gitlab: Adapt celery task implementations to the new lister api
Related to T2987
This commit is contained in:
parent
e4a590fc7f
commit
02871f16c9
2 changed files with 32 additions and 158 deletions
|
@ -1,50 +1,24 @@
|
|||
# Copyright (C) 2018 the Software Heritage developers
|
||||
# Copyright (C) 2018-2021 the Software Heritage developers
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import random
|
||||
from celery import shared_task
|
||||
|
||||
from celery import group, shared_task
|
||||
|
||||
from .. import utils
|
||||
from .lister import GitLabLister
|
||||
|
||||
NBPAGES = 10
|
||||
from swh.lister.gitlab.lister import GitLabLister
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".IncrementalGitLabLister")
|
||||
def list_gitlab_incremental(**lister_args):
|
||||
"""Incremental update of a GitLab instance"""
|
||||
lister_args["sort"] = "desc"
|
||||
lister = GitLabLister(**lister_args)
|
||||
total_pages = lister.get_pages_information()[1]
|
||||
# stopping as soon as existing origins for that instance are detected
|
||||
return lister.run(min_bound=1, max_bound=total_pages, check_existence=True)
|
||||
lister = GitLabLister.from_configfile(incremental=True, **lister_args)
|
||||
return lister.run().dict()
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".RangeGitLabLister")
|
||||
def _range_gitlab_lister(start, end, **lister_args):
|
||||
lister = GitLabLister(**lister_args)
|
||||
return lister.run(min_bound=start, max_bound=end)
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".FullGitLabRelister", bind=True)
|
||||
def list_gitlab_full(self, **lister_args):
|
||||
@shared_task(name=__name__ + ".FullGitLabRelister")
|
||||
def list_gitlab_full(**lister_args):
|
||||
"""Full update of a GitLab instance"""
|
||||
lister = GitLabLister(**lister_args)
|
||||
_, total_pages, _ = lister.get_pages_information()
|
||||
ranges = list(utils.split_range(total_pages, NBPAGES))
|
||||
random.shuffle(ranges)
|
||||
promise = group(
|
||||
_range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges
|
||||
)()
|
||||
self.log.debug("%s OK (spawned %s subtasks)" % (self.name, len(ranges)))
|
||||
try:
|
||||
promise.save()
|
||||
except (NotImplementedError, AttributeError):
|
||||
self.log.info("Unable to call save_group with current result backend.")
|
||||
# FIXME: what to do in terms of return here?
|
||||
return promise.id
|
||||
lister = GitLabLister.from_configfile(incremental=False, **lister_args)
|
||||
return lister.run().dict()
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".ping")
|
||||
|
|
|
@ -3,13 +3,9 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from time import sleep
|
||||
from unittest.mock import call, patch
|
||||
import pytest
|
||||
|
||||
from celery.result import GroupResult
|
||||
|
||||
from swh.lister.gitlab.tasks import NBPAGES
|
||||
from swh.lister.utils import split_range
|
||||
from swh.lister.pattern import ListerStats
|
||||
|
||||
|
||||
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
|
@ -20,128 +16,32 @@ def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
|||
assert res.result == "OK"
|
||||
|
||||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_incremental(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, 10, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.IncrementalGitLabLister"
|
||||
)
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
lister.assert_called_once_with(sort="desc")
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.get_pages_information.assert_called_once_with()
|
||||
lister.run.assert_called_once_with(min_bound=1, max_bound=10, check_existence=True)
|
||||
|
||||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_range(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.RangeGitLabLister", kwargs=dict(start=12, end=42)
|
||||
)
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
lister.assert_called_once_with()
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.run.assert_called_once_with(min_bound=12, max_bound=42)
|
||||
|
||||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
total_pages = 85
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.FullGitLabRelister"
|
||||
)
|
||||
assert res
|
||||
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
# retrieve the GroupResult for this task and wait for all the subtasks
|
||||
# to complete
|
||||
promise_id = res.result
|
||||
assert promise_id
|
||||
promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app)
|
||||
for i in range(5):
|
||||
if promise.ready():
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
lister.assert_called_with()
|
||||
|
||||
# one by the FullGitlabRelister task
|
||||
# + 9 for the RangeGitlabLister subtasks
|
||||
assert lister.call_count == 10
|
||||
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.db_partition_indices.assert_not_called()
|
||||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
|
||||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_relister_instance(
|
||||
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker
|
||||
@pytest.mark.parametrize(
|
||||
"task_name,incremental",
|
||||
[("IncrementalGitLabLister", True), ("FullGitLabRelister", False)],
|
||||
)
|
||||
def test_task_lister_gitlab(
|
||||
task_name,
|
||||
incremental,
|
||||
swh_scheduler_celery_app,
|
||||
swh_scheduler_celery_worker,
|
||||
mocker,
|
||||
):
|
||||
total_pages = 85
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
stats = ListerStats(pages=10, origins=200)
|
||||
mock_lister = mocker.patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
mock_lister.from_configfile.return_value = mock_lister
|
||||
mock_lister.run.return_value = ListerStats(pages=10, origins=200)
|
||||
|
||||
kwargs = dict(url="https://gitweb.torproject.org/")
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.FullGitLabRelister",
|
||||
kwargs=dict(url="https://0xacab.org/api/v4"),
|
||||
f"swh.lister.gitlab.tasks.{task_name}", kwargs=kwargs,
|
||||
)
|
||||
assert res
|
||||
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
# retrieve the GroupResult for this task and wait for all the subtasks
|
||||
# to complete
|
||||
promise_id = res.result
|
||||
assert promise_id
|
||||
promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app)
|
||||
for i in range(5):
|
||||
if promise.ready():
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
lister.assert_called_with(url="https://0xacab.org/api/v4")
|
||||
|
||||
# one by the FullGitlabRelister task
|
||||
# + 9 for the RangeGitlabLister subtasks
|
||||
assert lister.call_count == 10
|
||||
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.db_partition_indices.assert_not_called()
|
||||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
mock_lister.from_configfile.assert_called_once_with(
|
||||
incremental=incremental, **kwargs
|
||||
)
|
||||
mock_lister.run.assert_called_once_with()
|
||||
assert res.result == stats.dict()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue