Coverage for portality/crosswalks/oaipmh.py: 64%
354 statements
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
« prev ^ index » next coverage.py v6.4.2, created at 2022-07-22 15:59 +0100
1import base64, sys, re
2from lxml import etree
3from datetime import datetime
4from portality.core import app
5from portality import datasets
6from copy import deepcopy
9#####################################################################
10# Crosswalks for OAI-PMH
11#####################################################################
13class OAI_Crosswalk(object):
14 PMH_NAMESPACE = "http://www.openarchives.org/OAI/2.0/"
15 PMH = "{%s}" % PMH_NAMESPACE
17 XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
18 XSI = "{%s}" % XSI_NAMESPACE
20 NSMAP = {None: PMH_NAMESPACE, "xsi": XSI_NAMESPACE}
22 def crosswalk(self, record):
23 raise NotImplementedError()
25 def header(self, record):
26 raise NotImplementedError()
28 def _generate_header_subjects(self, parent_element, subjects):
29 if subjects is None:
30 subjects = []
32 for subs in subjects:
33 scheme = subs.get("scheme", '')
34 term = subs.get("term", '')
36 if term:
37 prefix = ''
38 if scheme:
39 prefix = scheme + ':'
41 subel = etree.SubElement(parent_element, self.PMH + "setSpec")
42 set_text(subel, make_set_spec(prefix + term))
45class OAI_DC(OAI_Crosswalk):
46 """
47 ~~OAIDC:Crosswalk->OAIPMH:Feature~~
48 """
49 OAIDC_NAMESPACE = "http://www.openarchives.org/OAI/2.0/oai_dc/"
50 OAIDC = "{%s}" % OAIDC_NAMESPACE
52 DC_NAMESPACE = "http://purl.org/dc/elements/1.1/"
53 DC = "{%s}" % DC_NAMESPACE
55 NSMAP = deepcopy(OAI_Crosswalk.NSMAP)
56 NSMAP.update({"oai_dc": OAIDC_NAMESPACE, "dc": DC_NAMESPACE})
58 def _generate_subjects(self, parent_element, subjects, keywords):
59 if keywords is None:
60 keywords = []
61 if subjects is None:
62 subjects = []
64 for keyword in keywords:
65 subj = etree.SubElement(parent_element, self.DC + "subject")
66 set_text(subj, keyword)
68 for subs in subjects:
69 scheme = subs.get("scheme")
70 code = subs.get("code")
71 term = subs.get("term")
73 if scheme and scheme.lower() == 'lcc':
74 attrib = {"{{{nspace}}}type".format(nspace=self.XSI_NAMESPACE): "dcterms:LCC"}
75 termtext = term
76 codetext = code
77 else:
78 attrib = {}
79 termtext = scheme + ':' + term if term else None
80 codetext = scheme + ':' + code if code else None
82 if termtext:
83 subel = etree.SubElement(parent_element, self.DC + "subject", **attrib)
84 set_text(subel, termtext)
86 if codetext:
87 sel2 = etree.SubElement(parent_element, self.DC + "subject", **attrib)
88 set_text(sel2, codetext)
91class OAI_DC_Article(OAI_DC):
92 """
93 ~~OAIArticleXML:Crosswalk->OAIPMH:Feature~~
94 ~~->OAIDC:Crosswalk~~
95 """
96 def crosswalk(self, record):
97 bibjson = record.bibjson()
99 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP)
100 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc")
101 oai_dc.set(self.XSI + "schemaLocation",
102 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd")
104 if bibjson.title is not None:
105 title = etree.SubElement(oai_dc, self.DC + "title")
106 set_text(title, bibjson.title)
108 # all the external identifiers (ISSNs, etc)
109 for identifier in bibjson.get_identifiers():
110 idel = etree.SubElement(oai_dc, self.DC + "identifier")
111 set_text(idel, identifier.get("id"))
113 # our internal identifier
114 url = app.config['BASE_URL'] + "/article/" + record.id
115 idel = etree.SubElement(oai_dc, self.DC + "identifier")
116 set_text(idel, url)
118 # work out the date of publication
119 date = bibjson.get_publication_date()
120 if date != "":
121 monthyear = etree.SubElement(oai_dc, self.DC + "date")
122 set_text(monthyear, date)
124 for url in bibjson.get_urls():
125 urlel = etree.SubElement(oai_dc, self.DC + "relation")
126 set_text(urlel, url.get("url"))
128 for identifier in bibjson.get_identifiers(idtype=bibjson.P_ISSN) + bibjson.get_identifiers(idtype=bibjson.E_ISSN):
129 journallink = etree.SubElement(oai_dc, self.DC + "relation")
130 set_text(journallink, app.config['BASE_URL'] + "/toc/" + identifier)
132 if bibjson.abstract is not None:
133 abstract = etree.SubElement(oai_dc, self.DC + "description")
134 set_text(abstract, bibjson.abstract)
136 if len(bibjson.author) > 0:
137 for author in bibjson.author:
138 ael = etree.SubElement(oai_dc, self.DC + "creator")
139 set_text(ael, author.get("name"))
140 if author.get("orcid_id"):
141 ael.set('id', author.get("orcid_id"))
143 if bibjson.publisher is not None:
144 pubel = etree.SubElement(oai_dc, self.DC + "publisher")
145 set_text(pubel, bibjson.publisher)
147 objecttype = etree.SubElement(oai_dc, self.DC + "type")
148 set_text(objecttype, "article")
150 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords)
152 jlangs = bibjson.journal_language
153 if jlangs is not None:
154 for language in jlangs:
155 langel = etree.SubElement(oai_dc, self.DC + "language")
156 set_text(langel, language)
158 citation = self._make_citation(bibjson)
159 if citation is not None:
160 cite = etree.SubElement(oai_dc, self.DC + "source")
161 set_text(cite, citation)
163 return metadata
165 def header(self, record):
166 bibjson = record.bibjson()
167 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
169 identifier = etree.SubElement(head, self.PMH + "identifier")
170 set_text(identifier, make_oai_identifier(record.id, "article"))
172 datestamp = etree.SubElement(head, self.PMH + "datestamp")
173 set_text(datestamp, normalise_date(record.last_updated))
175 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
176 return head
178 def _make_citation(self, bibjson):
179 # [title], Vol [vol], Iss [iss], Pp [start]-end (year)
180 ctitle = bibjson.journal_title
181 cvol = bibjson.volume
182 ciss = bibjson.number
183 cstart = bibjson.start_page
184 cend = bibjson.end_page
185 cyear = bibjson.year
187 citation = ""
188 if ctitle is not None:
189 citation += ctitle
191 if cvol is not None:
192 if citation != "":
193 citation += ", "
194 citation += "Vol " + cvol
196 if ciss is not None:
197 if citation != "":
198 citation += ", "
199 citation += "Iss " + ciss
201 if cstart is not None or cend is not None:
202 if citation != "":
203 citation += ", "
204 if (cstart is None and cend is not None) or (cstart is not None and cend is None):
205 citation += "p "
206 else:
207 citation += "Pp "
208 if cstart is not None:
209 citation += cstart
210 if cend is not None:
211 if cstart is not None:
212 citation += "-"
213 citation += cend
215 if cyear is not None:
216 if citation != "":
217 citation += " "
218 citation += "(" + cyear + ")"
220 return citation if citation != "" else None
223class OAI_DC_Journal(OAI_DC):
224 """
225 ~~OAIJournalXML:Crosswalk->OAIPMH:Feature~~
226 ~~->OAIDC:Crosswalk~~
227 """
228 def crosswalk(self, record):
229 bibjson = record.bibjson()
231 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP)
232 oai_dc = etree.SubElement(metadata, self.OAIDC + "dc")
233 oai_dc.set(self.XSI + "schemaLocation",
234 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd")
236 if bibjson.title is not None:
237 title = etree.SubElement(oai_dc, self.DC + "title")
238 set_text(title, bibjson.title)
240 # external identifiers (ISSNs, etc)
241 for identifier in bibjson.get_identifiers():
242 idel = etree.SubElement(oai_dc, self.DC + "identifier")
243 set_text(idel, identifier.get("id"))
245 # our internal identifier
246 url = app.config["BASE_URL"] + "/toc/" + record.toc_id
247 idel = etree.SubElement(oai_dc, self.DC + "identifier")
248 set_text(idel, url)
250 if bibjson.language is not None and len(bibjson.language) > 0:
251 for language in bibjson.language:
252 lang = etree.SubElement(oai_dc, self.DC + "language")
253 set_text(lang, language)
255 if bibjson.licenses is not None and len(bibjson.licenses) > 0:
256 for license in bibjson.licenses:
257 rights = etree.SubElement(oai_dc, self.DC + "rights")
258 set_text(rights, license.get("type"))
260 if bibjson.publisher is not None:
261 pub = etree.SubElement(oai_dc, self.DC + "publisher")
262 set_text(pub, bibjson.publisher)
264 # We have removed the list of URLs in in model v2, so we need to gather the URLS one by one
265 all_urls = [
266 bibjson.oa_statement_url,
267 bibjson.journal_url,
268 bibjson.aims_scope_url,
269 bibjson.author_instructions_url,
270 bibjson.waiver_url
271 ]
272 all_urls_dedupe = list(set(filter(None.__ne__, all_urls)))
274 for link in all_urls_dedupe:
275 urlel = etree.SubElement(oai_dc, self.DC + "relation")
276 set_text(urlel, link)
278 created = etree.SubElement(oai_dc, self.DC + "date")
279 set_text(created, normalise_date(record.created_date))
281 objecttype = etree.SubElement(oai_dc, self.DC + "type")
282 set_text(objecttype, "journal")
284 self._generate_subjects(parent_element=oai_dc, subjects=bibjson.subjects(), keywords=bibjson.keywords)
286 return metadata
288 def header(self, record):
289 bibjson = record.bibjson()
290 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
292 identifier = etree.SubElement(head, self.PMH + "identifier")
293 set_text(identifier, make_oai_identifier(record.id, "journal"))
295 datestamp = etree.SubElement(head, self.PMH + "datestamp")
296 set_text(datestamp, normalise_date(record.last_updated))
298 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
299 return head
302class OAI_DOAJ_Article(OAI_Crosswalk):
303 """
304 ~~OAIDOAJArticleXML:Crosswalk->OAIPMH:Feature~~
305 ~~->DOAJArticleXML:Schema~~
306 """
307 OAI_DOAJ_NAMESPACE = "http://doaj.org/features/oai_doaj/1.0/"
308 OAI_DOAJ = "{%s}" % OAI_DOAJ_NAMESPACE
310 NSMAP = deepcopy(OAI_Crosswalk.NSMAP)
311 NSMAP.update({"oai_doaj": OAI_DOAJ_NAMESPACE})
313 def crosswalk(self, record):
314 bibjson = record.bibjson()
316 metadata = etree.Element(self.PMH + "metadata", nsmap=self.NSMAP)
317 oai_doaj_article = etree.SubElement(metadata, self.OAI_DOAJ + "doajArticle")
318 oai_doaj_article.set(self.XSI + "schemaLocation",
319 "http://www.openarchives.org/OAI/2.0/ http://www.openarchives.org/OAI/2.0/OAI-PMH.xsd http://doaj.org/features/oai_doaj/1.0/ https://doaj.org/static/doaj/doajArticles.xsd")
321 # look up the journal's language
322 jlangs = bibjson.journal_language
323 # first, if there are any languages recorded, get the 3-char code
324 # corresponding to the first language
325 language = None
326 if jlangs:
327 if isinstance(jlangs, list):
328 jlang = jlangs[0]
329 lang = datasets.language_for(jlang)
330 if lang is not None:
331 language = lang.alpha_3
333 # if the language code lookup was successful, add it to the
334 # result
335 if language:
336 langel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "language")
337 set_text(langel, language)
339 if bibjson.publisher:
340 publel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisher")
341 set_text(publel, bibjson.publisher)
343 if bibjson.journal_title:
344 journtitel = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "journalTitle")
345 set_text(journtitel, bibjson.journal_title)
347 # all the external identifiers (ISSNs, etc)
348 if bibjson.get_one_identifier(bibjson.P_ISSN):
349 issn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issn")
350 set_text(issn, bibjson.get_one_identifier(bibjson.P_ISSN))
352 if bibjson.get_one_identifier(bibjson.E_ISSN):
353 eissn = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "eissn")
354 set_text(eissn, bibjson.get_one_identifier(bibjson.E_ISSN))
356 # work out the date of publication
357 date = bibjson.get_publication_date()
358 # convert it to the format required by the XML schema by parsing
359 # it into a Python datetime and getting it back out as string.
360 # If it's not coming back properly from the bibjson, throw it
361 # away.
362 try:
363 date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
364 date = date.strftime("%Y-%m-%d")
365 except:
366 date = ""
368 if date:
369 monthyear = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publicationDate")
370 set_text(monthyear, date)
372 if bibjson.volume:
373 volume = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "volume")
374 set_text(volume, bibjson.volume)
376 if bibjson.number:
377 issue = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "issue")
378 set_text(issue, bibjson.number)
380 if bibjson.start_page:
381 start_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "startPage")
382 set_text(start_page, bibjson.start_page)
384 if bibjson.end_page:
385 end_page = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "endPage")
386 set_text(end_page, bibjson.end_page)
388 if bibjson.get_one_identifier(bibjson.DOI):
389 doi = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "doi")
390 set_text(doi, bibjson.get_one_identifier(bibjson.DOI))
392 if record.publisher_record_id():
393 pubrecid = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "publisherRecordId")
394 set_text(pubrecid, record.publisher_record_id())
396 # document type
397 # as of Mar 2015 this was not being ingested when people upload XML
398 # conforming to the doajArticle schema, so it's not being output either
400 if bibjson.title is not None:
401 title = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "title")
402 set_text(title, bibjson.title)
404 affiliations = []
405 if bibjson.author:
406 authors_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "authors")
407 for author in bibjson.author: # bibjson.author is a list, despite the name
408 author_elem = etree.SubElement(authors_elem, self.OAI_DOAJ + "author")
409 if author.get('name'):
410 name_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "name")
411 set_text(name_elem, author.get('name'))
412 if author.get('email'):
413 email_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "email")
414 set_text(email_elem, author.get('email'))
415 if author.get('affiliation'):
416 new_affid = len(affiliations) # use the length of the list as the id for each new item
417 affiliations.append((new_affid, author['affiliation']))
418 author_affiliation_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "affiliationId")
419 set_text(author_affiliation_elem, str(new_affid))
420 if author.get('orcid_id'):
421 orcid_elem = etree.SubElement(author_elem, self.OAI_DOAJ + "orcid_id")
422 set_text(orcid_elem, author.get("orcid_id"))
424 if affiliations:
425 affiliations_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "affiliationsList")
426 for affid, affiliation in affiliations:
427 attrib = {"affiliationId": str(affid)}
428 affiliation_elem = etree.SubElement(affiliations_elem, self.OAI_DOAJ + "affiliationName", **attrib)
429 set_text(affiliation_elem, affiliation)
431 if bibjson.abstract:
432 abstract = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "abstract")
433 set_text(abstract, bibjson.abstract)
435 ftobj = bibjson.get_single_url('fulltext', unpack_urlobj=False)
436 if ftobj:
437 attrib = {}
438 if "content_type" in ftobj:
439 attrib['format'] = ftobj['content_type']
441 fulltext_url_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + "fullTextUrl", **attrib)
443 if "url" in ftobj:
444 set_text(fulltext_url_elem, ftobj['url'])
446 if bibjson.keywords:
447 keywords_elem = etree.SubElement(oai_doaj_article, self.OAI_DOAJ + 'keywords')
448 for keyword in bibjson.keywords:
449 kel = etree.SubElement(keywords_elem, self.OAI_DOAJ + 'keyword')
450 set_text(kel, keyword)
452 return metadata
454 def header(self, record):
455 bibjson = record.bibjson()
456 head = etree.Element(self.PMH + "header", nsmap=self.NSMAP)
458 identifier = etree.SubElement(head, self.PMH + "identifier")
459 set_text(identifier, make_oai_identifier(record.id, "article"))
461 datestamp = etree.SubElement(head, self.PMH + "datestamp")
462 set_text(datestamp, normalise_date(record.last_updated))
464 self._generate_header_subjects(parent_element=head, subjects=bibjson.subjects())
465 return head
468CROSSWALKS = {
469 "oai_dc": {
470 "article": OAI_DC_Article,
471 "journal": OAI_DC_Journal
472 },
473 'oai_doaj': {
474 "article": OAI_DOAJ_Article
475 }
476}
479#####################################################################
480# Utility methods/objects
481#####################################################################
483def make_set_spec(setspec):
484 b = base64.urlsafe_b64encode(setspec.encode("utf-8"))
485 setspec_utf8 = b.decode("utf-8")
486 s = setspec_utf8.replace('=', '~')
487 return s
490def make_oai_identifier(identifier, qualifier):
491 return "oai:" + app.config.get("OAIPMH_IDENTIFIER_NAMESPACE") + "/" + qualifier + ":" + identifier
494def normalise_date(date):
495 # FIXME: do we need a more powerful date normalisation routine?
496 try:
497 datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
498 return date
499 except:
500 return "T".join(date.split(" ")) + "Z"
503###########################################################
504# XML Character encoding hacks
505###########################################################
507_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F),
508 (0x7F, 0x84), (0x86, 0x9F),
509 (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
510if sys.maxunicode >= 0x10000: # not narrow build
511 _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
512 (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
513 (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
514 (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
515 (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
516 (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
517 (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
518 (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
519_illegal_ranges = ["%s-%s" % (chr(low), chr(high))
520 for (low, high) in _illegal_unichrs]
521_illegal_xml_chars_RE = re.compile('[%s]' % ''.join(_illegal_ranges))
524def valid_XML_char_ordinal(i):
525 return ( # conditions ordered by presumed frequency
526 0x20 <= i <= 0xD7FF
527 or i in (0x9, 0xA, 0xD)
528 or 0xE000 <= i <= 0xFFFD
529 or 0x10000 <= i <= 0x10FFFF
530 )
533def clean_unreadable(input_string):
534 try:
535 if type(input_string) == str:
536 return _illegal_xml_chars_RE.sub("", input_string)
537 else:
538 return _illegal_xml_chars_RE.sub("", input_string.decode("utf-8"))
539 except TypeError as e:
540 app.logger.error("Unable to strip illegal XML chars from: {x}, {y}".format(x=input_string, y=type(input_string)))
541 return None
544def xml_clean(input_string):
545 cleaned_string = ''.join(c for c in input_string if valid_XML_char_ordinal(ord(c)))
546 return cleaned_string
549def set_text(element, input_string):
550 if input_string is None:
551 return
552 input_string = clean_unreadable(input_string)
553 try:
554 element.text = input_string
555 except ValueError:
556 element.text = xml_clean(input_string)