Coverage for portality/crosswalks/oaipmh.py: 64%

354 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2022-07-22 15:59 +0100

1import base64, sys, re 

2from lxml import etree 

3from datetime import datetime 

4from portality.core import app 

5from portality import datasets 

6from copy import deepcopy 

7 

8 

9##################################################################### 

10# Crosswalks for OAI-PMH 

11##################################################################### 

12 

13class OAI_Crosswalk(object): 

14 PMH_NAMESPACE = "http://www.openarchives.org/OAI/2.0/" 

15 PMH = "{%s}" % PMH_NAMESPACE 

16 

17 XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance" 

18 XSI = "{%s}" % XSI_NAMESPACE 

19 

20 NSMAP = {None: PMH_NAMESPACE, "xsi": XSI_NAMESPACE} 

21 

22 def crosswalk(self, record): 

23 raise NotImplementedError() 

24 

25 def header(self, record): 

26 raise NotImplementedError() 

27 

28 def _generate_header_subjects(self, parent_element, subjects): 

29 if subjects is None: 

30 subjects = [] 

31 

32 for subs in subjects: 

33 scheme = subs.get("scheme", '') 

34 term = subs.get("term", '') 

35 

36 if term: 

37 prefix = '' 

38 if scheme: 

39 prefix = scheme + ':' 

40 

41 subel = etree.SubElement(parent_element, self.PMH + "setSpec") 

42 set_text(subel, make_set_spec(prefix + term)) 

43 

44 

45class OAI_DC(OAI_Crosswalk): 

46 """ 

47 ~~OAIDC:Crosswalk->OAIPMH:Feature~~ 

48 """ 

49 OAIDC_NAMESPACE = "http://www.openarchives.org/OAI/2.0/oai_dc/" 

50 OAIDC = "{%s}" % OAIDC_NAMESPACE 

51 

52 DC_NAMESPACE = "http://purl.org/dc/elements/1.1/" 

53 DC = "{%s}" % DC_NAMESPACE 

54 

55 NSMAP = deepcopy(OAI_Crosswalk.NSMAP) 

56 NSMAP.update({"oai_dc": OAIDC_NAMESPACE, "dc": DC_NAMESPACE}) 

57 

58 def _generate_subjects(self, parent_element, subjects, keywords): 

59 if keywords is None: 

60 keywords = [] 

61 if subjects is None: 

62 subjects = [] 

63 

64 for keyword in keywords: 

65 subj = etree.SubElement(parent_element, self.DC + "subject") 

66 set_text(subj, keyword) 

67 

68 for subs in subjects: 

69 scheme = subs.get("scheme") 

70 code = subs.get("code") 

71 term = subs.get("term") 

72 

73 if scheme and scheme.lower() == 'lcc': 

74 attrib = {"{{{nspace}}}type".format(nspace=self.XSI_NAMESPACE): "dcterms:LCC"} 

75 termtext = term 

76 codetext = code 

77 else: 

78 attrib = {} 

79 termtext = scheme + ':' + term if term else None 

80 codetext = scheme + ':' + code if code else None 

81 

82 if termtext: 

83 subel = etree.SubElement(parent_element, self.DC + "subject", **attrib) 

84 set_text(subel, termtext) 

85 

86 if codetext: 

87 sel2 = etree.SubElement(parent_element, self.DC + "subject", **attrib) 

88 set_text(sel2, codetext) 

89 

90 

91class OAI_DC_Article(OAI_DC): 

92 """ 

93 ~~OAIArticleXML:Crosswalk->OAIPMH:Feature~~ 

94 ~~->OAIDC:Crosswalk~~ 

95 """ 

96 def crosswalk(self, record): 

97 bibjson = record.bibjson() 

98 

99 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP) 

100 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc") 

101 oai_dc.set(self.XSI + "schemaLocation", 

102 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd") 

103 

104 if bibjson.title is not None: 

105 title = etree.SubElement(oai_dc, self.DC + "title") 

106 set_text(title, bibjson.title) 

107 

108 # all the external identifiers (ISSNs, etc) 

109 for identifier in bibjson.get_identifiers(): 

110 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

111 set_text(idel, identifier.get("id")) 

112 

113 # our internal identifier 

114 url = app.config['BASE_URL'] + "/article/" + record.id 

115 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

116 set_text(idel, url) 

117 

118 # work out the date of publication 

119 date = bibjson.get_publication_date() 

120 if date != "": 

121 monthyear = etree.SubElement(oai_dc, self.DC + "date") 

122 set_text(monthyear, date) 

123 

124 for url in bibjson.get_urls(): 

125 urlel = etree.SubElement(oai_dc, self.DC + "relation") 

126 set_text(urlel, url.get("url")) 

127 

128 for identifier in bibjson.get_identifiers(idtype=bibjson.P_ISSN) + bibjson.get_identifiers(idtype=bibjson.E_ISSN): 

129 journallink = etree.SubElement(oai_dc, self.DC + "relation") 

130 set_text(journallink, app.config['BASE_URL'] + "/toc/" + identifier) 

131 

132 if bibjson.abstract is not None: 

133 abstract = etree.SubElement(oai_dc, self.DC + "description") 

134 set_text(abstract, bibjson.abstract) 

135 

136 if len(bibjson.author) > 0: 

137 for author in bibjson.author: 

138 ael = etree.SubElement(oai_dc, self.DC + "creator") 

139 set_text(ael, author.get("name")) 

140 if author.get("orcid_id"): 

141 ael.set('id', author.get("orcid_id")) 

142 

143 if bibjson.publisher is not None: 

144 pubel = etree.SubElement(oai_dc, self.DC + "publisher") 

145 set_text(pubel, bibjson.publisher) 

146 

147 objecttype = etree.SubElement(oai_dc, self.DC + "type") 

148 set_text(objecttype, "article") 

149 

150 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords) 

151 

152 jlangs = bibjson.journal_language 

153 if jlangs is not None: 

154 for language in jlangs: 

155 langel = etree.SubElement(oai_dc, self.DC + "language") 

156 set_text(langel, language) 

157 

158 citation = self._make_citation(bibjson) 

159 if citation is not None: 

160 cite = etree.SubElement(oai_dc, self.DC + "source") 

161 set_text(cite, citation) 

162 

163 return metadata 

164 

165 def header(self, record): 

166 bibjson = record.bibjson() 

167 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

168 

169 identifier = etree.SubElement(head, self.PMH + "identifier") 

170 set_text(identifier, make_oai_identifier(record.id, "article")) 

171 

172 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

173 set_text(datestamp, normalise_date(record.last_updated)) 

174 

175 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

176 return head 

177 

178 def _make_citation(self, bibjson): 

179 # [title], Vol [vol], Iss [iss], Pp [start]-end (year) 

180 ctitle = bibjson.journal_title 

181 cvol = bibjson.volume 

182 ciss = bibjson.number 

183 cstart = bibjson.start_page 

184 cend = bibjson.end_page 

185 cyear = bibjson.year 

186 

187 citation = "" 

188 if ctitle is not None: 

189 citation += ctitle 

190 

191 if cvol is not None: 

192 if citation != "": 

193 citation += ", " 

194 citation += "Vol " + cvol 

195 

196 if ciss is not None: 

197 if citation != "": 

198 citation += ", " 

199 citation += "Iss " + ciss 

200 

201 if cstart is not None or cend is not None: 

202 if citation != "": 

203 citation += ", " 

204 if (cstart is None and cend is not None) or (cstart is not None and cend is None): 

205 citation += "p " 

206 else: 

207 citation += "Pp " 

208 if cstart is not None: 

209 citation += cstart 

210 if cend is not None: 

211 if cstart is not None: 

212 citation += "-" 

213 citation += cend 

214 

215 if cyear is not None: 

216 if citation != "": 

217 citation += " " 

218 citation += "(" + cyear + ")" 

219 

220 return citation if citation != "" else None 

221 

222 

223class OAI_DC_Journal(OAI_DC): 

224 """ 

225 ~~OAIJournalXML:Crosswalk->OAIPMH:Feature~~ 

226 ~~->OAIDC:Crosswalk~~ 

227 """ 

228 def crosswalk(self, record): 

229 bibjson = record.bibjson() 

230 

231 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP) 

232 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc") 

233 oai_dc.set(self.XSI + "schemaLocation", 

234 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd") 

235 

236 if bibjson.title is not None: 

237 title = etree.SubElement(oai_dc, self.DC + "title") 

238 set_text(title, bibjson.title) 

239 

240 # external identifiers (ISSNs, etc) 

241 for identifier in bibjson.get_identifiers(): 

242 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

243 set_text(idel, identifier.get("id")) 

244 

245 # our internal identifier 

246 url = app.config["BASE_URL"] + "/toc/" + record.toc_id 

247 idel = etree.SubElement(oai_dc, self.DC + "identifier") 

248 set_text(idel, url) 

249 

250 if bibjson.language is not None and len(bibjson.language) > 0: 

251 for language in bibjson.language: 

252 lang = etree.SubElement(oai_dc, self.DC + "language") 

253 set_text(lang, language) 

254 

255 if bibjson.licenses is not None and len(bibjson.licenses) > 0: 

256 for license in bibjson.licenses: 

257 rights = etree.SubElement(oai_dc, self.DC + "rights") 

258 set_text(rights, license.get("type")) 

259 

260 if bibjson.publisher is not None: 

261 pub = etree.SubElement(oai_dc, self.DC + "publisher") 

262 set_text(pub, bibjson.publisher) 

263 

264 # We have removed the list of URLs in in model v2, so we need to gather the URLS one by one 

265 all_urls = [ 

266 bibjson.oa_statement_url, 

267 bibjson.journal_url, 

268 bibjson.aims_scope_url, 

269 bibjson.author_instructions_url, 

270 bibjson.waiver_url 

271 ] 

272 all_urls_dedupe = list(set(filter(None.__ne__, all_urls))) 

273 

274 for link in all_urls_dedupe: 

275 urlel = etree.SubElement(oai_dc, self.DC + "relation") 

276 set_text(urlel, link) 

277 

278 created = etree.SubElement(oai_dc, self.DC + "date") 

279 set_text(created, normalise_date(record.created_date)) 

280 

281 objecttype = etree.SubElement(oai_dc, self.DC + "type") 

282 set_text(objecttype, "journal") 

283 

284 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords) 

285 

286 return metadata 

287 

288 def header(self, record): 

289 bibjson = record.bibjson() 

290 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

291 

292 identifier = etree.SubElement(head, self.PMH + "identifier") 

293 set_text(identifier, make_oai_identifier(record.id, "journal")) 

294 

295 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

296 set_text(datestamp, normalise_date(record.last_updated)) 

297 

298 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

299 return head 

300 

301 

302class OAI_DOAJ_Article(OAI_Crosswalk): 

303 """ 

304 ~~OAIDOAJArticleXML:Crosswalk->OAIPMH:Feature~~ 

305 ~~->DOAJArticleXML:Schema~~ 

306 """ 

307 OAI_DOAJ_NAMESPACE = "http://doaj.org/features/oai_doaj/1.0/" 

308 OAI_DOAJ = "{%s}" % OAI_DOAJ_NAMESPACE 

309 

310 NSMAP = deepcopy(OAI_Crosswalk.NSMAP) 

311 NSMAP.update({"oai_doaj": OAI_DOAJ_NAMESPACE}) 

312 

313 def crosswalk(self, record): 

314 bibjson = record.bibjson() 

315 

316 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP) 

317 oai_doaj_article = etree.SubElement(metadata, self.OAI_DOAJ + "doajArticle") 

318 oai_doaj_article.set(self.XSI + "schemaLocation", 

319 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd http://doaj.org/features/oai_doaj/1.0/ https://doaj.org/static/doaj/doajArticles.xsd") 

320 

321 # look up the journal's language 

322 jlangs = bibjson.journal_language 

323 # first, if there are any languages recorded, get the 3-char code 

324 # corresponding to the first language 

325 language = None 

326 if jlangs: 

327 if isinstance(jlangs, list): 

328 jlang = jlangs[0] 

329 lang = datasets.language_for(jlang) 

330 if lang is not None: 

331 language = lang.alpha_3 

332 

333 # if the language code lookup was successful, add it to the 

334 # result 

335 if language: 

336 langel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "language") 

337 set_text(langel, language) 

338 

339 if bibjson.publisher: 

340 publel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisher") 

341 set_text(publel, bibjson.publisher) 

342 

343 if bibjson.journal_title: 

344 journtitel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "journalTitle") 

345 set_text(journtitel, bibjson.journal_title) 

346 

347 # all the external identifiers (ISSNs, etc) 

348 if bibjson.get_one_identifier(bibjson.P_ISSN): 

349 issn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issn") 

350 set_text(issn, bibjson.get_one_identifier(bibjson.P_ISSN)) 

351 

352 if bibjson.get_one_identifier(bibjson.E_ISSN): 

353 eissn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "eissn") 

354 set_text(eissn, bibjson.get_one_identifier(bibjson.E_ISSN)) 

355 

356 # work out the date of publication 

357 date = bibjson.get_publication_date() 

358 # convert it to the format required by the XML schema by parsing 

359 # it into a Python datetime and getting it back out as string. 

360 # If it's not coming back properly from the bibjson, throw it 

361 # away. 

362 try: 

363 date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") 

364 date = date.strftime("%Y-%m-%d") 

365 except: 

366 date = "" 

367 

368 if date: 

369 monthyear = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publicationDate") 

370 set_text(monthyear, date) 

371 

372 if bibjson.volume: 

373 volume = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "volume") 

374 set_text(volume, bibjson.volume) 

375 

376 if bibjson.number: 

377 issue = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issue") 

378 set_text(issue, bibjson.number) 

379 

380 if bibjson.start_page: 

381 start_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "startPage") 

382 set_text(start_page, bibjson.start_page) 

383 

384 if bibjson.end_page: 

385 end_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "endPage") 

386 set_text(end_page, bibjson.end_page) 

387 

388 if bibjson.get_one_identifier(bibjson.DOI): 

389 doi = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "doi") 

390 set_text(doi, bibjson.get_one_identifier(bibjson.DOI)) 

391 

392 if record.publisher_record_id(): 

393 pubrecid = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisherRecordId") 

394 set_text(pubrecid, record.publisher_record_id()) 

395 

396 # document type 

397 # as of Mar 2015 this was not being ingested when people upload XML 

398 # conforming to the doajArticle schema, so it's not being output either 

399 

400 if bibjson.title is not None: 

401 title = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "title") 

402 set_text(title, bibjson.title) 

403 

404 affiliations = [] 

405 if bibjson.author: 

406 authors_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "authors") 

407 for author in bibjson.author: # bibjson.author is a list, despite the name 

408 author_elem = etree.SubElement(authors_elem, self.OAI_DOAJ + "author") 

409 if author.get('name'): 

410 name_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "name") 

411 set_text(name_elem, author.get('name')) 

412 if author.get('email'): 

413 email_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "email") 

414 set_text(email_elem, author.get('email')) 

415 if author.get('affiliation'): 

416 new_affid = len(affiliations) # use the length of the list as the id for each new item 

417 affiliations.append((new_affid, author['affiliation'])) 

418 author_affiliation_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "affiliationId") 

419 set_text(author_affiliation_elem, str(new_affid)) 

420 if author.get('orcid_id'): 

421 orcid_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "orcid_id") 

422 set_text(orcid_elem, author.get("orcid_id")) 

423 

424 if affiliations: 

425 affiliations_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "affiliationsList") 

426 for affid, affiliation in affiliations: 

427 attrib = {"affiliationId": str(affid)} 

428 affiliation_elem = etree.SubElement(affiliations_elem, self.OAI_DOAJ + "affiliationName", **attrib) 

429 set_text(affiliation_elem, affiliation) 

430 

431 if bibjson.abstract: 

432 abstract = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "abstract") 

433 set_text(abstract, bibjson.abstract) 

434 

435 ftobj = bibjson.get_single_url('fulltext', unpack_urlobj=False) 

436 if ftobj: 

437 attrib = {} 

438 if "content_type" in ftobj: 

439 attrib['format'] = ftobj['content_type'] 

440 

441 fulltext_url_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "fullTextUrl", **attrib) 

442 

443 if "url" in ftobj: 

444 set_text(fulltext_url_elem, ftobj['url']) 

445 

446 if bibjson.keywords: 

447 keywords_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + 'keywords') 

448 for keyword in bibjson.keywords: 

449 kel = etree.SubElement(keywords_elem, self.OAI_DOAJ + 'keyword') 

450 set_text(kel, keyword) 

451 

452 return metadata 

453 

454 def header(self, record): 

455 bibjson = record.bibjson() 

456 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP) 

457 

458 identifier = etree.SubElement(head, self.PMH + "identifier") 

459 set_text(identifier, make_oai_identifier(record.id, "article")) 

460 

461 datestamp = etree.SubElement(head, self.PMH + "datestamp") 

462 set_text(datestamp, normalise_date(record.last_updated)) 

463 

464 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects()) 

465 return head 

466 

467 

468CROSSWALKS = { 

469 "oai_dc": { 

470 "article": OAI_DC_Article, 

471 "journal": OAI_DC_Journal 

472 }, 

473 'oai_doaj': { 

474 "article": OAI_DOAJ_Article 

475 } 

476} 

477 

478 

479##################################################################### 

480# Utility methods/objects 

481##################################################################### 

482 

483def make_set_spec(setspec): 

484 b = base64.urlsafe_b64encode(setspec.encode("utf-8")) 

485 setspec_utf8 = b.decode("utf-8") 

486 s = setspec_utf8.replace('=', '~') 

487 return s 

488 

489 

490def make_oai_identifier(identifier, qualifier): 

491 return "oai:" + app.config.get("OAIPMH_IDENTIFIER_NAMESPACE") + "/" + qualifier + ":" + identifier 

492 

493 

494def normalise_date(date): 

495 # FIXME: do we need a more powerful date normalisation routine? 

496 try: 

497 datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") 

498 return date 

499 except: 

500 return "T".join(date.split(" ")) + "Z" 

501 

502 

503########################################################### 

504# XML Character encoding hacks 

505########################################################### 

506 

507_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), 

508 (0x7F, 0x84), (0x86, 0x9F), 

509 (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)] 

510if sys.maxunicode >= 0x10000: # not narrow build 

511 _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), 

512 (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), 

513 (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), 

514 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), 

515 (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), 

516 (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), 

517 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), 

518 (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)]) 

519_illegal_ranges = ["%s-%s" % (chr(low), chr(high)) 

520 for (low, high) in _illegal_unichrs] 

521_illegal_xml_chars_RE = re.compile('[%s]' % ''.join(_illegal_ranges)) 

522 

523 

524def valid_XML_char_ordinal(i): 

525 return ( # conditions ordered by presumed frequency 

526 0x20 <= i <= 0xD7FF 

527 or i in (0x9, 0xA, 0xD) 

528 or 0xE000 <= i <= 0xFFFD 

529 or 0x10000 <= i <= 0x10FFFF 

530 ) 

531 

532 

533def clean_unreadable(input_string): 

534 try: 

535 if type(input_string) == str: 

536 return _illegal_xml_chars_RE.sub("", input_string) 

537 else: 

538 return _illegal_xml_chars_RE.sub("", input_string.decode("utf-8")) 

539 except TypeError as e: 

540 app.logger.error("Unable to strip illegal XML chars from: {x}, {y}".format(x=input_string, y=type(input_string))) 

541 return None 

542 

543 

544def xml_clean(input_string): 

545 cleaned_string = ''.join(c for c in input_string if valid_XML_char_ordinal(ord(c))) 

546 return cleaned_string 

547 

548 

549def set_text(element, input_string): 

550 if input_string is None: 

551 return 

552 input_string = clean_unreadable(input_string) 

553 try: 

554 element.text = input_string 

555 except ValueError: 

556 element.text = xml_clean(input_string)