Commit 2eb8cf02 authored by 20after4's avatar 20after4
Browse files

Initial commit of ddd aka data cubed.

parents
# D³
`ddd` or `d³` is a toolkit for accessing APIs and processing data from disperate
systems.
## Status
This tool an supporting libraries are in early stages of experimentation and
development. The APIs are not yet stable and the featureset is not yet decoded
let alone completely implemented. Stay tuned or get involved.
## Currently supported data sources:
* Phabricator's conduit API.
## Coming soon:
* Elastic ELK
* Wikimedia SAL
* Gerrit's rest API
# Usage
The most useful bits of code that can be found in this repo are demonstrated
with the following code examples:
# Example:
```python
from ddd.phab import Conduit
phab = Conduit()
# Call phabricator's meniphest.search api and retrieve all results
r = phab.request('maniphest.search', {'queryKey': "KpRagEN3fCBC",
"limit": "40",
"attachments": {
"projects": True,
"columns": True
}})
# This fetches every page of results, note the API limits a single request to
# fetching at most 100 results (controlled by the limit argument)
# But fetch_all will request each page from the server until all available
# records have been retrieved.
r.fetch_all()
```
import pprint
from enum import Enum
from collections import deque
import sqlite3
con = sqlite3.connect(':memory:')
class DataIterator(object):
""" DataIterator iterates over a list of raw data, returning each record
wrapped in a Data instance.
"""
data = None
def __init__(self, data):
self.data = data.__iter__()
def __iter__(self):
return self
def __next__(self):
return Data(next(self.data))
class Data(object):
data = None
def __init__(self, data):
# print(type(data))
self.data = data
def __getattr__(self, attr):
#print(f"get:{attr}:{self.data[attr]}")
return self.__getitem__(attr)
def __getitem__(self, item):
itemdata = self.data[item]
if isinstance(itemdata, list):
return DataList(itemdata)
elif isinstance(itemdata, dict):
return Data(itemdata)
else:
return itemdata
def __dir__(self):
return self.data.keys()
def __iter__(self):
return DataIterator(self.data)
def __len__(self):
return len(self.data)
def __contains__(self, item):
return item in self.data
def __repr__(self):
return pprint.pformat(self.data, indent=2)
class DataList(Data):
def __getattr__(self, attr):
raise AttributeError()
def __dir__(self):
return dir(self.__dict__)
class Token(Enum):
ATTR = 1
ITEM = 2
class QueryBuilder(object):
def __init__(self, data):
self.data = data
self.query = deque()
def __getitem__(self, key):
self.query.append(Token.ITEM)
self.query.append(key)
return self
def __getattr__(self, key):
self.query.append(Token.ATTR)
self.query.append(key)
return self
import requests
import sys
import json
from pprint import pprint
from ddd.phab import Conduit
phab = Conduit()
r = phab.request('maniphest.search', {'queryKey': "KpRagEN3fCBC",
"limit": "40",
"attachments": {
"projects": True,
"columns": True
}})
r.fetch_all()
ids = [f"T{obj.id}" for obj in r]
print(ids)
ids = []
url = 'https://gerrit.wikimedia.org/r/changes/'
for tid in ids:
query = {'q': f"bug:{tid}"}
res = requests.get(url, params=query)
jsontxt = res.text[4:]
objs = json.loads(jsontxt)
for obj in objs:
print(f"{tid},{obj['change_id']},-{obj['deletions']},+{obj['insertions']}")
if __name__ == "__main__":
pass
from builtins import str
from collections import deque
import json
import os
# todo: remove dependency on requests
import requests
from ddd.data import DataIterator, Data
class Conduit(object):
phab_url = 'https://phabricator.wikimedia.org/api/'
def __init__(self, phab_url: str = None):
if phab_url:
self.phab_url = phab_url
self.conduit_token = self._get_token()
if self.conduit_token is None:
err = "Unable to find a conduit token in ~/.arcrc or environment"
raise ConduitException(self, None, err)
def _get_token(self):
"""
Use the $CONDUIT_TOKEN envvar, fallback to whatever is in ~/.arcrc
"""
token = None
token_path = os.path.expanduser('~/.arcrc')
if os.path.exists(token_path):
with open(token_path) as f:
arcrc = json.load(f)
if (self.phab_url in arcrc['hosts']):
token = arcrc['hosts'][self.phab_url]['token']
return os.environ.get('CONDUIT_TOKEN', token)
def request(self, method: str, args: dict, raw: bool = False):
"""
Helper method to call a phabricator api and return a ConduitResult
which can be used to iterate over all of the resulting records.
"""
data = flatten_for_post(args)
data['api.token'] = self.conduit_token
r = requests.post(f"{self.phab_url}{method}", data=data)
if raw:
return r
return ConduitResult(conduit=self, res=r, method=method, args=args)
class ConduitResult(object):
"""
ConduitResult handles fetching multiple pages of records from the conduit
api so that the results can be treated as a single collection of records.
"""
conduit = None
result = None
method = None
args = None
data = None
cursor = {}
def __init__(self, conduit: Conduit, res: requests.Response,
method: str, args: dict):
self.conduit = conduit
self.method = method
self.args = args
self.handle_result(res)
def retry(self):
pass
def next_page(self):
"""
Load the next page of results from conduit, using the cursor that was
returned by the most recently fetched page to specify the starting
point. This is specified by an "after" argument added to the request.
"""
after = self.cursor.get('after', None)
if after is None:
raise ConduitException(self.conduit, self,
'Cannot fetch pages beyond the last.')
args = self.args
args['after'] = after
res = self.conduit.request(method=self.method, args=args, raw=True)
self.handle_result(res)
def fetch_all(self):
while self.has_more():
self.next_page()
def handle_result(self, res):
"""
Process the result from a conduit call and store the records, along
with a cursor for fetching further pages when the result exceeds the
limit for a single request. The default and maximum limit is 100.
"""
json = res.json()
if json['error_info'] is not None:
raise ConduitException(self.conduit, self, json['error_info'])
self.result = json['result']
if "cursor" in self.result:
self.cursor = self.result['cursor']
if "data" in self.result:
# Modern conduit methods return a result[data] and result{cursor}
if self.data is None:
self.data = deque()
self.data.extend(self.result['data'])
elif self.data is None:
# Older methods just return a result:
self.data = self.result
def has_more(self):
after = self.cursor.get('after', None)
return after is not None
def __iter__(self):
return DataIterator(self.data)
def __getitem__(self, item):
return Data(self.data[item])
def __len__(self):
return len(self.data)
def __contains__(self, item):
return item in self.data
class ConduitException(Exception):
def __init__(self, conduit: Conduit, result: ConduitResult, message: str):
self.conduit = conduit
self.result = result
self.message = message
def PHIDType(phid):
_, phidtype, phidhash = phid.split('-', 3)
return phidtype
def isPHID(value):
return isinstance(value, str) and str.startswith("PHID-")
def flatten_for_post(h, result=None, kk=None):
"""
Since phab expects x-url-encoded form post data (meaning each
individual list element is named). AND because, evidently, requests
can't do this for me, I found a solution via stackoverflow.
See also:
<https://secure.phabricator.com/T12447>
<https://stackoverflow.com/questions/26266664/requests-form-urlencoded-data/36411923>
"""
if result is None:
result = {}
if isinstance(h, str) or isinstance(h, bool):
result[kk] = h
elif isinstance(h, list) or isinstance(h, tuple):
for i, v1 in enumerate(h):
flatten_for_post(v1, result, '%s[%d]' % (kk, i))
elif isinstance(h, dict):
for (k, v) in h.items():
key = k if kk is None else "%s[%s]" % (kk, k)
if isinstance(v, dict):
for i, v1 in v.items():
flatten_for_post(v1, result, '%s[%s]' % (key, i))
else:
flatten_for_post(v, result, key)
return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment