Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/schema_salad/utils.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import json | |
2 import os | |
3 from typing import ( | |
4 IO, | |
5 TYPE_CHECKING, | |
6 Any, | |
7 Callable, | |
8 Dict, | |
9 Iterable, | |
10 Mapping, | |
11 MutableSequence, | |
12 Tuple, | |
13 TypeVar, | |
14 Union, | |
15 ) | |
16 | |
17 import requests | |
18 from rdflib.graph import Graph | |
19 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
20 | |
21 if TYPE_CHECKING: | |
22 from .fetcher import Fetcher | |
23 | |
24 ContextType = Dict[str, Union[Dict[str, Any], str, Iterable[str]]] | |
25 DocumentType = TypeVar("DocumentType", CommentedSeq, CommentedMap) | |
26 DocumentOrStrType = TypeVar("DocumentOrStrType", CommentedSeq, CommentedMap, str) | |
27 FieldType = TypeVar("FieldType", str, CommentedSeq, CommentedMap) | |
28 ResolveType = Union[int, float, str, CommentedMap, CommentedSeq, None] | |
29 ResolvedRefType = Tuple[ResolveType, CommentedMap] | |
30 IdxResultType = Union[CommentedMap, CommentedSeq, str, None] | |
31 IdxType = Dict[str, IdxResultType] | |
32 CacheType = Dict[str, Union[str, Graph, bool]] | |
33 FetcherCallableType = Callable[[CacheType, requests.sessions.Session], "Fetcher"] | |
34 AttachmentsType = Callable[[Union[CommentedMap, CommentedSeq]], bool] | |
35 | |
36 | |
37 def add_dictlist(di, key, val): # type: (Dict[Any, Any], Any, Any) -> None | |
38 if key not in di: | |
39 di[key] = [] | |
40 di[key].append(val) | |
41 | |
42 | |
43 def aslist(thing): # type: (Any) -> MutableSequence[Any] | |
44 """ | |
45 Convenience function to wrap single items and lists. | |
46 | |
47 Return lists unchanged. | |
48 """ | |
49 | |
50 if isinstance(thing, MutableSequence): | |
51 return thing | |
52 else: | |
53 return [thing] | |
54 | |
55 | |
56 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html | |
57 | |
58 | |
59 def flatten(thing, ltypes=(list, tuple)): | |
60 # type: (Any, Any) -> Any | |
61 if thing is None: | |
62 return [] | |
63 if not isinstance(thing, ltypes): | |
64 return [thing] | |
65 | |
66 ltype = type(thing) | |
67 lst = list(thing) | |
68 i = 0 | |
69 while i < len(lst): | |
70 while isinstance(lst[i], ltypes): | |
71 if not lst[i]: | |
72 lst.pop(i) | |
73 i -= 1 | |
74 break | |
75 else: | |
76 lst[i : i + 1] = lst[i] | |
77 i += 1 | |
78 return ltype(lst) | |
79 | |
80 | |
81 # Check if we are on windows OS | |
82 def onWindows(): | |
83 # type: () -> (bool) | |
84 return os.name == "nt" | |
85 | |
86 | |
87 def convert_to_dict(j4): # type: (Any) -> Any | |
88 if isinstance(j4, Mapping): | |
89 return {k: convert_to_dict(v) for k, v in j4.items()} | |
90 elif isinstance(j4, MutableSequence): | |
91 return [convert_to_dict(v) for v in j4] | |
92 else: | |
93 return j4 | |
94 | |
95 | |
96 def json_dump( | |
97 obj, # type: Any | |
98 fp, # type: IO[str] | |
99 **kwargs # type: Any | |
100 ): # type: (...) -> None | |
101 """ Force use of unicode. """ | |
102 json.dump(convert_to_dict(obj), fp, **kwargs) | |
103 | |
104 | |
105 def json_dumps( | |
106 obj, # type: Any | |
107 **kwargs # type: Any | |
108 ): # type: (...) -> str | |
109 """ Force use of unicode. """ | |
110 return json.dumps(convert_to_dict(obj), **kwargs) |