Source code for tinychain.uri



[docs]def validate(name, state=None): """ Return the given `name` as allowed for use as a path segment in a :class:`URI`, or raise a :class:`KeyError`. """ name = str(name) name = str(name[1:]) if name.startswith('$') else name if not name or '/' in name or '$' in name or '<' in name or '>' in name or ' ' in name: if state: raise KeyError(f"invalid ID for {state}: {name}") else: raise KeyError(f"invalid ID: {name}") return name
[docs]class URI(object): """ An absolute or relative link to a :class:`State`. Examples: .. highlight:: python .. code-block:: python URI("https://example.com/myservice/value_name") URI("$other_state/method_name") URI("/state/scalar/value/none") """
[docs] @staticmethod def join(segments): """Join the given segments together to make a new URI.""" if segments: return URI(segments[0], *segments[1:]) else: return URI('/')
def __init__(self, subject, *path): path = [validate(segment) for segment in path if str(segment)] if isinstance(subject, URI): self._subject = subject._subject self._path = list(subject._path) self._path.extend(path) return if isinstance(subject, str): if subject.startswith("$$"): raise ValueError(f"invalid reference: {subject}") elif subject.startswith('$'): subject = subject[1:] else: self._subject = subject self._subject = subject self._path = path assert not str(self).startswith("//"), f"invalid URI prefix: {self}" def __add__(self, other): other = str(other) assert not other.startswith('$') if other.__contains__("://"): raise ValueError(f"cannot append {other} to {self}") if not other or other == "/": return self elif other.startswith('/'): other = other[1:] path = list(self._path) path.extend(other.split('/')) return URI(self._subject, *path) def __radd__(self, other): return URI(other) + str(self) def __bool__(self): return True def __eq__(self, other): return str(self) == str(other) def __getitem__(self, item): segments = self.split() if isinstance(item, int): if item == 0: return URI(segments[0]) else: return URI('/').append(segments[item]) elif isinstance(item, slice): if not segments[item]: return URI('/') if self.startswith('$'): return URI.join(segments[item]) elif "://" in segments[item][0]: return URI.join(segments[item]) elif len(segments[item]) == 1: [segment] = segments[item] if "://" in segment or segment[0] in ['/', '$']: return URI(segment) else: return URI('/').append(segment) else: return URI('/').extend(*segments[item]) else: raise TypeError(f"cannot index a URI with {item}") def __gt__(self, other): return self.startswith(other) and len(self) > len(other) def __ge__(self, other): return self.startswith(other) def __len__(self): return len(self.split()) def __lt__(self, other): return other.startswith(self) and len(self) < len(other) def __le__(self, other): return other.startswith(self) def __hash__(self): return hash(str(self)) def __json__(self): return {str(self): []} def __ns__(self, cxt, name_hint): from .scalar.ref import is_literal, is_op_ref from .state import State cxt.deanonymize(self._subject, name_hint + "_subject") if is_op_ref(self._subject): cxt.assign(self._subject, name_hint + "_subject") elif isinstance(self._subject, State) and is_literal(self._subject): cxt.assign(self._subject, name_hint + "_subject") def __repr__(self): if self._path: return f"URI({self._subject}/{'/'.join(self._path)})" else: return f"URI({self._subject})" def __str__(self): if hasattr(self._subject, "__uri__"): root = str(self._subject.__uri__) else: root = str(self._subject) if root.startswith('/') or root.startswith('$') or "://" in root: pass else: root = f"${root}" path = '/'.join(self._path) if path: if root == '/': return f"/{path}" else: return f"{root}/{path}" else: return root
[docs] def append(self, suffix): """ Construct a new `URI` beginning with this `URI` and ending with the given `suffix`. Example: .. highlight:: python .. code-block:: python value = OpRef.Get(URI("http://example.com/myapp").append("value/name")) """ suffix = str(suffix) if suffix in ["", "/"]: return self if "://" in suffix: raise ValueError(f"cannot append {suffix} to {self}") if suffix.startswith('/'): suffix = suffix[1:] path = list(self._path) path.extend(suffix.split('/')) return URI(self._subject, *path)
[docs] def extend(self, *segments): validated = [] for segment in segments: segment = str(segment) if "://" in segment: raise ValueError(f"cannot append {segment} to {self}") if segment.startswith('/'): segment = segment[1:] if '/' in segment: for subsegment in segment.split('/'): validated.append(validate(subsegment)) else: validated.append(validate(segment)) return URI(self._subject, *list(self._path) + validated)
[docs] def id(self): """Return the ID segment of this `URI`, if present.""" this = str(self) if this.startswith('$'): if '/' in this: end = this.index("/") return this[1:end] else: return this[1:]
[docs] def is_id(self): """Return `True` if this URI is a simple ID, like `$foo`.""" return '/' not in str(self)
[docs] def host(self): """Return the host segment of this `URI`, if present.""" uri = str(self) if "://" not in uri: return None start = uri.index("://") + 3 if '/' not in uri[start:]: return uri[start:] end = ( uri.index(':', start) if ':' in uri[start:] else uri.index('/', start)) if end > start: return uri[start:end] else: return uri[start:]
[docs] def path(self): """Return the path segment of this `URI`, if present.""" uri = str(self) if "://" not in uri: return URI(uri[uri.index('/'):]) start = uri.index("://") if '/' not in uri[(start + 3):]: return None start = uri.index('/', start + 3) return URI(uri[start:])
[docs] def port(self): """Return the port of this `URI`, if present.""" prefix = self.protocol() + "://" if self.protocol() else "" prefix += self.host() if self.host() else "" uri = str(self) if uri == prefix: return None if prefix and uri[len(prefix)] == ':': end = uri.index('/', len(prefix)) return int(uri[(len(prefix) + 1):end])
[docs] def protocol(self): """Return the protocol of this `URI` (e.g. "http"), if present.""" uri = str(self) if "://" in uri: i = uri.index("://") if i > 0: return uri[:i]
[docs] def split(self): """ Split this URI into its individual segments. The host (if absolute) or ID (if relative) is treated as the first segment, if present. """ if self.startswith('/'): segments = str(self)[1:].split('/') segments[0] = f"/{segments[0]}" return segments elif self.startswith('$'): return str(self).split('/') else: host = f"{self.host()}:{self.port()}" if self.port() else self.host() return [f"{self.protocol()}://{host}"] + str(self.path()).split('/')[1:]
[docs] def startswith(self, prefix): """Return `True` if this :class:`URI` starts with the given `prefix`.""" return str(self).startswith(str(prefix))