Coverage for portality/events/kafka_consumer.py: 0%

17 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1import faust 

2import json 

3 

4from portality.app import app as doajapp 

5from portality.bll import DOAJ 

6from portality.models import Event 

7 

8broker = doajapp.config.get("KAFKA_BROKER") 

9topic_name = doajapp.config.get("KAFKA_EVENTS_TOPIC") 

10 

11app = faust.App('events', broker=broker, value_serializer='json') 

12topic = app.topic(topic_name) 

13 

14 

15@app.agent(topic) 

16async def handle_event(stream): 

17 with doajapp.test_request_context("/"): 

18 svc = DOAJ.eventsService() 

19 async for event in stream: 

20 svc.consume(Event(raw=json.loads(event))) 

21 

22 

23if __name__ == '__main__': 

24 app.main()