lister.core: Stop creating origins when scheduling tasks

Prior to this commit, lister did create origins as well in the archive. Now, we
only schedule new origins for ingestion.
This commit is contained in:
Antoine R. Dumont (@ardumont) 2019-06-13 11:00:19 +02:00
parent a9a37a85bf
commit 64a9bc691d
No known key found for this signature in database
GPG key ID: 52E2E9840D10C3B8
5 changed files with 24 additions and 49 deletions

View file

@ -384,20 +384,6 @@ class SWHListerBase(abc.ABC, config.SWHConfig):
return sql_repo
def origin_dict(self, origin_type, origin_url, **kwargs):
"""Return special dict format for the origins list
Args:
origin_type (string)
origin_url (string)
Returns:
the same information in a different form
"""
return {
'type': origin_type,
'url': origin_url,
}
def task_dict(self, origin_type, origin_url, **kwargs):
"""Return special dict format for the tasks list
@ -452,45 +438,34 @@ class SWHListerBase(abc.ABC, config.SWHConfig):
injected_repos[m['uid']] = self.db_inject_repo(m)
return injected_repos
def create_missing_origins_and_tasks(self, models_list, injected_repos):
"""Find any newly created db entries that don't yet have tasks or
origin objects assigned.
def schedule_missing_tasks(self, models_list, injected_repos):
"""Find any newly created db entries that do not have been scheduled
yet.
Args:
models_list: a list of dicts mapping keys in the db model for
each repo
injected_repos: dict of uid:sql_repo pairs that have just
been created
models_list ([Model]): List of dicts mapping keys in the db model
for each repo
injected_repos ([dict]): Dict of uid:sql_repo pairs that have just
been created
Returns:
Nothing. Modifies injected_repos.
"""
origins = {}
tasks = {}
def _origin_key(m):
_type = m.get('origin_type', m.get('type'))
_url = m.get('origin_url', m.get('url'))
return '%s-%s' % (_type, _url)
def _task_key(m):
return '%s-%s' % (m['type'],
json.dumps(m['arguments'], sort_keys=True))
return '%s-%s' % (
m['type'],
json.dumps(m['arguments'], sort_keys=True)
)
for m in models_list:
ir = injected_repos[m['uid']]
if not ir.origin_id:
origin_dict = self.origin_dict(**m)
origins[_origin_key(m)] = (ir, m, origin_dict)
if not ir.task_id:
task_dict = self.task_dict(**m)
tasks[_task_key(task_dict)] = (ir, m, task_dict)
new_origins = self.storage.origin_add(
(origin_dicts for (_, _, origin_dicts) in origins.values()))
for origin in new_origins:
ir, m, _ = origins[_origin_key(origin)]
ir.origin_id = origin['id']
new_tasks = self.scheduler.create_tasks(
(task_dicts for (_, _, task_dicts) in tasks.values()))
for task in new_tasks:
@ -519,7 +494,7 @@ class SWHListerBase(abc.ABC, config.SWHConfig):
# inject into local db
injected = self.inject_repo_data_into_db(models_list)
# queue workers
self.create_missing_origins_and_tasks(models_list, injected)
self.schedule_missing_tasks(models_list, injected)
return response, injected
def save_response(self, response):

View file

@ -51,7 +51,7 @@ class SimpleLister(SWHListerBase):
# inject into local db
injected = self.inject_repo_data_into_db(models)
# queue workers
self.create_missing_origins_and_tasks(models, injected)
self.schedule_missing_tasks(models, injected)
all_injected.append(injected)
# flush
self.db_session.commit()

View file

@ -164,8 +164,8 @@ class HttpListerTesterBase(abc.ABC):
if k not in ['last_seen', 'task_id', 'origin_id', 'id']:
self.assertIn(k, di)
def disable_storage_and_scheduler(self, fl):
fl.create_missing_origins_and_tasks = Mock(return_value=None)
def disable_scheduler(self, fl):
fl.schedule_missing_tasks = Mock(return_value=None)
def disable_db(self, fl):
fl.winnow_models = Mock(return_value=[])
@ -176,7 +176,7 @@ class HttpListerTesterBase(abc.ABC):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_storage_and_scheduler(fl)
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=1, max_bound=1) # stores no results
@ -185,7 +185,7 @@ class HttpListerTesterBase(abc.ABC):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_storage_and_scheduler(fl)
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=self.first_index, max_bound=self.first_index)
@ -194,7 +194,7 @@ class HttpListerTesterBase(abc.ABC):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_storage_and_scheduler(fl)
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=self.first_index)
@ -222,7 +222,7 @@ class HttpListerTester(HttpListerTesterBase, abc.ABC):
})
self.init_db(db, fl.MODEL)
self.disable_storage_and_scheduler(fl)
self.disable_scheduler(fl)
fl.run(min_bound=self.first_index)

View file

@ -119,7 +119,7 @@ class DebianLister(SWHListerHttpTransport, SWHListerBase):
"""Generate the Package entries that didn't previously exist.
Contrary to SWHListerBase, we don't actually insert the data in
database. `create_missing_origins_and_tasks` does it once we have the
database. `schedule_missing_tasks` does it once we have the
origin and task identifiers.
"""
by_name_version = {}
@ -173,7 +173,7 @@ class DebianLister(SWHListerHttpTransport, SWHListerBase):
self.db_session.add_all(added_packages)
return added_packages
def create_missing_origins_and_tasks(self, models_list, added_packages):
def schedule_missing_tasks(self, models_list, added_packages):
"""We create tasks at the end of the full snapshot processing"""
return

View file

@ -97,7 +97,7 @@ class PhabricatorLister(SWHIndexingHttpLister):
self.max_index = models_list[0]['indexable']
models_list = self.filter_before_inject(models_list)
injected = self.inject_repo_data_into_db(models_list)
self.create_missing_origins_and_tasks(models_list, injected)
self.schedule_missing_tasks(models_list, injected)
return self.max_index
def run(self, min_bound=None, max_bound=None):