"""PROV-RDF serializers for ProvDocument
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
__author__ = 'Satrajit S. Ghosh'
__email__ = 'satra@mit.edu'
import logging
logger = logging.getLogger('rdf')
from collections import defaultdict, OrderedDict
import datetime
import io
from prov.serializers import Serializer, Error
from prov.constants import *
from prov.model import (Literal, Identifier, QualifiedName,
Namespace, ProvDocument, ProvBundle, first,
parse_xsd_datetime, ProvRecord)
from six import text_type
import base64
import datetime
import dateutil.parser
import prov.model as pm
attr2rdf = lambda attr: URIRef(PROV[PROV_ID_ATTRIBUTES_MAP[attr].split('prov:')[1]].uri)
from rdflib.term import URIRef, BNode
from rdflib.term import Literal as RDFLiteral
from rdflib.graph import ConjunctiveGraph
from rdflib.namespace import RDF, RDFS, XSD
class ProvRDFException(Error):
pass
class AnonymousIDGenerator():
def __init__(self):
self._cache = {}
self._count = 0
def get_anon_id(self, obj, local_prefix="id"):
if obj not in self._cache:
self._count += 1
self._cache[obj] = Identifier('_:%s%d' % (local_prefix,
self._count)).uri
return self._cache[obj]
# Reverse map for prov.model.XSD_DATATYPE_PARSERS
LITERAL_XSDTYPE_MAP = {
float: XSD['double'],
int: XSD['int'],
text_type: XSD['string'],
# boolean, string values are supported natively by PROV-RDF
# datetime values are converted separately
}
# Add long on Python 2
if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP:
LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = XSD['long']
def valid_qualified_name(bundle, value, xsd_qname=False):
if value is None:
return None
qualified_name = bundle.valid_qualified_name(value)
return qualified_name if not xsd_qname else XSD_QNAME(qualified_name)
[docs]class ProvRDFSerializer(Serializer):
"""
PROV-O serializer for :class:`~prov.model.ProvDocument`
"""
[docs] def serialize(self, stream=None, rdf_format='trig', **kwargs):
"""
Serializes a :class:`~prov.model.ProvDocument` instance to
`Prov-O <https://www.w3.org/TR/prov-o/>`_.
:param stream: Where to save the output.
"""
container = self.encode_document(self.document)
newargs = kwargs.copy()
newargs['format'] = rdf_format
if newargs['format'] == 'trig':
gr = ConjunctiveGraph()
gr.context_aware = True
gr.parse(data=container.serialize(format='nquads'), format='nquads')
for namespace in container.namespaces():
if namespace not in list(gr.namespaces()):
gr.bind(namespace[0], namespace[1])
container = gr
if six.PY2:
buf = io.BytesIO()
try:
container.serialize(buf, **newargs)
buf.seek(0, 0)
# Right now this is a bytestream. If the object to stream to is
# a text object is must be decoded. We assume utf-8 here which
# should be fine for almost every case.
if isinstance(stream, io.TextIOBase):
stream.write(buf.read().decode('utf-8'))
else:
stream.write(buf.read())
finally:
buf.close()
else:
buf = io.BytesIO()
try:
container.serialize(buf, **newargs)
buf.seek(0, 0)
# Right now this is a bytestream. If the object to stream to is
# a text object is must be decoded. We assume utf-8 here which
# should be fine for almost every case.
if isinstance(stream, io.TextIOBase):
stream.write(buf.read().decode('utf-8'))
else:
stream.write(buf.read()) #.encode('utf-8'))
finally:
buf.close()
[docs] def deserialize(self, stream, rdf_format='trig', **kwargs):
"""
Deserialize from the `Prov-O <https://www.w3.org/TR/prov-o/>`_
representation to a :class:`~prov.model.ProvDocument` instance.
:param stream: Input data.
"""
newargs = kwargs.copy()
newargs['format'] = rdf_format
container = ConjunctiveGraph()
container.parse(stream, **newargs)
document = ProvDocument()
self.document = document
self.decode_document(container, document)
return document
def valid_identifier(self, value):
return self.document.valid_qualified_name(value)
def encode_rdf_representation(self, value):
if isinstance(value, URIRef):
return value
elif isinstance(value, Literal):
return literal_rdf_representation(value)
elif isinstance(value, datetime.datetime):
return RDFLiteral(value.isoformat(), datatype=XSD['dateTime'])
elif isinstance(value, QualifiedName):
return URIRef(value.uri)
elif isinstance(value, Identifier):
return RDFLiteral(value.uri, datatype=XSD['anyURI'])
elif type(value) in LITERAL_XSDTYPE_MAP:
return RDFLiteral(value, datatype=LITERAL_XSDTYPE_MAP[type(value)])
else:
return RDFLiteral(value)
def decode_rdf_representation(self, literal, graph):
if isinstance(literal, RDFLiteral):
value = literal.value if literal.value is not None else literal
datatype = literal.datatype if hasattr(literal, 'datatype') else None
langtag = literal.language if hasattr(literal, 'language') else None
if datatype and 'XMLLiteral' in datatype:
value = literal
if datatype and 'base64Binary' in datatype:
value = base64.standard_b64encode(value)
if datatype == XSD['QName']:
return pm.Literal(literal, datatype=XSD_QNAME)
if datatype == XSD['dateTime']:
return dateutil.parser.parse(literal)
else:
# The literal of standard Python types is not converted here
# It will be automatically converted when added to a record by _auto_literal_conversion()
return Literal(value, self.valid_identifier(datatype), langtag)
elif isinstance(literal, URIRef):
rval = self.valid_identifier(literal)
if rval is None:
prefix, iri, _ = graph.namespace_manager.compute_qname(literal)
ns = self.document.add_namespace(prefix, iri)
rval = pm.QualifiedName(ns, literal.replace(ns.uri, ''))
return rval
else:
# simple type, just return it
return literal
def encode_document(self, document):
container = self.encode_container(document)
for item in document.bundles:
# encoding the sub-bundle
bundle = self.encode_container(item, identifier=item.identifier.uri)
container.addN(bundle.quads())
return container
def encode_container(self, bundle, container=None, identifier=None):
if container is None:
container = ConjunctiveGraph(identifier=identifier)
nm = container.namespace_manager
nm.bind('prov', PROV.uri)
for namespace in bundle.namespaces:
container.bind(namespace.prefix, namespace.uri)
id_generator = AnonymousIDGenerator()
real_or_anon_id = lambda record: record._identifier.uri if \
record._identifier else id_generator.get_anon_id(record)
for record in bundle._records:
rec_type = record.get_type()
if hasattr(record, 'identifier') and record.identifier:
identifier = URIRef(text_type(real_or_anon_id(record)))
container.add((identifier, RDF.type, URIRef(rec_type.uri)))
else:
identifier = None
if record.attributes:
bnode = None
formal_objects = []
used_objects = []
all_attributes = list(record.formal_attributes) + list(record.attributes)
formal_qualifiers = False
for attrid, (attr, value) in enumerate(list(record.formal_attributes)):
if (identifier is not None and value is not None) or \
(identifier is None and value is not None and attrid > 1):
formal_qualifiers = True
has_qualifiers = len(record.extra_attributes) > 0 or formal_qualifiers
for idx, (attr, value) in enumerate(all_attributes):
if record.is_relation():
pred = URIRef(PROV[PROV_N_MAP[rec_type]].uri)
# create bnode relation
if bnode is None:
valid_formal_indices = set()
for idx, (key, val) in enumerate(record.formal_attributes):
formal_objects.append(key)
if val:
valid_formal_indices.add(idx)
used_objects = [record.formal_attributes[0][0]]
subj = None
if record.formal_attributes[0][1]:
subj = URIRef(record.formal_attributes[0][1].uri)
if identifier is None and subj is not None:
try:
obj_val = record.formal_attributes[1][1]
obj_attr = URIRef(record.formal_attributes[1][0].uri)
except IndexError:
obj_val = None
if obj_val and (rec_type not in [PROV_END,
PROV_START,
PROV_USAGE,
PROV_GENERATION,
PROV_DERIVATION,
PROV_INVALIDATION] or
(valid_formal_indices == {0, 1} and
len(record.extra_attributes) == 0)):
used_objects.append(record.formal_attributes[1][0])
obj_val = self.encode_rdf_representation(obj_val)
if rec_type == PROV_ALTERNATE:
subj, obj_val = obj_val, subj
container.add((subj, pred, obj_val))
if rec_type == PROV_MENTION:
if record.formal_attributes[2][1]:
used_objects.append(record.formal_attributes[2][0])
obj_val = self.encode_rdf_representation(record.formal_attributes[2][1])
container.add((subj, URIRef(PROV['asInBundle'].uri), obj_val))
has_qualifiers = False
if rec_type in [PROV_ALTERNATE]: #, PROV_ASSOCIATION]:
continue
if subj and (has_qualifiers or identifier): #and (len(record.extra_attributes) > 0 or identifier):
qualifier = rec_type._localpart
rec_uri = rec_type.uri
for attr_name, val in record.extra_attributes:
if attr_name == PROV['type']:
if PROV['Revision'] == val or \
PROV['Quotation'] == val or \
PROV['PrimarySource'] == val:
qualifier = val._localpart
rec_uri = val.uri
if identifier is not None:
container.remove((identifier,
RDF.type,
URIRef(rec_type.uri)))
QRole = URIRef(PROV['qualified' + qualifier].uri)
if identifier is not None:
container.add((subj, QRole, identifier))
else:
bnode = identifier = BNode()
container.add((subj, QRole, identifier))
container.add((identifier, RDF.type,
URIRef(rec_uri)))
# reset identifier to BNode
if value is not None and attr not in used_objects:
if attr in formal_objects:
pred = attr2rdf(attr)
elif attr == PROV['role']:
pred = URIRef(PROV['hadRole'].uri)
elif attr == PROV['plan']:
pred = URIRef(PROV['hadPlan'].uri)
elif attr == PROV['type']:
pred = RDF.type
elif attr == PROV['label']:
pred = RDFS.label
elif isinstance(attr, QualifiedName):
pred = URIRef(attr.uri)
else:
pred = self.encode_rdf_representation(attr)
if PROV['plan'].uri in pred:
pred = URIRef(PROV['hadPlan'].uri)
if PROV['informant'].uri in pred:
pred = URIRef(PROV['activity'].uri)
if PROV['responsible'].uri in pred:
pred = URIRef(PROV['agent'].uri)
if rec_type == PROV_DELEGATION and PROV['activity'].uri in pred:
pred = URIRef(PROV['hadActivity'].uri)
if (rec_type in [PROV_END, PROV_START] and PROV['trigger'].uri in pred) or\
(rec_type in [PROV_USAGE] and PROV['used'].uri in pred):
pred = URIRef(PROV['entity'].uri)
if rec_type in [PROV_GENERATION, PROV_END,
PROV_START, PROV_USAGE,
PROV_INVALIDATION]:
if PROV['time'].uri in pred:
pred = URIRef(PROV['atTime'].uri)
if PROV['ender'].uri in pred:
pred = URIRef(PROV['hadActivity'].uri)
if PROV['starter'].uri in pred:
pred = URIRef(PROV['hadActivity'].uri)
if PROV['location'].uri in pred:
pred = URIRef(PROV['atLocation'].uri)
if rec_type in [PROV_ACTIVITY]:
if PROV_ATTR_STARTTIME in pred:
pred = URIRef(PROV['startedAtTime'].uri)
if PROV_ATTR_ENDTIME in pred:
pred = URIRef(PROV['endedAtTime'].uri)
if rec_type == PROV_DERIVATION:
if PROV['activity'].uri in pred:
pred = URIRef(PROV['hadActivity'].uri)
if PROV['generation'].uri in pred:
pred = URIRef(PROV['hadGeneration'].uri)
if PROV['usage'].uri in pred:
pred = URIRef(PROV['hadUsage'].uri)
if PROV['usedEntity'].uri in pred:
pred = URIRef(PROV['entity'].uri)
container.add((identifier, pred,
self.encode_rdf_representation(value)))
continue
if value is None:
continue
if isinstance(value, ProvRecord):
obj = URIRef(text_type(real_or_anon_id(value)))
else:
# Assuming this is a datetime value
obj = self.encode_rdf_representation(value)
if attr == PROV['location']:
pred = URIRef(PROV['atLocation'].uri)
if False and isinstance(value, (URIRef, QualifiedName)):
if isinstance(value, QualifiedName):
value = URIRef(value.uri)
container.add((identifier, pred, value))
else:
container.add((identifier, pred,
self.encode_rdf_representation(obj)))
continue
if attr == PROV['type']:
pred = RDF.type
elif attr == PROV['label']:
pred = RDFS.label
elif attr == PROV_ATTR_STARTTIME:
pred = URIRef(PROV['startedAtTime'].uri)
elif attr == PROV_ATTR_ENDTIME:
pred = URIRef(PROV['endedAtTime'].uri)
else:
pred = self.encode_rdf_representation(attr)
container.add((identifier, pred, obj))
return container
def decode_document(self, content, document):
for prefix, url in content.namespaces():
document.add_namespace(prefix, text_type(url))
if hasattr(content, 'contexts'):
for graph in content.contexts():
if isinstance(graph.identifier, BNode):
self.decode_container(graph, document)
else:
bundle_id = text_type(graph.identifier)
bundle = document.bundle(bundle_id)
self.decode_container(graph, bundle)
else:
self.decode_container(content, document)
def decode_container(self, graph, bundle):
ids = {}
PROV_CLS_MAP = {}
formal_attributes = {}
unique_sets = {}
for key, val in PROV_BASE_CLS.items():
PROV_CLS_MAP[key.uri] = PROV_BASE_CLS[key]
relation_mapper = {URIRef(PROV['alternateOf'].uri): 'alternate',
URIRef(PROV['actedOnBehalfOf'].uri): 'delegation',
URIRef(PROV['specializationOf'].uri): 'specialization',
URIRef(PROV['mentionOf'].uri): 'mention',
URIRef(PROV['wasAssociatedWith'].uri): 'association',
URIRef(PROV['wasDerivedFrom'].uri): 'derivation',
URIRef(PROV['wasAttributedTo'].uri): 'attribution',
URIRef(PROV['wasInformedBy'].uri): 'communication',
URIRef(PROV['wasGeneratedBy'].uri): 'generation',
URIRef(PROV['wasInfluencedBy'].uri): 'influence',
URIRef(PROV['wasInvalidatedBy'].uri): 'invalidation',
URIRef(PROV['wasEndedBy'].uri): 'end',
URIRef(PROV['wasStartedBy'].uri): 'start',
URIRef(PROV['hadMember'].uri): 'membership',
URIRef(PROV['used'].uri): 'usage',
}
predicate_mapper = {RDFS.label: pm.PROV['label'],
URIRef(PROV['atLocation'].uri): PROV_LOCATION,
URIRef(PROV['startedAtTime'].uri): PROV_ATTR_STARTTIME,
URIRef(PROV['endedAtTime'].uri): PROV_ATTR_ENDTIME,
URIRef(PROV['atTime'].uri): PROV_ATTR_TIME,
URIRef(PROV['hadRole'].uri): PROV_ROLE,
URIRef(PROV['hadPlan'].uri): pm.PROV_ATTR_PLAN,
URIRef(PROV['hadUsage'].uri): pm.PROV_ATTR_USAGE,
URIRef(PROV['hadGeneration'].uri): pm.PROV_ATTR_GENERATION,
URIRef(PROV['hadActivity'].uri): pm.PROV_ATTR_ACTIVITY,
}
other_attributes = {}
for stmt in graph.triples((None, RDF.type, None)):
id = text_type(stmt[0])
obj = text_type(stmt[2])
if obj in PROV_CLS_MAP:
if not isinstance(stmt[0], BNode) and self.valid_identifier(id) is None:
prefix, iri, _ = graph.namespace_manager.compute_qname(id)
self.document.add_namespace(prefix, iri)
try:
prov_obj = PROV_CLS_MAP[obj]
except AttributeError:
prov_obj = None
add_attr = True
isderivation = pm.PROV['Revision'].uri in stmt[2] or \
pm.PROV['Quotation'].uri in stmt[2] or \
pm.PROV['PrimarySource'].uri in stmt[2]
if id not in ids and prov_obj and (prov_obj.uri == obj or
isderivation or
isinstance(stmt[0], BNode)):
ids[id] = prov_obj
klass = pm.PROV_REC_CLS[prov_obj]
formal_attributes[id] = OrderedDict([(key, None) for key in klass.FORMAL_ATTRIBUTES])
unique_sets[id] = OrderedDict([(key, []) for key in klass.FORMAL_ATTRIBUTES])
add_attr = False or ((isinstance(stmt[0], BNode) or isderivation) and prov_obj.uri != obj)
if add_attr:
if id not in other_attributes:
other_attributes[id] = []
obj_formatted = self.decode_rdf_representation(stmt[2], graph)
other_attributes[id].append((pm.PROV['type'], obj_formatted))
else:
if id not in other_attributes:
other_attributes[id] = []
obj = self.decode_rdf_representation(stmt[2], graph)
other_attributes[id].append((pm.PROV['type'], obj))
for id, pred, obj in graph:
id = text_type(id)
if id not in other_attributes:
other_attributes[id] = []
if pred == RDF.type:
continue
if pred in relation_mapper:
if 'alternateOf' in pred:
getattr(bundle, relation_mapper[pred])(obj, id)
elif 'mentionOf' in pred:
mentionBundle = None
for stmt in graph.triples((URIRef(id), URIRef(pm.PROV['asInBundle'].uri), None)):
mentionBundle = stmt[2]
getattr(bundle, relation_mapper[pred])(id, text_type(obj), mentionBundle)
elif 'actedOnBehalfOf' in pred or 'wasAssociatedWith' in pred:
qualifier = 'qualified' + relation_mapper[pred].upper()[0] + relation_mapper[pred][1:]
qualifier_bnode = None
for stmt in graph.triples((URIRef(id), URIRef(pm.PROV[qualifier].uri), None)):
qualifier_bnode = stmt[2]
if qualifier_bnode is None:
getattr(bundle, relation_mapper[pred])(id, text_type(obj))
else:
fakeys = list(formal_attributes[text_type(qualifier_bnode)].keys())
formal_attributes[text_type(qualifier_bnode)][fakeys[0]] = id
formal_attributes[text_type(qualifier_bnode)][fakeys[1]] = text_type(obj)
else:
getattr(bundle, relation_mapper[pred])(id, text_type(obj))
elif id in ids:
obj1 = self.decode_rdf_representation(obj, graph)
if obj is not None and obj1 is None:
raise ValueError(('Error transforming', obj))
pred_new = pred
if pred in predicate_mapper:
pred_new = predicate_mapper[pred]
if ids[id] == PROV_COMMUNICATION and 'activity' in text_type(pred_new):
pred_new = PROV_ATTR_INFORMANT
if ids[id] == PROV_DELEGATION and 'agent' in text_type(pred_new):
pred_new = PROV_ATTR_RESPONSIBLE
if ids[id] in [PROV_END, PROV_START] and 'entity' in text_type(pred_new):
pred_new = PROV_ATTR_TRIGGER
if ids[id] in [PROV_END] and 'activity' in text_type(pred_new):
pred_new = PROV_ATTR_ENDER
if ids[id] in [PROV_START] and 'activity' in text_type(pred_new):
pred_new = PROV_ATTR_STARTER
if ids[id] == PROV_DERIVATION and 'entity' in text_type(pred_new):
pred_new = PROV_ATTR_USED_ENTITY
if text_type(pred_new) in [val.uri for val in formal_attributes[id]]:
qname_key = self.valid_identifier(pred_new)
formal_attributes[id][qname_key] = obj1
unique_sets[id][qname_key].append(obj1)
if len(unique_sets[id][qname_key]) > 1:
formal_attributes[id][qname_key] = None
else:
if 'qualified' not in text_type(pred_new) and \
'asInBundle' not in text_type(pred_new):
other_attributes[id].append((text_type(pred_new), obj1))
local_key = text_type(obj)
if local_key in ids:
if 'qualified' in pred:
formal_attributes[local_key][list(formal_attributes[local_key].keys())[0]] = id
for id in ids:
attrs = None
if id in other_attributes:
attrs = other_attributes[id]
items_to_walk = []
for qname, values in unique_sets[id].items():
if values and len(values) > 1:
items_to_walk.append((qname, values))
if items_to_walk:
for subset in list(walk(items_to_walk)):
for key, value in subset.items():
formal_attributes[id][key] = value
bundle.new_record(ids[id], id, formal_attributes[id], attrs)
else:
bundle.new_record(ids[id], id, formal_attributes[id], attrs)
ids[id] = None
if attrs is not None:
other_attributes[id] = []
for key, val in other_attributes.items():
if val:
ids[key].add_attributes(val)
[docs]def walk(children, level=0, path=None, usename=True):
"""Generate all the full paths in a tree, as a dict.
Examples
--------
>>> from nipype.pipeline.engine.utils import walk
>>> iterables = [('a', lambda: [1, 2]), ('b', lambda: [3, 4])]
>>> [val['a'] for val in walk(iterables)]
[1, 1, 2, 2]
>>> [val['b'] for val in walk(iterables)]
[3, 4, 3, 4]
"""
# Entry point
if level == 0:
path = {}
# Exit condition
if not children:
yield path.copy()
return
# Tree recursion
head, tail = children[0], children[1:]
name, func = head
for child in func:
# We can use the arg name or the tree level as a key
if usename:
path[name] = child
else:
path[level] = child
# Recurse into the next level
for child_paths in walk(tail, level + 1, path, usename):
yield child_paths
def literal_rdf_representation(literal):
value = text_type(literal.value) if literal.value else literal
if literal.langtag:
# a language tag can only go with prov:InternationalizedString
return RDFLiteral(value, lang=str(literal.langtag))
else:
datatype = literal.datatype
if 'base64Binary' in datatype.uri:
if six.PY2:
value = base64.standard_b64encode(value)
else:
value = literal.value.encode()
return RDFLiteral(value, datatype=datatype.uri)