Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/schema_salad/utils.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import json | |
| 2 import os | |
| 3 from typing import ( | |
| 4 IO, | |
| 5 TYPE_CHECKING, | |
| 6 Any, | |
| 7 Callable, | |
| 8 Dict, | |
| 9 Iterable, | |
| 10 Mapping, | |
| 11 MutableSequence, | |
| 12 Tuple, | |
| 13 TypeVar, | |
| 14 Union, | |
| 15 ) | |
| 16 | |
| 17 import requests | |
| 18 from rdflib.graph import Graph | |
| 19 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
| 20 | |
| 21 if TYPE_CHECKING: | |
| 22 from .fetcher import Fetcher | |
| 23 | |
| 24 ContextType = Dict[str, Union[Dict[str, Any], str, Iterable[str]]] | |
| 25 DocumentType = TypeVar("DocumentType", CommentedSeq, CommentedMap) | |
| 26 DocumentOrStrType = TypeVar("DocumentOrStrType", CommentedSeq, CommentedMap, str) | |
| 27 FieldType = TypeVar("FieldType", str, CommentedSeq, CommentedMap) | |
| 28 ResolveType = Union[int, float, str, CommentedMap, CommentedSeq, None] | |
| 29 ResolvedRefType = Tuple[ResolveType, CommentedMap] | |
| 30 IdxResultType = Union[CommentedMap, CommentedSeq, str, None] | |
| 31 IdxType = Dict[str, IdxResultType] | |
| 32 CacheType = Dict[str, Union[str, Graph, bool]] | |
| 33 FetcherCallableType = Callable[[CacheType, requests.sessions.Session], "Fetcher"] | |
| 34 AttachmentsType = Callable[[Union[CommentedMap, CommentedSeq]], bool] | |
| 35 | |
| 36 | |
| 37 def add_dictlist(di, key, val): # type: (Dict[Any, Any], Any, Any) -> None | |
| 38 if key not in di: | |
| 39 di[key] = [] | |
| 40 di[key].append(val) | |
| 41 | |
| 42 | |
| 43 def aslist(thing): # type: (Any) -> MutableSequence[Any] | |
| 44 """ | |
| 45 Convenience function to wrap single items and lists. | |
| 46 | |
| 47 Return lists unchanged. | |
| 48 """ | |
| 49 | |
| 50 if isinstance(thing, MutableSequence): | |
| 51 return thing | |
| 52 else: | |
| 53 return [thing] | |
| 54 | |
| 55 | |
| 56 # http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html | |
| 57 | |
| 58 | |
| 59 def flatten(thing, ltypes=(list, tuple)): | |
| 60 # type: (Any, Any) -> Any | |
| 61 if thing is None: | |
| 62 return [] | |
| 63 if not isinstance(thing, ltypes): | |
| 64 return [thing] | |
| 65 | |
| 66 ltype = type(thing) | |
| 67 lst = list(thing) | |
| 68 i = 0 | |
| 69 while i < len(lst): | |
| 70 while isinstance(lst[i], ltypes): | |
| 71 if not lst[i]: | |
| 72 lst.pop(i) | |
| 73 i -= 1 | |
| 74 break | |
| 75 else: | |
| 76 lst[i : i + 1] = lst[i] | |
| 77 i += 1 | |
| 78 return ltype(lst) | |
| 79 | |
| 80 | |
| 81 # Check if we are on windows OS | |
| 82 def onWindows(): | |
| 83 # type: () -> (bool) | |
| 84 return os.name == "nt" | |
| 85 | |
| 86 | |
| 87 def convert_to_dict(j4): # type: (Any) -> Any | |
| 88 if isinstance(j4, Mapping): | |
| 89 return {k: convert_to_dict(v) for k, v in j4.items()} | |
| 90 elif isinstance(j4, MutableSequence): | |
| 91 return [convert_to_dict(v) for v in j4] | |
| 92 else: | |
| 93 return j4 | |
| 94 | |
| 95 | |
| 96 def json_dump( | |
| 97 obj, # type: Any | |
| 98 fp, # type: IO[str] | |
| 99 **kwargs # type: Any | |
| 100 ): # type: (...) -> None | |
| 101 """ Force use of unicode. """ | |
| 102 json.dump(convert_to_dict(obj), fp, **kwargs) | |
| 103 | |
| 104 | |
| 105 def json_dumps( | |
| 106 obj, # type: Any | |
| 107 **kwargs # type: Any | |
| 108 ): # type: (...) -> str | |
| 109 """ Force use of unicode. """ | |
| 110 return json.dumps(convert_to_dict(obj), **kwargs) |
