Commit 54e83da7 authored by 20after4's avatar 20after4
Browse files

hardcore type annotations and general framework improvements

parent 838272ed
from __future__ import annotations
from collections import UserDict
from collections import UserList
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import MutableMapping
from collections.abc import MutableSequence
from collections.abc import Iterator
import json
import pprint
from enum import Enum
from collections import UserDict, UserList, deque
from collections.abc import MutableMapping, Iterable
import sqlite3
from sqlite3.dbapi2 import Connection
from typing import Optional, Union
con = sqlite3.connect(':memory:')
class DataCache(object):
con: sqlite3.Connection
class DataIterator(object):
""" DataIterator iterates over a list of raw data, returning each record
def __init__(self, db):
self.con = sqlite3.connect(db)
self.con.execute(
"create table phobjects (phid TEXT PRIMARY KEY, name TEXT, data TEXT)"
)
def row(self, item):
data = json.dumps(item)
return (item.phid, item.name, data)
def store_all(self, items):
for item in items:
self.row(item)
def store_one(self, item, data):
values = (item.phid, item.name, data)
self.con.execute("replace into phobjects values (?, ?, ?)", values)
class DataIterator(Iterator):
"""DataIterator iterates over a list of raw data, returning each record
wrapped in a Data instance.
"""
data = None
def __init__(self, data):
data: Iterator
def __init__(self, data, parent=None):
self.data = data.__iter__()
self.parent = parent
def __iter__(self):
return self
def __next__(self):
return Data(next(self.data))
item = next(self.data)
return wrapitem(item, self.parent)
class Data(MutableMapping):
data = None
Data_Class = locals().__class__
def __init__(self, data, parent=None):
# print(type(data))
data: Union[dict, list, MutableMapping, MutableSequence]
_parent: Optional[Data_Class]
def __new__(cls, data, *args, **kwargs):
if isinstance(data, MutableMapping):
subcls = DataDict
elif isinstance(data, MutableSequence):
subcls = DataList
else:
raise TypeError(
"data argument must be a list or a mapping, not %s" % type(data)
)
if super().__new__ is object.__new__ and cls.__init__ is not object.__init__:
obj = super().__new__(subcls)
else:
obj = super().__new__(subcls, *args, **kwargs)
return obj
def __init__(self, data, parent: Data_Class = None):
self.data = data
if parent:
self.parent = parent
self._parent = parent
else:
self._parent = None
def parent(self):
return self._parent
def siblings(self):
"""Iterate over the children of this element's parent data structure"""
return iter(self._parent)
def __getattr__(self, attr):
return self.__getitem__(attr)
def __getitem__(self, item):
itemdata = self.data[item]
if isinstance(itemdata, (dict, UserDict)):
return Data(itemdata, self)
elif isinstance(itemdata, (list, UserList, Iterable)):
return DataList(itemdata, self)
else:
return itemdata
def __dir__(self):
return self.data.keys() + self.__dict__.keys()
item = self.data[item]
return wrapitem(item, self)
def __iter__(self):
return DataIterator(self.data)
def __len__(self):
return len(self.data)
def __contains__(self, item):
return item in self.data
def __repr__(self):
return pprint.pformat(self.data, indent=2)
class DataList(Data):
def __getattr__(self, attr):
raise AttributeError()
return DataIterator(self.data, self)
def __dir__(self):
return dir(self.__dict__)
class Token(Enum):
ATTR = 1
ITEM = 2
PARENT = 3
return dir(self.data)
class QueryBuilder(object):
def __init__(self, data):
self.data = data
self.query = deque()
class DataDict(Data, UserDict):
data: MutableMapping
def __getitem__(self, key):
self.query.append((Token.ITEM, key))
return self
class DataList(Data, UserList):
data: MutableSequence
def __getattr__(self, key):
self.query.append((Token.ATTR, key))
return self
def __delitem__(self, v):
del self.data[v]
def parent(self):
self.query.append((Token.PARENT))
def __setitem__(self, item, val):
self.data[item] = val
def wrapitem(item, parent=None):
if isinstance(item, (Data, str)):
return item
if isinstance(item, (dict, UserDict)):
return Data(item, parent)
if isinstance(item, (list, UserList, Iterable)):
return DataList(item, parent)
return item
import requests
import sys
import json
from pprint import pprint
from ddd.phab import Conduit
phab = Conduit()
r = phab.request('maniphest.search', {'queryKey': "KpRagEN3fCBC",
"limit": "40",
"attachments": {
"projects": True,
"columns": True
}})
r.fetch_all()
ids = [f"T{obj.id}" for obj in r]
print(ids)
ids = []
url = 'https://gerrit.wikimedia.org/r/changes/'
for tid in ids:
query = {'q': f"bug:{tid}"}
res = requests.get(url, params=query)
jsontxt = res.text[4:]
objs = json.loads(jsontxt)
for obj in objs:
print(f"{tid},{obj['change_id']},-{obj['deletions']},+{obj['insertions']}")
if __name__ == "__main__":
pass
from __future__ import annotations
from builtins import str
from collections import UserDict, UserString, deque
from collections import UserDict, UserList, deque
from collections.abc import Iterable
import json
import os
from pprint import pprint
from tokenize import Number
from typing import Collection, MutableMapping, MutableSequence, Union
from numpy import real
# todo: remove dependency on requests
import requests
from ddd.data import DataIterator, Data
from ddd.phobjects import *
from ddd.data import Data, DataIterator, wrapitem
from ddd.phobjects import PHObject, isPHID
class Conduit(object):
phab_url = 'https://phabricator.wikimedia.org/api/'
phab_url: str = "https://phabricator.wikimedia.org/api/"
token: str = ""
def __init__(self, phab_url: str = None):
def __init__(self, phab_url: str = None, token: str = None):
if phab_url:
self.phab_url = phab_url
self.conduit_token = self._get_token()
if self.conduit_token is None:
if token:
self.token = token
else:
self.token = self._get_token()
if self.token is None:
err = "Unable to find a conduit token in ~/.arcrc or environment"
raise ConduitException(self, None, err)
raise ConduitException(conduit=self, message=err)
def _get_token(self):
"""
Use the $CONDUIT_TOKEN envvar, fallback to whatever is in ~/.arcrc
"""
token = None
token_path = os.path.expanduser('~/.arcrc')
token_path = os.path.expanduser("~/.arcrc")
if os.path.exists(token_path):
with open(token_path) as f:
arcrc = json.load(f)
if (self.phab_url in arcrc['hosts']):
token = arcrc['hosts'][self.phab_url]['token']
if self.phab_url in arcrc["hosts"]:
token = arcrc["hosts"][self.phab_url]["token"]
return os.environ.get('CONDUIT_TOKEN', token)
return os.environ.get("CONDUIT_TOKEN", token)
def request(self, method: str, args: dict, raw: bool = False):
def raw_request(self, method: str, args: MutableMapping):
"""
Helper method to call a phabricator api and return a ConduitResult
Helper method to call a phabricator api and return a ConduitCursor
which can be used to iterate over all of the resulting records.
"""
data = flatten_for_post(args)
data['api.token'] = self.conduit_token
r = requests.post(f"{self.phab_url}{method}", data=data)
if raw:
return r
return ConduitResult(conduit=self, res=r, method=method, args=args)
def edit(self, method: str, objectidentifier: str, transactions: list):
req = flatten_for_post(args)
req["api.token"] = self.token
r = requests.post(f"{self.phab_url}{method}", data=req)
return r
def request(self, method: str, args: MutableMapping):
r = self.raw_request(method=method, args=args)
return ConduitCursor(conduit=self, res=r, method=method, args=args)
def edit(self, method: str, objectidentifier: str, transactions: list):
"""
Calls a conduit "edit" method which applies a list of transactions
to a specified object (specified by a phabricator identifier such as
......@@ -59,39 +72,47 @@ class Conduit(object):
Raises an exception if something goes wrong and returns the decoded
conduit response otherwise.
"""
data = {
req = {
"parameters": {
"transactions": transactions,
"objectidentifier": objectidentifier
"objectidentifier": objectidentifier,
}
}
data = flatten_for_post(args)
data['api.token'] = self.conduit_token
r = requests.post(f"{self.phab_url}{method}", data=data)
req = flatten_for_post(req)
req["api.token"] = self.token
r = requests.post(f"{self.phab_url}{method}", data=req)
json = r.json()
if json['error_info'] is not None:
raise ConduitException(self.conduit, self, json['error_info'])
if json["error_info"] is not None:
raise ConduitException(conduit=self, message=json["error_info"])
return json
class ConduitResult(object):
class ConduitCursor(object):
"""
ConduitResult handles fetching multiple pages of records from the conduit
api so that the results can be treated as a single collection of records.
ConduitCursor handles fetching multiple pages of records from the conduit
api so that one api call can be treated as a single collection of results even
though it's split across multiple requests to the server.
"""
conduit = None
result = None
method = None
args = None
data = None
cursor = None
def __init__(self, conduit: Conduit, res: requests.Response,
method: str, args: dict):
conduit: Conduit
result: MutableMapping
method: str
args: MutableMapping
data: deque[Data]
cursor: MutableMapping
def __init__(
self,
conduit: Conduit,
res: requests.Response,
method: str,
args: MutableMapping,
):
self.conduit = conduit
self.method = method
self.args = args
self.cursor = {}
self.data = deque()
self.handle_result(res)
def retry(self):
......@@ -103,13 +124,14 @@ class ConduitResult(object):
returned by the most recently fetched page to specify the starting
point. This is specified by an "after" argument added to the request.
"""
after = self.cursor.get('after', None)
after = self.cursor.get("after", None)
if after is None:
raise ConduitException(self.conduit, self,
'Cannot fetch pages beyond the last.')
raise ConduitException(
conduit=self.conduit, message="Cannot fetch pages beyond the last."
)
args = self.args
args['after'] = after
res = self.conduit.request(method=self.method, args=args, raw=True)
args["after"] = after
res = self.conduit.raw_request(method=self.method, args=args)
self.handle_result(res)
def fetch_all(self):
......@@ -123,51 +145,52 @@ class ConduitResult(object):
limit for a single request. The default and maximum limit is 100.
"""
json = res.json()
if json['error_info'] is not None:
raise ConduitException(self.conduit, self, json['error_info'])
if json["error_info"] is not None:
raise ConduitException(conduit=self.conduit, message=json["error_info"])
self.result = json['result']
self.result = json["result"]
if "cursor" in self.result:
self.cursor = self.result['cursor']
self.cursor = self.result["cursor"]
else:
self.cursor = {}
# pprint(self.result)
if "data" in self.result:
# Modern conduit methods return a result[data] and result{cursor}
if self.data is None:
self.data = deque()
self.data.extend(self.result['data'])
elif self.data is None:
# Modern conduit methods return a result map with the key "data"
# mapped to a list of records and the key "cursor" maps to a record
# of pagination details.
self.data.extend(self.result["data"])
else:
# Older methods just return a result:
self.data = self.result
self.data.extend(self.result.values())
def has_more(self):
return self.cursor.get('after', None)
return self.cursor.get("after", None)
def resolve_phids(self, data=False):
if data is False:
def resolve_phids(self, data=None):
if data is None:
data = self.data
if isinstance(data, dict):
if isinstance(data, MutableMapping):
iter = data.items()
elif isinstance(data, (Iterable, list)) :
elif isinstance(data, (Iterable, MutableSequence)):
iter = enumerate(data)
else:
return
for key, val in iter:
if isPHID(val):
if key != "phid" and isPHID(val):
data[key] = PHObject.instance(val)
elif isinstance(val, (list, dict)):
elif isinstance(val, (MutableSequence, MutableMapping)):
self.resolve_phids(data=val)
if data is self.data:
if data is self.data and len(PHObject.instances):
phids = [phid for phid in PHObject.instances.keys()]
args = {'phids': phids}
res = self.conduit.request(method='phid.query', args=args, raw=True)
pprint(phids)
res = self.conduit.raw_request(method="phid.query", args={"phids": phids})
pprint(res.text)
objs = res.json()
for key, vals in objs['result'].items():
for key, vals in objs["result"].items():
PHObject.instances[key].update(vals)
# for attr in vals.keys():
......@@ -177,8 +200,10 @@ class ConduitResult(object):
def __iter__(self):
return DataIterator(self.data)
def __getitem__(self, item):
return Data(self.data[item])
def __getitem__(self, key):
if key not in self.data:
raise KeyError(key)
return wrapitem(self.data[key])
def __len__(self):
return len(self.data)
......@@ -187,13 +212,15 @@ class ConduitResult(object):
return item in self.data
class ConduitException(Exception):
def __init__(self, conduit: Conduit, result: ConduitResult, message: str):
def __init__(
self, message: str, conduit: Conduit = None, result: ConduitCursor = None
):
self.conduit = conduit
self.result = result
self.message = message
def flatten_for_post(h, result=None, kk=None):
"""
Since phab expects x-url-encoded form post data (meaning each
......@@ -207,17 +234,21 @@ def flatten_for_post(h, result=None, kk=None):
if result is None:
result = {}
if isinstance(h, str) or isinstance(h, bool):
if isinstance(h, (str, bool)):
result[kk] = h
elif isinstance(h, list) or isinstance(h, tuple):
for i, v1 in enumerate(h):
flatten_for_post(v1, result, '%s[%d]' % (kk, i))
elif isinstance(h, dict):
elif isinstance(h, (int, float)):
# Because conduit response comes back empty when you pass raw int
# values:
result[kk] = str(h)
elif hasattr(h, "items"):
for (k, v) in h.items():
key = k if kk is None else "%s[%s]" % (kk, k)
if isinstance(v, dict):
if hasattr(v, "items"):
for i, v1 in v.items():
flatten_for_post(v1, result, '%s[%s]' % (key, i))
flatten_for_post(v1, result, "%s[%s]" % (key, i))
else:
flatten_for_post(v, result, key)
elif isinstance(h, Iterable):
for i, v1 in enumerate(h):
flatten_for_post(v1, result, "%s[%d]" % (kk, i))
return result
from __future__ import annotations
from enum import Enum
from typing import ClassVar
from functools import total_ordering
from pydoc import classname
from symbol import classdef
import time
from typing import ClassVar, Dict, Generic, Mapping, NewType, Type, TypeVar, Union
from datetime import datetime
"""
Phabricator Objects
......@@ -19,6 +24,9 @@ from typing import ClassVar
"""
PHID = NewType("PHID", str)
class PHIDTypes(Enum):
PHOB = "PHObject"
PROJ = "Project"
......@@ -28,64 +36,116 @@ class PHIDTypes(Enum):
STRY = "FeedStory"
APPS = "Application"
PCOL = "ProjectColumn"
USER = "User"
def isPHID(value):
def isPHID(value: str):
return isinstance(value, str) and value.startswith("PHID-")
def PHIDType(phid):
parts = phid.split('-')
def PHIDType(phid: PHID):
parts = phid.split("-")
phidtype = parts[1]
if phidtype in PHIDTypes.__members__:
classname = PHIDTypes[phidtype].value
return classname
return phidtype
class PHObject(object):
"""
PHObjects represent Phabricator objects such as Users and Tasks.
This class handles caching and insures that there is at most one instance
per unique phid.
"""
id: int = 0
phid: str = ""
TID = TypeVar("TID")
T = TypeVar("T")
name: str = ""
dateCreated: int = 0
dateModified: int = 0
class SubclassCache(Generic[TID, T]):
instances: ClassVar[dict] = {}
subclasses: ClassVar[dict] = {}
instances: ClassVar[dict[TID, T]] = {}
subclasses: ClassVar[dict[str, type]] = {}
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
__class__.subclasses[cls.__name__] = cls
def __init__(self, phid, **kwargs):
@classmethod
def subclass(cls, classname: str, default: Type[T] = None) -> Union[Type[T], None]: