From 6fbabbe586e1474d3a2db8b514c6f91c12667798 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont Date: Thu, 17 Mar 2016 17:50:03 +0100 Subject: [PATCH] req_queue: use qless instead of a handmade queue --- requirements.txt | 1 + swh/lister/github/lister.py | 23 ++++++---------- swh/lister/github/processors.py | 17 ++++++++++-- swh/lister/github/req_queue.py | 49 ++++++++++----------------------- 4 files changed, 38 insertions(+), 52 deletions(-) diff --git a/requirements.txt b/requirements.txt index b5e90dd..cea9e6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ dateutil +qless requests redis diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 6ee474d..1bf12f9 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -6,13 +6,13 @@ import os import requests -from swh.core.config import load_named_config, prepare_folders +from swh.core.config import load_named_config from swh.storage import get_storage from . import req_queue, processors, cache DEFAULT_CONFIG = { - 'queue_file': ('str', '~/.cache/swh/lister-github/queue.pickle'), + 'queue_url': ('str', 'redis://localhost'), 'cache_url': ('str', 'redis://localhost'), 'storage_class': ('str', 'local_storage'), 'storage_args': ('list[str]', ['dbname=softwareheritage-dev', @@ -28,15 +28,14 @@ def run_from_queue(): cache.init_cache(config['cache_url']) - queue_file = os.path.expanduser(config['queue_file']) - prepare_folders(os.path.dirname(queue_file)) + queue_url = os.path.expanduser(config['queue_url']) credentials = {} for credential in config['credentials']: login, token = credential.split(':') credentials[login] = {'token': token} - queue = req_queue.restore_from_file(queue_file) + queue = req_queue.from_url(queue_url) if req_queue.empty(queue): req_queue.push(queue, {'type': 'repositories', 'url': None}) @@ -44,15 +43,11 @@ def run_from_queue(): session = requests.Session() storage = get_storage(config['storage_class'], config['storage_args']) - try: - while not req_queue.empty(queue): - processors.process_one_item( - queue, session=session, credentials=credentials, - storage=storage - ) - - finally: - req_queue.dump_to_file(queue, queue_file) + while not req_queue.empty(queue): + processors.process_one_item( + queue, session=session, credentials=credentials, + storage=storage + ) if __name__ == '__main__': run_from_queue() diff --git a/swh/lister/github/processors.py b/swh/lister/github/processors.py index b3a6d9f..4ab6abc 100644 --- a/swh/lister/github/processors.py +++ b/swh/lister/github/processors.py @@ -3,6 +3,7 @@ # See top-level LICENSE file for more information from math import ceil +import time from . import github_api, req_queue, storage_utils @@ -118,9 +119,19 @@ PROCESSORS = { def process_one_item(queue, session, credentials, storage): - item = req_queue.pop(queue) + + job = None + + while True: + job = req_queue.pop(queue) + if job: + break + time.sleep(0.1) + try: - PROCESSORS[item['type']](item, queue, session, credentials, storage) + PROCESSORS[job.klass_name](job.data, queue, session, credentials, + storage) except Exception: - req_queue.push_front(queue, item) raise + else: + job.complete() diff --git a/swh/lister/github/req_queue.py b/swh/lister/github/req_queue.py index cc32c3b..6c9472f 100644 --- a/swh/lister/github/req_queue.py +++ b/swh/lister/github/req_queue.py @@ -2,51 +2,30 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict, deque -import os -import pickle -import tempfile +import qless + PRIORITIES = { - 'forks': 10, - 'user': 15, - 'repository': 20, - 'repositories': 30, + 'forks': 100, + 'repository': 75, + 'user': 50, + 'repositories': 40, } -def restore_from_file(file): - if not os.path.exists(file): - return defaultdict(deque) - - with open(file, 'rb') as f: - return pickle.load(f) +def from_url(url): + return qless.Client(url).queues['github-lister'] -def dump_to_file(queue, file): - fd, filename = tempfile.mkstemp(dir=os.path.dirname(file)) - with open(fd, 'wb') as f: - pickle.dump(queue, f) - f.flush() - os.fsync(fd) - os.rename(filename, file) - - -def push(queue, item): - queue[item['type']].append(item) - - -def push_front(queue, item): - queue[item['type']].appendleft(item) +def push(queue, item, **kwargs): + print("Push %s to %s" % (item, queue.name)) + type = item.pop('type') + queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs) def pop(queue): - for type in sorted(queue, key=lambda t: PRIORITIES.get(t, 1000)): - if queue[type]: - return queue[type].popleft() - - raise IndexError("No items to pop") + return queue.pop() def empty(queue): - return not queue + return not len(queue)