Heavy refactor of the task system

Get rid of the class based task definition in favor of decorator-based
task declarations.

Doing so, we can get rid of core/tasks.py

Task names are explicitely set to keep compatibility with task
definitions in schedulers' database.

This also add debug statements at the beginning and end of each lister
task.
This commit is contained in:
David Douard 2018-12-20 16:07:28 +01:00
parent 94c1eaf402
commit 2d1f0643ff
7 changed files with 183 additions and 245 deletions

View file

@ -2,26 +2,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
RangeListerTask,
IndexingRefreshListerTask, ListerTaskBase)
import random
from celery import group
from swh.scheduler.celery_backend.config import app
from swh.scheduler.task import SWHTask
from .lister import BitBucketLister
class BitBucketListerTask(ListerTaskBase):
def new_lister(self, *, api_baseurl='https://api.bitbucket.org/2.0'):
return BitBucketLister(api_baseurl=api_baseurl)
GROUP_SPLIT = 10000
class IncrementalBitBucketLister(BitBucketListerTask,
IndexingDiscoveryListerTask):
task_queue = 'swh_lister_bitbucket_discover'
def new_lister(api_baseurl='https://api.bitbucket.org/2.0'):
return BitBucketLister(api_baseurl=api_baseurl)
class RangeBitBucketLister(BitBucketListerTask, RangeListerTask):
task_queue = 'swh_lister_bitbucket_refresh'
@app.task(name='swh.lister.bitbucket.tasks.IncrementalBitBucketLister',
base=SWHTask, bind=True)
def incremental_bitbucket_lister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
self.log.debug('%s OK' % (self.name))
@app.task(name='swh.lister.bitbucket.tasks.RangeBitBucketLister',
base=SWHTask, bind=True)
def range_bitbucket_lister(self, start, end, **lister_args):
self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
self.name, start, end, lister_args))
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
self.log.debug('%s OK' % (self.name))
@app.task(name='swh.lister.bitbucket.tasks.FullBitBucketRelister',
base=SWHTask, bind=True)
def full_bitbucket_relister(self, split=None, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = new_lister(**lister_args)
ranges = lister.db_partition_indices(split or GROUP_SPLIT)
random.shuffle(ranges)
group(range_bitbucket_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
class FullBitBucketRelister(BitBucketListerTask, IndexingRefreshListerTask):
task_queue = 'swh_lister_bitbucket_refresh'

View file

@ -1,95 +0,0 @@
# Copyright (C) 2017-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import random
from celery import group
from swh.scheduler.task import Task
from .abstractattribute import AbstractAttribute
class AbstractTaskMeta(abc.ABCMeta):
pass
class ListerTaskBase(Task, metaclass=AbstractTaskMeta):
"""Lister Tasks define the process of periodically requesting batches of
repository information from source code hosting services. They
instantiate Listers to do batches of work at periodic intervals.
There are two main kinds of lister tasks:
1. Discovering new repositories.
2. Refreshing the list of already discovered repositories.
If the hosting service is indexable (according to the requirements of
:class:`SWHIndexingLister`), then we can optionally partition the
set of known repositories into sub-sets to distribute the work.
This means that there is a third possible Task type for Indexing
Listers:
3. Discover or refresh a specific range of indices.
"""
task_queue = AbstractAttribute('Celery Task queue name')
@abc.abstractmethod
def new_lister(self, **lister_args):
"""Return a new lister of the appropriate type.
"""
pass
@abc.abstractmethod
def run_task(self, *, lister_args=None):
pass
# Paging/Indexing lister tasks derivatives
# (cf. {github/bitbucket/gitlab}/tasks)
class RangeListerTask(ListerTaskBase):
"""Range lister task.
"""
def run_task(self, start, end, lister_args=None):
if lister_args is None:
lister_args = {}
lister = self.new_lister(**lister_args)
return lister.run(min_bound=start, max_bound=end)
# Indexing Lister tasks derivatives (cf. {github/bitbucket}/tasks)
class IndexingDiscoveryListerTask(ListerTaskBase):
"""Incremental indexing lister task.
"""
def run_task(self, *, lister_args=None):
if lister_args is None:
lister_args = {}
lister = self.new_lister(**lister_args)
return lister.run(min_bound=lister.db_last_index(), max_bound=None)
class IndexingRefreshListerTask(ListerTaskBase):
"""Full indexing lister task.
"""
GROUP_SPLIT = 10000
def run_task(self, *, lister_args=None):
if lister_args is None:
lister_args = {}
lister = self.new_lister(**lister_args)
ranges = lister.db_partition_indices(self.GROUP_SPLIT)
random.shuffle(ranges)
range_task = RangeListerTask()
group(range_task.s(minv, maxv, lister_args)
for minv, maxv in ranges)()

View file

@ -2,17 +2,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.lister.core.tasks import ListerTaskBase
from swh.scheduler.celery_backend.config import app
from swh.scheduler.task import SWHTask
from .lister import DebianLister
class DebianListerTask(ListerTaskBase):
task_queue = 'swh_lister_debian'
def new_lister(self):
return DebianLister()
def run_task(self, distribution):
lister = self.new_lister()
return lister.run(distribution)
@app.task(name='swh.lister.debian.tasks.DebianListerTask',
base=SWHTask, bind=True)
def debian_lister(self, distribution, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
DebianLister(**lister_args).run(distribution)
self.log.debug('%s OK' % (self.name))

View file

@ -2,25 +2,52 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
RangeListerTask,
IndexingRefreshListerTask, ListerTaskBase)
import random
from .lister import GitHubLister
from celery import group
from swh.scheduler.celery_backend.config import app
from swh.scheduler.task import SWHTask
from swh.lister.github.lister import GitHubLister
GROUP_SPLIT = 10000
class GitHubListerTask(ListerTaskBase):
def new_lister(self, *, api_baseurl='https://api.github.com'):
return GitHubLister(api_baseurl=api_baseurl)
def new_lister(api_baseurl='https://api.github.com', **kw):
return GitHubLister(api_baseurl=api_baseurl, **kw)
class IncrementalGitHubLister(GitHubListerTask, IndexingDiscoveryListerTask):
task_queue = 'swh_lister_github_discover'
@app.task(name='swh.lister.github.tasks.IncrementalGitHubLister',
base=SWHTask, bind=True)
def incremental_github_lister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
self.log.debug('%s OK' % (self.name))
class RangeGitHubLister(GitHubListerTask, RangeListerTask):
task_queue = 'swh_lister_github_refresh'
@app.task(name='swh.lister.github.tasks.RangeGitHubLister',
base=SWHTask, bind=True)
def range_github_lister(self, start, end, **lister_args):
self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
self.name, start, end, lister_args))
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
self.log.debug('%s OK' % (self.name))
@app.task(name='swh.lister.github.tasks.FullGitHubRelister',
base=SWHTask, bind=True)
def full_github_relister(self, split=None, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = new_lister(**lister_args)
ranges = lister.db_partition_indices(split or GROUP_SPLIT)
random.shuffle(ranges)
group(range_github_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
class FullGitHubRelister(GitHubListerTask, IndexingRefreshListerTask):
task_queue = 'swh_lister_github_refresh'

View file

@ -6,58 +6,54 @@ import random
from celery import group
from swh.scheduler.celery_backend.config import app
from swh.scheduler.task import SWHTask
from .. import utils
from ..core.tasks import ListerTaskBase, RangeListerTask
from .lister import GitLabLister
class GitLabListerTask(ListerTaskBase):
def new_lister(self, *, api_baseurl='https://gitlab.com/api/v4',
instance='gitlab', sort='asc', per_page=20):
return GitLabLister(
api_baseurl=api_baseurl, instance=instance, sort=sort)
NBPAGES = 10
class RangeGitLabLister(GitLabListerTask, RangeListerTask):
"""Range GitLab lister (list available origins on specified range)
"""
task_queue = 'swh_lister_gitlab_refresh'
def new_lister(api_baseurl='https://gitlab.com/api/v4',
instance='gitlab', sort='asc', per_page=20):
return GitLabLister(
api_baseurl=api_baseurl, instance=instance, sort=sort)
class FullGitLabRelister(GitLabListerTask):
"""Full GitLab lister (list all available origins from the api).
"""
task_queue = 'swh_lister_gitlab_refresh'
# nb pages
nb_pages = 10
def run_task(self, lister_args=None):
if lister_args is None:
lister_args = {}
lister = self.new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
ranges = list(utils.split_range(total_pages, self.nb_pages))
random.shuffle(ranges)
range_task = RangeGitLabLister()
group(range_task.s(minv, maxv, lister_args=lister_args)
for minv, maxv in ranges)()
@app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister',
base=SWHTask, bind=True)
def incremental_gitlab_lister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister_args['sort'] = 'desc'
lister = new_lister(**lister_args)
total_pages = lister.get_pages_information()[1]
# stopping as soon as existing origins for that instance are detected
lister.run(min_bound=1, max_bound=total_pages, check_existence=True)
self.log.debug('%s OK' % (self.name))
class IncrementalGitLabLister(GitLabListerTask):
"""Incremental GitLab lister (list only new available origins).
@app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister',
base=SWHTask, bind=True)
def range_gitlab_lister(self, start, end, **lister_args):
self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
self.name, start, end, lister_args))
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
self.log.debug('%s OK' % (self.name))
"""
task_queue = 'swh_lister_gitlab_discover'
def run_task(self, lister_args=None):
if lister_args is None:
lister_args = {}
lister_args['sort'] = 'desc'
lister = self.new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
# stopping as soon as existing origins for that instance are detected
return lister.run(min_bound=1, max_bound=total_pages,
check_existence=True)
@app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister',
base=SWHTask, bind=True)
def full_gitlab_relister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
ranges = list(utils.split_range(total_pages, NBPAGES))
random.shuffle(ranges)
group(range_gitlab_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))

View file

@ -3,75 +3,62 @@
# See top-level LICENSE file for more information
from datetime import datetime
from contextlib import contextmanager
from swh.scheduler.celery_backend.config import app
from swh.scheduler.task import SWHTask
from swh.lister.core.tasks import ListerTaskBase
from swh.lister.npm.lister import NpmLister, NpmIncrementalLister
from swh.lister.npm.models import NpmVisitModel
class _NpmListerTaskBase(ListerTaskBase):
@contextmanager
def save_registry_state(lister):
params = {'headers': lister.request_headers()}
registry_state = lister.session.get(lister.api_baseurl, **params)
registry_state = registry_state.json()
keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
'disk_size', 'data_size', 'committed_update_seq',
'compacted_seq')
task_queue = 'swh_lister_npm_refresh'
def _save_registry_state(self):
"""Query the root endpoint from the npm registry and
backup values of interest for future listing
"""
params = {'headers': self.lister.request_headers()}
registry_state = \
self.lister.session.get(self.lister.api_baseurl, **params)
registry_state = registry_state.json()
self.registry_state = {
'visit_date': datetime.now(),
}
for key in ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
'disk_size', 'data_size', 'committed_update_seq',
'compacted_seq'):
self.registry_state[key] = registry_state[key]
def _store_registry_state(self):
"""Store the backup npm registry state to database.
"""
npm_visit = NpmVisitModel(**self.registry_state)
self.lister.db_session.add(npm_visit)
self.lister.db_session.commit()
state = {key: registry_state[key] for key in keys}
state['visit_date'] = datetime.now()
yield
npm_visit = NpmVisitModel(state)
lister.db_session.add(npm_visit)
lister.db_session.commit()
class NpmListerTask(_NpmListerTaskBase):
"""Full npm lister (list all available packages from the npm registry)
def get_last_update_seq(lister):
"""Get latest ``update_seq`` value for listing only updated packages.
"""
def new_lister(self):
return NpmLister()
def run_task(self):
self.lister = self.new_lister()
self._save_registry_state()
self.lister.run()
self._store_registry_state()
query = lister.db_session.query(NpmVisitModel.update_seq)
row = query.order_by(NpmVisitModel.uid.desc()).first()
if not row:
raise ValueError('No npm registry listing previously performed ! '
'This is required prior to the execution of an '
'incremental listing.')
return row[0]
class NpmIncrementalListerTask(_NpmListerTaskBase):
"""Incremental npm lister (list all updated packages since the last listing)
"""
@app.task(name='swh.lister.npm.tasks.NpmListerTask',
base=SWHTask, bind=True)
def npm_lister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = NpmLister(**lister_args)
with save_registry_state(lister):
lister.run()
self.log.debug('%s OK' % (self.name))
def new_lister(self):
return NpmIncrementalLister()
def run_task(self):
self.lister = self.new_lister()
update_seq_start = self._get_last_update_seq()
self._save_registry_state()
self.lister.run(min_bound=update_seq_start)
self._store_registry_state()
def _get_last_update_seq(self):
"""Get latest ``update_seq`` value for listing only updated packages.
"""
query = self.lister.db_session.query(NpmVisitModel.update_seq)
row = query.order_by(NpmVisitModel.uid.desc()).first()
if not row:
raise ValueError('No npm registry listing previously performed ! '
'This is required prior to the execution of an '
'incremental listing.')
return row[0]
@app.task(name='swh.lister.npm.tasks.NpmIncrementalListerTask',
base=SWHTask, bind=True)
def npm_incremental_lister(self, **lister_args):
self.log.debug('%s, lister_args=%s' % (
self.name, lister_args))
lister = NpmIncrementalLister(**lister_args)
update_seq_start = get_last_update_seq(lister)
with save_registry_state(lister):
lister.run(min_bound=update_seq_start)
self.log.debug('%s OK' % (self.name))

View file

@ -2,19 +2,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from ..core.tasks import ListerTaskBase
from swh.scheduler.task import SWHTask
from swh.scheduler.celery_backend.config import app
from .lister import PyPILister
class PyPIListerTask(ListerTaskBase):
"""Full PyPI lister (list all available origins from the api).
@app.task(name='swh.lister.pypi.tasks.PyPIListerTask',
base=SWHTask, bind=True)
def pypi_lister(self, **lister_args):
self.log.debug('%s(), lister_args=%s' % (
self.name, lister_args))
PyPILister(**lister_args).run()
self.log.debug('%s OK' % (self.name))
"""
task_queue = 'swh_lister_pypi_refresh'
def new_lister(self):
return PyPILister()
def run_task(self):
lister = self.new_lister()
lister.run()