swh.lister.github: Refactor to use swh.storage instead of sqlalchemy

This commit is contained in:
Nicolas Dandrimont 2016-03-09 19:03:35 +01:00
parent f13764ba36
commit 533f6fa1a3
20 changed files with 600 additions and 694 deletions

24
README
View file

@ -18,30 +18,20 @@ along with this program.
Dependencies
============
- python3
- python3-psycopg2
- python3-requests
- python3-sqlalchemy
See requirements.txt
Deployment
==========
1. git clone under $GHLISTER_ROOT (of your choosing)
2. mkdir ~/.config/swh/ ~/.cache/swh/lister-github/
3. edit $GHLISTER_ROOT/etc/crontab and customize GHLISTER_ROOT
4. crontab $GHLISTER_ROOT/etc/crontab
5. create configuration file ~/.config/swh/lister-github.ini
The github lister can be run standalone by using `python3 -m swh.lister.github.lister`.
Sample configuration file
-------------------------
cat ~/.config/swh/lister-github.ini
cat ~/.config/swh/lister/github.ini
[main]
db_url = postgres:///github
# see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
cache_dir = /home/zack/.cache/swh/lister-github
log_dir = /home/zack/.cache/swh/lister-github
username = foobar # github username
password = quux # github password
storage_class = local_storage
storage_args = dbname=softwareheritage-dev, /srv/softwareheritage/objects
queue_file = ~/.cache/swh/lister-github/queue.pickle
credentials = olasd:olasd_github_token, zacchiro:zacchiro_github_token

8
TODO
View file

@ -1,13 +1,5 @@
# -*- mode: org -*-
* TODO SQL: rework repo_history/repo_creations to use last_seen
* TODO cache dir: split json data from other HTTP info
for easier further processing of additional API data
* TODO cache dir: split in subdirs
to avoid hitting too hard on the filesystem due to the large amount of files
(200k+)
* TODO network-level traceback
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/urllib3/response.py", line 186, in read

View file

@ -1,41 +0,0 @@
#!/bin/bash
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
export https_proxy="127.0.0.1:8118" # use Tor
export PYTHONPATH=`pwd`
DBNAME=github
DBCONN="-p 5433"
psql="psql $DBCONN --no-psqlrc --pset t --pset format=unaligned ${DBNAME}"
BATCH_NO="$1"
shift
if [ -z "$BATCH_NO" ] ; then
echo "Usage: batch MILLION_NO [ MIN_ID | continue ]"
exit 2
fi
MIN_ID="$1"
shift
min_id=$[ ($BATCH_NO - 1) * 1000000 + 1 ]
max_id=$[ $BATCH_NO * 1000000 ]
# allow min_id override on the command line
if [ "$MIN_ID" = "continue" ] ; then
last_id=$(echo "select max(id) from repos where ${min_id} <= id and id <= ${max_id}" | $psql)
if [ "$last_id" -eq "$last_id" ] 2> /dev/null ; then # is an integer?
echo "Continuing from last known id ${last_id}"
min_id=$last_id
fi
elif [ -n "$MIN_ID" ] ; then
min_id=$[ $MIN_ID > $min_id ? $MIN_ID : $min_id ]
fi
cmd="bin/ghlister list ${min_id}-${max_id}"
echo Running $cmd ...
$cmd

View file

@ -1,135 +0,0 @@
#!/usr/bin/python3
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import argparse
import configparser
import logging
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from swh.lister.github import lister, models
from swh.lister.github.db_utils import session_scope
DEFAULT_CONF = {
'cache_dir': './cache',
'log_dir': './log',
'cache_json': 'False',
}
def db_connect(db_url):
engine = create_engine(db_url)
session = sessionmaker(bind=engine)
return (engine, session)
def int_interval(s):
"""parse an "N-M" string as an interval.
Return an (N,M) int (or None) pair
"""
def not_an_interval():
raise argparse.ArgumentTypeError('not an interval: ' + s)
def parse_int(s):
if s:
return int(s)
else:
return None
if '-' not in s:
not_an_interval()
parts = s.split('-')
if len(parts) > 2:
not_an_interval()
return tuple([parse_int(p) for p in parts])
def parse_args():
cli = argparse.ArgumentParser(
description='list GitHub repositories and load them into a DB')
cli.add_argument('--db-url', '-d', metavar='SQLALCHEMY_URL',
help='SQLAlchemy DB URL (override conffile); see '
'<http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls>') # NOQA
cli.add_argument('--verbose', '-v', action='store_true',
help='be verbose')
subcli = cli.add_subparsers(dest='action')
subcli.add_parser('createdb', help='initialize DB')
subcli.add_parser('dropdb', help='destroy DB')
list_cli = subcli.add_parser('list', help='list repositories')
list_cli.add_argument('interval',
type=int_interval,
help='interval of repository IDs to list, '
'in N-M format; either N or M can be omitted.')
list_cli = subcli.add_parser('catchup',
help='catchup with new repos since last time')
args = cli.parse_args()
if not args.action:
cli.error('no action given')
return args
def read_conf(args):
config = configparser.ConfigParser(defaults=DEFAULT_CONF)
config.read(os.path.expanduser('~/.config/swh/lister-github.ini'))
conf = config._sections['main']
# overrides
if args.db_url:
conf['db_url'] = args.db_url
# typing
if 'cache_json' in conf and conf['cache_json'].lower() == 'true':
conf['cache_json'] = True
else:
conf['cache_json'] = False
return conf
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) # XXX
args = parse_args()
conf = read_conf(args)
db_engine, mk_session = db_connect(conf['db_url'])
if args.action == 'createdb':
models.SQLBase.metadata.create_all(db_engine)
elif args.action == 'dropdb':
models.SQLBase.metadata.drop_all(db_engine)
elif args.action == 'list':
lister.fetch(conf,
mk_session,
min_id=args.interval[0],
max_id=args.interval[1])
elif args.action == 'catchup':
with session_scope(mk_session) as db_session:
last_known_id = lister.last_repo_id(db_session)
if last_known_id is not None:
logging.info('catching up from last known repo id: %d' %
last_known_id)
lister.fetch(conf,
mk_session,
min_id=last_known_id + 1,
max_id=None)
else:
logging.error('Cannot catchup: no last known id found. Abort.')
sys.exit(2)

View file

@ -1,9 +0,0 @@
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
export PYTHONPATH=`pwd`
dropdb github
createdb github
bin/ghlister createdb
rm cache/*

View file

@ -1,18 +0,0 @@
#!/bin/bash
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# intended usage: watch -n 60 bin/status
DBNAME="github"
DBCONN="-p 5433"
psql="psql $DBCONN --no-psqlrc"
ps auxw | grep bin/batch
echo "SELECT COUNT(*), MAX(id) FROM repos" | $psql "$DBNAME"
echo "\\l+ ${DBNAME}" | $psql "$DBNAME"
du -sh cache/
zgrep -i --color=auto "'X-RateLimit-Remaining'" cache/$(ls -t cache/ | head -n 4 | tail -n 1)

View file

@ -1,5 +0,0 @@
SHELL=/bin/bash
GHLISTER_ROOT=/home/zack/src/swh-lister-github
# m h dom mon dow command
0 8 * * * PYTHONPATH=$GHLISTER_ROOT $GHLISTER_ROOT/bin/ghlister catchup >> ~/.cache/swh/lister-github/$(date +\%Y\%m\%d).log 2>&1

View file

@ -1,2 +1,5 @@
dateutil
requests
SQLAlchemy
swh.core
swh.storage

View file

@ -1,106 +0,0 @@
-- -- return a random sample of repos, containing %percent repositories
-- create or replace function repos_random_sample_array(percent real)
-- returns setof repos as $$
-- declare
-- samples integer;
-- repo repos%rowtype;
-- ids integer[];
-- begin
-- select floor(count(*) / 100 * percent) into samples from repos;
-- ids := array(select id from repos order by id);
-- for i in 1 .. samples loop
-- select * into repo
-- from repos
-- where id = ids[round(random() * samples)];
-- return next repo;
-- end loop;
-- return;
-- end
-- $$
-- language plpgsql;
-- return a random sample of repositories
create or replace function repos_random_sample(percent real)
returns setof repos as $$
declare
sample_size integer;
begin
select floor(count(*) / 100 * percent) into sample_size from repos;
return query
select * from repos
order by random()
limit sample_size;
return;
end
$$
language plpgsql;
-- -- return a random sample of repositories
-- create or replace function random_sample_sequence(percent real)
-- returns setof repos as $$
-- declare
-- sample_size integer;
-- seq_size integer;
-- min_id integer;
-- max_id integer;
-- begin
-- select floor(count(*) / 100 * percent) into sample_size from repos;
-- select min(id) into min_id from repos;
-- select max(id) into max_id from repos;
-- seq_size := sample_size * 3; -- IDs are sparse, generate a larger sequence
-- -- to have enough of them
-- return query
-- select * from repos
-- where id in
-- (select floor(random() * (max_id - min_id + 1))::integer
-- + min_id
-- from generate_series(1, seq_size))
-- order by random() limit sample_size;
-- return;
-- end
-- $$
-- language plpgsql;
create or replace function repos_well_known()
returns setof repos as $$
begin
return query
select * from repos
where full_name like 'apache/%'
or full_name like 'eclipse/%'
or full_name like 'mozilla/%'
or full_name = 'torvalds/linux'
or full_name = 'gcc-mirror/gcc';
return;
end
$$
language plpgsql;
create table crawl_history (
id bigserial primary key,
repo integer references repos(id),
task_id uuid, -- celery task id
date timestamptz not null,
duration interval,
status boolean,
result json,
stdout text,
stderr text
);
create index on crawl_history (repo);
create view missing_orig_repos AS
select *
from orig_repos as repos
where not exists
(select 1 from crawl_history as history
where history.repo = repos.id);
create view missing_fork_repos AS
select *
from fork_repos as repos
where not exists
(select 1 from crawl_history as history
where history.repo = repos.id);

View file

@ -1,36 +0,0 @@
create view orig_repos as
select id, name, full_name, html_url, description, last_seen
from repos
where not fork;
create view fork_repos as
select id, name, full_name, html_url, description, last_seen
from repos
where fork
create extension pg_trgm;
create index ix_trgm_repos_description on
repos using gin (description gin_trgm_ops);
create index ix_trgm_repos_full_name on
repos using gin (full_name gin_trgm_ops);
create table repos_history (
ts timestamp default current_timestamp,
repos integer not null,
fork_repos integer,
orig_repos integer
);
create view repo_creations as
select today.ts :: date as date,
today.repos - yesterday.repos as repos,
today.fork_repos - yesterday.fork_repos as fork_repos,
today.orig_repos - yesterday.orig_repos as orig_repos
from repos_history today
join repos_history yesterday on
(yesterday.ts = (select max(ts)
from repos_history
where ts < today.ts));

View file

@ -0,0 +1,27 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
GITHUB_USER_UUID_CACHE = {}
GITHUB_REPO_UUID_CACHE = {}
def get_user(id):
"""Get the cache value for user `id`"""
return GITHUB_USER_UUID_CACHE.get(id)
def set_user(id, uuid):
"""Set the cache value for user `id`"""
GITHUB_USER_UUID_CACHE[id] = uuid
def get_repo(id):
"""Get the cache value for repo `id`"""
return GITHUB_REPO_UUID_CACHE.get(id)
def set_repo(id, uuid):
"""Set the cache value for repo `id`"""
GITHUB_REPO_UUID_CACHE[id] = uuid

View file

@ -0,0 +1,94 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
from email.utils import format_datetime
from dateutil.parser import parse as parse_datetime
from . import cache, storage_utils
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
def updated_at_to_last_modified(updated_at):
if not updated_at:
return None
dt = parse_datetime(updated_at).astimezone(datetime.timezone.utc)
return format_datetime(dt, usegmt=True)
def repository_to_entity(orig_entity, repo):
"""Convert a repository to an entity"""
entity = copy.deepcopy(orig_entity)
owner_uuid = cache.get_user(repo['owner']['id'])
if not owner_uuid:
raise ValueError("Owner %s (id=%d) not in cache" % (
repo['owner']['login'], repo['owner']['id']))
entity['parent'] = owner_uuid
entity['name'] = repo['full_name']
entity['type'] = 'project'
entity['description'] = repo['description']
if 'homepage' in repo:
entity['homepage'] = repo['homepage']
entity['active'] = True
entity['generated'] = True
entity['lister_metadata']['lister'] = storage_utils.GITHUB_LISTER_UUID
entity['lister_metadata']['type'] = 'repository'
entity['lister_metadata']['id'] = repo['id']
entity['lister_metadata']['fork'] = repo['fork']
if 'updated_at' in repo:
entity['lister_metadata']['updated_at'] = repo['updated_at']
entity['validity'] = [utcnow()]
return entity
def user_to_entity(orig_entity, user):
"""Convert a GitHub user toan entity"""
entity = copy.deepcopy(orig_entity)
if user['type'] == 'User':
parent = storage_utils.GITHUB_USERS_UUID
type = 'person'
elif user['type'] == 'Organization':
parent = storage_utils.GITHUB_ORGS_UUID
type = 'group_of_persons'
else:
raise ValueError("Unknown GitHub user type %s" % user['type'])
entity['parent'] = parent
if 'name' in user:
entity['name'] = user['name']
if not entity.get('name'):
entity['name'] = user['login']
entity['type'] = type
entity['active'] = True
entity['generated'] = True
entity['lister_metadata']['lister'] = storage_utils.GITHUB_LISTER_UUID
entity['lister_metadata']['type'] = 'user'
entity['lister_metadata']['id'] = user['id']
entity['lister_metadata']['login'] = user['login']
if 'updated_at' in user:
entity['lister_metadata']['updated_at'] = user['updated_at']
entity['validity'] = [datetime.datetime.now()]
return entity

View file

@ -1,18 +0,0 @@
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from contextlib import contextmanager
@contextmanager
def session_scope(mk_session):
session = mk_session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()

View file

@ -1,111 +0,0 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
import pickle
def list_imported_repos(swh_db):
"""List all the repositories that have been successfully imported in Software
Heritage.
"""
query = """
select o.url
from origin o
left join fetch_history fh
on o.id = fh.origin
where
fh.status = true and
o.url ~~ 'https://github.com/%'
"""
cur = swh_db.cursor()
cur.execute(query)
res = cur.fetchall()
cur.close()
return set('/'.join(repo.rsplit('/', 2)[-2:]) for (repo,) in res)
def list_fetched_repos(ghlister_db):
"""List all the repositories that have been successfully fetched from GitHub.
"""
query = """
select r.full_name
from crawl_history ch
left join repos r
on ch.repo = r.id
where
ch.status = true and
r.fork = false
"""
cur = ghlister_db.cursor()
cur.execute(query)
res = cur.fetchall()
cur.close()
return set(repo for (repo,) in res)
def list_missing_repos():
"""List all the repositories that have not yet been imported successfully."""
swh_db = psycopg2.connect('service=softwareheritage')
imported_repos = list_imported_repos(swh_db)
swh_db.close()
ghlister_db = psycopg2.connect('service=lister-github')
fetched_repos = list_fetched_repos(ghlister_db)
ghlister_db.close()
return fetched_repos - imported_repos
def generate_tasks(checkpoint_file='repos', checkpoint_every=100000):
"""Generate the Celery tasks to fetch all the missing repositories.
Checkpoint the missing repositories every checkpoint_every tasks sent, in a
pickle file called checkpoint_file.
If the checkpoint file exists, we do not call the database again but load
from the file.
"""
import swh.loader.git.tasks
from swh.core.worker import app # flake8: noqa for side effects
def checkpoint_repos(repos, checkpoint=checkpoint_file):
tmp = '.%s.tmp' % checkpoint
with open(tmp, 'wb') as f:
pickle.dump(repos, f)
os.rename(tmp, checkpoint)
def fetch_checkpoint_repos(checkpoint=checkpoint_file):
with open(checkpoint, 'rb') as f:
return pickle.load(f)
repos = set()
if not os.path.exists(checkpoint_file):
repos = list_missing_repos()
checkpoint_repos(repos)
else:
repos = fetch_checkpoint_repos()
task = app.tasks['swh.loader.git.tasks.LoadGitHubRepository']
ctr = 0
while True:
try:
repo = repos.pop()
except KeyError:
break
task.delay(repo)
ctr += 1
if ctr >= checkpoint_every:
ctr = 0
checkpoint_repos(repos)
os.unlink(checkpoint)

View file

@ -0,0 +1,137 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# see https://developer.github.com/v3/ for the GitHub API documentation
import time
from urllib.parse import urljoin
import requests
GITHUB_API_BASE = 'https://api.github.com/'
def github_api_request(url, last_modified=None, etag=None, session=None,
credentials=None):
"""Make a request to the GitHub API at 'url'.
Args:
url: the URL of the GitHub API endpoint to request
last_modified: the last time that URL was requested
etag: the etag for the last answer at this URL (overrides
last_modified)
session: a requests session
credentials: a list of dicts for GitHub credentials with keys:
login: the GitHub login for the credential
token: the API token for the credential
x_ratelimit_*: the rate-limit info for the given credential
Returns:
a dict with the following keys:
credential_used: the login for the credential used
x_ratelimit_*: GitHub rate-limiting information
"""
print("Requesting url %s" % url)
if not session:
session = requests
headers = {
'Accept': 'application/vnd.github.v3+json',
}
if etag:
headers['If-None-Match'] = etag
else:
if last_modified:
headers['If-Modified-Since'] = last_modified
if not credentials:
credentials = {None: {}}
reply = None
ret = {}
for login, creds in credentials.items():
# Handle rate-limiting
remaining = creds.get('x_ratelimit_remaining')
reset = creds.get('x_ratelimit_reset')
if remaining == 0 and reset and reset > time.time():
continue
kwargs = {}
if login and creds['token']:
kwargs['auth'] = (login, creds['token'])
reply = session.get(url, headers=headers, **kwargs)
ratelimit = {
key.lower().replace('-', '_'): int(value)
for key, value in reply.headers.items()
if key.lower().startswith('x-ratelimit')
}
ret.update(ratelimit)
creds.update(ratelimit)
if not(reply.status_code == 403 and
ratelimit.get('x_ratelimit_remaining') == 0):
# Request successful, let's get out of here
break
else:
# we broke out of the loop
raise ValueError('All out of credentials', credentials)
etag = reply.headers.get('ETag')
if etag and etag.startswith(('w/', 'W/')):
# Strip down reference to "weak" etags
etag = etag[2:]
ret.update({
'url': url,
'code': reply.status_code,
'data': reply.json() if reply.status_code != 304 else None,
'etag': etag,
'last_modified': reply.headers.get('Last-Modified'),
'links': reply.links,
'login': login,
})
return ret
def repositories(since=0, url=None, session=None, credentials=None):
"""Request the list of public repositories with id greater than `since`"""
if not url:
url = urljoin(GITHUB_API_BASE, 'repositories?since=%s' % since)
req = github_api_request(url, session=session, credentials=credentials)
return req
def repository(id, session=None, credentials=None, last_modified=None):
"""Request the information on the repository with the given id"""
url = urljoin(GITHUB_API_BASE, 'repositories/%d' % id)
req = github_api_request(url, session=session, credentials=credentials,
last_modified=last_modified)
return req
def forks(id, page, session=None, credentials=None):
"""Request the information on the repository with the given id"""
url = urljoin(GITHUB_API_BASE,
'repositories/%d/forks?sort=oldest&page=%d' % (id, page))
req = github_api_request(url, session=session, credentials=credentials)
return req
def user(id, session=None, credentials=None, last_modified=None):
"""Request the information on the user with the given id"""
url = urljoin(GITHUB_API_BASE, 'user/%d' % id)
req = github_api_request(url, session=session, credentials=credentials,
last_modified=last_modified)
return req

View file

@ -1,161 +1,52 @@
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# see https://developer.github.com/v3/ for GitHub API documentation
import gzip
import logging
import os
import re
import requests
import time
from pprint import pformat
from sqlalchemy import func
from swh.core.config import load_named_config, prepare_folders
from swh.storage import get_storage
from swh.lister.github.db_utils import session_scope
from swh.lister.github.models import Repository
from . import req_queue, processors
DEFAULT_CONFIG = {
'queue_file': ('str', '~/.cache/swh/lister-github/queue.pickle'),
'storage_class': ('str', 'local_storage'),
'storage_args': ('list[str]', ['dbname=softwareheritage-dev',
'/srv/softwareheritage/objects']),
'credentials': ('list[str]', []),
}
CONFIG_NAME = 'lister/github.ini'
GH_API_URL = 'https://api.github.com'
MAX_RETRIES = 7
MAX_SLEEP = 3600 # 1 hour
CONN_SLEEP = 10
def run_from_queue():
config = load_named_config(CONFIG_NAME, DEFAULT_CONFIG)
REPO_API_URL_RE = re.compile(r'^.*/repositories\?since=(\d+)')
queue_file = os.path.expanduser(config['queue_file'])
prepare_folders(os.path.dirname(queue_file))
credentials = {}
for credential in config['credentials']:
login, token = credential.split(':')
credentials[login] = {'token': token}
def save_http_response(r, cache_dir):
def escape_url_path(p):
return p.replace('/', '__')
queue = req_queue.restore_from_file(queue_file)
fname = os.path.join(cache_dir,
escape_url_path(r.request.path_url) + '.gz')
with gzip.open(fname, 'w') as f:
def emit(s):
f.write(bytes(s, 'UTF-8'))
emit(pformat(r.request.path_url))
emit('\n#\n')
emit(pformat(r.status_code))
emit('\n#\n')
emit(pformat(r.headers))
emit('\n#\n')
emit(pformat(r.json()))
if req_queue.empty(queue):
req_queue.push(queue, {'type': 'repositories', 'url': None})
session = requests.Session()
storage = get_storage(config['storage_class'], config['storage_args'])
def gh_api_request(path, username=None, password=None, headers={}):
params = {}
if 'Accept' not in headers: # request version 3 of the API
headers['Accept'] = 'application/vnd.github.v3+json'
params['headers'] = headers
if username is not None and password is not None:
params['auth'] = (username, password)
try:
while not req_queue.empty(queue):
processors.process_one_item(
queue, session=session, credentials=credentials,
storage=storage
)
retries_left = MAX_RETRIES
while retries_left > 0:
logging.debug('sending API request: %s' % path)
try:
r = requests.get(GH_API_URL + path, **params)
except requests.exceptions.ConnectionError:
# network-level connection error, try again
logging.warn('connection error upon %s: sleep for %d seconds' %
(path, CONN_SLEEP))
time.sleep(CONN_SLEEP)
retries_left -= 1
continue
if r.ok: # all went well, do not retry
break
# detect throttling
if r.status_code == 403 and \
int(r.headers['X-RateLimit-Remaining']) == 0:
delay = int(r.headers['X-RateLimit-Reset']) - time.time()
delay = min(delay, MAX_SLEEP)
logging.warn('rate limited upon %s: sleep for %d seconds' %
(path, int(delay)))
time.sleep(delay)
else: # unexpected error, abort
break
retries_left -= 1
if not retries_left:
logging.warn('giving up on %s: max retries exceed' % path)
return r
def lookup_repo(db_session, repo_id):
return db_session.query(Repository) \
.filter(Repository.id == repo_id) \
.first()
def last_repo_id(db_session):
t = db_session.query(func.max(Repository.id)) \
.first()
if t is not None:
return t[0]
# else: return None
INJECT_KEYS = ['id', 'name', 'full_name', 'html_url', 'description', 'fork']
def inject_repo(db_session, repo):
logging.debug('injecting repo %d' % repo['id'])
if lookup_repo(db_session, repo['id']):
logging.info('not injecting already present repo %d' % repo['id'])
return
kwargs = {k: repo[k] for k in INJECT_KEYS if k in repo}
sql_repo = Repository(**kwargs)
db_session.add(sql_repo)
class FetchError(RuntimeError):
def __init__(self, response):
self.response = response
def __str__(self):
return repr(self.response)
def fetch(conf, mk_session, min_id=None, max_id=None):
if min_id is None:
min_id = 1
if max_id is None:
max_id = float('inf')
next_id = min_id
cred = {}
for key in ['username', 'password']:
if key in conf:
cred[key] = conf[key]
while min_id <= next_id <= max_id:
logging.info('listing repos starting at %d' % next_id)
since = next_id - 1 # github API ?since=... is '>' strict, not '>='
repos_res = gh_api_request('/repositories?since=%d' % since, **cred)
if 'cache_dir' in conf and conf['cache_json']:
save_http_response(repos_res, conf['cache_dir'])
if not repos_res.ok:
raise FetchError(repos_res)
repos = repos_res.json()
for repo in repos:
if repo['id'] > max_id: # do not overstep max_id
break
with session_scope(mk_session) as db_session:
inject_repo(db_session, repo)
if 'next' in repos_res.links:
next_url = repos_res.links['next']['url']
m = REPO_API_URL_RE.match(next_url) # parse next_id
next_id = int(m.group(1)) + 1
else:
logging.info('stopping after id %d, no next link found' % next_id)
break
finally:
req_queue.dump_to_file(queue, queue_file)

View file

@ -1,44 +0,0 @@
# Copyright (C) 2015 Stefano Zacchiroli <zack@upsilon.cc>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
from sqlalchemy import Column
from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.ext.declarative import declarative_base
SQLBase = declarative_base()
class Repository(SQLBase):
"""a GitHub repository"""
__tablename__ = 'repos'
id = Column(Integer, primary_key=True)
name = Column(String, index=True)
full_name = Column(String, index=True)
html_url = Column(String)
description = Column(String)
fork = Column(Boolean, index=True)
last_seen = Column(DateTime, nullable=False)
def __init__(self, id, name=None, full_name=None, html_url=None,
description=None, fork=None):
self.id = id
self.last_seen = datetime.now()
if name is not None:
self.name = name
if full_name is not None:
self.full_name = full_name
if html_url is not None:
self.html_url = html_url
if description is not None:
self.description = description
if fork is not None:
self.fork = fork

View file

@ -0,0 +1,121 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from . import github_api, req_queue, storage_utils
class ProcessError(ValueError):
pass
def repositories(item, queue, session, credentials, storage):
print('Processing scrolling repositories %s' % item['url'])
repos = github_api.repositories(url=item['url'], session=session,
credentials=credentials)
if not repos['code'] == 200:
raise ProcessError(item)
storage_utils.update_repo_entities(storage, repos['data'])
for repo in repos['data']:
if not repo['fork']:
req_queue.push(queue, {
'type': 'repository',
'repo_name': repo['full_name'],
'repo_id': repo['id'],
})
if 'next' in repos['links']:
req_queue.push(queue, {
'type': 'repositories',
'url': repos['links']['next']['url'],
})
def repository(item, queue, session, credentials, storage):
print('Processing repository %s (%s)' % (item['repo_name'],
item['repo_id']))
last_modified = storage_utils.repo_last_modified(storage, item['repo_id'])
data = github_api.repository(item['repo_id'], session, credentials,
last_modified)
print(last_modified, '/', data['last_modified'])
if data['code'] == 304:
print('not modified')
# Not modified
# XXX: add validity
return
elif data['code'] == 200:
print('modified')
storage_utils.update_repo_entities(storage, [data['data']])
if data['data']['forks']:
req_queue.push(queue, {
'type': 'forks',
'repo_id': item['repo_id'],
'repo_name': item['repo_name'],
'forks_page': 1,
})
return
else:
print('Could not get reply for repository %s' % item['repo_name'])
print(data)
def forks(item, queue, session, credentials, storage):
print('Processing forks for repository %s (%s, page %s)' % (
item['repo_name'], item['repo_id'], item['forks_page']))
forks = github_api.forks(item['repo_id'], item['forks_page'], session,
credentials)
storage_utils.update_repo_entities(storage, forks['data'])
if 'next' in forks['links']:
req_queue.push(queue, {
'type': 'forks',
'repo_id': item['repo_id'],
'repo_name': item['repo_name'],
'forks_page': item['forks_page'] + 1,
})
def user(item, queue, session, credentials, storage):
print('Processing user %s (%s)' % (item['user_login'], item['user_id']))
last_modified = storage_utils.user_last_modified(storage, item['repo_id'])
data = github_api.user(item['user_id'], session, credentials,
last_modified)
print(last_modified, '/', data['last_modified'])
if data['code'] == 304:
print('not modified')
# Not modified
# XXX: add validity
return
elif data['code'] == 200:
print('modified')
storage_utils.update_user_entities(storage, [data['data']])
return
else:
print('Could not get reply for user %s' % item['user_login'])
print(data)
PROCESSORS = {
'repositories': repositories,
'repository': repository,
'forks': forks,
'user': user,
}
def process_one_item(queue, session, credentials, storage):
item = req_queue.pop(queue)
try:
PROCESSORS[item['type']](item, queue, session, credentials, storage)
except Exception:
req_queue.push_front(queue, item)
raise

View file

@ -0,0 +1,52 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict, deque
import os
import pickle
import tempfile
PRIORITIES = {
'forks': 10,
'user': 15,
'repository': 20,
'repositories': 30,
}
def restore_from_file(file):
if not os.path.exists(file):
return defaultdict(deque)
with open(file, 'rb') as f:
return pickle.load(f)
def dump_to_file(queue, file):
fd, filename = tempfile.mkstemp(dir=os.path.dirname(file))
with open(fd, 'wb') as f:
pickle.dump(queue, f)
f.flush()
os.fsync(fd)
os.rename(filename, file)
def push(queue, item):
queue[item['type']].append(item)
def push_front(queue, item):
queue[item['type']].appendleft(item)
def pop(queue):
for type in sorted(queue, key=lambda t: PRIORITIES.get(t, 1000)):
if queue[type]:
return queue[type].popleft()
raise IndexError("No items to pop")
def empty(queue):
return not queue

View file

@ -0,0 +1,122 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import uuid
from . import cache, converters
GITHUB_ORGS_UUID = '9f7b34d9-aa98-44d4-8907-b332c1036bc3'
GITHUB_USERS_UUID = 'ad6df473-c1d2-4f40-bc58-2b091d4a750e'
GITHUB_LISTER_UUID = '34bd6b1b-463f-43e5-a697-785107f598e4'
def update_user_entities(storage, users):
"""Update entities for several users in storage. Returns the new entities.
"""
users = list(sorted(users, key=lambda u: u['id']))
query = [{
'lister': GITHUB_LISTER_UUID,
'type': 'user',
'id': user['id'],
} for user in users]
entities = list(storage.entity_get_from_lister_metadata(query))
new_entities = []
for user, entity in zip(users, entities):
if not entity['uuid']:
entity = {
'uuid': uuid.uuid4(),
'doap': {},
'lister_metadata': {},
}
new_entity = converters.user_to_entity(entity, user)
cache.set_user(user['id'], new_entity['uuid'])
new_entities.append(new_entity)
storage.entity_add(new_entities)
return new_entities
def update_repo_entities(storage, repos):
"""Update entities for several repositories in storage. Returns the new
entities."""
repos = list(sorted(repos, key=lambda r: r['id']))
users = {}
for repo in repos:
if not cache.get_user(repo['owner']['id']):
users[repo['owner']['id']] = repo['owner']
if users:
update_user_entities(storage, users.values())
query = [{
'lister': GITHUB_LISTER_UUID,
'type': 'repository',
'id': repo['id'],
} for repo in repos]
entities = list(storage.entity_get_from_lister_metadata(query))
new_entities = []
for repo, entity in zip(repos, entities):
if not entity['uuid']:
entity = {
'uuid': uuid.uuid4(),
'doap': {},
'lister_metadata': {},
}
new_entities.append(converters.repository_to_entity(entity, repo))
storage.entity_add(new_entities)
return new_entities
def repo_last_modified(storage, id):
entity_id = cache.get_repo(id)
if entity_id:
entity = storage.entity_get_one(entity_id)
else:
entity = list(storage.entity_get_from_lister_metadata([{
'lister': GITHUB_LISTER_UUID,
'type': 'repository',
'id': id,
}]))[0]
if entity['uuid']:
cache.set_repo(id, entity['uuid'])
updated_at = entity.get('lister_metadata', {}).get('updated_at')
return converters.updated_at_to_last_modified(updated_at)
def user_last_modified(storage, id):
entity_id = cache.get_user(id)
if entity_id:
entity = storage.entity_get_one(entity_id)
else:
entity = list(storage.entity_get_from_lister_metadata([{
'lister': GITHUB_LISTER_UUID,
'type': 'user',
'id': id,
}]))[0]
if entity['uuid']:
cache.set_repo(id, entity['uuid'])
updated_at = entity.get('lister_metadata', {}).get('updated_at')
return converters.updated_at_to_last_modified(updated_at)