plugins: add support for scheduler's task-type declaration

Add a new register-task-types cli that will create missing task-type entries in the
scheduler according to:

- only create missing task-types (do not update them), but check that the
  backend_name field is consistent,
- each SWHTask-based task declared in a module listed in the 'task_modules'
  plugin registry field will be checked and added if needed; tasks which name
  start wit an underscore will not be added,
- added task-type will have:
  - the 'type' field is derived from the task's function name (with underscores
    replaced with dashes),
  - the description field is the first line of that function's docstring,
  - default values as provided by the swh.lister.cli.DEFAULT_TASK_TYPE (with
    a simple pattern matching to have decent default values for full/incremental
    tasks),
  - these default values can be overloaded via the 'task_type' plugin registry
    entry.

For this, we had to rename all tasks names (eg. `cran_lister` -> `list_cran`).

Comes with some tests.
This commit is contained in:
David Douard 2019-09-03 14:39:06 +02:00
parent e3c0ea9d90
commit 8d9deeb8f8
14 changed files with 233 additions and 41 deletions

View file

@ -17,20 +17,21 @@ def new_lister(api_baseurl='https://api.bitbucket.org/2.0', per_page=100):
@app.task(name=__name__ + '.IncrementalBitBucketLister')
def incremental_bitbucket_lister(**lister_args):
def list_bitbucket_incremental(**lister_args):
'''Incremental update of the BitBucket forge'''
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeBitBucketLister')
def range_bitbucket_lister(start, end, **lister_args):
def _range_bitbucket_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullBitBucketRelister', bind=True)
def full_bitbucket_relister(self, split=None, **lister_args):
"""Relist from the beginning of what's already been listed.
def list_bitbucket_full(self, split=None, **lister_args):
"""Full update of the BitBucket forge
It's not to be called for an initial listing.
@ -42,7 +43,7 @@ def full_bitbucket_relister(self, split=None, **lister_args):
return
random.shuffle(ranges)
promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args)
promise = group(_range_bitbucket_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges)))
try:
@ -53,5 +54,5 @@ def full_bitbucket_relister(self, split=None, **lister_args):
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -8,10 +8,11 @@ from .lister import CGitLister
@app.task(name=__name__ + '.CGitListerTask')
def cgit_lister(**lister_args):
def list_cgit(**lister_args):
'''Lister task for CGit instances'''
CGitLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -3,14 +3,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import logging
import pkg_resources
from copy import deepcopy
from importlib import import_module
import click
from sqlalchemy import create_engine
from swh.core.cli import CONTEXT_SETTINGS
from swh.scheduler import get_scheduler
from swh.scheduler.task import SWHTask
from swh.lister.core.models import initialize
@ -21,6 +25,25 @@ LISTERS = {entry_point.name.split('.', 1)[1]: entry_point
if entry_point.name.split('.', 1)[0] == 'lister'}
SUPPORTED_LISTERS = list(LISTERS)
# the key in this dict is the suffix used to match new task-type to be added.
# For example for a task which function name is "list_gitlab_full', the default
# value used when inserting a new task-type in the scheduler db will be the one
# under the 'full' key below (because it matches xxx_full).
DEFAULT_TASK_TYPE = {
'full': { # for tasks like 'list_xxx_full()'
'default_interval': '90 days',
'min_interval': '90 days',
'max_interval': '90 days',
'backoff_factor': 1
},
'*': { # value if not suffix matches
'default_interval': '1 day',
'min_interval': '1 day',
'max_interval': '1 day',
'backoff_factor': 1
},
}
def get_lister(lister_name, db_url=None, **conf):
"""Instantiate a lister given its name.
@ -66,6 +89,8 @@ def lister(ctx, config_file, db_url):
'cls': 'local',
'args': {'db': db_url}
}
if not config_file:
config_file = os.environ.get('SWH_CONFIG_FILENAME')
conf = config.read(config_file, override_conf)
ctx.obj['config'] = conf
ctx.obj['override_conf'] = override_conf
@ -89,20 +114,97 @@ def db_init(ctx, drop_tables):
db_url = lister_cfg['args']['db']
db_engine = create_engine(db_url)
registry = {}
for lister, entrypoint in LISTERS.items():
logger.info('Loading lister %s', lister)
registry_entry = entrypoint.load()()
registry[lister] = entrypoint.load()()
logger.info('Initializing database')
initialize(db_engine, drop_tables)
for lister, entrypoint in LISTERS.items():
registry_entry = registry[lister]
init_hook = registry_entry.get('init')
if callable(init_hook):
logger.info('Calling init hook for %s', lister)
init_hook(db_engine)
@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS)
@click.option('--lister', '-l', 'listers', multiple=True,
default=('all', ), show_default=True,
help='Only registers task-types for these listers',
type=click.Choice(['all'] + SUPPORTED_LISTERS))
@click.pass_context
def register_task_types(ctx, listers):
"""Insert missing task-type entries in the scheduler
According to declared tasks in each loaded lister plugin.
"""
cfg = ctx.obj['config']
scheduler = get_scheduler(**cfg['scheduler'])
for lister, entrypoint in LISTERS.items():
if 'all' not in listers and lister not in listers:
continue
logger.info('Loading lister %s', lister)
registry_entry = entrypoint.load()()
for task_module in registry_entry['task_modules']:
mod = import_module(task_module)
for task_name in (x for x in dir(mod) if not x.startswith('_')):
taskobj = getattr(mod, task_name)
if isinstance(taskobj, SWHTask):
task_type = task_name.replace('_', '-')
task_cfg = registry_entry.get('task_types', {}).get(
task_type, {})
ensure_task_type(task_type, taskobj, task_cfg, scheduler)
def ensure_task_type(task_type, swhtask, task_config, scheduler):
"""Ensure a task-type is known by the scheduler
Args:
task_type (str): the type of the task to check/insert (correspond to
the 'type' field in the db)
swhtask (SWHTask): the SWHTask instance the task-type correspond to
task_config (dict): a dict with specific/overloaded values for the
task-type to be created
scheduler: the scheduler object used to access the scheduler db
"""
for suffix, defaults in DEFAULT_TASK_TYPE.items():
if task_type.endswith('-' + suffix):
task_type_dict = defaults.copy()
break
else:
task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
task_type_dict['type'] = task_type
task_type_dict['backend_name'] = swhtask.name
if swhtask.__doc__:
task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
task_type_dict.update(task_config)
current_task_type = scheduler.get_task_type(task_type)
if current_task_type:
# check some stuff
if current_task_type['backend_name'] != task_type_dict['backend_name']:
logger.warning('Existing task type %s for lister %s has a '
'different backend name than current '
'code version provides (%s vs. %s)',
task_type,
lister,
current_task_type['backend_name'],
task_type_dict['backend_name'],
)
else:
logger.info('Create task type %s in scheduler', task_type)
logger.debug(' %s', task_type_dict)
scheduler.create_task_type(task_type_dict)
@lister.command(name='run', context_settings=CONTEXT_SETTINGS,
help='Trigger a full listing run for a particular forge '
'instance. The output of this listing results in '

View file

@ -8,10 +8,11 @@ from swh.lister.cran.lister import CRANLister
@app.task(name=__name__ + '.CRANListerTask')
def cran_lister(**lister_args):
def list_cran(**lister_args):
'''Lister task for the CRAN registry'''
CRANLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -8,10 +8,11 @@ from .lister import DebianLister
@app.task(name=__name__ + '.DebianListerTask')
def debian_lister(distribution, **lister_args):
def list_debian_distribution(distribution, **lister_args):
'''List a Debian distribution'''
DebianLister(**lister_args).run(distribution)
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -17,20 +17,21 @@ def new_lister(api_baseurl='https://api.github.com', **kw):
@app.task(name=__name__ + '.IncrementalGitHubLister')
def incremental_github_lister(**lister_args):
def list_github_incremental(**lister_args):
'Incremental update of GitHub'
lister = new_lister(**lister_args)
lister.run(min_bound=lister.db_last_index(), max_bound=None)
@app.task(name=__name__ + '.RangeGitHubLister')
def range_github_lister(start, end, **lister_args):
def _range_github_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitHubRelister', bind=True)
def full_github_relister(self, split=None, **lister_args):
"""Relist from the beginning of what's already been listed.
def list_github_full(self, split=None, **lister_args):
"""Full update of GitHub
It's not to be called for an initial listing.
@ -41,7 +42,7 @@ def full_github_relister(self, split=None, **lister_args):
self.log.info('Nothing to list')
return
random.shuffle(ranges)
promise = group(range_github_lister.s(minv, maxv, **lister_args)
promise = group(_range_github_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@ -52,5 +53,5 @@ def full_github_relister(self, split=None, **lister_args):
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -22,7 +22,8 @@ def new_lister(api_baseurl='https://gitlab.com/api/v4',
@app.task(name=__name__ + '.IncrementalGitLabLister')
def incremental_gitlab_lister(**lister_args):
def list_gitlab_incremental(**lister_args):
"""Incremental update of a GitLab instance"""
lister_args['sort'] = 'desc'
lister = new_lister(**lister_args)
total_pages = lister.get_pages_information()[1]
@ -31,23 +32,19 @@ def incremental_gitlab_lister(**lister_args):
@app.task(name=__name__ + '.RangeGitLabLister')
def range_gitlab_lister(start, end, **lister_args):
def _range_gitlab_lister(start, end, **lister_args):
lister = new_lister(**lister_args)
lister.run(min_bound=start, max_bound=end)
@app.task(name=__name__ + '.FullGitLabRelister', bind=True)
def full_gitlab_relister(self, **lister_args):
"""Full lister
This should be renamed as such.
"""
def list_gitlab_full(self, **lister_args):
"""Full update of a GitLab instance"""
lister = new_lister(**lister_args)
_, total_pages, _ = lister.get_pages_information()
ranges = list(utils.split_range(total_pages, NBPAGES))
random.shuffle(ranges)
promise = group(range_gitlab_lister.s(minv, maxv, **lister_args)
promise = group(_range_gitlab_lister.s(minv, maxv, **lister_args)
for minv, maxv in ranges)()
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
try:
@ -58,5 +55,5 @@ def full_gitlab_relister(self, **lister_args):
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -8,10 +8,11 @@ from .lister import GNULister
@app.task(name=__name__ + '.GNUListerTask')
def gnu_lister(**lister_args):
def list_gnu_full(**lister_args):
'List lister for the GNU source code archive'
GNULister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -10,4 +10,11 @@ def register():
return {'models': [NpmVisitModel, NpmModel],
'lister': NpmLister,
'task_modules': ['%s.tasks' % __name__],
'task_types': {
'list-npm-full': {
'default_interval': '7 days',
'min_interval': '7 days',
'max_interval': '7 days',
},
},
}

View file

@ -41,14 +41,16 @@ def get_last_update_seq(lister):
@app.task(name=__name__ + '.NpmListerTask')
def npm_lister(**lister_args):
def list_npm_full(**lister_args):
'Full lister for the npm (javascript) registry'
lister = NpmLister(**lister_args)
with save_registry_state(lister):
lister.run()
@app.task(name=__name__ + '.NpmIncrementalListerTask')
def npm_incremental_lister(**lister_args):
def list_npm_incremental(**lister_args):
'Incremental lister for the npm (javascript) registry'
lister = NpmIncrementalLister(**lister_args)
update_seq_start = get_last_update_seq(lister)
with save_registry_state(lister):
@ -56,5 +58,5 @@ def npm_incremental_lister(**lister_args):
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -8,10 +8,11 @@ from .lister import PackagistLister
@app.task(name=__name__ + '.PackagistListerTask')
def packagist_lister(**lister_args):
def list_packagist(**lister_args):
'List the packagist (php) registry'
PackagistLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -7,10 +7,11 @@ from swh.lister.phabricator.lister import PhabricatorLister
@app.task(name=__name__ + '.FullPhabricatorLister')
def full_phabricator_lister(**lister_args):
def list_phabricator_full(**lister_args):
'Full update of a Phabricator instance'
PhabricatorLister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -8,10 +8,11 @@ from .lister import PyPILister
@app.task(name=__name__ + '.PyPIListerTask')
def pypi_lister(**lister_args):
def list_pypi(**lister_args):
'Full update of the PyPI (python) registry'
PyPILister(**lister_args).run()
@app.task(name=__name__ + '.ping')
def ping():
def _ping():
return 'OK'

View file

@ -3,12 +3,43 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import glob
import pytest
import traceback
from datetime import timedelta
import yaml
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests.conftest import DUMP_FILES
from swh.lister.core.lister_base import ListerBase
from swh.lister.cli import get_lister, SUPPORTED_LISTERS
from swh.lister.cli import lister as cli, get_lister, SUPPORTED_LISTERS
from .test_utils import init_db
from click.testing import CliRunner
@pytest.fixture
def swh_scheduler_config(request, postgresql_proc, postgresql):
scheduler_config = {
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
host=postgresql_proc.host,
port=postgresql_proc.port,
user='postgres',
dbname='tests')
}
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
for fname in all_dump_files:
with open(fname) as fobj:
cursor.execute(fobj.read())
postgresql.commit()
return scheduler_config
def test_get_lister_wrong_input():
@ -64,3 +95,47 @@ def test_get_lister_override():
assert url_key not in lst.config
assert 'priority' not in lst.config
assert 'oneshot' not in lst.config
def test_task_types(swh_scheduler_config, tmp_path):
db_url = init_db().url()
configfile = tmp_path / 'config.yml'
configfile.write_text(yaml.dump({'scheduler': {
'cls': 'local',
'args': swh_scheduler_config}}))
runner = CliRunner()
result = runner.invoke(cli, [
'--db-url', db_url,
'--config-file', configfile.as_posix(),
'register-task-types'])
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
scheduler = get_scheduler(cls='local', args=swh_scheduler_config)
all_tasks = [
'list-bitbucket-full', 'list-bitbucket-incremental',
'list-cran',
'list-cgit',
'list-debian-distribution',
'list-gitlab-full', 'list-gitlab-incremental',
'list-github-full', 'list-github-incremental',
'list-gnu-full',
'list-npm-full', 'list-npm-incremental',
'list-phabricator-full',
'list-packagist',
'list-pypi',
]
for task in all_tasks:
task_type_desc = scheduler.get_task_type(task)
assert task_type_desc
assert task_type_desc['type'] == task
assert task_type_desc['backoff_factor'] == 1
if task == 'list-npm-full':
delay = timedelta(days=7) # overloaded in the plugin registry
elif task.endswith('-full'):
delay = timedelta(days=90) # default value for 'full' lister tasks
else:
delay = timedelta(days=1) # default value for other lister tasks
assert task_type_desc['default_interval'] == delay, task