req_queue: use qless instead of a handmade queue

This commit is contained in:
Nicolas Dandrimont 2016-03-17 17:50:03 +01:00
parent 401d37190e
commit 6fbabbe586
4 changed files with 38 additions and 52 deletions

View file

@ -1,4 +1,5 @@
dateutil
qless
requests
redis

View file

@ -6,13 +6,13 @@ import os
import requests
from swh.core.config import load_named_config, prepare_folders
from swh.core.config import load_named_config
from swh.storage import get_storage
from . import req_queue, processors, cache
DEFAULT_CONFIG = {
'queue_file': ('str', '~/.cache/swh/lister-github/queue.pickle'),
'queue_url': ('str', 'redis://localhost'),
'cache_url': ('str', 'redis://localhost'),
'storage_class': ('str', 'local_storage'),
'storage_args': ('list[str]', ['dbname=softwareheritage-dev',
@ -28,15 +28,14 @@ def run_from_queue():
cache.init_cache(config['cache_url'])
queue_file = os.path.expanduser(config['queue_file'])
prepare_folders(os.path.dirname(queue_file))
queue_url = os.path.expanduser(config['queue_url'])
credentials = {}
for credential in config['credentials']:
login, token = credential.split(':')
credentials[login] = {'token': token}
queue = req_queue.restore_from_file(queue_file)
queue = req_queue.from_url(queue_url)
if req_queue.empty(queue):
req_queue.push(queue, {'type': 'repositories', 'url': None})
@ -44,15 +43,11 @@ def run_from_queue():
session = requests.Session()
storage = get_storage(config['storage_class'], config['storage_args'])
try:
while not req_queue.empty(queue):
processors.process_one_item(
queue, session=session, credentials=credentials,
storage=storage
)
finally:
req_queue.dump_to_file(queue, queue_file)
while not req_queue.empty(queue):
processors.process_one_item(
queue, session=session, credentials=credentials,
storage=storage
)
if __name__ == '__main__':
run_from_queue()

View file

@ -3,6 +3,7 @@
# See top-level LICENSE file for more information
from math import ceil
import time
from . import github_api, req_queue, storage_utils
@ -118,9 +119,19 @@ PROCESSORS = {
def process_one_item(queue, session, credentials, storage):
item = req_queue.pop(queue)
job = None
while True:
job = req_queue.pop(queue)
if job:
break
time.sleep(0.1)
try:
PROCESSORS[item['type']](item, queue, session, credentials, storage)
PROCESSORS[job.klass_name](job.data, queue, session, credentials,
storage)
except Exception:
req_queue.push_front(queue, item)
raise
else:
job.complete()

View file

@ -2,51 +2,30 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict, deque
import os
import pickle
import tempfile
import qless
PRIORITIES = {
'forks': 10,
'user': 15,
'repository': 20,
'repositories': 30,
'forks': 100,
'repository': 75,
'user': 50,
'repositories': 40,
}
def restore_from_file(file):
if not os.path.exists(file):
return defaultdict(deque)
with open(file, 'rb') as f:
return pickle.load(f)
def from_url(url):
return qless.Client(url).queues['github-lister']
def dump_to_file(queue, file):
fd, filename = tempfile.mkstemp(dir=os.path.dirname(file))
with open(fd, 'wb') as f:
pickle.dump(queue, f)
f.flush()
os.fsync(fd)
os.rename(filename, file)
def push(queue, item):
queue[item['type']].append(item)
def push_front(queue, item):
queue[item['type']].appendleft(item)
def push(queue, item, **kwargs):
print("Push %s to %s" % (item, queue.name))
type = item.pop('type')
queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs)
def pop(queue):
for type in sorted(queue, key=lambda t: PRIORITIES.get(t, 1000)):
if queue[type]:
return queue[type].popleft()
raise IndexError("No items to pop")
return queue.pop()
def empty(queue):
return not queue
return not len(queue)