diff --git a/swh/lister/github/cache.py b/swh/lister/github/cache.py deleted file mode 100644 index f47df37..0000000 --- a/swh/lister/github/cache.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import redis - -cache = None - - -def init_cache(url): - global cache - cache = redis.StrictRedis.from_url(url, decode_responses=True) - - -def _user_key(id): - return 'github:user:%d:uuid' % id - - -def _repo_key(id): - return 'github:repo:%d:uuid' % id - - -def get_user(id): - """Get the cache value for user `id`""" - return cache.get(_user_key(id)) - - -def set_user(id, uuid): - """Set the cache value for user `id`""" - cache.set(_user_key(id), uuid) - - -def get_repo(id): - """Get the cache value for repo `id`""" - return cache.get(_repo_key(id)) - - -def set_repo(id, uuid): - """Set the cache value for repo `id`""" - return cache.set(_repo_key(id), uuid) diff --git a/swh/lister/github/constants.py b/swh/lister/github/constants.py deleted file mode 100644 index 08dcc89..0000000 --- a/swh/lister/github/constants.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -GITHUB_ORGS_UUID = '9f7b34d9-aa98-44d4-8907-b332c1036bc3' -GITHUB_USERS_UUID = 'ad6df473-c1d2-4f40-bc58-2b091d4a750e' -GITHUB_LISTER_UUID = '34bd6b1b-463f-43e5-a697-785107f598e4' diff --git a/swh/lister/github/converters.py b/swh/lister/github/converters.py deleted file mode 100644 index 4846f85..0000000 --- a/swh/lister/github/converters.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import copy -import datetime -from email.utils import format_datetime - -from dateutil.parser import parse as parse_datetime - -from . import cache, constants - - -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - -def updated_at_to_last_modified(updated_at): - if not updated_at: - return None - - dt = parse_datetime(updated_at).astimezone(datetime.timezone.utc) - return format_datetime(dt, usegmt=True) - - -def repository_to_entity(orig_entity, repo): - """Convert a repository to an entity""" - - entity = copy.deepcopy(orig_entity) - - owner_uuid = cache.get_user(repo['owner']['id']) - if not owner_uuid: - raise ValueError("Owner %s (id=%d) not in cache" % ( - repo['owner']['login'], repo['owner']['id'])) - - entity['parent'] = owner_uuid - entity['name'] = repo['full_name'] - entity['type'] = 'project' - entity['description'] = repo['description'] - if 'homepage' in repo: - entity['homepage'] = repo['homepage'] - entity['active'] = True - entity['generated'] = True - - entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID - entity['lister_metadata']['type'] = 'repository' - entity['lister_metadata']['id'] = repo['id'] - entity['lister_metadata']['fork'] = repo['fork'] - - if 'updated_at' in repo: - entity['lister_metadata']['updated_at'] = repo['updated_at'] - - entity['validity'] = [utcnow()] - - return entity - - -def user_to_entity(orig_entity, user): - """Convert a GitHub user toan entity""" - - entity = copy.deepcopy(orig_entity) - - if user['type'] == 'User': - parent = constants.GITHUB_USERS_UUID - - type = 'person' - elif user['type'] == 'Organization': - parent = constants.GITHUB_ORGS_UUID - type = 'group_of_persons' - else: - raise ValueError("Unknown GitHub user type %s" % user['type']) - - entity['parent'] = parent - - if 'name' in user: - entity['name'] = user['name'] - - if not entity.get('name'): - entity['name'] = user['login'] - - entity['type'] = type - entity['active'] = True - entity['generated'] = True - - entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID - entity['lister_metadata']['type'] = 'user' - entity['lister_metadata']['id'] = user['id'] - entity['lister_metadata']['login'] = user['login'] - - if 'updated_at' in user: - entity['lister_metadata']['updated_at'] = user['updated_at'] - - entity['validity'] = [datetime.datetime.now()] - - return entity diff --git a/swh/lister/github/generate_tasks.py b/swh/lister/github/generate_tasks.py new file mode 100644 index 0000000..1184b73 --- /dev/null +++ b/swh/lister/github/generate_tasks.py @@ -0,0 +1,111 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 +import pickle + + +def list_imported_repos(swh_db): + """List all the repositories that have been successfully imported in Software + Heritage. + """ + query = """ + select o.url + from origin o + left join fetch_history fh + on o.id = fh.origin + where + fh.status = true and + o.url ~~ 'https://github.com/%' + """ + + cur = swh_db.cursor() + cur.execute(query) + res = cur.fetchall() + cur.close() + return set('/'.join(repo.rsplit('/', 2)[-2:]) for (repo,) in res) + + +def list_fetched_repos(ghlister_db): + """List all the repositories that have been successfully fetched from GitHub. + """ + query = """ + select r.full_name + from crawl_history ch + left join repos r + on ch.repo = r.id + where + ch.status = true and + r.fork = false + """ + + cur = ghlister_db.cursor() + cur.execute(query) + res = cur.fetchall() + cur.close() + return set(repo for (repo,) in res) + + +def list_missing_repos(): + """List all the repositories that have not yet been imported successfully.""" + swh_db = psycopg2.connect('service=softwareheritage') + imported_repos = list_imported_repos(swh_db) + swh_db.close() + + ghlister_db = psycopg2.connect('service=lister-github') + fetched_repos = list_fetched_repos(ghlister_db) + ghlister_db.close() + + return fetched_repos - imported_repos + + +def generate_tasks(checkpoint_file='repos', checkpoint_every=100000): + """Generate the Celery tasks to fetch all the missing repositories. + + Checkpoint the missing repositories every checkpoint_every tasks sent, in a + pickle file called checkpoint_file. + + If the checkpoint file exists, we do not call the database again but load + from the file. + """ + import swh.loader.git.tasks + from swh.core.worker import app # flake8: noqa for side effects + + def checkpoint_repos(repos, checkpoint=checkpoint_file): + tmp = '.%s.tmp' % checkpoint + with open(tmp, 'wb') as f: + pickle.dump(repos, f) + + os.rename(tmp, checkpoint) + + def fetch_checkpoint_repos(checkpoint=checkpoint_file): + with open(checkpoint, 'rb') as f: + return pickle.load(f) + + repos = set() + + if not os.path.exists(checkpoint_file): + repos = list_missing_repos() + checkpoint_repos(repos) + else: + repos = fetch_checkpoint_repos() + + task = app.tasks['swh.loader.git.tasks.LoadGitHubRepository'] + + ctr = 0 + while True: + try: + repo = repos.pop() + except KeyError: + break + + task.delay(repo) + + ctr += 1 + if ctr >= checkpoint_every: + ctr = 0 + checkpoint_repos(repos) + + os.unlink(checkpoint) diff --git a/swh/lister/github/github_api.py b/swh/lister/github/github_api.py deleted file mode 100644 index 0e4807f..0000000 --- a/swh/lister/github/github_api.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# see https://developer.github.com/v3/ for the GitHub API documentation - -import time -from urllib.parse import urljoin - -import requests - - -GITHUB_API_BASE = 'https://api.github.com/' - - -def github_api_request(url, last_modified=None, etag=None, session=None, - credentials=None): - """Make a request to the GitHub API at 'url'. - - Args: - url: the URL of the GitHub API endpoint to request - last_modified: the last time that URL was requested - etag: the etag for the last answer at this URL (overrides - last_modified) - session: a requests session - credentials: a list of dicts for GitHub credentials with keys: - login: the GitHub login for the credential - token: the API token for the credential - x_ratelimit_*: the rate-limit info for the given credential - - Returns: - a dict with the following keys: - credential_used: the login for the credential used - x_ratelimit_*: GitHub rate-limiting information - """ - print("Requesting url %s" % url) - if not session: - session = requests - - headers = { - 'Accept': 'application/vnd.github.v3+json', - } - - if etag: - headers['If-None-Match'] = etag - else: - if last_modified: - headers['If-Modified-Since'] = last_modified - - if not credentials: - credentials = {None: {}} - - reply = None - ret = {} - for login, creds in credentials.items(): - # Handle rate-limiting - remaining = creds.get('x_ratelimit_remaining') - reset = creds.get('x_ratelimit_reset') - if remaining == 0 and reset and reset > time.time(): - continue - - kwargs = {} - if login and creds['token']: - kwargs['auth'] = (login, creds['token']) - - reply = session.get(url, headers=headers, **kwargs) - - ratelimit = { - key.lower().replace('-', '_'): int(value) - for key, value in reply.headers.items() - if key.lower().startswith('x-ratelimit') - } - - ret.update(ratelimit) - creds.update(ratelimit) - - if not(reply.status_code == 403 and - ratelimit.get('x_ratelimit_remaining') == 0): - # Request successful, let's get out of here - break - else: - # we broke out of the loop - raise ValueError('All out of credentials', credentials) - - etag = reply.headers.get('ETag') - if etag and etag.startswith(('w/', 'W/')): - # Strip down reference to "weak" etags - etag = etag[2:] - - ret.update({ - 'url': url, - 'code': reply.status_code, - 'data': reply.json() if reply.status_code != 304 else None, - 'etag': etag, - 'last_modified': reply.headers.get('Last-Modified'), - 'links': reply.links, - 'login': login, - }) - - return ret - - -def repositories(since=0, url=None, session=None, credentials=None): - """Request the list of public repositories with id greater than `since`""" - if not url: - url = urljoin(GITHUB_API_BASE, 'repositories?since=%s' % since) - - req = github_api_request(url, session=session, credentials=credentials) - - return req - - -def repository(id, session=None, credentials=None, last_modified=None): - """Request the information on the repository with the given id""" - url = urljoin(GITHUB_API_BASE, 'repositories/%d' % id) - req = github_api_request(url, session=session, credentials=credentials, - last_modified=last_modified) - - return req - - -def forks(id, page, session=None, credentials=None): - """Request the information on the repository with the given id""" - url = urljoin(GITHUB_API_BASE, - 'repositories/%d/forks?sort=oldest&page=%d' % (id, page)) - req = github_api_request(url, session=session, credentials=credentials) - - return req - - -def user(id, session=None, credentials=None, last_modified=None): - """Request the information on the user with the given id""" - url = urljoin(GITHUB_API_BASE, 'user/%d' % id) - req = github_api_request(url, session=session, credentials=credentials, - last_modified=last_modified) - - return req diff --git a/swh/lister/github/processors.py b/swh/lister/github/processors.py deleted file mode 100644 index 84e3a7d..0000000 --- a/swh/lister/github/processors.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from math import ceil -import time - -from . import github_api, req_queue, storage_utils - - -class ProcessError(ValueError): - pass - - -def repositories(item, queue, session, credentials, storage): - print('Processing scrolling repositories %s' % item['url']) - repos = github_api.repositories(url=item['url'], session=session, - credentials=credentials) - if not repos['code'] == 200: - raise ProcessError(item) - - if 'next' in repos['links']: - req_queue.push(queue, { - 'type': 'repositories', - 'url': repos['links']['next']['url'], - }) - - users = {} - for repo in repos['data']: - users[repo['owner']['id']] = repo['owner'] - - for id, user in users.items(): - jid = 'user-%d' % id - if not queue.client.jobs[jid]: - req_queue.push(queue, { - 'type': 'user', - 'user_login': user['login'], - 'user_id': id, - }, jid=jid) - - for repo in repos['data']: - if not repo['fork']: - req_queue.push(queue, { - 'type': 'repository', - 'repo_name': repo['full_name'], - 'repo_id': repo['id'], - }) - - storage_utils.update_repo_entities(storage, repos['data']) - - -def repository(item, queue, session, credentials, storage): - print('Processing repository %s (%s)' % (item['repo_name'], - item['repo_id'])) - - last_modified = storage_utils.repo_last_modified(storage, item['repo_id']) - data = github_api.repository(item['repo_id'], session, credentials, - last_modified) - - print(last_modified, '/', data['last_modified']) - if data['code'] == 304: - print('not modified') - # Not modified - # XXX: add validity - return - elif data['code'] == 200: - print('modified') - storage_utils.update_repo_entities(storage, [data['data']]) - if data['data']['forks']: - npages = ceil(data['data']['forks']/30) - for page in range(1, npages + 1): - req_queue.push(queue, { - 'type': 'forks', - 'repo_id': item['repo_id'], - 'repo_name': item['repo_name'], - 'forks_page': page, - 'check_next': page == npages, - }) - return - else: - print('Could not get reply for repository %s' % item['repo_name']) - print(data) - - -def forks(item, queue, session, credentials, storage): - print('Processing forks for repository %s (%s, page %s)' % ( - item['repo_name'], item['repo_id'], item['forks_page'])) - - forks = github_api.forks(item['repo_id'], item['forks_page'], session, - credentials) - - users = {} - for repo in forks['data']: - users[repo['owner']['id']] = repo['owner'] - - for id, user in users.items(): - jid = 'user-%d' % id - if not queue.client.jobs[jid]: - req_queue.push(queue, { - 'type': 'user', - 'user_login': user['login'], - 'user_id': id, - }, jid=jid) - - if item['check_next'] and 'next' in forks['links']: - req_queue.push(queue, { - 'type': 'forks', - 'repo_id': item['repo_id'], - 'repo_name': item['repo_name'], - 'forks_page': item['forks_page'] + 1, - 'check_next': True, - }) - - storage_utils.update_repo_entities(storage, forks['data']) - - -def user(item, queue, session, credentials, storage): - print('Processing user %s (%s)' % (item['user_login'], item['user_id'])) - - last_modified = storage_utils.user_last_modified(storage, item['user_id']) - - data = github_api.user(item['user_id'], session, credentials, - last_modified) - - print(last_modified, '/', data['last_modified']) - if data['code'] == 304: - print('not modified') - # Not modified - # XXX: add validity - return - elif data['code'] == 200: - print('modified') - storage_utils.update_user_entities(storage, [data['data']]) - return - else: - print('Could not get reply for user %s' % item['user_login']) - print(data) - -PROCESSORS = { - 'repositories': repositories, - 'repository': repository, - 'forks': forks, - 'user': user, -} - - -def process_one_item(queue, session, credentials, storage): - - job = None - - while True: - job = req_queue.pop(queue) - if job: - break - time.sleep(0.1) - - try: - PROCESSORS[job.klass_name](job.data, queue, session, credentials, - storage) - except Exception: - raise - else: - job.complete() diff --git a/swh/lister/github/req_queue.py b/swh/lister/github/req_queue.py deleted file mode 100644 index 6c9472f..0000000 --- a/swh/lister/github/req_queue.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import qless - - -PRIORITIES = { - 'forks': 100, - 'repository': 75, - 'user': 50, - 'repositories': 40, -} - - -def from_url(url): - return qless.Client(url).queues['github-lister'] - - -def push(queue, item, **kwargs): - print("Push %s to %s" % (item, queue.name)) - type = item.pop('type') - queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs) - - -def pop(queue): - return queue.pop() - - -def empty(queue): - return not len(queue) diff --git a/swh/lister/github/storage_utils.py b/swh/lister/github/storage_utils.py deleted file mode 100644 index f625fab..0000000 --- a/swh/lister/github/storage_utils.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import uuid - -from . import cache, constants, converters - - -def update_user_entities(storage, users): - """Update entities for several users in storage. Returns the new entities. - """ - - users = list(sorted(users, key=lambda u: u['id'])) - - query = [{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'user', - 'id': user['id'], - } for user in users] - - entities = list(storage.entity_get_from_lister_metadata(query)) - - new_entities = [] - - for user, entity in zip(users, entities): - if not entity['uuid']: - entity = { - 'uuid': uuid.uuid4(), - 'doap': {}, - 'lister_metadata': {}, - } - new_entity = converters.user_to_entity(entity, user) - cache.set_user(user['id'], new_entity['uuid']) - new_entities.append(new_entity) - - storage.entity_add(new_entities) - - return new_entities - - -def update_repo_entities(storage, repos): - """Update entities for several repositories in storage. Returns the new - entities.""" - - repos = list(sorted(repos, key=lambda r: r['id'])) - - users = {} - for repo in repos: - if not cache.get_user(repo['owner']['id']): - users[repo['owner']['id']] = repo['owner'] - - if users: - update_user_entities(storage, users.values()) - - query = [{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'repository', - 'id': repo['id'], - } for repo in repos] - - entities = list(storage.entity_get_from_lister_metadata(query)) - - new_entities = [] - - for repo, entity in zip(repos, entities): - if not entity['uuid']: - entity = { - 'uuid': uuid.uuid4(), - 'doap': {}, - 'lister_metadata': {}, - } - new_entities.append(converters.repository_to_entity(entity, repo)) - - storage.entity_add(new_entities) - - return new_entities - - -def repo_last_modified(storage, id): - entity_id = cache.get_repo(id) - - if entity_id: - entity = storage.entity_get_one(entity_id) - else: - entity = list(storage.entity_get_from_lister_metadata([{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'repository', - 'id': id, - }]))[0] - - if entity['uuid']: - cache.set_repo(id, entity['uuid']) - - updated_at = entity.get('lister_metadata', {}).get('updated_at') - - return converters.updated_at_to_last_modified(updated_at) - - -def user_last_modified(storage, id): - entity_id = cache.get_user(id) - - if entity_id: - entity = storage.entity_get_one(entity_id) - else: - entity = list(storage.entity_get_from_lister_metadata([{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'user', - 'id': id, - }]))[0] - - if entity['uuid']: - cache.set_user(id, entity['uuid']) - - updated_at = entity.get('lister_metadata', {}).get('updated_at') - - return converters.updated_at_to_last_modified(updated_at)