utils.split_range: Split into not overlapping ranges
Existing listers use the `is_within_bound` [1] method from the base lister. This method uses inclusive boundaries in all cases. As some "range" task listers [2] [3] are using `split_range` function to create "overlapping" ranges, this can cause concurrent insert issues down the line [4]. This commit adapts the function `split_range` to make the generated ranges no longer overlap. [1] https://forge.softwareheritage.org/source/swh-lister/browse/master/swh/lister/core/lister_base.py$194-199 [2] https://forge.softwareheritage.org/source/swh-lister/browse/master/swh/lister/gitlab/tasks.py$37-41 [3] https://forge.softwareheritage.org/source/swh-lister/browse/master/swh/lister/gitea/tasks.py$36-41 Related to T2577
This commit is contained in:
parent
725c1fe4ad
commit
e3c856b5ee
4 changed files with 54 additions and 44 deletions
|
@ -4,9 +4,12 @@
|
|||
# See top-level LICENSE file for more information
|
||||
|
||||
from time import sleep
|
||||
from celery.result import GroupResult
|
||||
|
||||
from unittest.mock import patch
|
||||
from celery.result import GroupResult
|
||||
from unittest.mock import patch, call
|
||||
|
||||
from swh.lister.gitea.tasks import NBPAGES
|
||||
from swh.lister.utils import split_range
|
||||
|
||||
|
||||
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
|
@ -57,13 +60,11 @@ def test_range(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
|||
|
||||
@patch("swh.lister.gitea.tasks.GiteaLister")
|
||||
def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
total_pages = 85
|
||||
# setup the mocked GiteaLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, 85, None)
|
||||
lister.db_partition_indices.return_value = [
|
||||
(i, i + 9) for i in range(0, 80, 10)
|
||||
] + [(80, 85)]
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task("swh.lister.gitea.tasks.FullGiteaRelister")
|
||||
assert res
|
||||
|
@ -92,25 +93,21 @@ def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker)
|
|||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for i in range(8):
|
||||
# XXX inconsistent behavior: max_bound is EXCLUDED here
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
dict(min_bound=10 * i, max_bound=10 * i + 10),
|
||||
) in lister.run.call_args_list
|
||||
assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
|
||||
|
||||
@patch("swh.lister.gitea.tasks.GiteaLister")
|
||||
def test_relister_instance(
|
||||
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker
|
||||
):
|
||||
total_pages = 85
|
||||
# setup the mocked GiteaLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, 85, None)
|
||||
lister.db_partition_indices.return_value = [
|
||||
(i, i + 9) for i in range(0, 80, 10)
|
||||
] + [(80, 85)]
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitea.tasks.FullGiteaRelister",
|
||||
|
@ -142,9 +139,7 @@ def test_relister_instance(
|
|||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for i in range(8):
|
||||
# XXX inconsistent behavior: max_bound is EXCLUDED here
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
dict(min_bound=10 * i, max_bound=10 * i + 10),
|
||||
) in lister.run.call_args_list
|
||||
assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
|
|
|
@ -4,9 +4,12 @@
|
|||
# See top-level LICENSE file for more information
|
||||
|
||||
from time import sleep
|
||||
from celery.result import GroupResult
|
||||
|
||||
from unittest.mock import patch
|
||||
from celery.result import GroupResult
|
||||
from unittest.mock import patch, call
|
||||
|
||||
from swh.lister.gitea.tasks import NBPAGES
|
||||
from swh.lister.utils import split_range
|
||||
|
||||
|
||||
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
|
@ -57,13 +60,11 @@ def test_range(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
|||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
|
||||
total_pages = 85
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, 85, None)
|
||||
lister.db_partition_indices.return_value = [
|
||||
(i, i + 9) for i in range(0, 80, 10)
|
||||
] + [(80, 85)]
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.FullGitLabRelister"
|
||||
|
@ -94,25 +95,21 @@ def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker)
|
|||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for i in range(8):
|
||||
# XXX inconsistent behavior: max_bound is EXCLUDED here
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
dict(min_bound=10 * i, max_bound=10 * i + 10),
|
||||
) in lister.run.call_args_list
|
||||
assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
|
||||
|
||||
@patch("swh.lister.gitlab.tasks.GitLabLister")
|
||||
def test_relister_instance(
|
||||
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker
|
||||
):
|
||||
total_pages = 85
|
||||
# setup the mocked GitlabLister
|
||||
lister.return_value = lister
|
||||
lister.run.return_value = None
|
||||
lister.get_pages_information.return_value = (None, 85, None)
|
||||
lister.db_partition_indices.return_value = [
|
||||
(i, i + 9) for i in range(0, 80, 10)
|
||||
] + [(80, 85)]
|
||||
lister.get_pages_information.return_value = (None, total_pages, None)
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.gitlab.tasks.FullGitLabRelister",
|
||||
|
@ -144,9 +141,7 @@ def test_relister_instance(
|
|||
lister.get_pages_information.assert_called_once_with()
|
||||
|
||||
# lister.run should have been called once per partition interval
|
||||
for i in range(8):
|
||||
# XXX inconsistent behavior: max_bound is EXCLUDED here
|
||||
for min_bound, max_bound in split_range(total_pages, NBPAGES):
|
||||
assert (
|
||||
dict(min_bound=10 * i, max_bound=10 * i + 10),
|
||||
) in lister.run.call_args_list
|
||||
assert (dict(min_bound=80, max_bound=85),) in lister.run.call_args_list
|
||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list
|
||||
)
|
||||
|
|
|
@ -11,7 +11,12 @@ from swh.lister import utils
|
|||
|
||||
@pytest.mark.parametrize(
|
||||
"total_pages,nb_pages,expected_ranges",
|
||||
[(14, 5, [(0, 5), (5, 10), (10, 14)]), (19, 10, [(0, 10), (10, 19)])],
|
||||
[
|
||||
(14, 5, [(0, 4), (5, 9), (10, 14)]),
|
||||
(19, 10, [(0, 9), (10, 19)]),
|
||||
(20, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]),
|
||||
(21, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21),],),
|
||||
],
|
||||
)
|
||||
def test_split_range(total_pages, nb_pages, expected_ranges):
|
||||
actual_ranges = list(utils.split_range(total_pages, nb_pages))
|
||||
|
|
|
@ -1,13 +1,28 @@
|
|||
# Copyright (C) 2018 the Software Heritage developers
|
||||
# Copyright (C) 2018-2020 the Software Heritage developers
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from typing import Iterator, Tuple
|
||||
|
||||
def split_range(total_pages, nb_pages):
|
||||
|
||||
def split_range(total_pages: int, nb_pages: int) -> Iterator[Tuple[int, int]]:
|
||||
"""Split `total_pages` into mostly `nb_pages` ranges. In some cases, the last range can
|
||||
have one more element.
|
||||
|
||||
>>> split_range(19, 10)
|
||||
[(0, 9), (10, 19)]
|
||||
|
||||
>>> split_range(20, 3)
|
||||
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]
|
||||
|
||||
>>> split_range(21, 3)
|
||||
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21)]
|
||||
|
||||
"""
|
||||
prev_index = None
|
||||
for index in range(0, total_pages, nb_pages):
|
||||
if index is not None and prev_index is not None:
|
||||
yield prev_index, index
|
||||
yield prev_index, index - 1
|
||||
prev_index = index
|
||||
|
||||
if index != total_pages:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue