Commit 95a4349a authored by 20after4's avatar 20after4
Browse files

New PropertyMatcher class and example usage in boardmetrics.py

More or less complete usage example:

```py
        def process_transactions(transactions):
            mapper = PropertyMatcher()

            @mapper("transactionType=core:edge", "meta.edge:type=41")
            def edge(t):
                ''' match project edge transactions '''
                oldValue = [PHIDRef(p) for p in t["oldValue"]]
                newValue = [PHIDRef(p) for p in t["newValue"]]
                return [["projects", '', oldValue, newValue]]

            for taskid, t in transactions.result.items():
                st = sorted(t, key=itemgetter("dateCreated"))
                for record in st:
                    for row in mapper.run(record):
                        if row:
                            yield row

        transactions = get_some_transactions()

        for row in process_transactions(transactions):
            ''' do something with row '''
```

Change-Id: I6eac22574b02df75b9fd8c38d1a29856dd05f130
parent 26c7d88a
#!/usr/bin/python3
from collections.abc import Callable
from operator import itemgetter
import pandas as pd
from IPython.display import display
from pprint import pprint
import ddd
from ddd.mw import MWVersion, version
from ddd.phab import Conduit
from ddd.phobjects import PHIDRef, PHObject, EdgeType
from ddd.data import PropertyMatcher
pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.max_colwidth = 50
pd.options.display.width = 400
#%%
phab = Conduit()
# find all train blocker tasks
r = phab.request(
"maniphest.search",
{
"constraints": {"projects": ["release-engineering-team"]},
"limit": "30",
"attachments": {"projects": True, "columns": True},
},
)
#%%
def gettransactions(taskids):
mapper = PropertyMatcher()
ids = [id for id in taskids.keys()]
transactions = phab.request(
"maniphest.gettasktransactions",
{
"ids": ids,
},
)
# functions decorated with @ttype will take a transaction object and distill
# it down to just the information we care about.
# add new functions to match other transaction types.
# ids = [id for id in tasks.keys()]
@mapper("transactionType=core:edge", "meta.edge:type=41")
def edge(t):
"""
edge transactions point to related objects such as subtasks,
mentioned tasks and project tags.
The type of relationship is specified by an integer in
meta['edge:type']. Edge type constants are defined in
the enum `ddd.phobjects.EdgeType`
"""
oldValue = [PHIDRef(p) for p in t["oldValue"]]
newValue = [PHIDRef(p) for p in t["newValue"]]
return [["projects", '', oldValue, newValue]]
@mapper("transactionType=status")
def status(t):
return [("status",'', t["oldValue"], t["newValue"])]
# @trnstype("unblock")
def unblock(t):
"""a subtask was closed or otherwise changed status"""
nv = t["newValue"]
for item in nv.items():
phid, action = item
return [[action, PHIDRef(phid)]]
# @trnstype("core:comment")
def comment(t):
# todo: we could check for the risky revision template here, if we care
# to count that.
nl = "\\n"
txt = str(t["comments"]).replace("\n", nl)
return [["comment", txt]]
@mapper('ransactionType=core:customfield')
def customfield(t):
"""Collect the version number from the release.version custom field"""
nv = version(str(t["newValue"]))
if nv:
train["version"] = nv
return None
@mapper('transactionType=core:columns')
def columns(t):
newv = t["newValue"]
res = []
for obj in newv:
fromcol = ""
for col in obj["fromColumnPHIDs"]:
fromcol = PHIDRef(col)
res.append(('columns', obj["boardPHID"], fromcol, obj["columnPHID"]))
return res
for taskid, t in transactions.result.items():
st = sorted(t, key=itemgetter("dateCreated"))
for record in st:
for row in mapper.run(record):
if row:
newrow = [record['dateCreated'], "T" + taskid]
newrow.extend(row)
yield newrow
#%%
# r.fetch_all()
tasks = r.asdict()
# now collect all of the formatted transaction details
rows = [row for row in gettransactions(tasks)]
PHObject.resolve_phids(phab)
#%%
data = pd.DataFrame.from_records(
rows,
columns=["ts", "task", "description", "what", "from", "to"],
index=["ts", "task"],
)
display(data)
from __future__ import annotations from __future__ import annotations
from collections import Callable
from collections import UserDict from collections import UserDict
from collections import UserList from collections import UserList
from collections.abc import Collection
from collections.abc import Iterable from collections.abc import Iterable
from collections.abc import MutableMapping from collections.abc import MutableMapping
from collections.abc import MutableSequence from collections.abc import MutableSequence
from collections.abc import Iterator from collections.abc import Iterator
import json
import pprint
from typing import Optional, Union from typing import Optional, Union
class PropertyMatcher(object):
""" Usage example:
def process_transactions(transactions):
mapper = PropertyMatcher()
@mapper("transactionType=core:edge", "meta.edge:type=41")
def edge(t):
''' match project edge transactions '''
oldValue = [PHIDRef(p) for p in t["oldValue"]]
newValue = [PHIDRef(p) for p in t["newValue"]]
return [["projects", '', oldValue, newValue]]
for taskid, t in transactions.result.items():
st = sorted(t, key=itemgetter("dateCreated"))
for record in st:
for row in mapper.run(record):
if row:
yield row
transactions = get_some_transactions()
for row in process_transactions(transactions):
# do something with row
"""
def __init__(self):
self.matchers = []
self.patterns = []
def run(self, obj):
for matcher in self.matchers:
res = matcher(obj)
if res:
for row in res:
yield row
def __call__(self, *args):
if len(args) == 1 and isinstance(args[0], Callable):
self.fn = args[0]
else:
self.patterns = []
for arg in args:
pat, val = arg.split("=")
pattern = (pat.split('.'), val)
self.patterns.append(pattern)
def wrapper(func):
patterns = self.patterns
def matcher(obj):
orig = obj
matched = False
for pattern in patterns:
matched = False
try:
pattern, val = pattern
obj = orig
for item in pattern:
obj = obj[item]
if obj == val:
matched = True
except Exception:
matched = False
if not matched:
return False
if matched:
return func(orig)
self.matchers.append(matcher)
return matcher
return wrapper
class DataIterator(Iterator): class DataIterator(Iterator):
......
import re import re
from typing import NewType, Optional
MWVersion = NewType('MWVersion', str)
def version(ver): def version(ver:str) -> Optional[MWVersion]:
"""Validate our version number formats""" """Validate our version number formats"""
try: try:
return re.match("(\\d+\\.\\d+(\\.\\d+-)?wmf\\.?\\d+)", ver).group(0) return MWVersion(re.match("(\\d+\\.\\d+(\\.\\d+-)?wmf\\.?\\d+)", ver).group(0))
except Exception: except Exception:
return None return None
return None return None
...@@ -8,10 +8,12 @@ from collections.abc import ( ...@@ -8,10 +8,12 @@ from collections.abc import (
Collection, Collection,
Mapping, Mapping,
MutableMapping, MutableMapping,
MutableSequence) MutableSequence,
)
from pprint import pprint from pprint import pprint
from tokenize import Number from tokenize import Number
from typing import Union from typing import Sequence, Union
from operator import itemgetter
# todo: remove dependency on requests # todo: remove dependency on requests
import requests import requests
...@@ -20,6 +22,43 @@ from numpy import real ...@@ -20,6 +22,43 @@ from numpy import real
from ddd.data import Data, DataIterator, wrapitem from ddd.data import Data, DataIterator, wrapitem
from ddd.phobjects import PHID, PHObject, PhabObjectBase, isPHID, json_object_hook from ddd.phobjects import PHID, PHObject, PhabObjectBase, isPHID, json_object_hook
class Cursor(object):
args: MutableMapping
cursor: MutableMapping
conduit: Conduit
data: deque[Data]
result: MutableMapping
def asdict(self, key="id"):
return {obj[key]: obj for obj in self.data}
def next_page(self):
"""
Load the next page of results from conduit, using the cursor that was
returned by the most recently fetched page to specify the starting
point. This is specified by an "after" argument added to the request.
"""
after = self.cursor.get("after", None)
if after is None:
raise ConduitException(
conduit=self.conduit, message="Cannot fetch pages beyond the last."
)
args = self.args
args["after"] = after
res = self.conduit.raw_request(method=self.method, args=args)
self.handle_result(res)
def has_more(self):
return self.cursor.get("after", None)
def fetch_all(self):
""" Sequentially Fetch all pages of results from the server. """
while self.has_more():
self.next_page()
return self.data
def handle_result(self, response):
raise ConduitException('Not Implemented')
class Conduit(object): class Conduit(object):
phab_url: str = "https://phabricator.wikimedia.org/api/" phab_url: str = "https://phabricator.wikimedia.org/api/"
...@@ -62,7 +101,7 @@ class Conduit(object): ...@@ -62,7 +101,7 @@ class Conduit(object):
r = requests.post(f"{self.phab_url}{method}", data=req) r = requests.post(f"{self.phab_url}{method}", data=req)
return r return r
def request(self, method: str, args: MutableMapping) -> ConduitCursor: def request(self, method: str, args: MutableMapping) -> Cursor:
r = self.raw_request(method=method, args=args) r = self.raw_request(method=method, args=args)
return ConduitCursor(conduit=self, res=r, method=method, args=args) return ConduitCursor(conduit=self, res=r, method=method, args=args)
...@@ -89,20 +128,22 @@ class Conduit(object): ...@@ -89,20 +128,22 @@ class Conduit(object):
raise ConduitException(conduit=self, message=json["error_info"]) raise ConduitException(conduit=self, message=json["error_info"])
return json return json
def project_search(self, queryKey="all",
constraints: MutableMapping={}) -> Cursor:
class ConduitCursor(object): return self.request('project.search', {
"queryKey": queryKey,
"constraints": constraints
})
class ConduitCursor(Cursor):
""" """
ConduitCursor handles fetching multiple pages of records from the conduit ConduitCursor handles fetching multiple pages of records from the conduit
api so that one api call can be treated as a single collection of results even api so that one api call can be treated as a single collection of results even
though it's split across multiple requests to the server. though it's split across multiple requests to the server.
""" """
conduit: Conduit
result: MutableMapping
method: str method: str
args: MutableMapping
data: deque[Data]
cursor: MutableMapping
def __init__( def __init__(
self, self,
...@@ -122,31 +163,7 @@ class ConduitCursor(object): ...@@ -122,31 +163,7 @@ class ConduitCursor(object):
res = self.conduit.raw_request(method=self.method, args=self.args) res = self.conduit.raw_request(method=self.method, args=self.args)
self.handle_result(res) self.handle_result(res)
def next_page(self): def handle_result(self, res: requests.Response):
"""
Load the next page of results from conduit, using the cursor that was
returned by the most recently fetched page to specify the starting
point. This is specified by an "after" argument added to the request.
"""
after = self.cursor.get("after", None)
if after is None:
raise ConduitException(
conduit=self.conduit, message="Cannot fetch pages beyond the last."
)
args = self.args
args["after"] = after
res = self.conduit.raw_request(method=self.method, args=args)
self.handle_result(res)
def fetch_all(self):
while self.has_more():
self.next_page()
return self.data
def asdict(self, key="id"):
return { obj[key]:obj for obj in self.data }
def handle_result(self, res:requests.Response):
""" """
Process the result from a conduit call and store the records, along Process the result from a conduit call and store the records, along
with a cursor for fetching further pages when the result exceeds the with a cursor for fetching further pages when the result exceeds the
...@@ -172,8 +189,7 @@ class ConduitCursor(object): ...@@ -172,8 +189,7 @@ class ConduitCursor(object):
# Older methods just return a result: # Older methods just return a result:
self.data.extend(self.result.values()) self.data.extend(self.result.values())
def has_more(self):
return self.cursor.get("after", None)
def __iter__(self): def __iter__(self):
return DataIterator(self.data) return DataIterator(self.data)
...@@ -203,7 +219,7 @@ class ConduitException(Exception): ...@@ -203,7 +219,7 @@ class ConduitException(Exception):
return "ConduitException: " + self.message return "ConduitException: " + self.message
def flatten_for_post(h, result:dict=None, kk=None) -> dict[str, str]: def flatten_for_post(h, result: dict = None, kk=None) -> dict[str, str]:
""" """
Since phab expects x-url-encoded form post data (meaning each Since phab expects x-url-encoded form post data (meaning each
individual list element is named). AND because, evidently, requests individual list element is named). AND because, evidently, requests
......
from __future__ import annotations from __future__ import annotations
from collections import UserDict from collections import UserDict, deque
from importlib.resources import path
from itertools import repeat from itertools import repeat
import json import json
...@@ -11,9 +12,11 @@ from enum import Enum ...@@ -11,9 +12,11 @@ from enum import Enum
from functools import total_ordering from functools import total_ordering
from pprint import pprint from pprint import pprint
from sqlite3 import Connection from sqlite3 import Connection
from typing import (Any, ClassVar, Generic, NewType, Optional, Type, TypeVar, from typing import (Any, ClassVar, Deque, Generic, NewType, Optional, Type, TypeVar,
Union) Union)
from pathlib import Path
""" """
Phabricator Objects Phabricator Objects
PHObject and its subclasses are proxy objects that model PHObject and its subclasses are proxy objects that model
...@@ -289,10 +292,15 @@ class PHIDRef(object): ...@@ -289,10 +292,15 @@ class PHIDRef(object):
self.relation = None self.relation = None
def __repr__(self) -> str: def __repr__(self) -> str:
return "PHIDRef('%s')" % self.toPHID return f"PHIDRef('{self.toPHID}', object='{self.object}')"
def __str__(self) -> str: def __str__(self) -> str:
return self.toPHID return f"{self.object}"
def __eq__(self, other) -> bool:
return other is self.object or (
isinstance(other, PHIDRef) and
other.object is self.object)
def json_object_hook(obj:dict): def json_object_hook(obj:dict):
...@@ -309,37 +317,69 @@ def json_object_hook(obj:dict): ...@@ -309,37 +317,69 @@ def json_object_hook(obj:dict):
return obj return obj
def SQLType(name:str, pythontype:Type, sqlkeyword:Optional[str]=None):
t = NewType(name, pythontype)
t.sqlkw = sqlkeyword if sqlkeyword else name
return t
class DataCache(object): class DataCache(object):
con:sqlite3.Connection
replace_phobject = """ @staticmethod
REPLACE INTO phobjects (phid, name, dateCreated, dateModified, data) def placeholders(count):
VALUES ( ?, ?, ?, ?, ?) return ",".join(repeat("?", count))
"""
def __init__(self, db): PRIMARYKEY = SQLType('PRIMARY', str, 'PRIMARY KEY')
REAL = SQLType('REAL', float)
TEXT = SQLType('TEXT', str)
con:sqlite3.Connection
table_name:str = "phobjects"
id:REAL
phid:PRIMARYKEY
authorPHID:TEXT
name:TEXT
fullname:TEXT
dateCreated:REAL
dateModified:REAL
status:TEXT
data:TEXT
ram_cache:deque = Deque(maxlen=1000)
def cols(self) -> Sequence:
cols = []
for field in self.__annotations__.items():
if field[1] in (SQLType.types):
field = (field[0], str(globals()[field[1]].sqlkw))
cols.append(f"{' '.join(field)}")
return cols
def sql_insert(self, cols=None, replace=True):
if not cols:
cols = self.cols()
op = "REPLACE" if replace else "INSERT"
sql = f"""{op} INTO {self.table_name} ({", ".join(cols)})
VALUES ({self.placeholders(len(cols))})"""
return sql
def __init__(self, db:Path):
self.con = sqlite3.connect(db) self.con = sqlite3.connect(db)
self.con.row_factory = sqlite3.Row() self.con.row_factory = sqlite3.Row
self.con.execute( self.cur = self.con.cursor()
"""CREATE TABLE if not exists phobjects ( cols = ", ".join(self.cols())
id real, sql = f"""CREATE TABLE if not exists {self.table_name} ({cols});"""
phid TEXT PRIMARY KEY, self.cur.execute(sql)
authorPHID text,
name TEXT,
fullname TEXT,
dateCreated real,
dateModified real,
status TEXT,
data TEXT
); """)
def load(self, phids): def load(self, phids):
placeholders = ",".join(repeat("?", len(phids))) ph = self.placeholders(len(phids))
select = f"SELECT * FROM phobjects where phid in ({placeholders})" select = f"SELECT * FROM {self.table_name} where phid in ({ph})"
if isinstance(phids, list): if isinstance(phids, (PHID,str)):
return self.con.executemany(select, phids) return self.cur.execute(select, phids)
else: else:
return self.con.execute(select, phids) return self.cur.executemany(select, phids)
def row(self, item): def row(self, item):
data = json.dumps(item) data = json.dumps(item)
...@@ -347,8 +387,8 @@ class DataCache(object): ...@@ -347,8 +387,8 @@ class DataCache(object):
def store_all(self, items:Sequence[PHObject]): def store_all(self, items:Sequence[PHObject]):
rows = [self.row(item) for item in items] rows = [self.row(item) for item in items]
self.con.executemany(self.replace_phobject, rows) return self.cur.executemany(self.sql_insert(replace=True), rows)
def store_one(self, item:PHObject): def store_one(self, item:PHObject):
values = self.row(item) values = self.row(item)
self.con.execute(self.replace_phobject, values) return self.cur.execute(self.sql_insert(replace=True), values)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment