Actually revert to the pre-qless version

This commit is contained in:
Nicolas Dandrimont 2016-09-13 15:20:09 +02:00
parent 2a62db6827
commit 4d53974e0c
8 changed files with 111 additions and 590 deletions

View file

@ -1,40 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import redis
cache = None
def init_cache(url):
global cache
cache = redis.StrictRedis.from_url(url, decode_responses=True)
def _user_key(id):
return 'github:user:%d:uuid' % id
def _repo_key(id):
return 'github:repo:%d:uuid' % id
def get_user(id):
"""Get the cache value for user `id`"""
return cache.get(_user_key(id))
def set_user(id, uuid):
"""Set the cache value for user `id`"""
cache.set(_user_key(id), uuid)
def get_repo(id):
"""Get the cache value for repo `id`"""
return cache.get(_repo_key(id))
def set_repo(id, uuid):
"""Set the cache value for repo `id`"""
return cache.set(_repo_key(id), uuid)

View file

@ -1,7 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
GITHUB_ORGS_UUID = '9f7b34d9-aa98-44d4-8907-b332c1036bc3'
GITHUB_USERS_UUID = 'ad6df473-c1d2-4f40-bc58-2b091d4a750e'
GITHUB_LISTER_UUID = '34bd6b1b-463f-43e5-a697-785107f598e4'

View file

@ -1,95 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
from email.utils import format_datetime
from dateutil.parser import parse as parse_datetime
from . import cache, constants
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
def updated_at_to_last_modified(updated_at):
if not updated_at:
return None
dt = parse_datetime(updated_at).astimezone(datetime.timezone.utc)
return format_datetime(dt, usegmt=True)
def repository_to_entity(orig_entity, repo):
"""Convert a repository to an entity"""
entity = copy.deepcopy(orig_entity)
owner_uuid = cache.get_user(repo['owner']['id'])
if not owner_uuid:
raise ValueError("Owner %s (id=%d) not in cache" % (
repo['owner']['login'], repo['owner']['id']))
entity['parent'] = owner_uuid
entity['name'] = repo['full_name']
entity['type'] = 'project'
entity['description'] = repo['description']
if 'homepage' in repo:
entity['homepage'] = repo['homepage']
entity['active'] = True
entity['generated'] = True
entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID
entity['lister_metadata']['type'] = 'repository'
entity['lister_metadata']['id'] = repo['id']
entity['lister_metadata']['fork'] = repo['fork']
if 'updated_at' in repo:
entity['lister_metadata']['updated_at'] = repo['updated_at']
entity['validity'] = [utcnow()]
return entity
def user_to_entity(orig_entity, user):
"""Convert a GitHub user toan entity"""
entity = copy.deepcopy(orig_entity)
if user['type'] == 'User':
parent = constants.GITHUB_USERS_UUID
type = 'person'
elif user['type'] == 'Organization':
parent = constants.GITHUB_ORGS_UUID
type = 'group_of_persons'
else:
raise ValueError("Unknown GitHub user type %s" % user['type'])
entity['parent'] = parent
if 'name' in user:
entity['name'] = user['name']
if not entity.get('name'):
entity['name'] = user['login']
entity['type'] = type
entity['active'] = True
entity['generated'] = True
entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID
entity['lister_metadata']['type'] = 'user'
entity['lister_metadata']['id'] = user['id']
entity['lister_metadata']['login'] = user['login']
if 'updated_at' in user:
entity['lister_metadata']['updated_at'] = user['updated_at']
entity['validity'] = [datetime.datetime.now()]
return entity

View file

@ -0,0 +1,111 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
import pickle
def list_imported_repos(swh_db):
"""List all the repositories that have been successfully imported in Software
Heritage.
"""
query = """
select o.url
from origin o
left join fetch_history fh
on o.id = fh.origin
where
fh.status = true and
o.url ~~ 'https://github.com/%'
"""
cur = swh_db.cursor()
cur.execute(query)
res = cur.fetchall()
cur.close()
return set('/'.join(repo.rsplit('/', 2)[-2:]) for (repo,) in res)
def list_fetched_repos(ghlister_db):
"""List all the repositories that have been successfully fetched from GitHub.
"""
query = """
select r.full_name
from crawl_history ch
left join repos r
on ch.repo = r.id
where
ch.status = true and
r.fork = false
"""
cur = ghlister_db.cursor()
cur.execute(query)
res = cur.fetchall()
cur.close()
return set(repo for (repo,) in res)
def list_missing_repos():
"""List all the repositories that have not yet been imported successfully."""
swh_db = psycopg2.connect('service=softwareheritage')
imported_repos = list_imported_repos(swh_db)
swh_db.close()
ghlister_db = psycopg2.connect('service=lister-github')
fetched_repos = list_fetched_repos(ghlister_db)
ghlister_db.close()
return fetched_repos - imported_repos
def generate_tasks(checkpoint_file='repos', checkpoint_every=100000):
"""Generate the Celery tasks to fetch all the missing repositories.
Checkpoint the missing repositories every checkpoint_every tasks sent, in a
pickle file called checkpoint_file.
If the checkpoint file exists, we do not call the database again but load
from the file.
"""
import swh.loader.git.tasks
from swh.core.worker import app # flake8: noqa for side effects
def checkpoint_repos(repos, checkpoint=checkpoint_file):
tmp = '.%s.tmp' % checkpoint
with open(tmp, 'wb') as f:
pickle.dump(repos, f)
os.rename(tmp, checkpoint)
def fetch_checkpoint_repos(checkpoint=checkpoint_file):
with open(checkpoint, 'rb') as f:
return pickle.load(f)
repos = set()
if not os.path.exists(checkpoint_file):
repos = list_missing_repos()
checkpoint_repos(repos)
else:
repos = fetch_checkpoint_repos()
task = app.tasks['swh.loader.git.tasks.LoadGitHubRepository']
ctr = 0
while True:
try:
repo = repos.pop()
except KeyError:
break
task.delay(repo)
ctr += 1
if ctr >= checkpoint_every:
ctr = 0
checkpoint_repos(repos)
os.unlink(checkpoint)

View file

@ -1,137 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# see https://developer.github.com/v3/ for the GitHub API documentation
import time
from urllib.parse import urljoin
import requests
GITHUB_API_BASE = 'https://api.github.com/'
def github_api_request(url, last_modified=None, etag=None, session=None,
credentials=None):
"""Make a request to the GitHub API at 'url'.
Args:
url: the URL of the GitHub API endpoint to request
last_modified: the last time that URL was requested
etag: the etag for the last answer at this URL (overrides
last_modified)
session: a requests session
credentials: a list of dicts for GitHub credentials with keys:
login: the GitHub login for the credential
token: the API token for the credential
x_ratelimit_*: the rate-limit info for the given credential
Returns:
a dict with the following keys:
credential_used: the login for the credential used
x_ratelimit_*: GitHub rate-limiting information
"""
print("Requesting url %s" % url)
if not session:
session = requests
headers = {
'Accept': 'application/vnd.github.v3+json',
}
if etag:
headers['If-None-Match'] = etag
else:
if last_modified:
headers['If-Modified-Since'] = last_modified
if not credentials:
credentials = {None: {}}
reply = None
ret = {}
for login, creds in credentials.items():
# Handle rate-limiting
remaining = creds.get('x_ratelimit_remaining')
reset = creds.get('x_ratelimit_reset')
if remaining == 0 and reset and reset > time.time():
continue
kwargs = {}
if login and creds['token']:
kwargs['auth'] = (login, creds['token'])
reply = session.get(url, headers=headers, **kwargs)
ratelimit = {
key.lower().replace('-', '_'): int(value)
for key, value in reply.headers.items()
if key.lower().startswith('x-ratelimit')
}
ret.update(ratelimit)
creds.update(ratelimit)
if not(reply.status_code == 403 and
ratelimit.get('x_ratelimit_remaining') == 0):
# Request successful, let's get out of here
break
else:
# we broke out of the loop
raise ValueError('All out of credentials', credentials)
etag = reply.headers.get('ETag')
if etag and etag.startswith(('w/', 'W/')):
# Strip down reference to "weak" etags
etag = etag[2:]
ret.update({
'url': url,
'code': reply.status_code,
'data': reply.json() if reply.status_code != 304 else None,
'etag': etag,
'last_modified': reply.headers.get('Last-Modified'),
'links': reply.links,
'login': login,
})
return ret
def repositories(since=0, url=None, session=None, credentials=None):
"""Request the list of public repositories with id greater than `since`"""
if not url:
url = urljoin(GITHUB_API_BASE, 'repositories?since=%s' % since)
req = github_api_request(url, session=session, credentials=credentials)
return req
def repository(id, session=None, credentials=None, last_modified=None):
"""Request the information on the repository with the given id"""
url = urljoin(GITHUB_API_BASE, 'repositories/%d' % id)
req = github_api_request(url, session=session, credentials=credentials,
last_modified=last_modified)
return req
def forks(id, page, session=None, credentials=None):
"""Request the information on the repository with the given id"""
url = urljoin(GITHUB_API_BASE,
'repositories/%d/forks?sort=oldest&page=%d' % (id, page))
req = github_api_request(url, session=session, credentials=credentials)
return req
def user(id, session=None, credentials=None, last_modified=None):
"""Request the information on the user with the given id"""
url = urljoin(GITHUB_API_BASE, 'user/%d' % id)
req = github_api_request(url, session=session, credentials=credentials,
last_modified=last_modified)
return req

View file

@ -1,163 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from math import ceil
import time
from . import github_api, req_queue, storage_utils
class ProcessError(ValueError):
pass
def repositories(item, queue, session, credentials, storage):
print('Processing scrolling repositories %s' % item['url'])
repos = github_api.repositories(url=item['url'], session=session,
credentials=credentials)
if not repos['code'] == 200:
raise ProcessError(item)
if 'next' in repos['links']:
req_queue.push(queue, {
'type': 'repositories',
'url': repos['links']['next']['url'],
})
users = {}
for repo in repos['data']:
users[repo['owner']['id']] = repo['owner']
for id, user in users.items():
jid = 'user-%d' % id
if not queue.client.jobs[jid]:
req_queue.push(queue, {
'type': 'user',
'user_login': user['login'],
'user_id': id,
}, jid=jid)
for repo in repos['data']:
if not repo['fork']:
req_queue.push(queue, {
'type': 'repository',
'repo_name': repo['full_name'],
'repo_id': repo['id'],
})
storage_utils.update_repo_entities(storage, repos['data'])
def repository(item, queue, session, credentials, storage):
print('Processing repository %s (%s)' % (item['repo_name'],
item['repo_id']))
last_modified = storage_utils.repo_last_modified(storage, item['repo_id'])
data = github_api.repository(item['repo_id'], session, credentials,
last_modified)
print(last_modified, '/', data['last_modified'])
if data['code'] == 304:
print('not modified')
# Not modified
# XXX: add validity
return
elif data['code'] == 200:
print('modified')
storage_utils.update_repo_entities(storage, [data['data']])
if data['data']['forks']:
npages = ceil(data['data']['forks']/30)
for page in range(1, npages + 1):
req_queue.push(queue, {
'type': 'forks',
'repo_id': item['repo_id'],
'repo_name': item['repo_name'],
'forks_page': page,
'check_next': page == npages,
})
return
else:
print('Could not get reply for repository %s' % item['repo_name'])
print(data)
def forks(item, queue, session, credentials, storage):
print('Processing forks for repository %s (%s, page %s)' % (
item['repo_name'], item['repo_id'], item['forks_page']))
forks = github_api.forks(item['repo_id'], item['forks_page'], session,
credentials)
users = {}
for repo in forks['data']:
users[repo['owner']['id']] = repo['owner']
for id, user in users.items():
jid = 'user-%d' % id
if not queue.client.jobs[jid]:
req_queue.push(queue, {
'type': 'user',
'user_login': user['login'],
'user_id': id,
}, jid=jid)
if item['check_next'] and 'next' in forks['links']:
req_queue.push(queue, {
'type': 'forks',
'repo_id': item['repo_id'],
'repo_name': item['repo_name'],
'forks_page': item['forks_page'] + 1,
'check_next': True,
})
storage_utils.update_repo_entities(storage, forks['data'])
def user(item, queue, session, credentials, storage):
print('Processing user %s (%s)' % (item['user_login'], item['user_id']))
last_modified = storage_utils.user_last_modified(storage, item['user_id'])
data = github_api.user(item['user_id'], session, credentials,
last_modified)
print(last_modified, '/', data['last_modified'])
if data['code'] == 304:
print('not modified')
# Not modified
# XXX: add validity
return
elif data['code'] == 200:
print('modified')
storage_utils.update_user_entities(storage, [data['data']])
return
else:
print('Could not get reply for user %s' % item['user_login'])
print(data)
PROCESSORS = {
'repositories': repositories,
'repository': repository,
'forks': forks,
'user': user,
}
def process_one_item(queue, session, credentials, storage):
job = None
while True:
job = req_queue.pop(queue)
if job:
break
time.sleep(0.1)
try:
PROCESSORS[job.klass_name](job.data, queue, session, credentials,
storage)
except Exception:
raise
else:
job.complete()

View file

@ -1,31 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import qless
PRIORITIES = {
'forks': 100,
'repository': 75,
'user': 50,
'repositories': 40,
}
def from_url(url):
return qless.Client(url).queues['github-lister']
def push(queue, item, **kwargs):
print("Push %s to %s" % (item, queue.name))
type = item.pop('type')
queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs)
def pop(queue):
return queue.pop()
def empty(queue):
return not len(queue)

View file

@ -1,117 +0,0 @@
# Copyright © 2016 The Software Heritage Developers <swh-devel@inria.fr>
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import uuid
from . import cache, constants, converters
def update_user_entities(storage, users):
"""Update entities for several users in storage. Returns the new entities.
"""
users = list(sorted(users, key=lambda u: u['id']))
query = [{
'lister': constants.GITHUB_LISTER_UUID,
'type': 'user',
'id': user['id'],
} for user in users]
entities = list(storage.entity_get_from_lister_metadata(query))
new_entities = []
for user, entity in zip(users, entities):
if not entity['uuid']:
entity = {
'uuid': uuid.uuid4(),
'doap': {},
'lister_metadata': {},
}
new_entity = converters.user_to_entity(entity, user)
cache.set_user(user['id'], new_entity['uuid'])
new_entities.append(new_entity)
storage.entity_add(new_entities)
return new_entities
def update_repo_entities(storage, repos):
"""Update entities for several repositories in storage. Returns the new
entities."""
repos = list(sorted(repos, key=lambda r: r['id']))
users = {}
for repo in repos:
if not cache.get_user(repo['owner']['id']):
users[repo['owner']['id']] = repo['owner']
if users:
update_user_entities(storage, users.values())
query = [{
'lister': constants.GITHUB_LISTER_UUID,
'type': 'repository',
'id': repo['id'],
} for repo in repos]
entities = list(storage.entity_get_from_lister_metadata(query))
new_entities = []
for repo, entity in zip(repos, entities):
if not entity['uuid']:
entity = {
'uuid': uuid.uuid4(),
'doap': {},
'lister_metadata': {},
}
new_entities.append(converters.repository_to_entity(entity, repo))
storage.entity_add(new_entities)
return new_entities
def repo_last_modified(storage, id):
entity_id = cache.get_repo(id)
if entity_id:
entity = storage.entity_get_one(entity_id)
else:
entity = list(storage.entity_get_from_lister_metadata([{
'lister': constants.GITHUB_LISTER_UUID,
'type': 'repository',
'id': id,
}]))[0]
if entity['uuid']:
cache.set_repo(id, entity['uuid'])
updated_at = entity.get('lister_metadata', {}).get('updated_at')
return converters.updated_at_to_last_modified(updated_at)
def user_last_modified(storage, id):
entity_id = cache.get_user(id)
if entity_id:
entity = storage.entity_get_one(entity_id)
else:
entity = list(storage.entity_get_from_lister_metadata([{
'lister': constants.GITHUB_LISTER_UUID,
'type': 'user',
'id': id,
}]))[0]
if entity['uuid']:
cache.set_user(id, entity['uuid'])
updated_at = entity.get('lister_metadata', {}).get('updated_at')
return converters.updated_at_to_last_modified(updated_at)