Commit 3bf26a6f authored by 20after4's avatar 20after4
Browse files

Added cli script that uses ddd to process transactions and produce workboard metrics.

The output is suitable for generating a timeline with data points for each time a task enters or leaves a column.

The column_metrics data set looks like this:

| project_phid | column_phid | task_id | ts_begin (unix ts)       | ts_end (unix ts)      | duration (seconds)     |
|--------------|-------------|---------|--------------------------|-----------------------|------------------------|
| PHID-PROJ-1  | PHID-PCOL-1 | 123456  | Time task entered column | Time task left column | How long task remained |

We can also derive another type of metrics from this same dataset - to calculate global workboard metrics and task-specific
metrics we simply summarize the above data.

* Each task will have a duration value for each on the workboard.
** the lead time for the task is the duration in default colum
** cycle time is total off all durations except for default column
* Global workboard metrics are the mean of all durations in each column.
** We can calculate the metrics using a moving window in order to capture the change in metric performance over time.

Change-Id: Ie416494100b4833006bd974a758ab27e4c04a6b4
parent 8d7c1607
......@@ -21,10 +21,25 @@ let alone completely implemented. Stay tuned or get involved.
# Usage
The most useful bits of code that can be found in this repo are demonstrated
with the following code examples:
## boardmetrics.py
# Example:
This tool is used to extract data from phabricator and organize it in a structure that will facilitate further analysis.
The analysis of task activities can provide some insight into workflows.
The output if this tool will be used as the data source for charts to visualize certain agile project planning metrics.
Example usage (this is rough and can be simplified with a bit more refinement.)
From the project directory:
```bash
./ddd/boardmetrics.py --project=PHID-PROJ-fmcvjrkfvvzz3gxavs3a --mock=test/train.transactions.json --dump=json > metrics.json
```
This calculates data for the given project PHID, using data from a mock api call result (to speed up testing) and dumps the output as json.
If you omit the --mock argument then it will request a rather large amount of data from the phabricator API which takes an extra 20+ seconds to fetch.
# Example code:
```python
from ddd.phab import Conduit
......
#!/usr/bin/python3
import io
import json
import pathlib
import sqlite3 as sqlite3
import subprocess
import sys
from collections import deque
from operator import itemgetter
from typing import Mapping
import click
from rich import inspect, pretty
from rich.console import Console
from rich.pretty import pprint
from ddd.data import PropertyMatcher
from ddd.mw import MWVersion, version
from ddd.phab import Conduit
from ddd.phobjects import PHIDRef, PHObject, EdgeType
from ddd.data import PropertyMatcher
from ddd.phobjects import (
PHID,
DataCache,
PHIDRef,
PHObject,
PHObjectEncoder,
sqlite_connect,
sqlite_insert_statement,
)
metrics = {}
class ObjectMetrics(object):
"""Used to track how long a task exists in each workboard column"""
phid: PHID
metrics: dict = {}
def __init__(self, id):
self.id = id
self.metrics = {}
self.started = None
self.last_column = None
def metric(self, metric_name: str):
metric_name = str(metric_name)
if metric_name not in self.metrics:
self.metrics[metric_name] = {"start": 0, "end": 0}
return self.metrics[metric_name]
def start(self, metric: str, value: int):
if self.started:
if metric == self.started:
return
record = self.metric(self.started)
record["next"] = metric
self.end(value)
record = self.metric(metric)
record["start"] = value
self.set_if_earlier("start", value)
self.started = metric
self.last_column = metric
def end(self, value: int):
if self.started is not None:
metric = self.started
record = self.metric(metric)
record["end"] = value
record["duration"] = value - record["start"]
self.set_if_later("end", value)
self.started = None
def set_if_later(self, metric: str, value: int):
record = self.metric(metric)
if value > record["end"]:
record["end"] = value
if record["start"] == 0:
record["start"] = record["end"]
def set_if_earlier(self, metric: str, value: int):
record = self.metric(metric)
if value < record["start"]:
record["start"] = value
if record["end"] == 0:
record["end"] = record["start"]
def duration(self, metric):
"""Calculate the duration of the metric"""
record = self.metric(metric)
start = record["start"] if "start" in record else self.metrics["start"]
end = record["end"] if "end" in record else self.metrics["end"]
return end - start
def lead(self):
return self.metrics["start"] - self.metrics["start"]
def all(self):
total = 0
for metric, record in self.metrics.items():
if isinstance(record, dict) and "duration" in record:
total += record["duration"]
next_metric = record["next"] if "next" in record else None
yield [
self.id,
metric,
record["start"],
record["end"],
record["duration"],
next_metric,
]
def MetricsFor(id) -> ObjectMetrics:
global metrics
if id not in metrics:
metrics[id] = ObjectMetrics(id)
return metrics[id]
#%%
def gettransactions(phab:Conduit, taskids):
def maptransactions(
project_phid,
transactions: Mapping,
con: sqlite3.Connection,
console: Console,
phab: Conduit,
):
mapper = PropertyMatcher()
ids = [id for id in taskids.keys()]
transactions = phab.request(
"maniphest.gettasktransactions",
{
"ids": ids,
},
)
# functions decorated with @ttype will take a transaction object and distill
# it down to just the information we care about.
# add new functions to match other transaction types.
......@@ -30,69 +143,379 @@ def gettransactions(phab:Conduit, taskids):
meta['edge:type']. Edge type constants are defined in
the enum `ddd.phobjects.EdgeType`
"""
oldValue = [PHIDRef(p) for p in t["oldValue"]]
newValue = [PHIDRef(p) for p in t["newValue"]]
return [["projects", '', oldValue, newValue]]
oldValue = [p for p in t["oldValue"]]
newValue = [p for p in t["newValue"]]
move = ["projects", "global", oldValue, newValue]
return [move]
@mapper("transactionType=core:edge", "meta.edge:type=51")
def mention_this(t):
return [
["mention-this", None, t["taskID"], PHIDRef(taskid)]
for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=52")
def mention_task(t):
return [
["mention", None, t["taskID"], PHIDRef(taskid)] for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=3")
def subtask(t):
removed = [p for p in t["oldValue"] if p not in t["newValue"]]
added = [p for p in t["newValue"] if p not in t["oldValue"]]
res = []
for r in removed:
res.append(["removed_subtask", None, t["taskID"], PHIDRef(r)])
for a in added:
res.append(["added_subtask", None, t["taskID"], PHIDRef(a)])
return res
# @mapper("transactionType=unblock")
def unblock(t):
pass
@mapper("transactionType=status")
def status(t):
return [("status",'', t["oldValue"], t["newValue"])]
return [("status", "global", t["oldValue"], t["newValue"])]
# @mapper("transactionType=core:create")
def create(t):
return [("status", "global", None, "open")]
@mapper('transactionType=core:columns')
@mapper("transactionType=core:columns")
def columns(t):
newv = t["newValue"]
res = []
for obj in newv:
fromcol = ""
for col in obj["fromColumnPHIDs"]:
fromcol = PHIDRef(col)
res.append(('columns', obj["boardPHID"], fromcol, obj["columnPHID"]))
# if obj["boardPHID"] != project_phid:
# continue
tocol = str(obj["columnPHID"])
if obj["fromColumnPHIDs"]:
[fromcol] = obj["fromColumnPHIDs"]
fromcol = str(fromcol)
else:
fromcol = None
res.append(("columns", PHIDRef(obj["boardPHID"]), fromcol, tocol))
return res
for taskid, t in transactions.result.items():
metrics = {}
days = 60 * 60 * 24
datapoints = deque()
insert_stmt = None
for taskid, t in transactions.items():
st = sorted(t, key=itemgetter("dateCreated"))
task_metrics = MetricsFor(taskid)
metrics[taskid] = task_metrics
for record in st:
ts = int(record["dateCreated"])
trnsid = record["transactionID"]
for row in mapper.run(record):
if row:
newrow = [record['dateCreated'], "T" + taskid]
newrow.extend(row)
yield newrow
what, project, old, new = row
task_metrics.set_if_earlier("start", ts)
task_metrics.set_if_later("end", ts)
#%%
def main():
import pandas as pd
from IPython.display import display
if what == "projects":
if project_phid in new:
task_metrics.start(project_phid, ts)
for project in new:
if project not in old:
datapoints.append(
(trnsid, ts, project, None, taskid, what, 1)
)
elif project_phid in old:
task_metrics.end(ts)
for project in old:
if project not in new:
datapoints.append(
(trnsid, ts, project, None, taskid, what, -1)
)
elif what == "columns":
if old:
datapoints.append((trnsid, ts, project, old, taskid, what, -1))
if new:
datapoints.append((trnsid, ts, project, new, taskid, what, 1))
task_metrics.start(new, ts)
elif what == "status":
if (old == "open" or old == "stalled") and not (
new == "open" or new == "stalled"
):
if task_metrics.last_column:
datapoints.append(
(
trnsid,
ts,
project_phid,
task_metrics.last_column,
taskid,
what,
-1,
)
)
task_metrics.end(ts)
elif new == "open" and task_metrics.last_column:
datapoints.append(
(
trnsid,
ts,
project_phid,
task_metrics.last_column,
taskid,
what,
1,
)
)
event = {
"ts": ts,
"task": taskid,
"user": record["authorPHID"],
"event": what,
"project": project,
"old": old,
"new": new,
}
if insert_stmt is None:
insert_stmt = sqlite_insert_statement("events", event, replace=True)
con.execute(insert_stmt, event)
con.commit()
return datapoints
thisdir = pathlib.Path(__file__).parent
mock_default = thisdir / ".." / "test" / "transactions.json"
all_tables = ["columns", "events", "column_metrics", "task_metrics", "phobjects", "all"]
@click.command()
@click.option(
"--project",
default="PHID-PROJ-uier7rukzszoewbhj7ja",
metavar="PHID",
help="PHID of the project to fetch.",
)
@click.option(
"--mock",
help="Skip calling the Phabricator API and instead use test data from a file.",
is_flag=False,
flag_value=mock_default,
type=click.Path(exists=True, dir_okay=False),
)
@click.option(
"--db",
nargs=1,
required=False,
default="./metrics.db",
help="Path to sqlite database.",
show_default=True,
type=click.Path(),
)
@click.option(
"--dump",
nargs=1,
help="Dump metrics in the specified format. Supported formats: json, csv.",
required=False,
metavar="FORMAT",
)
@click.option(
"--table",
nargs=1,
help="With --dump, specifies which table to dump. Default=all",
required=False,
metavar="TABLE",
default="all",
type=click.Choice(all_tables, case_sensitive=True),
)
@click.option(
"--cache-columns",
help="Download and cache column names for a project.",
is_flag=True,
)
def main(project, mock, db, dump, table, cache_columns):
"""Gather workboard metrics from Phabricator"""
console = Console(stderr=True)
db_path = db
table = [t for t in all_tables if t != "all"] if table == "all" else [table]
if dump == "csv":
for t in table:
console.log(f"Dumping {t} as csv")
res = subprocess.run(
["sqlite3", "--csv", "--header", db_path, f"SELECT * from {t}"]
)
sys.exit(res.returncode)
phab = Conduit()
# find all train blocker tasks
# project_phid = "PHID-PROJ-q7wvv5i67p7tbg2kuret"
project_phid = project
column_keys = (
"project",
"phid",
"name",
"status",
"proxyPHID",
"dateCreated",
"dateModified",
)
column_names = ",".join(column_keys)
console.log(f"Opening db: {db_path}")
con = sqlite_connect(db_path)
with con:
console.log("Creating db schema.")
# sqlite db schema:
con.executescript(
f"""
CREATE TABLE IF NOT EXISTS column_metrics (trnsid, ts, project phid, column phid, task, type, value);
CREATE TABLE IF NOT EXISTS columns({column_names});
CREATE INDEX IF NOT EXISTS ts_column_value on column_metrics(column, task, ts, value);
CREATE UNIQUE INDEX IF NOT EXISTS trnsid on column_metrics(ts, column, task, value);
CREATE TABLE IF NOT EXISTS task_metrics(task, metric phid, next_metric phid, ts, ts2, duration);
CREATE UNIQUE INDEX IF NOT EXISTS task_metric ON task_metrics(task, metric);
CREATE TABLE IF NOT EXISTS events(ts, task, project phid, user phid, event, old, new);
CREATE UNIQUE INDEX IF NOT EXISTS events_pk on events(ts, task, event);
"""
)
if cache_columns:
console.log(f"Cache columns for project {project_phid}")
placeholders = DataCache.placeholders(len(column_keys))
cur = con.execute("select distinct(column) from column_metrics;")
all_columns = [str(c[0]) for c in cur.fetchall()]
r = phab.project_columns(column_phids=all_columns)
r.fetch_all()
data = r.data
rows = [[str(col[key]) for key in column_keys] for col in data]
# console.log(rows)
cur = con.executemany(
f"REPLACE INTO columns ({column_names}) values ({placeholders})", rows
)
con.commit()
with console.status("[bold green]Processing...") as status:
if mock:
console.log(f"Running with mock data from [bold blue]{mock}")
with io.open(mock) as jsonfile:
transactions = json.load(jsonfile)
transactions = transactions["result"]
else:
console.log(f"finding all tasks for the project [bold blue]{project_phid}")
r = phab.request(
"maniphest.search",
{
"constraints": {"projects": ["release-engineering-team"]},
"limit": "30",
"attachments": {"projects": True, "columns": True},
},
"maniphest.project.task.transactions",
{"project": project_phid},
)
# r.fetch_all()
tasks = r.asdict()
transactions = r.result
console.log("Processing task transactions.")
# now collect all of the formatted transaction details
rows = [row for row in gettransactions(phab, tasks)]
PHObject.resolve_phids(phab)
sts = console.status("[bold green]Processing column_metrics...")
with con, sts:
#%%
pd.options.display.max_columns = None
pd.options.display.max_rows = None
pd.options.display.max_colwidth = 50
pd.options.display.width = 400
datapoints = maptransactions(project_phid, transactions, con, console, phab)
data = pd.DataFrame.from_records(
rows,
columns=["ts", "task", "description", "what", "from", "to"],
index=["ts", "task"],
console.log("Inserting workboard column metrics")
count = 0
total = len(datapoints)
for metric in datapoints:
count += 1
if count % 10 == 0:
sts.update(f"[bold green]Inserting {count} of {total} data points.")
con.execute(
"REPLACE INTO column_metrics (trnsid, ts, project, column, task, type, value) VALUES (?, ?, ?, ?, ?, ?, ?)",
metric,
)
display(data)
console.log(f"Inserted [bold green]{count}[/bold green] column_metrics.")
sts = console.status("[bold green]Processing task_metrics...")
# and collect the task metrics
metric_totals = [row for obj in metrics.values() for row in obj.all()]
with con, sts:
console.log(
f"Inserting [bold green]{len(metric_totals)}[/bold green] task metrics"
)
con.executemany(
"REPLACE INTO task_metrics (task, metric, ts, ts2, duration, next_metric) VALUES (?, ?, ?, ?, ?, ?)",
metric_totals,
)
if dump:
with con:
console.log("Querying metrics")
cur = con.execute(
"""--sql
SELECT
c.name,
m.trnsid,
m.ts,
m.project,
m.column,
m.task,
m.type,
m.value
FROM column_metrics m
JOIN columns c ON m.`column`=c.phid
where m.project=? and m.column != m.project and m.value > 0
order by m.column, ts
""",
[project_phid],
)
column_metrics = [{k: row[k] for k in row.keys()} for row in cur.fetchall()]
cur = con.execute(
"""
SELECT m.*, c.name from task_metrics m
LEFT JOIN columns c on m.metric=c.phid
"""
)
task_metrics = [{k: row[k] for k in row.keys()} for row in cur.fetchall()]
cur = con.execute(
"""
SELECT e.* FROM events e
"""
)
events = [{k: row[k] for k in row.keys()} for row in cur.fetchall()]
cache = DataCache(db)
phobs = PHObject.resolve_phids(phab, cache)
if dump in ("js", "json"):
if dump == "js":
print("function metrics() { return ", end="")
encoder = PHObjectEncoder()
outstr = ""
for output in encoder.iterencode(
{
"columns": column_metrics,
"tasks": task_metrics,
"events": events,
"phids": {key: val.data for key, val in phobs.items()},
}
):
outstr += output
print(outstr, end="")
if dump == "js":
print("; }")
print("")
con.close()
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -10,7 +10,7 @@ from typing import Optional, Union
class PropertyMatcher(object):
""" Usage example:
"""Usage example:
def process_transactions(transactions):
mapper = PropertyMatcher()
......@@ -36,6 +36,7 @@ class PropertyMatcher(object):
# do something with row
"""
def __init__(self):
self.matchers = []
self.patterns = []
......@@ -54,11 +55,12 @@ class PropertyMatcher(object):
self.patterns = []
for arg in args:
pat, val = arg.split("=")
pattern = (pat.split('.'), val)