🚧 This instance is under construction; expect occasional downtime. Runners available in /repos. Questions? Ask in #wikimedia-gitlab on libera.chat, or under GitLab on Phabricator.

Commit 7db82a66 authored by 20after4's avatar 20after4
Browse files

Fix columns table to include project_phid, move mapper to separate module.

Change-Id: I66b9a6df5358a0698a0c56cd6467da0d9a968d35
parent 95925040
......@@ -27,269 +27,7 @@ from ddd.phobjects import (
sqlite_connect,
sqlite_insert_statement,
)
metrics = {}
class ObjectMetrics(object):
"""Used to track how long a task exists in each workboard column"""
phid: PHID
metrics: dict = {}
def __init__(self, id):
self.id = id
self.metrics = {}
self.started = None
self.last_column = None
def metric(self, metric_name: str):
metric_name = str(metric_name)
if metric_name not in self.metrics:
self.metrics[metric_name] = {"start": 0, "end": 0}
return self.metrics[metric_name]
def start(self, metric: str, value: int):
if self.started:
if metric == self.started:
return
record = self.metric(self.started)
record["next"] = metric
self.end(value)
record = self.metric(metric)
record["start"] = value
self.set_if_earlier("start", value)
self.started = metric
self.last_column = metric
def end(self, value: int):
if self.started is not None:
metric = self.started
record = self.metric(metric)
record["end"] = value
record["duration"] = value - record["start"]
self.set_if_later("end", value)
self.started = None
def set_if_later(self, metric: str, value: int):
record = self.metric(metric)
if value > record["end"]:
record["end"] = value
if record["start"] == 0:
record["start"] = record["end"]
def set_if_earlier(self, metric: str, value: int):
record = self.metric(metric)
if value < record["start"]:
record["start"] = value
if record["end"] == 0:
record["end"] = record["start"]
def duration(self, metric):
"""Calculate the duration of the metric"""
record = self.metric(metric)
start = record["start"] if "start" in record else self.metrics["start"]
end = record["end"] if "end" in record else self.metrics["end"]
return end - start
def lead(self):
return self.metrics["start"] - self.metrics["start"]
def all(self):
total = 0
for metric, record in self.metrics.items():
if isinstance(record, dict) and "duration" in record:
total += record["duration"]
next_metric = record["next"] if "next" in record else None
yield [
self.id,
metric,
record["start"],
record["end"],
record["duration"],
next_metric,
]
def MetricsFor(id) -> ObjectMetrics:
global metrics
if id not in metrics:
metrics[id] = ObjectMetrics(id)
return metrics[id]
def maptransactions(
project_phid,
transactions: Mapping,
con: sqlite3.Connection,
console: Console,
phab: Conduit,
):
mapper = PropertyMatcher()
# functions decorated with @ttype will take a transaction object and distill
# it down to just the information we care about.
# add new functions to match other transaction types.
# ids = [id for id in tasks.keys()]
@mapper("transactionType=core:edge", "meta.edge:type=41")
def edge(t):
"""
edge transactions point to related objects such as subtasks,
mentioned tasks and project tags.
The type of relationship is specified by an integer in
meta['edge:type']. Edge type constants are defined in
the enum `ddd.phobjects.EdgeType`
"""
oldValue = [p for p in t["oldValue"]]
newValue = [p for p in t["newValue"]]
move = ["projects", "global", oldValue, newValue]
return [move]
@mapper("transactionType=core:edge", "meta.edge:type=51")
def mention_this(t):
return [
["mention-this", None, t["taskID"], PHIDRef(taskid)]
for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=52")
def mention_task(t):
return [
["mention", None, t["taskID"], PHIDRef(taskid)] for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=3")
def subtask(t):
removed = [p for p in t["oldValue"] if p not in t["newValue"]]
added = [p for p in t["newValue"] if p not in t["oldValue"]]
res = []
for r in removed:
res.append(["removed_subtask", None, t["taskID"], PHIDRef(r)])
for a in added:
res.append(["added_subtask", None, t["taskID"], PHIDRef(a)])
return res
# @mapper("transactionType=unblock")
def unblock(t):
pass
@mapper("transactionType=status")
def status(t):
return [("status", "global", t["oldValue"], t["newValue"])]
# @mapper("transactionType=core:create")
def create(t):
return [("status", "global", None, "open")]
@mapper("transactionType=core:columns")
def columns(t):
newv = t["newValue"]
res = []
for obj in newv:
# if obj["boardPHID"] != project_phid:
# continue
tocol = str(obj["columnPHID"])
if obj["fromColumnPHIDs"]:
[fromcol] = obj["fromColumnPHIDs"]
fromcol = str(fromcol)
else:
fromcol = None
res.append(("columns", PHIDRef(obj["boardPHID"]), fromcol, tocol))
return res
metrics = {}
days = 60 * 60 * 24
datapoints = deque()
insert_stmt = None
for taskid, t in transactions.items():
st = sorted(t, key=itemgetter("dateCreated"))
task_metrics = MetricsFor(taskid)
metrics[taskid] = task_metrics
for record in st:
ts = int(record["dateCreated"])
trnsid = record["transactionID"]
for row in mapper.run(record):
what, project, old, new = row
task_metrics.set_if_earlier("start", ts)
task_metrics.set_if_later("end", ts)
if what == "projects":
if project_phid in new:
task_metrics.start(project_phid, ts)
for project in new:
if project not in old:
datapoints.append(
(trnsid, ts, project, None, taskid, what, 1)
)
elif project_phid in old:
task_metrics.end(ts)
for project in old:
if project not in new:
datapoints.append(
(trnsid, ts, project, None, taskid, what, -1)
)
elif what == "columns":
if old:
datapoints.append((trnsid, ts, project, old, taskid, what, -1))
if new:
datapoints.append((trnsid, ts, project, new, taskid, what, 1))
task_metrics.start(new, ts)
elif what == "status":
if (old == "open" or old == "stalled") and not (
new == "open" or new == "stalled"
):
if task_metrics.last_column:
datapoints.append(
(
trnsid,
ts,
project_phid,
task_metrics.last_column,
taskid,
what,
-1,
)
)
task_metrics.end(ts)
elif new == "open" and task_metrics.last_column:
datapoints.append(
(
trnsid,
ts,
project_phid,
task_metrics.last_column,
taskid,
what,
1,
)
)
event = {
"ts": ts,
"task": taskid,
"user": record["authorPHID"],
"event": what,
"project": project,
"old": old,
"new": new,
}
if insert_stmt is None:
insert_stmt = sqlite_insert_statement("events", event, replace=True)
con.execute(insert_stmt, event)
con.commit()
return datapoints
from ddd.boardmetrics_mapper import maptransactions
thisdir = pathlib.Path(__file__).parent
mock_default = thisdir / ".." / "test" / "transactions.json"
......@@ -345,6 +83,7 @@ def main(project, mock, db, dump, table, cache_columns):
"""Gather workboard metrics from Phabricator"""
console = Console(stderr=True)
db_path = db
metrics = {}
table = [t for t in all_tables if t != "all"] if table == "all" else [table]
......@@ -360,16 +99,17 @@ def main(project, mock, db, dump, table, cache_columns):
# project_phid = "PHID-PROJ-q7wvv5i67p7tbg2kuret"
project_phid = project
column_keys = (
"project",
"phid",
"name",
"status",
"proxyPHID",
"dateCreated",
"dateModified",
)
column_names = ",".join(column_keys)
column_keys = {
"project_name": "project.name",
"column_name": "name",
"project_phid": "project.phid",
"column_phid": "phid",
"status": "status",
"proxyPHID": "proxyPHID",
"dateCreated": "dateCreated",
"dateModified": "dateModified",
}
column_names = ",".join(column_keys.keys())
console.log(f"Opening db: {db_path}")
con = sqlite_connect(db_path)
......@@ -377,16 +117,15 @@ def main(project, mock, db, dump, table, cache_columns):
console.log("Creating db schema.")
# sqlite db schema:
con.executescript(
f"""
schema = f"""
--sql
CREATE TABLE IF NOT EXISTS column_metrics (trnsid, ts, project phid, column phid, task, type, value);
--sql
CREATE TABLE IF NOT EXISTS columns({column_names});
CREATE INDEX IF NOT EXISTS column_ts on column_metrics(column, ts);
--sql
CREATE INDEX IF NOT EXISTS ts_column_value on column_metrics(column, task, ts, value);
CREATE UNIQUE INDEX IF NOT EXISTS ts_column_task on column_metrics(ts, column, task);
--sql
CREATE UNIQUE INDEX IF NOT EXISTS trnsid on column_metrics(ts, column, task, value);
CREATE TABLE IF NOT EXISTS columns({column_names}, PRIMARY KEY(column_phid));
--sql
CREATE TABLE IF NOT EXISTS task_metrics(task, metric phid, next_metric phid, ts, ts2, duration);
--sql
......@@ -417,7 +156,11 @@ def main(project, mock, db, dump, table, cache_columns):
ORDER BY
c.column, c.ts;
"""
)
try:
con.executescript(schema)
except sqlite3.IntegrityError:
console.log("IntegrityError while initializing database schema.")
raise
if cache_columns:
console.log(f"Cache columns for project {project_phid}")
......@@ -426,10 +169,13 @@ def main(project, mock, db, dump, table, cache_columns):
cur = con.execute("select distinct(column) from column_metrics;")
all_columns = [str(c[0]) for c in cur.fetchall()]
r = phab.project_columns(column_phids=all_columns)
r = phab.project_columns(project=project_phid)
r.fetch_all()
data = r.data
rows = [[str(col[key]) for key in column_keys] for col in data]
rows = [
[str(getattr(col, key)) for key in column_keys.values()] for col in data
]
# console.log(rows)
cur = con.executemany(
f"REPLACE INTO columns ({column_names}) values ({placeholders})", rows
......@@ -456,7 +202,9 @@ def main(project, mock, db, dump, table, cache_columns):
sts = console.status("[bold green]Processing column_metrics...")
with con, sts:
datapoints = maptransactions(project_phid, transactions, con, console, phab)
datapoints, metrics = maptransactions(
project_phid, transactions, con, console, phab, metrics
)
console.log("Inserting workboard column metrics")
count = 0
......@@ -490,7 +238,7 @@ def main(project, mock, db, dump, table, cache_columns):
cur = con.execute(
"""--sql
SELECT
c.name,
c.column_name,
m.trnsid,
m.ts,
m.project,
......@@ -499,7 +247,7 @@ def main(project, mock, db, dump, table, cache_columns):
m.type,
m.value
FROM column_metrics m
JOIN columns c ON m.`column`=c.phid
JOIN columns c ON m.`column`=c.column_phid
where m.project=? and m.column != m.project and m.value > 0
order by m.column, ts
""",
......@@ -509,8 +257,8 @@ def main(project, mock, db, dump, table, cache_columns):
cur = con.execute(
"""
SELECT m.*, c.name from task_metrics m
LEFT JOIN columns c on m.metric=c.phid
SELECT m.*, c.column_name from task_metrics m
LEFT JOIN columns c on m.metric=c.column_phid
"""
)
task_metrics = [{k: row[k] for k in row.keys()} for row in cur.fetchall()]
......
from collections import deque
from operator import itemgetter
from ddd.phobjects import PHID, PHIDRef, sqlite_insert_statement
from typing import Mapping
from ddd.data import PropertyMatcher
from ddd.phab import Conduit
import sqlite3
from rich.console import Console
class ObjectMetrics(object):
"""Used to track how long a task exists in each workboard column"""
phid: PHID
metrics: dict = {}
def __init__(self, id):
self.id = id
self.metrics = {}
self.started = None
self.last_column = None
def metric(self, metric_name: str):
metric_name = str(metric_name)
if metric_name not in self.metrics:
self.metrics[metric_name] = {"start": 0, "end": 0}
return self.metrics[metric_name]
def start(self, metric: str, value: int):
if self.started:
if metric == self.started:
return
record = self.metric(self.started)
record["next"] = metric
self.end(value)
record = self.metric(metric)
record["start"] = value
self.set_if_earlier("start", value)
self.started = metric
self.last_column = metric
def end(self, value: int):
if self.started is not None:
metric = self.started
record = self.metric(metric)
record["end"] = value
record["duration"] = value - record["start"]
self.set_if_later("end", value)
self.started = None
def set_if_later(self, metric: str, value: int):
record = self.metric(metric)
if value > record["end"]:
record["end"] = value
if record["start"] == 0:
record["start"] = record["end"]
def set_if_earlier(self, metric: str, value: int):
record = self.metric(metric)
if value < record["start"]:
record["start"] = value
if record["end"] == 0:
record["end"] = record["start"]
def duration(self, metric):
"""Calculate the duration of the metric"""
record = self.metric(metric)
start = record["start"] if "start" in record else self.metrics["start"]
end = record["end"] if "end" in record else self.metrics["end"]
return end - start
def lead(self):
return self.metrics["start"] - self.metrics["start"]
def all(self):
total = 0
for metric, record in self.metrics.items():
if isinstance(record, dict) and "duration" in record:
total += record["duration"]
next_metric = record["next"] if "next" in record else None
yield [
self.id,
metric,
record["start"],
record["end"],
record["duration"],
next_metric,
]
def maptransactions(
project_phid,
transactions: Mapping,
con: sqlite3.Connection,
console: Console,
phab: Conduit,
metrics: dict,
):
mapper = PropertyMatcher()
def MetricsFor(id) -> ObjectMetrics:
if id not in metrics:
metrics[id] = ObjectMetrics(id)
return metrics[id]
# functions decorated with @ttype will take a transaction object and distill
# it down to just the information we care about.
# add new functions to match other transaction types.
# ids = [id for id in tasks.keys()]
@mapper("transactionType=core:edge", "meta.edge:type=41")
def edge(t):
"""
edge transactions point to related objects such as subtasks,
mentioned tasks and project tags.
The type of relationship is specified by an integer in
meta['edge:type']. Edge type constants are defined in
the enum `ddd.phobjects.EdgeType`
"""
oldValue = [p for p in t["oldValue"]]
newValue = [p for p in t["newValue"]]
move = ["projects", "global", oldValue, newValue]
return [move]
@mapper("transactionType=core:edge", "meta.edge:type=51")
def mention_this(t):
return [
["mention-this", None, t["taskID"], PHIDRef(taskid)]
for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=52")
def mention_task(t):
return [
["mention", None, t["taskID"], PHIDRef(taskid)] for taskid in t["newValue"]
]
@mapper("transactionType=core:edge", "meta.edge:type=3")
def subtask(t):
removed = [p for p in t["oldValue"] if p not in t["newValue"]]
added = [p for p in t["newValue"] if p not in t["oldValue"]]
res = []
for r in removed:
res.append(["removed_subtask", None, t["taskID"], PHIDRef(r)])
for a in added:
res.append(["added_subtask", None, t["taskID"], PHIDRef(a)])
return res
# @mapper("transactionType=unblock")
def unblock(t):
pass
@mapper("transactionType=status")
def status(t):
return [("status", "global", t["oldValue"], t["newValue"])]
# @mapper("transactionType=core:create")
def create(t):
return [("status", "global", None, "open")]
@mapper("transactionType=core:columns")
def columns(t):
newv = t["newValue"]
res = []
for obj in newv:
# if obj["boardPHID"] != project_phid:
# continue
tocol = str(obj["columnPHID"])
if obj["fromColumnPHIDs"]:
[fromcol] = obj["fromColumnPHIDs"]
fromcol = str(fromcol)
else:
fromcol = None
res.append(("columns", PHIDRef(obj["boardPHID"]), fromcol, tocol))
return res
metrics = {}
days = 60 * 60 * 24
datapoints = deque()
insert_stmt = None
for taskid, t in transactions.items():
st = sorted(t, key=itemgetter("dateCreated"))
task_metrics = MetricsFor(taskid)
metrics[taskid] = task_metrics
for record in st:
ts = int(record["dateCreated"])
trnsid = record["transactionID"]
for row in mapper.run(record):
what, project, old, new = row
task_metrics.set_if_earlier("start", ts)
task_metrics.set_if_later("end", ts)
if what == "projects":
if project_phid in new:
task_metrics.start(project_phid, ts)
for project in new:
if project not in old:
datapoints.append(
(trnsid, ts, project, None, taskid, what, 1)
)
elif project_phid in old:
task_metrics.end(ts)
for project in old:
if project not in new:
datapoints.append(
(trnsid, ts, project, None, taskid, what, -1)
)
elif what == "columns":
if old:
datapoints.append((trnsid, ts, project, old, taskid, what, -1))
if new:
datapoints.append((trnsid, ts, project, new, taskid, what, 1))
task_metrics.start(new, ts)