lister.cli: Remove task type register cli
It's now defined in swh.scheduler
This commit is contained in:
parent
8d02458686
commit
484377cc13
2 changed files with 1 additions and 154 deletions
|
@ -6,14 +6,11 @@
|
|||
import os
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from importlib import import_module
|
||||
|
||||
import celery.app.task
|
||||
import click
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from swh.core.cli import CONTEXT_SETTINGS
|
||||
from swh.scheduler import get_scheduler
|
||||
from swh.lister import get_lister, SUPPORTED_LISTERS, LISTERS
|
||||
from swh.lister.core.models import initialize
|
||||
|
||||
|
@ -99,81 +96,6 @@ def db_init(ctx, drop_tables):
|
|||
init_hook(db_engine)
|
||||
|
||||
|
||||
@lister.command(name='register-task-types', context_settings=CONTEXT_SETTINGS)
|
||||
@click.option('--lister', '-l', 'listers', multiple=True,
|
||||
default=('all', ), show_default=True,
|
||||
help='Only registers task-types for these listers',
|
||||
type=click.Choice(['all'] + SUPPORTED_LISTERS))
|
||||
@click.pass_context
|
||||
def register_task_types(ctx, listers):
|
||||
"""Insert missing task-type entries in the scheduler
|
||||
|
||||
According to declared tasks in each loaded lister plugin.
|
||||
"""
|
||||
|
||||
cfg = ctx.obj['config']
|
||||
scheduler = get_scheduler(**cfg['scheduler'])
|
||||
|
||||
for lister, entrypoint in LISTERS.items():
|
||||
if 'all' not in listers and lister not in listers:
|
||||
continue
|
||||
logger.info('Loading lister %s', lister)
|
||||
|
||||
registry_entry = entrypoint.load()()
|
||||
for task_module in registry_entry['task_modules']:
|
||||
mod = import_module(task_module)
|
||||
for task_name in (x for x in dir(mod) if not x.startswith('_')):
|
||||
taskobj = getattr(mod, task_name)
|
||||
if isinstance(taskobj, celery.app.task.Task):
|
||||
task_type = task_name.replace('_', '-')
|
||||
task_cfg = registry_entry.get('task_types', {}).get(
|
||||
task_type, {})
|
||||
ensure_task_type(task_type, taskobj, task_cfg, scheduler)
|
||||
|
||||
|
||||
def ensure_task_type(task_type, swhtask, task_config, scheduler):
|
||||
"""Ensure a task-type is known by the scheduler
|
||||
|
||||
Args:
|
||||
task_type (str): the type of the task to check/insert (correspond to
|
||||
the 'type' field in the db)
|
||||
swhtask (SWHTask): the SWHTask instance the task-type correspond to
|
||||
task_config (dict): a dict with specific/overloaded values for the
|
||||
task-type to be created
|
||||
scheduler: the scheduler object used to access the scheduler db
|
||||
"""
|
||||
for suffix, defaults in DEFAULT_TASK_TYPE.items():
|
||||
if task_type.endswith('-' + suffix):
|
||||
task_type_dict = defaults.copy()
|
||||
break
|
||||
else:
|
||||
task_type_dict = DEFAULT_TASK_TYPE['*'].copy()
|
||||
|
||||
task_type_dict['type'] = task_type
|
||||
task_type_dict['backend_name'] = swhtask.name
|
||||
if swhtask.__doc__:
|
||||
task_type_dict['description'] = swhtask.__doc__.splitlines()[0]
|
||||
|
||||
task_type_dict.update(task_config)
|
||||
|
||||
current_task_type = scheduler.get_task_type(task_type)
|
||||
if current_task_type:
|
||||
# check some stuff
|
||||
if current_task_type['backend_name'] != task_type_dict['backend_name']:
|
||||
logger.warning('Existing task type %s for lister %s has a '
|
||||
'different backend name than current '
|
||||
'code version provides (%s vs. %s)',
|
||||
task_type,
|
||||
lister,
|
||||
current_task_type['backend_name'],
|
||||
task_type_dict['backend_name'],
|
||||
)
|
||||
else:
|
||||
logger.info('Create task type %s in scheduler', task_type)
|
||||
logger.debug(' %s', task_type_dict)
|
||||
scheduler.create_task_type(task_type_dict)
|
||||
|
||||
|
||||
@lister.command(name='run', context_settings=CONTEXT_SETTINGS,
|
||||
help='Trigger a full listing run for a particular forge '
|
||||
'instance. The output of this listing results in '
|
||||
|
|
|
@ -3,43 +3,13 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import glob
|
||||
import pytest
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
|
||||
import yaml
|
||||
|
||||
from swh.core.utils import numfile_sortkey as sortkey
|
||||
from swh.scheduler import get_scheduler
|
||||
from swh.scheduler.tests.conftest import DUMP_FILES
|
||||
|
||||
from swh.lister.core.lister_base import ListerBase
|
||||
from swh.lister.cli import lister as cli, get_lister, SUPPORTED_LISTERS
|
||||
from swh.lister.cli import get_lister, SUPPORTED_LISTERS
|
||||
|
||||
from .test_utils import init_db
|
||||
from click.testing import CliRunner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def swh_scheduler_config(request, postgresql_proc, postgresql):
|
||||
scheduler_config = {
|
||||
'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format(
|
||||
host=postgresql_proc.host,
|
||||
port=postgresql_proc.port,
|
||||
user='postgres',
|
||||
dbname='tests')
|
||||
}
|
||||
|
||||
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
|
||||
|
||||
cursor = postgresql.cursor()
|
||||
for fname in all_dump_files:
|
||||
with open(fname) as fobj:
|
||||
cursor.execute(fobj.read())
|
||||
postgresql.commit()
|
||||
|
||||
return scheduler_config
|
||||
|
||||
|
||||
def test_get_lister_wrong_input():
|
||||
|
@ -95,48 +65,3 @@ def test_get_lister_override():
|
|||
assert 'priority' not in lst.config
|
||||
assert 'oneshot' not in lst.config
|
||||
assert lst.url == lst.DEFAULT_URL
|
||||
|
||||
|
||||
def test_task_types(swh_scheduler_config, tmp_path):
|
||||
configfile = tmp_path / 'config.yml'
|
||||
config = {
|
||||
'scheduler': {
|
||||
'cls': 'local',
|
||||
'args': swh_scheduler_config
|
||||
}
|
||||
}
|
||||
configfile.write_text(yaml.dump(config))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(cli, [
|
||||
'--config-file', configfile.as_posix(),
|
||||
'register-task-types'])
|
||||
|
||||
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
|
||||
|
||||
scheduler = get_scheduler(**config['scheduler'])
|
||||
all_tasks = [
|
||||
'list-bitbucket-full', 'list-bitbucket-incremental',
|
||||
'list-cran',
|
||||
'list-cgit',
|
||||
'list-debian-distribution',
|
||||
'list-gitlab-full', 'list-gitlab-incremental',
|
||||
'list-github-full', 'list-github-incremental',
|
||||
'list-gnu-full',
|
||||
'list-npm-full', 'list-npm-incremental',
|
||||
'list-phabricator-full',
|
||||
'list-packagist',
|
||||
'list-pypi',
|
||||
]
|
||||
for task in all_tasks:
|
||||
task_type_desc = scheduler.get_task_type(task)
|
||||
assert task_type_desc
|
||||
assert task_type_desc['type'] == task
|
||||
assert task_type_desc['backoff_factor'] == 1
|
||||
|
||||
if task == 'list-npm-full':
|
||||
delay = timedelta(days=7) # overloaded in the plugin registry
|
||||
elif task.endswith('-full'):
|
||||
delay = timedelta(days=90) # default value for 'full' lister tasks
|
||||
else:
|
||||
delay = timedelta(days=1) # default value for other lister tasks
|
||||
assert task_type_desc['default_interval'] == delay, task
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue