Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/rdflib_jsonld/context.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26e78fe6e8c4 |
---|---|
1 # -*- coding: utf-8 -*- | |
2 """ | |
3 Implementation of the JSON-LD Context structure. See: | |
4 | |
5 http://json-ld.org/ | |
6 | |
7 """ | |
8 from collections import namedtuple | |
9 from rdflib.namespace import RDF | |
10 | |
11 from .keys import (BASE, CONTAINER, CONTEXT, GRAPH, ID, INDEX, LANG, LIST, | |
12 REV, SET, TYPE, VALUE, VOCAB) | |
13 from . import errors | |
14 from .util import source_to_json, urljoin, urlsplit, split_iri, norm_url | |
15 | |
16 | |
17 NODE_KEYS = set([LANG, ID, TYPE, VALUE, LIST, SET, REV, GRAPH]) | |
18 | |
19 class Defined(int): pass | |
20 UNDEF = Defined(0) | |
21 | |
22 | |
23 class Context(object): | |
24 | |
25 def __init__(self, source=None, base=None): | |
26 self.language = None | |
27 self.vocab = None | |
28 self.base = base | |
29 self.doc_base = base | |
30 self.terms = {} | |
31 self._alias = {} | |
32 self._lookup = {} | |
33 self._prefixes = {} | |
34 self.active = False | |
35 if source: | |
36 self.load(source) | |
37 | |
38 @property | |
39 def base(self): | |
40 return self._base | |
41 | |
42 @base.setter | |
43 def base(self, base): | |
44 if base: | |
45 hash_index = base.find('#') | |
46 if hash_index > -1: | |
47 base = base[0:hash_index] | |
48 self._base = self.resolve_iri(base) if ( | |
49 hasattr(self, '_base') and base is not None) else base | |
50 self._basedomain = '%s://%s' % urlsplit(base)[0:2] if base else None | |
51 | |
52 def subcontext(self, source): | |
53 # IMPROVE: to optimize, implement SubContext with parent fallback support | |
54 ctx = Context() | |
55 ctx.language = self.language | |
56 ctx.vocab = self.vocab | |
57 ctx.base = self.base | |
58 ctx.doc_base = self.doc_base | |
59 ctx._alias = self._alias.copy() | |
60 ctx.terms = self.terms.copy() | |
61 ctx._lookup = self._lookup.copy() | |
62 ctx._prefixes = self._prefixes.copy() | |
63 ctx.load(source) | |
64 return ctx | |
65 | |
66 def get_id(self, obj): | |
67 return self._get(obj, ID) | |
68 | |
69 def get_type(self, obj): | |
70 return self._get(obj, TYPE) | |
71 | |
72 def get_language(self, obj): | |
73 return self._get(obj, LANG) | |
74 | |
75 def get_value(self, obj): | |
76 return self._get(obj, VALUE) | |
77 | |
78 def get_graph(self, obj): | |
79 return self._get(obj, GRAPH) | |
80 | |
81 def get_list(self, obj): | |
82 return self._get(obj, LIST) | |
83 | |
84 def get_set(self, obj): | |
85 return self._get(obj, SET) | |
86 | |
87 def get_rev(self, obj): | |
88 return self._get(obj, REV) | |
89 | |
90 def _get(self, obj, key): | |
91 return obj.get(self._alias.get(key)) or obj.get(key) | |
92 | |
93 def get_key(self, key): | |
94 return self._alias.get(key, key) | |
95 | |
96 lang_key = property(lambda self: self.get_key(LANG)) | |
97 id_key = property(lambda self: self.get_key(ID)) | |
98 type_key = property(lambda self: self.get_key(TYPE)) | |
99 value_key = property(lambda self: self.get_key(VALUE)) | |
100 list_key = property(lambda self: self.get_key(LIST)) | |
101 rev_key = property(lambda self: self.get_key(REV)) | |
102 graph_key = property(lambda self: self.get_key(GRAPH)) | |
103 | |
104 def add_term(self, name, idref, coercion=UNDEF, container=UNDEF, | |
105 language=UNDEF, reverse=False): | |
106 term = Term(idref, name, coercion, container, language, reverse) | |
107 self.terms[name] = term | |
108 self._lookup[(idref, coercion or language, container, reverse)] = term | |
109 self._prefixes[idref] = name | |
110 | |
111 def find_term(self, idref, coercion=None, container=UNDEF, | |
112 language=None, reverse=False): | |
113 lu = self._lookup | |
114 if coercion is None: | |
115 coercion = language | |
116 if coercion is not UNDEF and container: | |
117 found = lu.get((idref, coercion, container, reverse)) | |
118 if found: return found | |
119 if coercion is not UNDEF: | |
120 found = lu.get((idref, coercion, UNDEF, reverse)) | |
121 if found: return found | |
122 if container: | |
123 found = lu.get((idref, coercion, container, reverse)) | |
124 if found: return found | |
125 elif language: | |
126 found = lu.get((idref, UNDEF, LANG, reverse)) | |
127 if found: return found | |
128 else: | |
129 found = lu.get((idref, coercion or UNDEF, SET, reverse)) | |
130 if found: return found | |
131 return lu.get((idref, UNDEF, UNDEF, reverse)) | |
132 | |
133 def resolve(self, curie_or_iri): | |
134 iri = self.expand(curie_or_iri, False) | |
135 if self.isblank(iri): | |
136 return iri | |
137 return self.resolve_iri(iri) | |
138 | |
139 def resolve_iri(self, iri): | |
140 return norm_url(self._base, iri) | |
141 | |
142 def isblank(self, ref): | |
143 return ref.startswith('_:') | |
144 | |
145 def expand(self, term_curie_or_iri, use_vocab=True): | |
146 if use_vocab: | |
147 term = self.terms.get(term_curie_or_iri) | |
148 if term: | |
149 return term.id | |
150 is_term, pfx, local = self._prep_expand(term_curie_or_iri) | |
151 if pfx == '_': | |
152 return term_curie_or_iri | |
153 if pfx is not None: | |
154 ns = self.terms.get(pfx) | |
155 if ns and ns.id: | |
156 return ns.id + local | |
157 elif is_term and use_vocab: | |
158 if self.vocab: | |
159 return self.vocab + term_curie_or_iri | |
160 return None | |
161 return self.resolve_iri(term_curie_or_iri) | |
162 | |
163 def shrink_iri(self, iri): | |
164 ns, name = split_iri(str(iri)) | |
165 pfx = self._prefixes.get(ns) | |
166 if pfx: | |
167 return ":".join((pfx, name)) | |
168 elif self._base: | |
169 if str(iri) == self._base: | |
170 return "" | |
171 elif iri.startswith(self._basedomain): | |
172 return iri[len(self._basedomain):] | |
173 return iri | |
174 | |
175 def to_symbol(self, iri): | |
176 iri = str(iri) | |
177 term = self.find_term(iri) | |
178 if term: | |
179 return term.name | |
180 ns, name = split_iri(iri) | |
181 if ns == self.vocab: | |
182 return name | |
183 pfx = self._prefixes.get(ns) | |
184 if pfx: | |
185 return ":".join((pfx, name)) | |
186 return iri | |
187 | |
188 def load(self, source, base=None): | |
189 self.active = True | |
190 sources = [] | |
191 source = source if isinstance(source, list) else [source] | |
192 self._prep_sources(base, source, sources) | |
193 for source_url, source in sources: | |
194 self._read_source(source, source_url) | |
195 | |
196 def _prep_sources(self, base, inputs, sources, referenced_contexts=None, | |
197 in_source_url=None): | |
198 referenced_contexts = referenced_contexts or set() | |
199 for source in inputs: | |
200 if isinstance(source, str): | |
201 source_url = urljoin(base, source) | |
202 if source_url in referenced_contexts: | |
203 raise errors.RECURSIVE_CONTEXT_INCLUSION | |
204 referenced_contexts.add(source_url) | |
205 source = source_to_json(source_url) | |
206 if CONTEXT not in source: | |
207 raise errors.INVALID_REMOTE_CONTEXT | |
208 else: | |
209 source_url = in_source_url | |
210 | |
211 if isinstance(source, dict): | |
212 if CONTEXT in source: | |
213 source = source[CONTEXT] | |
214 source = source if isinstance(source, list) else [source] | |
215 if isinstance(source, list): | |
216 self._prep_sources(base, source, sources, referenced_contexts, source_url) | |
217 else: | |
218 sources.append((source_url, source)) | |
219 | |
220 def _read_source(self, source, source_url=None): | |
221 self.vocab = source.get(VOCAB, self.vocab) | |
222 for key, value in list(source.items()): | |
223 if key == LANG: | |
224 self.language = value | |
225 elif key == VOCAB: | |
226 continue | |
227 elif key == BASE: | |
228 if source_url: | |
229 continue | |
230 self.base = value | |
231 else: | |
232 self._read_term(source, key, value) | |
233 | |
234 def _read_term(self, source, name, dfn): | |
235 if isinstance(dfn, dict): | |
236 #term = self._create_term(source, key, value) | |
237 rev = dfn.get(REV) | |
238 idref = rev or dfn.get(ID, UNDEF) | |
239 if idref == TYPE: | |
240 idref = str(RDF.type) | |
241 elif idref is not UNDEF: | |
242 idref = self._rec_expand(source, idref) | |
243 elif ':' in name: | |
244 idref = self._rec_expand(source, name) | |
245 elif self.vocab: | |
246 idref = self.vocab + name | |
247 coercion = dfn.get(TYPE, UNDEF) | |
248 if coercion and coercion not in (ID, TYPE, VOCAB): | |
249 coercion = self._rec_expand(source, coercion) | |
250 self.add_term(name, idref, coercion, | |
251 dfn.get(CONTAINER, UNDEF), dfn.get(LANG, UNDEF), bool(rev)) | |
252 else: | |
253 idref = self._rec_expand(source, dfn) | |
254 self.add_term(name, idref) | |
255 if idref in NODE_KEYS: | |
256 self._alias[idref] = name | |
257 | |
258 def _rec_expand(self, source, expr, prev=None): | |
259 if expr == prev or expr in NODE_KEYS: | |
260 return expr | |
261 is_term, pfx, nxt = self._prep_expand(expr) | |
262 if pfx: | |
263 iri = self._get_source_id(source, pfx) or self.expand(pfx) | |
264 if iri is None: | |
265 nxt = expr | |
266 else: | |
267 nxt = iri + nxt | |
268 else: | |
269 nxt = self._get_source_id(source, nxt) or nxt | |
270 if ':' not in nxt and self.vocab: | |
271 return self.vocab + nxt | |
272 return self._rec_expand(source, nxt, expr) | |
273 | |
274 def _prep_expand(self, expr): | |
275 if ':' not in expr: | |
276 return True, None, expr | |
277 pfx, local = expr.split(':', 1) | |
278 if not local.startswith('//'): | |
279 return False, pfx, local | |
280 else: | |
281 return False, None, expr | |
282 | |
283 def _get_source_id(self, source, key): | |
284 # .. from source dict or if already defined | |
285 term = source.get(key) | |
286 if term is None: | |
287 dfn = self.terms.get(key) | |
288 if dfn: | |
289 term = dfn.id | |
290 elif isinstance(term, dict): | |
291 term = term.get(ID) | |
292 return term | |
293 | |
294 | |
295 Term = namedtuple('Term', | |
296 'id, name, type, container, language, reverse') | |
297 Term.__new__.__defaults__ = (UNDEF, UNDEF, UNDEF, False) |