From 4b9f0e0553cdc69f919352a1ec20b0d101e293b9 Mon Sep 17 00:00:00 2001 From: "Antoine R. Dumont (@ardumont)" Date: Thu, 19 Dec 2019 10:49:10 +0100 Subject: [PATCH] lister_base: Split into chunks the tasks prior to creation This decreases in smaller transaction which won't timeout Related to T2160 --- swh/lister/core/lister_base.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py index e4253af..fb7ecf1 100644 --- a/swh/lister/core/lister_base.py +++ b/swh/lister/core/lister_base.py @@ -16,6 +16,7 @@ from sqlalchemy.orm import sessionmaker from typing import Any, Dict, Type, Union from swh.core import config +from swh.core.utils import grouper from swh.scheduler import get_scheduler, utils from .abstractattribute import AbstractAttribute @@ -468,11 +469,12 @@ class ListerBase(abc.ABC, config.SWHConfig): task_dict = self.task_dict(**m) tasks[_task_key(task_dict)] = (ir, m, task_dict) - new_tasks = self.scheduler.create_tasks( - (task_dicts for (_, _, task_dicts) in tasks.values())) - for task in new_tasks: - ir, m, _ = tasks[_task_key(task)] - ir.task_id = task['id'] + gen_tasks = (task_dicts for (_, _, task_dicts) in tasks.values()) + for grouped_tasks in grouper(gen_tasks, n=1000): + new_tasks = self.scheduler.create_tasks(list(grouped_tasks)) + for task in new_tasks: + ir, m, _ = tasks[_task_key(task)] + ir.task_id = task['id'] def ingest_data(self, identifier, checks=False): """The core data fetch sequence. Request server endpoint. Simplify and