Add tests for github tasks
in order to be able to run unit tests using celery pytest fixtures, we use a dedicated swh_app fixture that ensure the "main" celery app is the test app (otherwise subtasks won't work).
This commit is contained in:
parent
dd104759ae
commit
b7139619fd
4 changed files with 118 additions and 2 deletions
23
swh/lister/core/tests/conftest.py
Normal file
23
swh/lister/core/tests/conftest.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_enable_logging():
|
||||
return True
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def celery_includes():
|
||||
return [
|
||||
'swh.lister.github.tasks',
|
||||
]
|
||||
|
||||
|
||||
# override the celery_session_app fixture to monkeypatch the 'main'
|
||||
# swh.scheduler.celery_backend.config.app Celery application
|
||||
# with the test application.
|
||||
@pytest.fixture(scope='session')
|
||||
def swh_app(celery_session_app):
|
||||
import swh.scheduler.celery_backend.config
|
||||
swh.scheduler.celery_backend.config.app = celery_session_app
|
||||
yield celery_session_app
|
|
@ -46,9 +46,11 @@ def full_github_relister(self, split=None, **lister_args):
|
|||
lister = new_lister(**lister_args)
|
||||
ranges = lister.db_partition_indices(split or GROUP_SPLIT)
|
||||
random.shuffle(ranges)
|
||||
group(range_github_lister.s(minv, maxv, **lister_args)
|
||||
for minv, maxv in ranges)()
|
||||
promise = group(range_github_lister.s(minv, maxv, **lister_args)
|
||||
for minv, maxv in ranges)()
|
||||
self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
|
||||
promise.save() # so that we can restore the GroupResult in tests
|
||||
return promise.id
|
||||
|
||||
|
||||
@app.task(name='swh.lister.github.tasks.ping',
|
||||
|
|
1
swh/lister/github/tests/conftest.py
Normal file
1
swh/lister/github/tests/conftest.py
Normal file
|
@ -0,0 +1 @@
|
|||
from swh.lister.core.tests.conftest import * # noqa
|
90
swh/lister/github/tests/test_tasks.py
Normal file
90
swh/lister/github/tests/test_tasks.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
from time import sleep
|
||||
from celery.result import GroupResult
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
def test_ping(swh_app, celery_session_worker):
|
||||
res = swh_app.send_task(
|
||||
'swh.lister.github.tasks.ping')
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
assert res.result == 'OK'
|
||||
|
||||
|
||||
@patch('swh.lister.github.tasks.GitHubLister')
|
||||
def test_incremental(lister, swh_app, celery_session_worker):
|
||||
# setup the mocked GitHubLister
|
||||
lister.return_value = lister
|
||||
lister.db_last_index.return_value = 42
|
||||
lister.run.return_value = None
|
||||
|
||||
res = swh_app.send_task(
|
||||
'swh.lister.github.tasks.IncrementalGitHubLister')
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
lister.assert_called_once_with(api_baseurl='https://api.github.com')
|
||||
lister.db_last_index.assert_called_once_with()
|
||||
lister.run.assert_called_once_with(min_bound=42, max_bound=None)
|
||||
|
||||
|
||||
@patch('swh.lister.github.tasks.GitHubLister')
|
||||
def test_range(lister, swh_app, celery_session_worker):
|
||||
# setup the mocked GitHubLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
|
||||
res = swh_app.send_task(
|
||||
'swh.lister.github.tasks.RangeGitHubLister',
|
||||
kwargs=dict(start=12, end=42))
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
lister.assert_called_once_with(api_baseurl='https://api.github.com')
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.run.assert_called_once_with(min_bound=12, max_bound=42)
|
||||
|
||||
|
||||
@patch('swh.lister.github.tasks.GitHubLister')
|
||||
def test_relister(lister, swh_app, celery_session_worker):
|
||||
# setup the mocked GitHubLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.db_partition_indices.return_value = [
|
||||
(i, i+9) for i in range(0, 50, 10)]
|
||||
|
||||
res = swh_app.send_task(
|
||||
'swh.lister.github.tasks.FullGitHubRelister')
|
||||
assert res
|
||||
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
|
||||
# retrieve the GroupResult for this task and wait for all the subtasks
|
||||
# to complete
|
||||
promise_id = res.result
|
||||
assert promise_id
|
||||
promise = GroupResult.restore(promise_id, app=swh_app)
|
||||
for i in range(5):
|
||||
if promise.ready():
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
lister.assert_called_with(api_baseurl='https://api.github.com')
|
||||
|
||||
# one by the FullGitHubRelister task
|
||||
# + 5 for the RangeGitHubLister subtasks
|
||||
assert lister.call_count == 6
|
||||
|
||||
lister.db_last_index.assert_not_called()
|
||||
lister.db_partition_indices.assert_called_once_with(10000)
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for i in range(5):
|
||||
# XXX inconsistent behavior: max_bound is INCLUDED here
|
||||
assert (dict(min_bound=10*i, max_bound=10*i + 9),) \
|
||||
in lister.run.call_args_list
|
Loading…
Add table
Add a link
Reference in a new issue