lister_base: Split into chunks the tasks prior to creation
This decreases in smaller transaction which won't timeout Related to T2160
This commit is contained in:
parent
5ab9d67d67
commit
4b9f0e0553
1 changed files with 7 additions and 5 deletions
|
@ -16,6 +16,7 @@ from sqlalchemy.orm import sessionmaker
|
|||
from typing import Any, Dict, Type, Union
|
||||
|
||||
from swh.core import config
|
||||
from swh.core.utils import grouper
|
||||
from swh.scheduler import get_scheduler, utils
|
||||
|
||||
from .abstractattribute import AbstractAttribute
|
||||
|
@ -468,11 +469,12 @@ class ListerBase(abc.ABC, config.SWHConfig):
|
|||
task_dict = self.task_dict(**m)
|
||||
tasks[_task_key(task_dict)] = (ir, m, task_dict)
|
||||
|
||||
new_tasks = self.scheduler.create_tasks(
|
||||
(task_dicts for (_, _, task_dicts) in tasks.values()))
|
||||
for task in new_tasks:
|
||||
ir, m, _ = tasks[_task_key(task)]
|
||||
ir.task_id = task['id']
|
||||
gen_tasks = (task_dicts for (_, _, task_dicts) in tasks.values())
|
||||
for grouped_tasks in grouper(gen_tasks, n=1000):
|
||||
new_tasks = self.scheduler.create_tasks(list(grouped_tasks))
|
||||
for task in new_tasks:
|
||||
ir, m, _ = tasks[_task_key(task)]
|
||||
ir.task_id = task['id']
|
||||
|
||||
def ingest_data(self, identifier, checks=False):
|
||||
"""The core data fetch sequence. Request server endpoint. Simplify and
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue