Coverage for portality/events/kafka_consumer.py: 0%
17 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1import faust
2import json
4from portality.app import app as doajapp
5from portality.bll import DOAJ
6from portality.models import Event
8broker = doajapp.config.get("KAFKA_BROKER")
9topic_name = doajapp.config.get("KAFKA_EVENTS_TOPIC")
11app = faust.App('events', broker=broker, value_serializer='json')
12topic = app.topic(topic_name)
15@app.agent(topic)
16async def handle_event(stream):
17 with doajapp.test_request_context("/"):
18 svc = DOAJ.eventsService()
19 async for event in stream:
20 svc.consume(Event(raw=json.loads(event)))
23if __name__ == '__main__':
24 app.main()