pattern: Store termination date to scheduler database at end of listing
It enables to track last lister execution date and will be used to schedule first visits with high priority for listed origins. Related to swh/devel/swh-scheduler#4687.
This commit is contained in:
parent
927aebbd0b
commit
7609ebf7e1
5 changed files with 35 additions and 13 deletions
|
@ -1,2 +1,2 @@
|
|||
swh.core[db] >= 3.4.0
|
||||
swh.scheduler >= 2.4.0
|
||||
swh.scheduler >= 2.5.0
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright (C) 2022 The Software Heritage developers
|
||||
# Copyright (C) 2022-2024 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
@ -133,7 +133,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
|
|||
lister = HackageLister(scheduler=swh_scheduler)
|
||||
# force lister.last_listing_date to not being 'now'
|
||||
lister.state.last_listing_date = iso8601.parse_date("2022-08-26T02:27:45.073759Z")
|
||||
lister.set_state_in_scheduler()
|
||||
lister.set_state_in_scheduler(force=True)
|
||||
assert lister.get_state_from_scheduler() == HackageListerState(
|
||||
last_listing_date=iso8601.parse_date("2022-08-26T02:27:45.073759Z")
|
||||
)
|
||||
|
@ -157,7 +157,7 @@ def test_hackage_lister_incremental(swh_scheduler, requests_mock, datadir):
|
|||
lister.state.last_listing_date = iso8601.parse_date(
|
||||
"2022-09-30T08:00:34.348551203Z"
|
||||
)
|
||||
lister.set_state_in_scheduler()
|
||||
lister.set_state_in_scheduler(force=True)
|
||||
assert lister.get_state_from_scheduler() == HackageListerState(
|
||||
last_listing_date=iso8601.parse_date("2022-09-30T08:00:34.348551203Z")
|
||||
)
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
# Copyright (C) 2020-2023 The Software Heritage developers
|
||||
# Copyright (C) 2020-2024 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, TypeVar
|
||||
|
@ -20,6 +21,7 @@ from swh.core.retry import http_retry
|
|||
from swh.core.utils import grouper
|
||||
from swh.scheduler import get_scheduler, model
|
||||
from swh.scheduler.interface import SchedulerInterface
|
||||
from swh.scheduler.utils import utcnow
|
||||
|
||||
from . import USER_AGENT_TEMPLATE
|
||||
from .utils import is_valid_origin_url
|
||||
|
@ -247,8 +249,7 @@ class Lister(Generic[StateType, PageType]):
|
|||
break
|
||||
finally:
|
||||
self.finalize()
|
||||
if self.updated:
|
||||
self.set_state_in_scheduler()
|
||||
self.set_state_in_scheduler()
|
||||
|
||||
return full_stats
|
||||
|
||||
|
@ -262,19 +263,26 @@ class Lister(Generic[StateType, PageType]):
|
|||
the state retrieved from the scheduler backend
|
||||
"""
|
||||
self.lister_obj = self.scheduler.get_or_create_lister(
|
||||
name=self.LISTER_NAME, instance_name=self.instance
|
||||
name=self.LISTER_NAME,
|
||||
instance_name=self.instance,
|
||||
)
|
||||
return self.state_from_dict(self.lister_obj.current_state)
|
||||
return self.state_from_dict(copy.deepcopy(self.lister_obj.current_state))
|
||||
|
||||
def set_state_in_scheduler(self) -> None:
|
||||
def set_state_in_scheduler(self, force: bool = False) -> None:
|
||||
"""Update the state in the scheduler backend from the state of the current
|
||||
instance.
|
||||
|
||||
Args:
|
||||
force: Update lister state even when lister has ``updated`` attribute
|
||||
set to :const:`False`, this is useful for tests
|
||||
|
||||
Raises:
|
||||
swh.scheduler.exc.StaleData: in case of a race condition between
|
||||
concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`).
|
||||
"""
|
||||
self.lister_obj.current_state = self.state_to_dict(self.state)
|
||||
if self.updated or force:
|
||||
self.lister_obj.current_state = self.state_to_dict(self.state)
|
||||
self.lister_obj.last_listing_finished_at = utcnow()
|
||||
self.lister_obj = self.scheduler.update_lister(self.lister_obj)
|
||||
|
||||
# State management to/from the scheduler
|
||||
|
|
|
@ -413,4 +413,5 @@ class SaveBulkLister(Lister[SaveBulkListerState, SaveBulkListerPage]):
|
|||
# update scheduler state at each rejected origin to get feedback
|
||||
# using Web API before end of listing
|
||||
self.state.rejected_origins = list(self.rejected_origins)
|
||||
self.updated = True
|
||||
self.set_state_in_scheduler()
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright (C) 2020-2021 The Software Heritage developers
|
||||
# Copyright (C) 2020-2024 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
@ -151,21 +151,34 @@ def test_run(swh_scheduler):
|
|||
|
||||
update_date = lister.lister_obj.updated
|
||||
|
||||
assert lister.lister_obj.last_listing_finished_at is None
|
||||
|
||||
run_result = lister.run()
|
||||
|
||||
assert run_result.pages == 2
|
||||
assert run_result.origins == 20
|
||||
|
||||
stored_lister = swh_scheduler.get_or_create_lister(
|
||||
name="test-pattern-lister", instance_name="example.com"
|
||||
name=lister.lister_obj.name, instance_name=lister.lister_obj.instance_name
|
||||
)
|
||||
|
||||
# Check that the finalize operation happened
|
||||
assert stored_lister.updated > update_date
|
||||
assert stored_lister.current_state["updated"] == "yes"
|
||||
assert stored_lister.last_listing_finished_at is not None
|
||||
|
||||
last_listing_finished_at = stored_lister.last_listing_finished_at
|
||||
|
||||
check_listed_origins(swh_scheduler, lister, stored_lister)
|
||||
|
||||
lister.run()
|
||||
|
||||
stored_lister = swh_scheduler.get_or_create_lister(
|
||||
name=lister.lister_obj.name, instance_name=lister.lister_obj.instance_name
|
||||
)
|
||||
|
||||
assert stored_lister.last_listing_finished_at > last_listing_finished_at
|
||||
|
||||
|
||||
class InstantiableStatelessLister(pattern.StatelessLister[PageType]):
|
||||
LISTER_NAME = "test-stateless-lister"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue