From 2a62db682745c3a2bcd7caa65bfa034392c52f29 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont Date: Tue, 13 Sep 2016 14:57:26 +0200 Subject: [PATCH] Revert to the pre-qless refactoring version --- README | 24 +++-- TODO | 8 ++ bin/batch | 41 ++++++++ bin/ghlister | 135 +++++++++++++++++++++++++ bin/reset.sh | 9 ++ bin/status | 18 ++++ etc/crontab | 5 + requirements.txt | 7 +- sql/crawler.sql | 106 ++++++++++++++++++++ sql/pimp_db.sql | 36 +++++++ swh/lister/github/db_utils.py | 18 ++++ swh/lister/github/lister.py | 180 +++++++++++++++++++++++++++------- swh/lister/github/models.py | 44 +++++++++ 13 files changed, 582 insertions(+), 49 deletions(-) create mode 100755 bin/batch create mode 100755 bin/ghlister create mode 100644 bin/reset.sh create mode 100755 bin/status create mode 100644 etc/crontab create mode 100644 sql/crawler.sql create mode 100644 sql/pimp_db.sql create mode 100644 swh/lister/github/db_utils.py create mode 100644 swh/lister/github/models.py diff --git a/README b/README index 04baf88..ae7b7ed 100644 --- a/README +++ b/README @@ -18,20 +18,30 @@ along with this program. Dependencies ============ -See requirements.txt +- python3 +- python3-psycopg2 +- python3-requests +- python3-sqlalchemy + Deployment ========== -The github lister can be run standalone by using `python3 -m swh.lister.github.lister`. +1. git clone under $GHLISTER_ROOT (of your choosing) +2. mkdir ~/.config/swh/ ~/.cache/swh/lister-github/ +3. edit $GHLISTER_ROOT/etc/crontab and customize GHLISTER_ROOT +4. crontab $GHLISTER_ROOT/etc/crontab +5. create configuration file ~/.config/swh/lister-github.ini Sample configuration file ------------------------- -cat ~/.config/swh/lister/github.ini +cat ~/.config/swh/lister-github.ini [main] - storage_class = local_storage - storage_args = dbname=softwareheritage-dev, /srv/softwareheritage/objects - queue_file = ~/.cache/swh/lister-github/queue.pickle - credentials = olasd:olasd_github_token, zacchiro:zacchiro_github_token \ No newline at end of file + db_url = postgres:///github + # see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls + cache_dir = /home/zack/.cache/swh/lister-github + log_dir = /home/zack/.cache/swh/lister-github + username = foobar # github username + password = quux # github password diff --git a/TODO b/TODO index c10c02f..46a84f3 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,13 @@ # -*- mode: org -*- +* TODO SQL: rework repo_history/repo_creations to use last_seen +* TODO cache dir: split json data from other HTTP info + for easier further processing of additional API data + +* TODO cache dir: split in subdirs + to avoid hitting too hard on the filesystem due to the large amount of files + (200k+) + * TODO network-level traceback Traceback (most recent call last): File "/usr/lib/python3/dist-packages/urllib3/response.py", line 186, in read diff --git a/bin/batch b/bin/batch new file mode 100755 index 0000000..9796387 --- /dev/null +++ b/bin/batch @@ -0,0 +1,41 @@ +#!/bin/bash + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +export https_proxy="127.0.0.1:8118" # use Tor +export PYTHONPATH=`pwd` + +DBNAME=github +DBCONN="-p 5433" + +psql="psql $DBCONN --no-psqlrc --pset t --pset format=unaligned ${DBNAME}" + +BATCH_NO="$1" +shift +if [ -z "$BATCH_NO" ] ; then + echo "Usage: batch MILLION_NO [ MIN_ID | continue ]" + exit 2 +fi + +MIN_ID="$1" +shift + +min_id=$[ ($BATCH_NO - 1) * 1000000 + 1 ] +max_id=$[ $BATCH_NO * 1000000 ] + +# allow min_id override on the command line +if [ "$MIN_ID" = "continue" ] ; then + last_id=$(echo "select max(id) from repos where ${min_id} <= id and id <= ${max_id}" | $psql) + if [ "$last_id" -eq "$last_id" ] 2> /dev/null ; then # is an integer? + echo "Continuing from last known id ${last_id}" + min_id=$last_id + fi +elif [ -n "$MIN_ID" ] ; then + min_id=$[ $MIN_ID > $min_id ? $MIN_ID : $min_id ] +fi + +cmd="bin/ghlister list ${min_id}-${max_id}" +echo Running $cmd ... +$cmd diff --git a/bin/ghlister b/bin/ghlister new file mode 100755 index 0000000..da4c5d9 --- /dev/null +++ b/bin/ghlister @@ -0,0 +1,135 @@ +#!/usr/bin/python3 + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import argparse +import configparser +import logging +import os +import sys + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from swh.lister.github import lister, models +from swh.lister.github.db_utils import session_scope + + +DEFAULT_CONF = { + 'cache_dir': './cache', + 'log_dir': './log', + 'cache_json': 'False', +} + + +def db_connect(db_url): + engine = create_engine(db_url) + session = sessionmaker(bind=engine) + + return (engine, session) + + +def int_interval(s): + """parse an "N-M" string as an interval. + + Return an (N,M) int (or None) pair + + """ + def not_an_interval(): + raise argparse.ArgumentTypeError('not an interval: ' + s) + + def parse_int(s): + if s: + return int(s) + else: + return None + + if '-' not in s: + not_an_interval() + parts = s.split('-') + if len(parts) > 2: + not_an_interval() + return tuple([parse_int(p) for p in parts]) + + +def parse_args(): + cli = argparse.ArgumentParser( + description='list GitHub repositories and load them into a DB') + cli.add_argument('--db-url', '-d', metavar='SQLALCHEMY_URL', + help='SQLAlchemy DB URL (override conffile); see ' + '') # NOQA + cli.add_argument('--verbose', '-v', action='store_true', + help='be verbose') + + subcli = cli.add_subparsers(dest='action') + subcli.add_parser('createdb', help='initialize DB') + subcli.add_parser('dropdb', help='destroy DB') + + list_cli = subcli.add_parser('list', help='list repositories') + list_cli.add_argument('interval', + type=int_interval, + help='interval of repository IDs to list, ' + 'in N-M format; either N or M can be omitted.') + + list_cli = subcli.add_parser('catchup', + help='catchup with new repos since last time') + + args = cli.parse_args() + + if not args.action: + cli.error('no action given') + + return args + + +def read_conf(args): + config = configparser.ConfigParser(defaults=DEFAULT_CONF) + config.read(os.path.expanduser('~/.config/swh/lister-github.ini')) + + conf = config._sections['main'] + + # overrides + if args.db_url: + conf['db_url'] = args.db_url + + # typing + if 'cache_json' in conf and conf['cache_json'].lower() == 'true': + conf['cache_json'] = True + else: + conf['cache_json'] = False + + return conf + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) # XXX + + args = parse_args() + conf = read_conf(args) + + db_engine, mk_session = db_connect(conf['db_url']) + + if args.action == 'createdb': + models.SQLBase.metadata.create_all(db_engine) + elif args.action == 'dropdb': + models.SQLBase.metadata.drop_all(db_engine) + elif args.action == 'list': + lister.fetch(conf, + mk_session, + min_id=args.interval[0], + max_id=args.interval[1]) + elif args.action == 'catchup': + with session_scope(mk_session) as db_session: + last_known_id = lister.last_repo_id(db_session) + if last_known_id is not None: + logging.info('catching up from last known repo id: %d' % + last_known_id) + lister.fetch(conf, + mk_session, + min_id=last_known_id + 1, + max_id=None) + else: + logging.error('Cannot catchup: no last known id found. Abort.') + sys.exit(2) diff --git a/bin/reset.sh b/bin/reset.sh new file mode 100644 index 0000000..f5bf69b --- /dev/null +++ b/bin/reset.sh @@ -0,0 +1,9 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +export PYTHONPATH=`pwd` +dropdb github +createdb github +bin/ghlister createdb +rm cache/* diff --git a/bin/status b/bin/status new file mode 100755 index 0000000..8a3105f --- /dev/null +++ b/bin/status @@ -0,0 +1,18 @@ +#!/bin/bash + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# intended usage: watch -n 60 bin/status + +DBNAME="github" +DBCONN="-p 5433" + +psql="psql $DBCONN --no-psqlrc" + +ps auxw | grep bin/batch +echo "SELECT COUNT(*), MAX(id) FROM repos" | $psql "$DBNAME" +echo "\\l+ ${DBNAME}" | $psql "$DBNAME" +du -sh cache/ +zgrep -i --color=auto "'X-RateLimit-Remaining'" cache/$(ls -t cache/ | head -n 4 | tail -n 1) diff --git a/etc/crontab b/etc/crontab new file mode 100644 index 0000000..4ebb2d1 --- /dev/null +++ b/etc/crontab @@ -0,0 +1,5 @@ +SHELL=/bin/bash +GHLISTER_ROOT=/home/zack/src/swh-lister-github + +# m h dom mon dow command + 0 8 * * * PYTHONPATH=$GHLISTER_ROOT $GHLISTER_ROOT/bin/ghlister catchup >> ~/.cache/swh/lister-github/$(date +\%Y\%m\%d).log 2>&1 diff --git a/requirements.txt b/requirements.txt index 66b87b7..833d5e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,2 @@ -python-dateutil -qless-py requests -redis - -swh.core -swh.storage +SQLAlchemy diff --git a/sql/crawler.sql b/sql/crawler.sql new file mode 100644 index 0000000..0a30b54 --- /dev/null +++ b/sql/crawler.sql @@ -0,0 +1,106 @@ + +-- -- return a random sample of repos, containing %percent repositories +-- create or replace function repos_random_sample_array(percent real) +-- returns setof repos as $$ +-- declare +-- samples integer; +-- repo repos%rowtype; +-- ids integer[]; +-- begin +-- select floor(count(*) / 100 * percent) into samples from repos; +-- ids := array(select id from repos order by id); +-- for i in 1 .. samples loop +-- select * into repo +-- from repos +-- where id = ids[round(random() * samples)]; +-- return next repo; +-- end loop; +-- return; +-- end +-- $$ +-- language plpgsql; + +-- return a random sample of repositories +create or replace function repos_random_sample(percent real) +returns setof repos as $$ +declare + sample_size integer; +begin + select floor(count(*) / 100 * percent) into sample_size from repos; + return query + select * from repos + order by random() + limit sample_size; + return; +end +$$ +language plpgsql; + +-- -- return a random sample of repositories +-- create or replace function random_sample_sequence(percent real) +-- returns setof repos as $$ +-- declare +-- sample_size integer; +-- seq_size integer; +-- min_id integer; +-- max_id integer; +-- begin +-- select floor(count(*) / 100 * percent) into sample_size from repos; +-- select min(id) into min_id from repos; +-- select max(id) into max_id from repos; +-- seq_size := sample_size * 3; -- IDs are sparse, generate a larger sequence +-- -- to have enough of them +-- return query +-- select * from repos +-- where id in +-- (select floor(random() * (max_id - min_id + 1))::integer +-- + min_id +-- from generate_series(1, seq_size)) +-- order by random() limit sample_size; +-- return; +-- end +-- $$ +-- language plpgsql; + +create or replace function repos_well_known() +returns setof repos as $$ +begin + return query + select * from repos + where full_name like 'apache/%' + or full_name like 'eclipse/%' + or full_name like 'mozilla/%' + or full_name = 'torvalds/linux' + or full_name = 'gcc-mirror/gcc'; + return; +end +$$ +language plpgsql; + +create table crawl_history ( + id bigserial primary key, + repo integer references repos(id), + task_id uuid, -- celery task id + date timestamptz not null, + duration interval, + status boolean, + result json, + stdout text, + stderr text +); + +create index on crawl_history (repo); + +create view missing_orig_repos AS + select * + from orig_repos as repos + where not exists + (select 1 from crawl_history as history + where history.repo = repos.id); + +create view missing_fork_repos AS + select * + from fork_repos as repos + where not exists + (select 1 from crawl_history as history + where history.repo = repos.id); diff --git a/sql/pimp_db.sql b/sql/pimp_db.sql new file mode 100644 index 0000000..2cc9cef --- /dev/null +++ b/sql/pimp_db.sql @@ -0,0 +1,36 @@ + +create view orig_repos as + select id, name, full_name, html_url, description, last_seen + from repos + where not fork; + +create view fork_repos as + select id, name, full_name, html_url, description, last_seen + from repos + where fork + +create extension pg_trgm; + +create index ix_trgm_repos_description on + repos using gin (description gin_trgm_ops); + +create index ix_trgm_repos_full_name on + repos using gin (full_name gin_trgm_ops); + +create table repos_history ( + ts timestamp default current_timestamp, + repos integer not null, + fork_repos integer, + orig_repos integer +); + +create view repo_creations as + select today.ts :: date as date, + today.repos - yesterday.repos as repos, + today.fork_repos - yesterday.fork_repos as fork_repos, + today.orig_repos - yesterday.orig_repos as orig_repos + from repos_history today + join repos_history yesterday on + (yesterday.ts = (select max(ts) + from repos_history + where ts < today.ts)); diff --git a/swh/lister/github/db_utils.py b/swh/lister/github/db_utils.py new file mode 100644 index 0000000..0563036 --- /dev/null +++ b/swh/lister/github/db_utils.py @@ -0,0 +1,18 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager + + +@contextmanager +def session_scope(mk_session): + session = mk_session() + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 1bf12f9..9bbde6a 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,53 +1,161 @@ -# Copyright © 2016 The Software Heritage Developers +# Copyright (C) 2015 Stefano Zacchiroli # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# see https://developer.github.com/v3/ for GitHub API documentation + +import gzip +import logging import os - +import re import requests +import time -from swh.core.config import load_named_config -from swh.storage import get_storage +from pprint import pformat +from sqlalchemy import func -from . import req_queue, processors, cache - -DEFAULT_CONFIG = { - 'queue_url': ('str', 'redis://localhost'), - 'cache_url': ('str', 'redis://localhost'), - 'storage_class': ('str', 'local_storage'), - 'storage_args': ('list[str]', ['dbname=softwareheritage-dev', - '/srv/softwareheritage/objects']), - 'credentials': ('list[str]', []), - -} -CONFIG_NAME = 'lister/github.ini' +from swh.lister.github.db_utils import session_scope +from swh.lister.github.models import Repository -def run_from_queue(): - config = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) +GH_API_URL = 'https://api.github.com' +MAX_RETRIES = 7 +MAX_SLEEP = 3600 # 1 hour +CONN_SLEEP = 10 - cache.init_cache(config['cache_url']) +REPO_API_URL_RE = re.compile(r'^.*/repositories\?since=(\d+)') - queue_url = os.path.expanduser(config['queue_url']) - credentials = {} - for credential in config['credentials']: - login, token = credential.split(':') - credentials[login] = {'token': token} +def save_http_response(r, cache_dir): + def escape_url_path(p): + return p.replace('/', '__') - queue = req_queue.from_url(queue_url) + fname = os.path.join(cache_dir, + escape_url_path(r.request.path_url) + '.gz') + with gzip.open(fname, 'w') as f: + def emit(s): + f.write(bytes(s, 'UTF-8')) + emit(pformat(r.request.path_url)) + emit('\n#\n') + emit(pformat(r.status_code)) + emit('\n#\n') + emit(pformat(r.headers)) + emit('\n#\n') + emit(pformat(r.json())) - if req_queue.empty(queue): - req_queue.push(queue, {'type': 'repositories', 'url': None}) - session = requests.Session() - storage = get_storage(config['storage_class'], config['storage_args']) +def gh_api_request(path, username=None, password=None, headers={}): + params = {} + if 'Accept' not in headers: # request version 3 of the API + headers['Accept'] = 'application/vnd.github.v3+json' + params['headers'] = headers + if username is not None and password is not None: + params['auth'] = (username, password) - while not req_queue.empty(queue): - processors.process_one_item( - queue, session=session, credentials=credentials, - storage=storage - ) + retries_left = MAX_RETRIES + while retries_left > 0: + logging.debug('sending API request: %s' % path) + try: + r = requests.get(GH_API_URL + path, **params) + except requests.exceptions.ConnectionError: + # network-level connection error, try again + logging.warn('connection error upon %s: sleep for %d seconds' % + (path, CONN_SLEEP)) + time.sleep(CONN_SLEEP) + retries_left -= 1 + continue -if __name__ == '__main__': - run_from_queue() + if r.ok: # all went well, do not retry + break + + # detect throttling + if r.status_code == 403 and \ + int(r.headers['X-RateLimit-Remaining']) == 0: + delay = int(r.headers['X-RateLimit-Reset']) - time.time() + delay = min(delay, MAX_SLEEP) + logging.warn('rate limited upon %s: sleep for %d seconds' % + (path, int(delay))) + time.sleep(delay) + else: # unexpected error, abort + break + + retries_left -= 1 + + if not retries_left: + logging.warn('giving up on %s: max retries exceed' % path) + + return r + + +def lookup_repo(db_session, repo_id): + return db_session.query(Repository) \ + .filter(Repository.id == repo_id) \ + .first() + + +def last_repo_id(db_session): + t = db_session.query(func.max(Repository.id)) \ + .first() + if t is not None: + return t[0] + # else: return None + + +INJECT_KEYS = ['id', 'name', 'full_name', 'html_url', 'description', 'fork'] + + +def inject_repo(db_session, repo): + logging.debug('injecting repo %d' % repo['id']) + if lookup_repo(db_session, repo['id']): + logging.info('not injecting already present repo %d' % repo['id']) + return + kwargs = {k: repo[k] for k in INJECT_KEYS if k in repo} + sql_repo = Repository(**kwargs) + db_session.add(sql_repo) + + +class FetchError(RuntimeError): + + def __init__(self, response): + self.response = response + + def __str__(self): + return repr(self.response) + + +def fetch(conf, mk_session, min_id=None, max_id=None): + if min_id is None: + min_id = 1 + if max_id is None: + max_id = float('inf') + next_id = min_id + + cred = {} + for key in ['username', 'password']: + if key in conf: + cred[key] = conf[key] + + while min_id <= next_id <= max_id: + logging.info('listing repos starting at %d' % next_id) + since = next_id - 1 # github API ?since=... is '>' strict, not '>=' + repos_res = gh_api_request('/repositories?since=%d' % since, **cred) + + if 'cache_dir' in conf and conf['cache_json']: + save_http_response(repos_res, conf['cache_dir']) + if not repos_res.ok: + raise FetchError(repos_res) + + repos = repos_res.json() + for repo in repos: + if repo['id'] > max_id: # do not overstep max_id + break + with session_scope(mk_session) as db_session: + inject_repo(db_session, repo) + + if 'next' in repos_res.links: + next_url = repos_res.links['next']['url'] + m = REPO_API_URL_RE.match(next_url) # parse next_id + next_id = int(m.group(1)) + 1 + else: + logging.info('stopping after id %d, no next link found' % next_id) + break diff --git a/swh/lister/github/models.py b/swh/lister/github/models.py new file mode 100644 index 0000000..c4cb027 --- /dev/null +++ b/swh/lister/github/models.py @@ -0,0 +1,44 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime + +from sqlalchemy import Column +from sqlalchemy import Boolean, DateTime, Integer, String +from sqlalchemy.ext.declarative import declarative_base + + +SQLBase = declarative_base() + + +class Repository(SQLBase): + + """a GitHub repository""" + + __tablename__ = 'repos' + + id = Column(Integer, primary_key=True) + + name = Column(String, index=True) + full_name = Column(String, index=True) + html_url = Column(String) + description = Column(String) + fork = Column(Boolean, index=True) + + last_seen = Column(DateTime, nullable=False) + + def __init__(self, id, name=None, full_name=None, html_url=None, + description=None, fork=None): + self.id = id + self.last_seen = datetime.now() + if name is not None: + self.name = name + if full_name is not None: + self.full_name = full_name + if html_url is not None: + self.html_url = html_url + if description is not None: + self.description = description + if fork is not None: + self.fork = fork