Coverage for portality / models / journal_csv.py: 86%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1from portality.lib.seamless import SeamlessMixin 

2from portality.dao import DomainObject 

3from portality.lib.coerce import COERCE_MAP 

4from datetime import datetime 

5from portality.lib import dates, es_data_mapping 

6from typing import Union, List 

7from portality.core import app 

8 

9 

10JOURNAL_CSV_STRUCT = { 

11 "fields" : { 

12 "id" : {"coerce" : "unicode"}, 

13 "created_date" : {"coerce" : "utcdatetime"}, 

14 "last_updated" : {"coerce" : "utcdatetime"}, 

15 "es_type": {"coerce": "unicode"}, 

16 "export_date": {"coerce": "utcdatetime"}, 

17 "container": {"coerce": "unicode"}, 

18 "filename": {"coerce": "unicode"}, 

19 "url": {"coerce": "unicode"}, 

20 "size": {"coerce": "integer"} 

21 } 

22} 

23 

24MAPPING_OPTS = { 

25 "dynamic": None, 

26 "coerces": app.config["DATAOBJ_TO_MAPPING_DEFAULTS"] 

27} 

28 

29class JournalCSV(SeamlessMixin, DomainObject): 

30 __type__ = "journal_csv" 

31 

32 __SEAMLESS_STRUCT__ = JOURNAL_CSV_STRUCT 

33 __SEAMLESS_COERCE__ = COERCE_MAP 

34 

35 def __init__(self, **kwargs): 

36 # FIXME: hack, to deal with ES integration layer being improperly abstracted 

37 if "_source" in kwargs: 

38 kwargs = kwargs["_source"] 

39 super(JournalCSV, self).__init__(raw=kwargs) 

40 

41 def mappings(self): 

42 return es_data_mapping.create_mapping(self.__seamless_struct__.raw, MAPPING_OPTS) 

43 

44 @property 

45 def data(self): 

46 return self.__seamless__.data 

47 

48 @classmethod 

49 def all_csvs_before(cls, cutoff: datetime) -> list: 

50 q = CutoffQuery(cutoff) 

51 return cls.object_query(q.query()) 

52 

53 @classmethod 

54 def find_by_filename(cls, filename: str) -> List['JournalCSV']: 

55 q = FilenameQuery(filename) 

56 return cls.object_query(q.query()) 

57 

58 @classmethod 

59 def find_latest(cls): 

60 q = LatestQuery() 

61 res = cls.object_query(q.query()) 

62 if res is not None and len(res) > 0: 

63 return res[0] 

64 return None 

65 

66 @classmethod 

67 def first_csv_after(cls, cutoff: datetime) -> Union[None, 'JournalCSV']: 

68 q = FirstAfterQuery(cutoff) 

69 res = cls.object_query(q.query()) 

70 if res is not None and len(res) > 0: 

71 return res[0] 

72 return None 

73 

74 @property 

75 def export_date(self): 

76 return self.__seamless__.get_single("export_date", coerce=COERCE_MAP["datetime"]) 

77 

78 @property 

79 def export_day(self): 

80 return self.__seamless__.get_single("export_date", coerce=COERCE_MAP["bigenddate"]) 

81 

82 @export_date.setter 

83 def export_date(self, dump_date: Union[str, datetime]): 

84 self.__seamless__.set_with_struct("export_date", dump_date) 

85 

86 def set_csv(self, container, filename, size, url): 

87 self.__seamless__.set_with_struct("container", container) 

88 self.__seamless__.set_with_struct("filename", filename) 

89 self.__seamless__.set_with_struct("url", url) 

90 self.__seamless__.set_with_struct("size", size) 

91 

92 @property 

93 def container(self): 

94 return self.__seamless__.get_single("container") 

95 

96 @property 

97 def filename(self): 

98 return self.__seamless__.get_single("filename") 

99 

100 @property 

101 def url(self): 

102 return self.__seamless__.get_single("url") 

103 

104 @property 

105 def size(self): 

106 return self.__seamless__.get_single("size") 

107 

108 @property 

109 def size_human(self): 

110 value = self.size 

111 if value is not None: 

112 return self._int_to_filesize(value) 

113 return None 

114 

115 def _int_to_filesize(self, value): 

116 if value is not None: 

117 for unit in ['B', 'KB', 'MB', 'GB', 'TB']: 

118 if value < 1024.0: 

119 return f"{value:.2f} {unit}" 

120 value /= 1024.0 

121 return f"{value:.2f} PB" 

122 return None 

123 

124 

125class CutoffQuery(object): 

126 def __init__(self, cutoff: datetime): 

127 self.cutoff = cutoff 

128 

129 def query(self): 

130 return { 

131 "query": { 

132 "range": { 

133 "export_date": { 

134 "lt": dates.format(self.cutoff) 

135 } 

136 } 

137 }, 

138 "sort": { 

139 "export_date": { 

140 "order": "asc" # oldest first 

141 } 

142 } 

143 } 

144 

145 

146class FirstAfterQuery(object): 

147 def __init__(self, cutoff: datetime): 

148 self.cutoff = cutoff 

149 

150 def query(self): 

151 return { 

152 "query": { 

153 "range": { 

154 "export_date": { 

155 "gte": dates.format(self.cutoff) 

156 } 

157 } 

158 }, 

159 "sort": { 

160 "export_date": { 

161 "order": "asc" 

162 } 

163 }, 

164 "size": 1 

165 } 

166 

167 

168class LatestQuery: 

169 def query(self): 

170 return { 

171 "query": { 

172 "match_all": {} 

173 }, 

174 "sort": { 

175 "export_date": { 

176 "order": "desc" 

177 } 

178 }, 

179 "size": 1 

180 } 

181 

182 

183class FilenameQuery(object): 

184 def __init__(self, filename: str): 

185 self.filename = filename 

186 

187 def query(self): 

188 return { 

189 "query": { 

190 "bool": { 

191 "must": [ 

192 { 

193 "term": { 

194 "filename.exact": self.filename 

195 } 

196 } 

197 ] 

198 } 

199 } 

200 }