Stop binding tasks to a specific instance of the celery app
The celery.shared_task decorator allows late-binding of tasks to any celery app, which is well suited for our "task plugin" architecture.
This commit is contained in:
parent
a64ae9641d
commit
78105940ff
11 changed files with 41 additions and 44 deletions
|
@ -3,29 +3,27 @@
|
|||
# See top-level LICENSE file for more information
|
||||
|
||||
import random
|
||||
from celery import group
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import group, shared_task
|
||||
|
||||
from .lister import BitBucketLister
|
||||
|
||||
GROUP_SPLIT = 10000
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.IncrementalBitBucketLister')
|
||||
@shared_task(name=__name__ + '.IncrementalBitBucketLister')
|
||||
def list_bitbucket_incremental(**lister_args):
|
||||
'''Incremental update of the BitBucket forge'''
|
||||
lister = BitBucketLister(**lister_args)
|
||||
lister.run(min_bound=lister.db_last_index(), max_bound=None)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.RangeBitBucketLister')
|
||||
@shared_task(name=__name__ + '.RangeBitBucketLister')
|
||||
def _range_bitbucket_lister(start, end, **lister_args):
|
||||
lister = BitBucketLister(**lister_args)
|
||||
lister.run(min_bound=start, max_bound=end)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.FullBitBucketRelister', bind=True)
|
||||
@shared_task(name=__name__ + '.FullBitBucketRelister', bind=True)
|
||||
def list_bitbucket_full(self, split=None, **lister_args):
|
||||
"""Full update of the BitBucket forge
|
||||
|
||||
|
@ -49,6 +47,6 @@ def list_bitbucket_full(self, split=None, **lister_args):
|
|||
return promise.id
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import CGitLister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.CGitListerTask')
|
||||
@shared_task(name=__name__ + '.CGitListerTask')
|
||||
def list_cgit(**lister_args):
|
||||
'''Lister task for CGit instances'''
|
||||
CGitLister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from swh.lister.cran.lister import CRANLister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.CRANListerTask')
|
||||
@shared_task(name=__name__ + '.CRANListerTask')
|
||||
def list_cran(**lister_args):
|
||||
'''Lister task for the CRAN registry'''
|
||||
CRANLister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import DebianLister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.DebianListerTask')
|
||||
@shared_task(name=__name__ + '.DebianListerTask')
|
||||
def list_debian_distribution(distribution, **lister_args):
|
||||
'''List a Debian distribution'''
|
||||
DebianLister(**lister_args).run(distribution)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -4,28 +4,27 @@
|
|||
|
||||
import random
|
||||
|
||||
from celery import group
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import group, shared_task
|
||||
|
||||
from swh.lister.github.lister import GitHubLister
|
||||
|
||||
GROUP_SPLIT = 10000
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.IncrementalGitHubLister')
|
||||
@shared_task(name=__name__ + '.IncrementalGitHubLister')
|
||||
def list_github_incremental(**lister_args):
|
||||
'Incremental update of GitHub'
|
||||
lister = GitHubLister(**lister_args)
|
||||
lister.run(min_bound=lister.db_last_index(), max_bound=None)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.RangeGitHubLister')
|
||||
@shared_task(name=__name__ + '.RangeGitHubLister')
|
||||
def _range_github_lister(start, end, **lister_args):
|
||||
lister = GitHubLister(**lister_args)
|
||||
lister.run(min_bound=start, max_bound=end)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.FullGitHubRelister', bind=True)
|
||||
@shared_task(name=__name__ + '.FullGitHubRelister', bind=True)
|
||||
def list_github_full(self, split=None, **lister_args):
|
||||
"""Full update of GitHub
|
||||
|
||||
|
@ -48,6 +47,6 @@ def list_github_full(self, split=None, **lister_args):
|
|||
return promise.id
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -4,8 +4,7 @@
|
|||
|
||||
import random
|
||||
|
||||
from celery import group
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import group, shared_task
|
||||
|
||||
from .. import utils
|
||||
from .lister import GitLabLister
|
||||
|
@ -14,7 +13,7 @@ from .lister import GitLabLister
|
|||
NBPAGES = 10
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.IncrementalGitLabLister')
|
||||
@shared_task(name=__name__ + '.IncrementalGitLabLister')
|
||||
def list_gitlab_incremental(**lister_args):
|
||||
"""Incremental update of a GitLab instance"""
|
||||
lister_args['sort'] = 'desc'
|
||||
|
@ -24,13 +23,13 @@ def list_gitlab_incremental(**lister_args):
|
|||
lister.run(min_bound=1, max_bound=total_pages, check_existence=True)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.RangeGitLabLister')
|
||||
@shared_task(name=__name__ + '.RangeGitLabLister')
|
||||
def _range_gitlab_lister(start, end, **lister_args):
|
||||
lister = GitLabLister(**lister_args)
|
||||
lister.run(min_bound=start, max_bound=end)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.FullGitLabRelister', bind=True)
|
||||
@shared_task(name=__name__ + '.FullGitLabRelister', bind=True)
|
||||
def list_gitlab_full(self, **lister_args):
|
||||
"""Full update of a GitLab instance"""
|
||||
lister = GitLabLister(**lister_args)
|
||||
|
@ -47,6 +46,6 @@ def list_gitlab_full(self, **lister_args):
|
|||
return promise.id
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import GNULister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.GNUListerTask')
|
||||
@shared_task(name=__name__ + '.GNUListerTask')
|
||||
def list_gnu_full(**lister_args):
|
||||
'List lister for the GNU source code archive'
|
||||
GNULister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
from datetime import datetime
|
||||
from contextlib import contextmanager
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from swh.lister.npm.lister import NpmLister, NpmIncrementalLister
|
||||
from swh.lister.npm.models import NpmVisitModel
|
||||
|
@ -40,7 +40,7 @@ def get_last_update_seq(lister):
|
|||
return row[0]
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.NpmListerTask')
|
||||
@shared_task(name=__name__ + '.NpmListerTask')
|
||||
def list_npm_full(**lister_args):
|
||||
'Full lister for the npm (javascript) registry'
|
||||
lister = NpmLister(**lister_args)
|
||||
|
@ -48,7 +48,7 @@ def list_npm_full(**lister_args):
|
|||
lister.run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.NpmIncrementalListerTask')
|
||||
@shared_task(name=__name__ + '.NpmIncrementalListerTask')
|
||||
def list_npm_incremental(**lister_args):
|
||||
'Incremental lister for the npm (javascript) registry'
|
||||
lister = NpmIncrementalLister(**lister_args)
|
||||
|
@ -57,6 +57,6 @@ def list_npm_incremental(**lister_args):
|
|||
lister.run(min_bound=update_seq_start)
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import PackagistLister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.PackagistListerTask')
|
||||
@shared_task(name=__name__ + '.PackagistListerTask')
|
||||
def list_packagist(**lister_args):
|
||||
'List the packagist (php) registry'
|
||||
PackagistLister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,16 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from swh.lister.phabricator.lister import PhabricatorLister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.FullPhabricatorLister')
|
||||
@shared_task(name=__name__ + '.FullPhabricatorLister')
|
||||
def list_phabricator_full(**lister_args):
|
||||
'Full update of a Phabricator instance'
|
||||
PhabricatorLister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
|
@ -2,17 +2,17 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from swh.scheduler.celery_backend.config import app
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import PyPILister
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.PyPIListerTask')
|
||||
@shared_task(name=__name__ + '.PyPIListerTask')
|
||||
def list_pypi(**lister_args):
|
||||
'Full update of the PyPI (python) registry'
|
||||
PyPILister(**lister_args).run()
|
||||
|
||||
|
||||
@app.task(name=__name__ + '.ping')
|
||||
@shared_task(name=__name__ + '.ping')
|
||||
def _ping():
|
||||
return 'OK'
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue