Coverage for portality / events / consumers / update_request_publisher_submitted_notify.py: 98%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1# ~~UpdateRequestPublisherSubmittedNotify:Consumer~~ 

2from typing import TypedDict 

3 

4from portality import constants 

5from portality import models 

6from portality.bll import DOAJ 

7from portality.events import consumer_utils 

8from portality.events.consumer import EventConsumer 

9from portality.lib import dates 

10from portality.models import Account 

11from portality.util import url_for 

12 

13 

14class UpdateRequestPublisherSubmittedNotify(EventConsumer): 

15 ID = "update_request:publisher:submitted:notify" 

16 

17 class Context(TypedDict): 

18 # no usage, just for developer reference 

19 application: dict 

20 

21 @classmethod 

22 def should_consume(cls, event): 

23 if event.id != constants.EVENT_APPLICATION_UR_SUBMITTED: 

24 return False 

25 

26 if not Account.is_enable_publisher_email(): 

27 return False 

28 

29 app_source = event.context.get("application") 

30 if app_source is None: 

31 return False 

32 

33 application = consumer_utils.parse_application(app_source) 

34 if application.application_type != constants.APPLICATION_TYPE_UPDATE_REQUEST: 

35 return False 

36 

37 if not application.owner: 

38 return False 

39 

40 return True 

41 

42 @classmethod 

43 def consume(cls, event): 

44 application = consumer_utils.parse_application(event.context.get("application")) 

45 

46 # ~~-> Notifications:Service ~~ 

47 svc = DOAJ.notificationsService() 

48 

49 notification = models.Notification() 

50 notification.who = application.owner 

51 notification.created_by = cls.ID 

52 notification.classification = constants.NOTIFICATION_CLASSIFICATION_STATUS_CHANGE 

53 

54 notification.long = svc.long_notification(cls.ID).format( 

55 application_title=application.bibjson().title, 

56 date_applied=dates.human_date(application.date_applied), 

57 ) 

58 notification.short = svc.short_notification(cls.ID).format( 

59 issns=application.bibjson().issns_as_text() 

60 ) 

61 

62 notification.action = url_for("publisher.updates_in_progress") 

63 

64 svc.notify(notification) 

65 return notification