core.lister_base: Batch create origins (storage) & tasks (scheduler)
This commit is contained in:
parent
b272a36237
commit
8b2ee221ac
3 changed files with 32 additions and 8 deletions
|
@ -5,6 +5,7 @@
|
|||
import abc
|
||||
import datetime
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
@ -378,7 +379,7 @@ class SWHListerBase(abc.ABC, config.SWHConfig):
|
|||
|
||||
return sql_repo
|
||||
|
||||
def origin_dict(self, origin_type, origin_url):
|
||||
def origin_dict(self, origin_type, origin_url, **kwargs):
|
||||
"""Return special dict format for the origins list
|
||||
|
||||
Args:
|
||||
|
@ -458,16 +459,37 @@ class SWHListerBase(abc.ABC, config.SWHConfig):
|
|||
Returns:
|
||||
Nothing. Modifies injected_repos.
|
||||
"""
|
||||
origins = {}
|
||||
tasks = {}
|
||||
|
||||
def _origin_key(m):
|
||||
_type = m.get('origin_type', m.get('type'))
|
||||
_url = m.get('origin_url', m.get('url'))
|
||||
return '%s-%s' % (_type, _url)
|
||||
|
||||
def _task_key(m):
|
||||
return '%s-%s' % (m['type'], json.dumps(m['arguments']))
|
||||
|
||||
for m in models_list:
|
||||
ir = injected_repos[m['uid']]
|
||||
if not ir.origin_id:
|
||||
ir.origin_id = self.storage.origin_add_one(
|
||||
self.origin_dict(m['origin_type'], m['origin_url'])
|
||||
)
|
||||
origin_dict = self.origin_dict(**m)
|
||||
origins[_origin_key(m)] = (ir, m, origin_dict)
|
||||
if not ir.task_id:
|
||||
ir.task_id = self.scheduler.create_tasks(
|
||||
[self.task_dict(**m)]
|
||||
)[0]['id']
|
||||
task_dict = self.task_dict(**m)
|
||||
tasks[_task_key(task_dict)] = (ir, m, task_dict)
|
||||
|
||||
new_origins = self.storage.origin_add(
|
||||
(origin_dicts for (_, _, origin_dicts) in origins.values()))
|
||||
for origin in new_origins:
|
||||
ir, m, _ = origins[_origin_key(origin)]
|
||||
ir.origin_id = origin['id']
|
||||
|
||||
new_tasks = self.scheduler.create_tasks(
|
||||
(task_dicts for (_, _, task_dicts) in tasks.values()))
|
||||
for task in new_tasks:
|
||||
ir, m, _ = tasks[_task_key(task)]
|
||||
ir.task_id = task['id']
|
||||
|
||||
def ingest_data(self, identifier, checks=False):
|
||||
"""The core data fetch sequence. Request server endpoint. Simplify and
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue