phabricator/lister: Prevent erroneous scheduler tasks disabling
Previously, the Phabricator lister was disabling some loading tasks while it was not supposed to. More precisely, due to an invalid index provided to a database query, the latest created scheduler task was disabled each time a new page of results was provided to the lister by the Phabricator API. Moreover, database queries were not filtered according to the Phabricator instance resulting in possible disabling of scheduler tasks from other instances. Closes T2000
This commit is contained in:
parent
e83902c2a3
commit
4c8d7baf75
6 changed files with 4826 additions and 35 deletions
|
@ -3,6 +3,7 @@
|
|||
# See top-level LICENSE file for more information
|
||||
|
||||
import abc
|
||||
import datetime
|
||||
import time
|
||||
from unittest import TestCase
|
||||
from unittest.mock import Mock, patch
|
||||
|
@ -51,6 +52,7 @@ class HttpListerTesterBase(abc.ABC):
|
|||
self.response = None
|
||||
self.fl = None
|
||||
self.helper = None
|
||||
self.scheduler_tasks = []
|
||||
if self.__class__ != HttpListerTesterBase:
|
||||
self.run = TestCase.run.__get__(self, self.__class__)
|
||||
else:
|
||||
|
@ -82,11 +84,38 @@ class HttpListerTesterBase(abc.ABC):
|
|||
self.fl.INITIAL_BACKOFF = 1
|
||||
|
||||
self.fl.reset_backoff()
|
||||
self.scheduler_tasks = []
|
||||
return self.fl
|
||||
|
||||
def disable_scheduler(self, fl):
|
||||
fl.schedule_missing_tasks = Mock(return_value=None)
|
||||
|
||||
def mock_scheduler(self, fl):
|
||||
def _create_tasks(tasks):
|
||||
task_id = 0
|
||||
current_nb_tasks = len(self.scheduler_tasks)
|
||||
if current_nb_tasks > 0:
|
||||
task_id = self.scheduler_tasks[-1]['id'] + 1
|
||||
for task in tasks:
|
||||
scheduler_task = dict(task)
|
||||
scheduler_task.update({
|
||||
'status': 'next_run_not_scheduled',
|
||||
'retries_left': 0,
|
||||
'priority': None,
|
||||
'id': task_id,
|
||||
'current_interval': datetime.timedelta(days=64)
|
||||
})
|
||||
self.scheduler_tasks.append(scheduler_task)
|
||||
task_id = task_id + 1
|
||||
return self.scheduler_tasks[current_nb_tasks:]
|
||||
|
||||
def _disable_tasks(task_ids):
|
||||
for task_id in task_ids:
|
||||
self.scheduler_tasks[task_id]['status'] = 'disabled'
|
||||
|
||||
fl.scheduler.create_tasks = Mock(wraps=_create_tasks)
|
||||
fl.scheduler.disable_tasks = Mock(wraps=_disable_tasks)
|
||||
|
||||
def disable_db(self, fl):
|
||||
fl.winnow_models = Mock(return_value=[])
|
||||
fl.db_inject_repo = Mock(return_value=fl.MODEL())
|
||||
|
@ -176,7 +205,7 @@ class HttpListerTester(HttpListerTesterBase, abc.ABC):
|
|||
fl.db = db
|
||||
self.init_db(db, fl.MODEL)
|
||||
|
||||
self.disable_scheduler(fl)
|
||||
self.mock_scheduler(fl)
|
||||
return fl
|
||||
|
||||
@requests_mock.Mocker()
|
||||
|
@ -186,6 +215,7 @@ class HttpListerTester(HttpListerTesterBase, abc.ABC):
|
|||
fl.run(min_bound=self.first_index)
|
||||
|
||||
self.assertEqual(fl.db_last_index(), self.last_index)
|
||||
|
||||
partitions = fl.db_partition_indices(5)
|
||||
self.assertGreater(len(partitions), 0)
|
||||
for k in partitions:
|
||||
|
|
|
@ -7,9 +7,13 @@ import random
|
|||
|
||||
import urllib.parse
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from sqlalchemy import func
|
||||
|
||||
from swh.lister.core.indexing_lister import IndexingHttpLister
|
||||
from swh.lister.phabricator.models import PhabricatorModel
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -26,14 +30,6 @@ class PhabricatorLister(IndexingHttpLister):
|
|||
instance = urllib.parse.urlparse(self.url).hostname
|
||||
self.instance = instance
|
||||
|
||||
@property
|
||||
def default_min_bound(self):
|
||||
"""Starting boundary when `min_bound` is not defined (db empty). This
|
||||
is used within the fn:`run` call.
|
||||
|
||||
"""
|
||||
return self._bootstrap_repositories_listing()
|
||||
|
||||
def request_params(self, identifier):
|
||||
"""Override the default params behavior to retrieve the api token
|
||||
|
||||
|
@ -99,29 +95,62 @@ class PhabricatorLister(IndexingHttpLister):
|
|||
models_list = [m for m in models_list if m is not None]
|
||||
return super().filter_before_inject(models_list)
|
||||
|
||||
def _bootstrap_repositories_listing(self):
|
||||
def disable_deleted_repo_tasks(self, index, next_index, keep_these):
|
||||
"""
|
||||
Method called when no min_bound value has been provided
|
||||
to the lister. Its purpose is to:
|
||||
(Overrides) Fix provided index value to avoid:
|
||||
|
||||
1. get the first repository data hosted on the Phabricator
|
||||
instance
|
||||
- database query error
|
||||
- erroneously disabling some scheduler tasks
|
||||
"""
|
||||
# First call to the Phabricator API uses an empty 'after' parameter,
|
||||
# so set the index to 0 to avoid database query error
|
||||
if index == '':
|
||||
index = 0
|
||||
# Next listed repository ids are strictly greater than the 'after'
|
||||
# parameter, so increment the index to avoid disabling the latest
|
||||
# created task when processing a new repositories page returned by
|
||||
# the Phabricator API
|
||||
else:
|
||||
index = index + 1
|
||||
return super().disable_deleted_repo_tasks(index, next_index,
|
||||
keep_these)
|
||||
|
||||
2. inject them into the lister database
|
||||
|
||||
3. return the first repository index to start the listing
|
||||
after that value
|
||||
def db_first_index(self):
|
||||
"""
|
||||
(Overrides) Filter results by Phabricator instance
|
||||
|
||||
Returns:
|
||||
int: The first repository index
|
||||
the smallest indexable value of all repos in the db
|
||||
"""
|
||||
params = '&order=oldest&limit=1'
|
||||
response = self.safely_issue_request(params)
|
||||
models_list = self.transport_response_simplified(response)
|
||||
models_list = self.filter_before_inject(models_list)
|
||||
injected = self.inject_repo_data_into_db(models_list)
|
||||
self.schedule_missing_tasks(models_list, injected)
|
||||
return models_list[0]['indexable']
|
||||
t = self.db_session.query(func.min(self.MODEL.indexable))
|
||||
t = t.filter(self.MODEL.instance == self.instance).first()
|
||||
if t:
|
||||
return t[0]
|
||||
|
||||
def db_last_index(self):
|
||||
"""
|
||||
(Overrides) Filter results by Phabricator instance
|
||||
|
||||
Returns:
|
||||
the largest indexable value of all instance repos in the db
|
||||
"""
|
||||
t = self.db_session.query(func.max(self.MODEL.indexable))
|
||||
t = t.filter(self.MODEL.instance == self.instance).first()
|
||||
if t:
|
||||
return t[0]
|
||||
|
||||
def db_query_range(self, start, end):
|
||||
"""
|
||||
(Overrides) Filter the results by the Phabricator instance to
|
||||
avoid disabling loading tasks for repositories hosted on a
|
||||
different instance.
|
||||
|
||||
Returns:
|
||||
a list of sqlalchemy.ext.declarative.declarative_base objects
|
||||
with indexable values within the given range for the instance
|
||||
"""
|
||||
retlist = super().db_query_range(start, end)
|
||||
return retlist.filter(self.MODEL.instance == self.instance)
|
||||
|
||||
|
||||
def get_repo_url(attachments):
|
||||
|
|
2336
swh/lister/phabricator/tests/api_first_response_other_instance.json
Normal file
2336
swh/lister/phabricator/tests/api_first_response_other_instance.json
Normal file
File diff suppressed because it is too large
Load diff
2354
swh/lister/phabricator/tests/api_next_response.json
Normal file
2354
swh/lister/phabricator/tests/api_next_response.json
Normal file
File diff suppressed because it is too large
Load diff
|
@ -18,7 +18,7 @@ class PhabricatorListerTester(HttpListerTester, unittest.TestCase):
|
|||
# first request will have the after parameter empty
|
||||
test_re = re.compile(r'\&after=([^?&]*)')
|
||||
lister_subdir = 'phabricator'
|
||||
good_api_response_file = 'api_response.json'
|
||||
good_api_response_file = 'api_first_response.json'
|
||||
good_api_response_undefined_protocol = 'api_response_undefined_'\
|
||||
'protocol.json'
|
||||
bad_api_response_file = 'api_empty_response.json'
|
||||
|
@ -35,7 +35,7 @@ class PhabricatorListerTester(HttpListerTester, unittest.TestCase):
|
|||
"""
|
||||
m = self.test_re.search(request.path_url)
|
||||
idx = m.group(1)
|
||||
if idx == str(self.last_index):
|
||||
if idx not in ('', 'None'):
|
||||
return int(idx)
|
||||
|
||||
def get_fl(self, override_config=None):
|
||||
|
@ -86,12 +86,54 @@ class PhabricatorListerTester(HttpListerTester, unittest.TestCase):
|
|||
self.assertEqual(len(ingested_repos), self.entries_per_page)
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_range_listing(self, http_mocker):
|
||||
def test_scheduled_tasks(self, http_mocker):
|
||||
fl = self.create_fl_with_db(http_mocker)
|
||||
|
||||
fl.run(max_bound=self.last_index - 1)
|
||||
# process first page of repositories listing
|
||||
fl.run()
|
||||
|
||||
self.assertEqual(fl.db_last_index(), self.last_index - 1)
|
||||
ingested_repos = list(fl.db_query_range(self.first_index,
|
||||
self.last_index))
|
||||
self.assertEqual(len(ingested_repos), self.entries_per_page - 1)
|
||||
# process second page of repositories listing
|
||||
prev_last_index = self.last_index
|
||||
self.first_index = self.last_index
|
||||
self.last_index = 23
|
||||
self.good_api_response_file = 'api_next_response.json'
|
||||
fl.run(min_bound=prev_last_index)
|
||||
|
||||
# check expected number of ingested repos and loading tasks
|
||||
ingested_repos = list(fl.db_query_range(0, self.last_index))
|
||||
self.assertEqual(len(ingested_repos), len(self.scheduler_tasks))
|
||||
self.assertEqual(len(ingested_repos), 2 * self.entries_per_page)
|
||||
|
||||
# check tasks are not disabled
|
||||
for task in self.scheduler_tasks:
|
||||
self.assertTrue(task['status'] != 'disabled')
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_scheduled_tasks_multiple_instances(self, http_mocker):
|
||||
|
||||
fl = self.create_fl_with_db(http_mocker)
|
||||
|
||||
# list first Phabricator instance
|
||||
fl.run()
|
||||
|
||||
fl.instance = 'other_fake'
|
||||
fl.config['credentials'] = {
|
||||
'phabricator': {
|
||||
'other_fake': [{
|
||||
'password': 'foo'
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
# list second Phabricator instance hosting repositories having
|
||||
# same ids as those listed from the first instance
|
||||
self.good_api_response_file = 'api_first_response_other_instance.json'
|
||||
self.last_index = 13
|
||||
fl.run()
|
||||
|
||||
# check expected number of loading tasks
|
||||
self.assertEqual(len(self.scheduler_tasks), 2 * self.entries_per_page)
|
||||
|
||||
# check tasks are not disabled
|
||||
for task in self.scheduler_tasks:
|
||||
self.assertTrue(task['status'] != 'disabled')
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue