Coverage for portality / crosswalks / oaipmh.py: 64%
369 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 09:41 +0100
1import base64, sys, re
2from lxml import etree
3from portality.core import app
4from portality import datasets
5from copy import deepcopy
7from portality.lib import dates
8from portality.lib.dates import FMT_DATE_STD
11#####################################################################
12# Crosswalks for OAI-PMH
13#####################################################################
15class OAI_Crosswalk(object):
16 PMH_NAMESPACE = "http://www.openarchives.org/OAI/2.0/"
17 PMH = "{%s}" % PMH_NAMESPACE
19 XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
20 XSI = "{%s}" % XSI_NAMESPACE
22 XMLNS_NAMESPACE = "http://www.openarchives.org/OAI/2.0/"
23 XMLNS = "{%s}" % XMLNS_NAMESPACE
25 NSMAP = {None: PMH_NAMESPACE, "xsi": XSI_NAMESPACE, "xmlns": XMLNS_NAMESPACE}
27 def crosswalk(self, record):
28 raise NotImplementedError()
30 def header(self, record):
31 raise NotImplementedError()
33 def _generate_header_subjects(self, parent_element, subjects):
34 if subjects is None:
35 subjects = []
37 for subs in subjects:
38 scheme = subs.get("scheme", '')
39 term = subs.get("term", '')
41 if term:
42 prefix = ''
43 if scheme:
44 prefix = scheme + ':'
46 subel = etree.SubElement(parent_element, self.PMH + "setSpec")
47 set_text(subel, make_set_spec(prefix + term))
50class OAI_DC(OAI_Crosswalk):
51 """
52 ~~OAIDC:Crosswalk->OAIPMH:Feature~~
53 """
54 OAIDC_NAMESPACE = "http://www.openarchives.org/OAI/2.0/oai_dc/"
55 OAIDC = "{%s}" % OAIDC_NAMESPACE
57 DC_NAMESPACE = "http://purl.org/dc/elements/1.1/"
58 DC = "{%s}" % DC_NAMESPACE
60 NSMAP = deepcopy(OAI_Crosswalk.NSMAP)
61 NSMAP.update({"oai_dc": OAIDC_NAMESPACE, "dc": DC_NAMESPACE})
63 def _generate_subjects(self, parent_element, subjects, keywords):
64 if keywords is None:
65 keywords = []
66 if subjects is None:
67 subjects = []
69 for keyword in keywords:
70 subj = etree.SubElement(parent_element, self.DC + "subject")
71 set_text(subj, keyword)
73 for subs in subjects:
74 scheme = subs.get("scheme")
75 code = subs.get("code")
76 term = subs.get("term")
78 if scheme and scheme.lower() == 'lcc':
79 attrib = {"{{{nspace}}}type".format(nspace=self.XSI_NAMESPACE): "dcterms:LCC"}
80 termtext = term
81 codetext = code
82 else:
83 attrib = {}
84 termtext = scheme + ':' + term if term else None
85 codetext = scheme + ':' + code if code else None
87 if termtext:
88 subel = etree.SubElement(parent_element, self.DC + "subject", **attrib)
89 set_text(subel, termtext)
91 if codetext:
92 sel2 = etree.SubElement(parent_element, self.DC + "subject", **attrib)
93 set_text(sel2, codetext)
96class OAI_DC_Article(OAI_DC):
97 """
98 ~~OAIArticleXML:Crosswalk->OAIPMH:Feature~~
99 ~~->OAIDC:Crosswalk~~
100 """
101 def crosswalk(self, record):
102 if not record.is_in_doaj():
103 return None
105 bibjson = record.bibjson()
107 metadata = etree.Element(self.PMH + "metadata")
108 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc", nsmap=self.NSMAP)
109 oai_dc.set(self.XSI + "schemaLocation",
110 "http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd")
112 if bibjson.title is not None:
113 title = etree.SubElement(oai_dc, self.DC + "title")
114 set_text(title, bibjson.title)
116 # all the external identifiers (ISSNs, etc)
117 for identifier in bibjson.get_identifiers():
118 idel = etree.SubElement(oai_dc, self.DC + "identifier")
119 set_text(idel, identifier.get("id"))
121 # our internal identifier
122 url = app.config['BASE_URL'] + "/article/" + record.id
123 idel = etree.SubElement(oai_dc, self.DC + "identifier")
124 set_text(idel, url)
126 # work out the date of publication
127 date = bibjson.get_publication_date()
128 if date != "":
129 monthyear = etree.SubElement(oai_dc, self.DC + "date")
130 set_text(monthyear, date)
132 for url in bibjson.get_urls():
133 urlel = etree.SubElement(oai_dc, self.DC + "relation")
134 set_text(urlel, url.get("url"))
136 for identifier in bibjson.get_identifiers(idtype=bibjson.P_ISSN) + bibjson.get_identifiers(idtype=bibjson.E_ISSN):
137 journallink = etree.SubElement(oai_dc, self.DC + "relation")
138 set_text(journallink, app.config['BASE_URL'] + "/toc/" + identifier)
140 if bibjson.abstract is not None:
141 abstract = etree.SubElement(oai_dc, self.DC + "description")
142 set_text(abstract, bibjson.abstract)
144 if len(bibjson.author) > 0:
145 for author in bibjson.author:
146 ael = etree.SubElement(oai_dc, self.DC + "creator")
147 set_text(ael, author.get("name"))
148 if author.get("orcid_id"):
149 ael.set('id', author.get("orcid_id"))
151 if bibjson.publisher is not None:
152 pubel = etree.SubElement(oai_dc, self.DC + "publisher")
153 set_text(pubel, bibjson.publisher)
155 objecttype = etree.SubElement(oai_dc, self.DC + "type")
156 set_text(objecttype, "article")
158 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords)
160 jlangs = bibjson.journal_language
161 if jlangs is not None:
162 for language in jlangs:
163 langel = etree.SubElement(oai_dc, self.DC + "language")
164 set_text(langel, language)
166 citation = self._make_citation(bibjson)
167 if citation is not None:
168 cite = etree.SubElement(oai_dc, self.DC + "source")
169 set_text(cite, citation)
171 return metadata
173 def header(self, record):
174 bibjson = record.bibjson()
175 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
177 if not record.is_in_doaj():
178 head.set("status", "deleted")
180 identifier = etree.SubElement(head, self.PMH + "identifier")
181 set_text(identifier, make_oai_identifier(record.id, "article"))
183 datestamp = etree.SubElement(head, self.PMH + "datestamp")
184 set_text(datestamp, normalise_date(record.last_updated))
186 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
187 return head
189 def _make_citation(self, bibjson):
190 # [title], Vol [vol], Iss [iss], Pp [start]-end (year)
191 ctitle = bibjson.journal_title
192 cvol = bibjson.volume
193 ciss = bibjson.number
194 cstart = bibjson.start_page
195 cend = bibjson.end_page
196 cyear = bibjson.year
198 citation = ""
199 if ctitle is not None:
200 citation += ctitle
202 if cvol is not None:
203 if citation != "":
204 citation += ", "
205 citation += "Vol " + cvol
207 if ciss is not None:
208 if citation != "":
209 citation += ", "
210 citation += "Iss " + ciss
212 if cstart is not None or cend is not None:
213 if citation != "":
214 citation += ", "
215 if (cstart is None and cend is not None) or (cstart is not None and cend is None):
216 citation += "p "
217 else:
218 citation += "Pp "
219 if cstart is not None:
220 citation += cstart
221 if cend is not None:
222 if cstart is not None:
223 citation += "-"
224 citation += cend
226 if cyear is not None:
227 if citation != "":
228 citation += " "
229 citation += "(" + cyear + ")"
231 return citation if citation != "" else None
234class OAI_DC_Journal(OAI_DC):
235 """
236 ~~OAIJournalXML:Crosswalk->OAIPMH:Feature~~
237 ~~->OAIDC:Crosswalk~~
238 """
239 def crosswalk(self, record):
240 if not record.is_in_doaj():
241 return None
243 bibjson = record.bibjson()
245 metadata = etree.Element(self.PMH + "metadata")
246 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc", nsmap=self.NSMAP)
247 oai_dc.set(self.XSI + "schemaLocation",
248 "http://www.openarchives.org/OAI/2.0/oai_dc/ http://www.openarchives.org/OAI/2.0/oai_dc.xsd")
249 if bibjson.title is not None:
250 title = etree.SubElement(oai_dc, self.DC + "title")
251 set_text(title, bibjson.title)
253 # external identifiers (ISSNs, etc)
254 for identifier in bibjson.get_identifiers():
255 idel = etree.SubElement(oai_dc, self.DC + "identifier")
256 set_text(idel, identifier.get("id"))
258 # our internal identifier
259 url = app.config["BASE_URL"] + "/toc/" + record.toc_id
260 idel = etree.SubElement(oai_dc, self.DC + "identifier")
261 set_text(idel, url)
263 if bibjson.language is not None and len(bibjson.language) > 0:
264 for language in bibjson.language:
265 lang = etree.SubElement(oai_dc, self.DC + "language")
266 set_text(lang, language)
268 if bibjson.licenses is not None and len(bibjson.licenses) > 0:
269 for license in bibjson.licenses:
270 rights = etree.SubElement(oai_dc, self.DC + "rights")
271 set_text(rights, license.get("type"))
273 if bibjson.publisher is not None:
274 pub = etree.SubElement(oai_dc, self.DC + "publisher")
275 set_text(pub, bibjson.publisher)
277 # We have removed the list of URLs in in model v2, so we need to gather the URLS one by one
278 all_urls = [
279 bibjson.journal_url,
280 bibjson.aims_scope_url,
281 bibjson.author_instructions_url,
282 bibjson.waiver_url
283 ]
284 all_urls_dedupe = list(set(filter(None.__ne__, all_urls)))
286 for link in all_urls_dedupe:
287 urlel = etree.SubElement(oai_dc, self.DC + "relation")
288 set_text(urlel, link)
290 created = etree.SubElement(oai_dc, self.DC + "date")
291 set_text(created, normalise_date(record.created_date))
293 objecttype = etree.SubElement(oai_dc, self.DC + "type")
294 set_text(objecttype, "journal")
296 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords)
298 return metadata
300 def header(self, record):
301 bibjson = record.bibjson()
302 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
304 if not record.is_in_doaj():
305 head.set("status", "deleted")
307 identifier = etree.SubElement(head, self.PMH + "identifier")
308 set_text(identifier, make_oai_identifier(record.id, "journal"))
310 datestamp = etree.SubElement(head, self.PMH + "datestamp")
311 set_text(datestamp, normalise_date(record.last_updated))
313 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
314 return head
317class OAI_DOAJ_Article(OAI_Crosswalk):
318 """
319 ~~OAIDOAJArticleXML:Crosswalk->OAIPMH:Feature~~
320 ~~->DOAJArticleXML:Schema~~
321 """
322 OAI_DOAJ_NAMESPACE = "http://doaj.org/features/oai_doaj/1.0/"
323 OAI_DOAJ = "{%s}" % OAI_DOAJ_NAMESPACE
325 NSMAP = deepcopy(OAI_Crosswalk.NSMAP)
326 NSMAP.update({"oai_doaj": OAI_DOAJ_NAMESPACE})
328 def crosswalk(self, record):
329 if not record.is_in_doaj():
330 return None
332 bibjson = record.bibjson()
334 metadata = etree.Element(self.PMH + "metadata")
335 oai_doaj_article = etree.SubElement(metadata, self.OAI_DOAJ + "doajArticle", nsmap=self.NSMAP)
336 oai_doaj_article.set(self.XSI + "schemaLocation",
337 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd http://doaj.org/features/oai_doaj/1.0/ https://doaj.org/static/doaj/doajArticles.xsd")
339 # look up the journal's language
340 jlangs = bibjson.journal_language
341 # first, if there are any languages recorded, get the 3-char code
342 # corresponding to the first language
343 language = None
344 if jlangs:
345 if isinstance(jlangs, list):
346 jlang = jlangs[0]
347 lang = datasets.language_for(jlang)
348 if lang is not None:
349 language = lang.alpha_3
351 # if the language code lookup was successful, add it to the
352 # result
353 if language:
354 langel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "language")
355 set_text(langel, language)
357 if bibjson.publisher:
358 publel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisher")
359 set_text(publel, bibjson.publisher)
361 if bibjson.journal_title:
362 journtitel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "journalTitle")
363 set_text(journtitel, bibjson.journal_title)
365 # all the external identifiers (ISSNs, etc)
366 if bibjson.get_one_identifier(bibjson.P_ISSN):
367 issn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issn")
368 set_text(issn, bibjson.get_one_identifier(bibjson.P_ISSN))
370 if bibjson.get_one_identifier(bibjson.E_ISSN):
371 eissn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "eissn")
372 set_text(eissn, bibjson.get_one_identifier(bibjson.E_ISSN))
374 # work out the date of publication
375 date = bibjson.get_publication_date()
376 # convert it to the format required by the XML schema by parsing
377 # it into a Python datetime and getting it back out as string.
378 # If it's not coming back properly from the bibjson, throw it
379 # away.
380 try:
381 date = dates.parse(date)
382 date = date.strftime(FMT_DATE_STD)
383 except:
384 date = ""
386 if date:
387 monthyear = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publicationDate")
388 set_text(monthyear, date)
390 if bibjson.volume:
391 volume = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "volume")
392 set_text(volume, bibjson.volume)
394 if bibjson.number:
395 issue = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issue")
396 set_text(issue, bibjson.number)
398 if bibjson.start_page:
399 start_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "startPage")
400 set_text(start_page, bibjson.start_page)
402 if bibjson.end_page:
403 end_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "endPage")
404 set_text(end_page, bibjson.end_page)
406 if bibjson.get_one_identifier(bibjson.DOI):
407 doi = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "doi")
408 set_text(doi, bibjson.get_one_identifier(bibjson.DOI))
410 if record.publisher_record_id():
411 pubrecid = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisherRecordId")
412 set_text(pubrecid, record.publisher_record_id())
414 # document type
415 # as of Mar 2015 this was not being ingested when people upload XML
416 # conforming to the doajArticle schema, so it's not being output either
418 if bibjson.title is not None:
419 title = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "title")
420 set_text(title, bibjson.title)
422 affiliations = []
423 if bibjson.author:
424 authors_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "authors")
425 for author in bibjson.author: # bibjson.author is a list, despite the name
426 author_elem = etree.SubElement(authors_elem, self.OAI_DOAJ + "author")
427 if author.get('name'):
428 name_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "name")
429 set_text(name_elem, author.get('name'))
430 if author.get('email'):
431 email_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "email")
432 set_text(email_elem, author.get('email'))
433 if author.get('affiliation'):
434 new_affid = len(affiliations) # use the length of the list as the id for each new item
435 affiliations.append((new_affid, author['affiliation']))
436 author_affiliation_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "affiliationId")
437 set_text(author_affiliation_elem, str(new_affid))
438 if author.get('orcid_id'):
439 orcid_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "orcid_id")
440 set_text(orcid_elem, author.get("orcid_id"))
442 if affiliations:
443 affiliations_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "affiliationsList")
444 for affid, affiliation in affiliations:
445 attrib = {"affiliationId": str(affid)}
446 affiliation_elem = etree.SubElement(affiliations_elem, self.OAI_DOAJ + "affiliationName", **attrib)
447 set_text(affiliation_elem, affiliation)
449 if bibjson.abstract:
450 abstract = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "abstract")
451 set_text(abstract, bibjson.abstract)
453 ftobj = bibjson.get_single_url('fulltext', unpack_urlobj=False)
454 if ftobj:
455 attrib = {}
456 if "content_type" in ftobj:
457 attrib['format'] = ftobj['content_type']
459 fulltext_url_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "fullTextUrl", **attrib)
461 if "url" in ftobj:
462 set_text(fulltext_url_elem, ftobj['url'])
464 if bibjson.keywords:
465 keywords_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + 'keywords')
466 for keyword in bibjson.keywords:
467 kel = etree.SubElement(keywords_elem, self.OAI_DOAJ + 'keyword')
468 set_text(kel, keyword)
470 return metadata
472 def header(self, record):
473 bibjson = record.bibjson()
474 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
476 if not record.is_in_doaj():
477 head.set("status", "deleted")
479 identifier = etree.SubElement(head, self.PMH + "identifier")
480 set_text(identifier, make_oai_identifier(record.id, "article"))
482 datestamp = etree.SubElement(head, self.PMH + "datestamp")
483 set_text(datestamp, normalise_date(record.last_updated))
485 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
486 return head
489CROSSWALKS = {
490 "oai_dc": {
491 "article": OAI_DC_Article,
492 "journal": OAI_DC_Journal,
493 "article,article_tombstone": OAI_DC_Article
494 },
495 'oai_doaj': {
496 "article": OAI_DOAJ_Article,
497 "article,article_tombstone": OAI_DOAJ_Article
498 }
499}
502#####################################################################
503# Utility methods/objects
504#####################################################################
506def make_set_spec(setspec):
507 b = base64.urlsafe_b64encode(setspec.encode("utf-8"))
508 setspec_utf8 = b.decode("utf-8")
509 s = setspec_utf8.replace('=', '~')
510 return s
513def make_oai_identifier(identifier, qualifier):
514 return "oai:" + app.config.get("OAIPMH_IDENTIFIER_NAMESPACE") + "/" + qualifier + ":" + identifier
517def normalise_date(date):
518 # FIXME: do we need a more powerful date normalisation routine?
519 try:
520 dates.parse(date)
521 return date
522 except:
523 return "T".join(date.split(" ")) + "Z"
526###########################################################
527# XML Character encoding hacks
528###########################################################
530_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F),
531 (0x7F, 0x84), (0x86, 0x9F),
532 (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
533if sys.maxunicode >= 0x10000: # not narrow build
534 _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
535 (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
536 (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
537 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
538 (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
539 (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
540 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
541 (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
542_illegal_ranges = ["%s-%s" % (chr(low), chr(high))
543 for (low, high) in _illegal_unichrs]
544_illegal_xml_chars_RE = re.compile('[%s]' % ''.join(_illegal_ranges))
547def valid_XML_char_ordinal(i):
548 return ( # conditions ordered by presumed frequency
549 0x20 <= i <= 0xD7FF
550 or i in (0x9, 0xA, 0xD)
551 or 0xE000 <= i <= 0xFFFD
552 or 0x10000 <= i <= 0x10FFFF
553 )
556def clean_unreadable(input_string):
557 try:
558 if type(input_string) == str:
559 return _illegal_xml_chars_RE.sub("", input_string)
560 else:
561 return _illegal_xml_chars_RE.sub("", input_string.decode("utf-8"))
562 except TypeError as e:
563 app.logger.error("Unable to strip illegal XML chars from: {x}, {y}".format(x=input_string, y=type(input_string)))
564 return None
567def xml_clean(input_string):
568 cleaned_string = ''.join(c for c in input_string if valid_XML_char_ordinal(ord(c)))
569 return cleaned_string
572def set_text(element, input_string):
573 if input_string is None:
574 return
575 input_string = clean_unreadable(input_string)
576 try:
577 element.text = input_string
578 except ValueError:
579 element.text = xml_clean(input_string)