Coverage for portality / crosswalks / oaipmh.py: 64%

369 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 00:09 +0100

1import base64, sys, re 

2from lxml import etree 

3from portality.core import app 

4from portality import datasets 

5from copy import deepcopy 

6 

7from portality.lib import dates 

8from portality.lib.dates import FMT_DATE_STD 

9 

10 

11##################################################################### 

12# Crosswalks for OAI-PMH 

13##################################################################### 

14 

15class OAI_Crosswalk(object): 

16 PMH_NAMESPACE = "http://www.openarchives.org/OAI/2.0/" 

17 PMH = "{%s}" % PMH_NAMESPACE 

18 

19 XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance" 

20 XSI = "{%s}" % XSI_NAMESPACE 

21 

22 XMLNS_NAMESPACE = "http://www.openarchives.org/OAI/2.0/" 

23 XMLNS = "{%s}" % XMLNS_NAMESPACE 

24 

25 NSMAP = {None: PMH_NAMESPACE, "xsi": XSI_NAMESPACE, "xmlns": XMLNS_NAMESPACE} 

26 

27 def crosswalk(self, record): 

28 raise NotImplementedError() 

29 

30 def header(self, record): 

31 raise NotImplementedError() 

32 

33 def _generate_header_subjects(self, parent_element, subjects): 

34 if subjects is None: 

35 subjects = [] 

36 

37 for subs in subjects: 

38 scheme = subs.get("scheme", '') 

39 term = subs.get("term", '') 

40 

41 if term: 

42 prefix = '' 

43 if scheme: 

44 prefix = scheme + ':' 

45 

46 subel = etree.SubElement(parent_element, self.PMH + "setSpec") 

47 set_text(subel, make_set_spec(prefix + term)) 

48 

49 

50class OAI_DC(OAI_Crosswalk): 

51 """ 

52 ~~OAIDC:Crosswalk->OAIPMH:Feature~~ 

53 """ 

54 OAIDC_NAMESPACE = "http://www.openarchives.org/OAI/2.0/oai_dc/" 

55 OAIDC = "{%s}" % OAIDC_NAMESPACE 

56 

57 DC_NAMESPACE = "http://purl.org/dc/elements/1.1/" 

58 DC = "{%s}" % DC_NAMESPACE 

59 

60 NSMAP = deepcopy(OAI_Crosswalk.NSMAP) 

61 NSMAP.update({"oai_dc": OAIDC_NAMESPACE, "dc": DC_NAMESPACE}) 

62 

63 def _generate_subjects(self, parent_element, subjects, keywords): 

64 if keywords is None: 

65 keywords = [] 

66 if subjects is None: 

67 subjects = [] 

68 

69 for keyword in keywords: 

70 subj = etree.SubElement(parent_element, self.DC + "subject") 

71 set_text(subj, keyword) 

72 

73 for subs in subjects: 

74 scheme = subs.get("scheme") 

75 code = subs.get("code") 

76 term = subs.get("term") 

77 

78 if scheme and scheme.lower() == 'lcc': 

79 attrib = {"{{{nspace}}}type".format(nspace=self.XSI_NAMESPACE): "dcterms:LCC"} 

80 termtext = term 

81 codetext = code 

82 else: 

83 attrib = {} 

84 termtext = scheme + ':' + term if term else None 

85 codetext = scheme + ':' + code if code else None 

86 

87 if termtext: 

88 subel = etree.SubElement(parent_element, self.DC + "subject", **attrib) 

89 set_text(subel, termtext) 

90 

91 if codetext: 

92 sel2 = etree.SubElement(parent_element, self.DC + "subject", **attrib) 

93 set_text(sel2, codetext) 

94 

95 

96class OAI_DC_Article(OAI_DC): 

97 """ 

98 ~~OAIArticleXML:Crosswalk->OAIPMH:Feature~~ 

99 ~~->OAIDC:Crosswalk~~ 

100 """ 

101 def crosswalk(self, record): 

102 if not record.is_in_doaj(): 

103 return None 

104 

105 bibjson = record.bibjson() 

106 

107 metadata = etree.Element(self.PMH + "metadata") 

108 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc", nsmap=self.NSMAP) 

109 oai_dc.set(self.XSI + "schemaLocation", 

110 "http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd") 

111 

112 if bibjson.title is not None: 

113 title = etree.SubElement(oai_dc, self.DC + "title") 

114 set_text(title, bibjson.title) 

115 

116 # all the external identifiers (ISSNs, etc) 

117 for identifier in bibjson.get_identifiers(): 

118 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

119 set_text(idel, identifier.get("id")) 

120 

121 # our internal identifier 

122 url = app.config['BASE_URL'] + "/article/" + record.id 

123 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

124 set_text(idel, url) 

125 

126 # work out the date of publication 

127 date = bibjson.get_publication_date() 

128 if date != "": 

129 monthyear = etree.SubElement(oai_dc, self.DC + "date") 

130 set_text(monthyear, date) 

131 

132 for url in bibjson.get_urls(): 

133 urlel = etree.SubElement(oai_dc, self.DC + "relation") 

134 set_text(urlel, url.get("url")) 

135 

136 for identifier in bibjson.get_identifiers(idtype=bibjson.P_ISSN) + bibjson.get_identifiers(idtype=bibjson.E_ISSN): 

137 journallink = etree.SubElement(oai_dc, self.DC + "relation") 

138 set_text(journallink, app.config['BASE_URL'] + "/toc/" + identifier) 

139 

140 if bibjson.abstract is not None: 

141 abstract = etree.SubElement(oai_dc, self.DC + "description") 

142 set_text(abstract, bibjson.abstract) 

143 

144 if len(bibjson.author) > 0: 

145 for author in bibjson.author: 

146 ael = etree.SubElement(oai_dc, self.DC + "creator") 

147 set_text(ael, author.get("name")) 

148 if author.get("orcid_id"): 

149 ael.set('id', author.get("orcid_id")) 

150 

151 if bibjson.publisher is not None: 

152 pubel = etree.SubElement(oai_dc, self.DC + "publisher") 

153 set_text(pubel, bibjson.publisher) 

154 

155 objecttype = etree.SubElement(oai_dc, self.DC + "type") 

156 set_text(objecttype, "article") 

157 

158 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords) 

159 

160 jlangs = bibjson.journal_language 

161 if jlangs is not None: 

162 for language in jlangs: 

163 langel = etree.SubElement(oai_dc, self.DC + "language") 

164 set_text(langel, language) 

165 

166 citation = self._make_citation(bibjson) 

167 if citation is not None: 

168 cite = etree.SubElement(oai_dc, self.DC + "source") 

169 set_text(cite, citation) 

170 

171 return metadata 

172 

173 def header(self, record): 

174 bibjson = record.bibjson() 

175 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

176 

177 if not record.is_in_doaj(): 

178 head.set("status", "deleted") 

179 

180 identifier = etree.SubElement(head, self.PMH + "identifier") 

181 set_text(identifier, make_oai_identifier(record.id, "article")) 

182 

183 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

184 set_text(datestamp, normalise_date(record.last_updated)) 

185 

186 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

187 return head 

188 

189 def _make_citation(self, bibjson): 

190 # [title], Vol [vol], Iss [iss], Pp [start]-end (year) 

191 ctitle = bibjson.journal_title 

192 cvol = bibjson.volume 

193 ciss = bibjson.number 

194 cstart = bibjson.start_page 

195 cend = bibjson.end_page 

196 cyear = bibjson.year 

197 

198 citation = "" 

199 if ctitle is not None: 

200 citation += ctitle 

201 

202 if cvol is not None: 

203 if citation != "": 

204 citation += ", " 

205 citation += "Vol " + cvol 

206 

207 if ciss is not None: 

208 if citation != "": 

209 citation += ", " 

210 citation += "Iss " + ciss 

211 

212 if cstart is not None or cend is not None: 

213 if citation != "": 

214 citation += ", " 

215 if (cstart is None and cend is not None) or (cstart is not None and cend is None): 

216 citation += "p " 

217 else: 

218 citation += "Pp " 

219 if cstart is not None: 

220 citation += cstart 

221 if cend is not None: 

222 if cstart is not None: 

223 citation += "-" 

224 citation += cend 

225 

226 if cyear is not None: 

227 if citation != "": 

228 citation += " " 

229 citation += "(" + cyear + ")" 

230 

231 return citation if citation != "" else None 

232 

233 

234class OAI_DC_Journal(OAI_DC): 

235 """ 

236 ~~OAIJournalXML:Crosswalk->OAIPMH:Feature~~ 

237 ~~->OAIDC:Crosswalk~~ 

238 """ 

239 def crosswalk(self, record): 

240 if not record.is_in_doaj(): 

241 return None 

242 

243 bibjson = record.bibjson() 

244 

245 metadata = etree.Element(self.PMH + "metadata") 

246 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc", nsmap=self.NSMAP) 

247 oai_dc.set(self.XSI + "schemaLocation", 

248 "http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd") 

249 if bibjson.title is not None: 

250 title = etree.SubElement(oai_dc, self.DC + "title") 

251 set_text(title, bibjson.title) 

252 

253 # external identifiers (ISSNs, etc) 

254 for identifier in bibjson.get_identifiers(): 

255 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

256 set_text(idel, identifier.get("id")) 

257 

258 # our internal identifier 

259 url = app.config["BASE_URL"] + "/toc/" + record.toc_id 

260 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

261 set_text(idel, url) 

262 

263 if bibjson.language is not None and len(bibjson.language) > 0: 

264 for language in bibjson.language: 

265 lang = etree.SubElement(oai_dc, self.DC + "language") 

266 set_text(lang, language) 

267 

268 if bibjson.licenses is not None and len(bibjson.licenses) > 0: 

269 for license in bibjson.licenses: 

270 rights = etree.SubElement(oai_dc, self.DC + "rights") 

271 set_text(rights, license.get("type")) 

272 

273 if bibjson.publisher is not None: 

274 pub = etree.SubElement(oai_dc, self.DC + "publisher") 

275 set_text(pub, bibjson.publisher) 

276 

277 # We have removed the list of URLs in in model v2, so we need to gather the URLS one by one 

278 all_urls = [ 

279 bibjson.journal_url, 

280 bibjson.aims_scope_url, 

281 bibjson.author_instructions_url, 

282 bibjson.waiver_url 

283 ] 

284 all_urls_dedupe = list(set(filter(None.__ne__, all_urls))) 

285 

286 for link in all_urls_dedupe: 

287 urlel = etree.SubElement(oai_dc, self.DC + "relation") 

288 set_text(urlel, link) 

289 

290 created = etree.SubElement(oai_dc, self.DC + "date") 

291 set_text(created, normalise_date(record.created_date)) 

292 

293 objecttype = etree.SubElement(oai_dc, self.DC + "type") 

294 set_text(objecttype, "journal") 

295 

296 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords) 

297 

298 return metadata 

299 

300 def header(self, record): 

301 bibjson = record.bibjson() 

302 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

303 

304 if not record.is_in_doaj(): 

305 head.set("status", "deleted") 

306 

307 identifier = etree.SubElement(head, self.PMH + "identifier") 

308 set_text(identifier, make_oai_identifier(record.id, "journal")) 

309 

310 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

311 set_text(datestamp, normalise_date(record.last_updated)) 

312 

313 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

314 return head 

315 

316 

317class OAI_DOAJ_Article(OAI_Crosswalk): 

318 """ 

319 ~~OAIDOAJArticleXML:Crosswalk->OAIPMH:Feature~~ 

320 ~~->DOAJArticleXML:Schema~~ 

321 """ 

322 OAI_DOAJ_NAMESPACE = "http://doaj.org/features/oai_doaj/1.0/" 

323 OAI_DOAJ = "{%s}" % OAI_DOAJ_NAMESPACE 

324 

325 NSMAP = deepcopy(OAI_Crosswalk.NSMAP) 

326 NSMAP.update({"oai_doaj": OAI_DOAJ_NAMESPACE}) 

327 

328 def crosswalk(self, record): 

329 if not record.is_in_doaj(): 

330 return None 

331 

332 bibjson = record.bibjson() 

333 

334 metadata = etree.Element(self.PMH + "metadata") 

335 oai_doaj_article = etree.SubElement(metadata, self.OAI_DOAJ + "doajArticle", nsmap=self.NSMAP) 

336 oai_doaj_article.set(self.XSI + "schemaLocation", 

337 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd http://doaj.org/features/oai_doaj/1.0/ https://doaj.org/static/doaj/doajArticles.xsd") 

338 

339 # look up the journal's language 

340 jlangs = bibjson.journal_language 

341 # first, if there are any languages recorded, get the 3-char code 

342 # corresponding to the first language 

343 language = None 

344 if jlangs: 

345 if isinstance(jlangs, list): 

346 jlang = jlangs[0] 

347 lang = datasets.language_for(jlang) 

348 if lang is not None: 

349 language = lang.alpha_3 

350 

351 # if the language code lookup was successful, add it to the 

352 # result 

353 if language: 

354 langel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "language") 

355 set_text(langel, language) 

356 

357 if bibjson.publisher: 

358 publel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisher") 

359 set_text(publel, bibjson.publisher) 

360 

361 if bibjson.journal_title: 

362 journtitel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "journalTitle") 

363 set_text(journtitel, bibjson.journal_title) 

364 

365 # all the external identifiers (ISSNs, etc) 

366 if bibjson.get_one_identifier(bibjson.P_ISSN): 

367 issn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issn") 

368 set_text(issn, bibjson.get_one_identifier(bibjson.P_ISSN)) 

369 

370 if bibjson.get_one_identifier(bibjson.E_ISSN): 

371 eissn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "eissn") 

372 set_text(eissn, bibjson.get_one_identifier(bibjson.E_ISSN)) 

373 

374 # work out the date of publication 

375 date = bibjson.get_publication_date() 

376 # convert it to the format required by the XML schema by parsing 

377 # it into a Python datetime and getting it back out as string. 

378 # If it's not coming back properly from the bibjson, throw it 

379 # away. 

380 try: 

381 date = dates.parse(date) 

382 date = date.strftime(FMT_DATE_STD) 

383 except: 

384 date = "" 

385 

386 if date: 

387 monthyear = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publicationDate") 

388 set_text(monthyear, date) 

389 

390 if bibjson.volume: 

391 volume = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "volume") 

392 set_text(volume, bibjson.volume) 

393 

394 if bibjson.number: 

395 issue = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issue") 

396 set_text(issue, bibjson.number) 

397 

398 if bibjson.start_page: 

399 start_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "startPage") 

400 set_text(start_page, bibjson.start_page) 

401 

402 if bibjson.end_page: 

403 end_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "endPage") 

404 set_text(end_page, bibjson.end_page) 

405 

406 if bibjson.get_one_identifier(bibjson.DOI): 

407 doi = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "doi") 

408 set_text(doi, bibjson.get_one_identifier(bibjson.DOI)) 

409 

410 if record.publisher_record_id(): 

411 pubrecid = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisherRecordId") 

412 set_text(pubrecid, record.publisher_record_id()) 

413 

414 # document type 

415 # as of Mar 2015 this was not being ingested when people upload XML 

416 # conforming to the doajArticle schema, so it's not being output either 

417 

418 if bibjson.title is not None: 

419 title = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "title") 

420 set_text(title, bibjson.title) 

421 

422 affiliations = [] 

423 if bibjson.author: 

424 authors_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "authors") 

425 for author in bibjson.author: # bibjson.author is a list, despite the name 

426 author_elem = etree.SubElement(authors_elem, self.OAI_DOAJ + "author") 

427 if author.get('name'): 

428 name_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "name") 

429 set_text(name_elem, author.get('name')) 

430 if author.get('email'): 

431 email_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "email") 

432 set_text(email_elem, author.get('email')) 

433 if author.get('affiliation'): 

434 new_affid = len(affiliations) # use the length of the list as the id for each new item 

435 affiliations.append((new_affid, author['affiliation'])) 

436 author_affiliation_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "affiliationId") 

437 set_text(author_affiliation_elem, str(new_affid)) 

438 if author.get('orcid_id'): 

439 orcid_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "orcid_id") 

440 set_text(orcid_elem, author.get("orcid_id")) 

441 

442 if affiliations: 

443 affiliations_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "affiliationsList") 

444 for affid, affiliation in affiliations: 

445 attrib = {"affiliationId": str(affid)} 

446 affiliation_elem = etree.SubElement(affiliations_elem, self.OAI_DOAJ + "affiliationName", **attrib) 

447 set_text(affiliation_elem, affiliation) 

448 

449 if bibjson.abstract: 

450 abstract = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "abstract") 

451 set_text(abstract, bibjson.abstract) 

452 

453 ftobj = bibjson.get_single_url('fulltext', unpack_urlobj=False) 

454 if ftobj: 

455 attrib = {} 

456 if "content_type" in ftobj: 

457 attrib['format'] = ftobj['content_type'] 

458 

459 fulltext_url_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "fullTextUrl", **attrib) 

460 

461 if "url" in ftobj: 

462 set_text(fulltext_url_elem, ftobj['url']) 

463 

464 if bibjson.keywords: 

465 keywords_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + 'keywords') 

466 for keyword in bibjson.keywords: 

467 kel = etree.SubElement(keywords_elem, self.OAI_DOAJ + 'keyword') 

468 set_text(kel, keyword) 

469 

470 return metadata 

471 

472 def header(self, record): 

473 bibjson = record.bibjson() 

474 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

475 

476 if not record.is_in_doaj(): 

477 head.set("status", "deleted") 

478 

479 identifier = etree.SubElement(head, self.PMH + "identifier") 

480 set_text(identifier, make_oai_identifier(record.id, "article")) 

481 

482 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

483 set_text(datestamp, normalise_date(record.last_updated)) 

484 

485 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

486 return head 

487 

488 

489CROSSWALKS = { 

490 "oai_dc": { 

491 "article": OAI_DC_Article, 

492 "journal": OAI_DC_Journal, 

493 "article,article_tombstone": OAI_DC_Article 

494 }, 

495 'oai_doaj': { 

496 "article": OAI_DOAJ_Article, 

497 "article,article_tombstone": OAI_DOAJ_Article 

498 } 

499} 

500 

501 

502##################################################################### 

503# Utility methods/objects 

504##################################################################### 

505 

506def make_set_spec(setspec): 

507 b = base64.urlsafe_b64encode(setspec.encode("utf-8")) 

508 setspec_utf8 = b.decode("utf-8") 

509 s = setspec_utf8.replace('=', '~') 

510 return s 

511 

512 

513def make_oai_identifier(identifier, qualifier): 

514 return "oai:" + app.config.get("OAIPMH_IDENTIFIER_NAMESPACE") + "/" + qualifier + ":" + identifier 

515 

516 

517def normalise_date(date): 

518 # FIXME: do we need a more powerful date normalisation routine? 

519 try: 

520 dates.parse(date) 

521 return date 

522 except: 

523 return "T".join(date.split(" ")) + "Z" 

524 

525 

526########################################################### 

527# XML Character encoding hacks 

528########################################################### 

529 

530_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), 

531 (0x7F, 0x84), (0x86, 0x9F), 

532 (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] 

533if sys.maxunicode >= 0x10000: # not narrow build 

534 _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), 

535 (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), 

536 (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), 

537 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), 

538 (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), 

539 (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), 

540 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), 

541 (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)]) 

542_illegal_ranges = ["%s-%s" % (chr(low), chr(high)) 

543 for (low, high) in _illegal_unichrs] 

544_illegal_xml_chars_RE = re.compile('[%s]' % ''.join(_illegal_ranges)) 

545 

546 

547def valid_XML_char_ordinal(i): 

548 return ( # conditions ordered by presumed frequency 

549 0x20 <= i <= 0xD7FF 

550 or i in (0x9, 0xA, 0xD) 

551 or 0xE000 <= i <= 0xFFFD 

552 or 0x10000 <= i <= 0x10FFFF 

553 ) 

554 

555 

556def clean_unreadable(input_string): 

557 try: 

558 if type(input_string) == str: 

559 return _illegal_xml_chars_RE.sub("", input_string) 

560 else: 

561 return _illegal_xml_chars_RE.sub("", input_string.decode("utf-8")) 

562 except TypeError as e: 

563 app.logger.error("Unable to strip illegal XML chars from: {x}, {y}".format(x=input_string, y=type(input_string))) 

564 return None 

565 

566 

567def xml_clean(input_string): 

568 cleaned_string = ''.join(c for c in input_string if valid_XML_char_ordinal(ord(c))) 

569 return cleaned_string 

570 

571 

572def set_text(element, input_string): 

573 if input_string is None: 

574 return 

575 input_string = clean_unreadable(input_string) 

576 try: 

577 element.text = input_string 

578 except ValueError: 

579 element.text = xml_clean(input_string)