Add incremental function to Golang Lister
This commit is contained in:
parent
60405e78ae
commit
c6ce862d32
4 changed files with 185 additions and 12 deletions
|
@ -3,6 +3,7 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import json
|
||||
import logging
|
||||
|
@ -17,14 +18,22 @@ from swh.scheduler.interface import SchedulerInterface
|
|||
from swh.scheduler.model import ListedOrigin
|
||||
|
||||
from .. import USER_AGENT
|
||||
from ..pattern import CredentialsType, StatelessLister
|
||||
from ..pattern import CredentialsType, Lister
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GolangStateType:
|
||||
last_seen: Optional[datetime] = None
|
||||
"""Last timestamp of a package version we have saved.
|
||||
Used as a starting point for an incremental listing."""
|
||||
|
||||
|
||||
GolangPageType = List[Dict[str, Any]]
|
||||
|
||||
|
||||
class GolangLister(StatelessLister[GolangPageType]):
|
||||
class GolangLister(Lister[GolangStateType, GolangPageType]):
|
||||
"""
|
||||
List all Golang modules and send associated origins to scheduler.
|
||||
|
||||
|
@ -38,7 +47,10 @@ class GolangLister(StatelessLister[GolangPageType]):
|
|||
LISTER_NAME = "Golang"
|
||||
|
||||
def __init__(
|
||||
self, scheduler: SchedulerInterface, credentials: CredentialsType = None,
|
||||
self,
|
||||
scheduler: SchedulerInterface,
|
||||
incremental: bool = False,
|
||||
credentials: CredentialsType = None,
|
||||
):
|
||||
super().__init__(
|
||||
scheduler=scheduler,
|
||||
|
@ -51,6 +63,29 @@ class GolangLister(StatelessLister[GolangPageType]):
|
|||
self.session.headers.update(
|
||||
{"Accept": "application/json", "User-Agent": USER_AGENT}
|
||||
)
|
||||
self.incremental = incremental
|
||||
|
||||
def state_from_dict(self, d: Dict[str, Any]) -> GolangStateType:
|
||||
as_string = d.get("last_seen")
|
||||
last_seen = iso8601.parse_date(as_string) if as_string is not None else None
|
||||
return GolangStateType(last_seen=last_seen)
|
||||
|
||||
def state_to_dict(self, state: GolangStateType) -> Dict[str, Any]:
|
||||
return {
|
||||
"last_seen": state.last_seen.isoformat()
|
||||
if state.last_seen is not None
|
||||
else None
|
||||
}
|
||||
|
||||
def finalize(self):
|
||||
if self.incremental and self.state.last_seen is not None:
|
||||
scheduler_state = self.get_state_from_scheduler()
|
||||
|
||||
if (
|
||||
scheduler_state.last_seen is None
|
||||
or self.state.last_seen > scheduler_state.last_seen
|
||||
):
|
||||
self.updated = True
|
||||
|
||||
@throttling_retry(
|
||||
retry=retry_policy_generic,
|
||||
|
@ -108,17 +143,25 @@ class GolangLister(StatelessLister[GolangPageType]):
|
|||
return page, since
|
||||
|
||||
def get_pages(self) -> Iterator[GolangPageType]:
|
||||
page, since = self.get_single_page()
|
||||
last_since = since
|
||||
since = None
|
||||
if self.incremental:
|
||||
since = self.state.last_seen
|
||||
page, since = self.get_single_page(since=since)
|
||||
if since == self.state.last_seen:
|
||||
# The index returns packages whose timestamp are greater or
|
||||
# equal to the date provided as parameter, which will create
|
||||
# an infinite loop if not stopped here.
|
||||
return [], since
|
||||
if since is not None:
|
||||
self.state.last_seen = since
|
||||
|
||||
while page:
|
||||
yield page
|
||||
page, since = self.get_single_page(since=since)
|
||||
if last_since == since:
|
||||
# The index returns packages whose timestamp are greater or
|
||||
# equal to the date provided as parameter, which will create
|
||||
# an infinite loop if not stopped here.
|
||||
return []
|
||||
last_since = since
|
||||
if since == self.state.last_seen:
|
||||
return [], since
|
||||
if since is not None:
|
||||
self.state.last_seen = since
|
||||
|
||||
def get_origins_from_page(self, page: GolangPageType) -> Iterator[ListedOrigin]:
|
||||
"""
|
||||
|
|
|
@ -13,6 +13,13 @@ def list_golang(**lister_args):
|
|||
return GolangLister.from_configfile(**lister_args).run().dict()
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".IncrementalGolangLister")
|
||||
def list_golang_incremental(**lister_args):
|
||||
"""Incremental update of Golang packages"""
|
||||
lister = GolangLister.from_configfile(incremental=True, **lister_args)
|
||||
return lister.run().dict()
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".ping")
|
||||
def _ping():
|
||||
return "OK"
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import iso8601
|
||||
|
||||
from swh.lister.golang.lister import GolangLister
|
||||
from swh.lister.golang.lister import GolangLister, GolangStateType
|
||||
from swh.lister.tests.test_utils import assert_sleep_calls
|
||||
from swh.lister.utils import WAIT_EXP_BASE
|
||||
|
||||
|
@ -88,3 +89,105 @@ def test_golang_lister(swh_scheduler, mocker, requests_mock, datadir):
|
|||
|
||||
assert stats.pages == 3
|
||||
assert stats.origins == 18
|
||||
|
||||
|
||||
def test_golang_lister_incremental(swh_scheduler, requests_mock, datadir, mocker):
|
||||
# first listing, should return one origin per package
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
|
||||
responses = [
|
||||
{"text": Path(datadir, "page-1.txt").read_text(), "status_code": 200},
|
||||
]
|
||||
requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses)
|
||||
|
||||
stats = lister.run()
|
||||
|
||||
page1_last_timestamp = datetime.datetime(
|
||||
2019, 4, 11, 18, 47, 29, 390564, tzinfo=datetime.timezone.utc
|
||||
)
|
||||
page2_last_timestamp = datetime.datetime(
|
||||
2019, 4, 15, 13, 54, 35, 250835, tzinfo=datetime.timezone.utc
|
||||
)
|
||||
page3_last_timestamp = datetime.datetime(
|
||||
2019, 4, 18, 2, 7, 41, 336899, tzinfo=datetime.timezone.utc
|
||||
)
|
||||
mock.assert_has_calls(
|
||||
[
|
||||
# First call has no state
|
||||
mocker.call(since=None),
|
||||
# Second call is the last timestamp in the listed page
|
||||
mocker.call(since=page1_last_timestamp),
|
||||
]
|
||||
)
|
||||
|
||||
assert lister.get_state_from_scheduler() == GolangStateType(
|
||||
last_seen=page1_last_timestamp
|
||||
)
|
||||
|
||||
assert stats.pages == 1
|
||||
assert stats.origins == 5
|
||||
|
||||
# Incremental should list nothing
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
stats = lister.run()
|
||||
mock.assert_has_calls([mocker.call(since=page1_last_timestamp)])
|
||||
assert stats.pages == 0
|
||||
assert stats.origins == 0
|
||||
|
||||
# Add more responses
|
||||
responses = [
|
||||
{"text": Path(datadir, "page-2.txt").read_text(), "status_code": 200},
|
||||
]
|
||||
|
||||
requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses)
|
||||
|
||||
# Incremental should list new page
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
stats = lister.run()
|
||||
mock.assert_has_calls(
|
||||
[
|
||||
mocker.call(since=page1_last_timestamp),
|
||||
mocker.call(since=page2_last_timestamp),
|
||||
]
|
||||
)
|
||||
assert stats.pages == 1
|
||||
assert stats.origins == 4
|
||||
|
||||
# Incremental should list nothing again
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
stats = lister.run()
|
||||
assert stats.pages == 0
|
||||
assert stats.origins == 0
|
||||
mock.assert_has_calls([mocker.call(since=page2_last_timestamp)])
|
||||
|
||||
# Add yet more responses
|
||||
responses = [
|
||||
{"text": Path(datadir, "page-3.txt").read_text(), "status_code": 200},
|
||||
]
|
||||
|
||||
requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses)
|
||||
|
||||
# Incremental should list new page again
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
stats = lister.run()
|
||||
assert stats.pages == 1
|
||||
assert stats.origins == 9
|
||||
mock.assert_has_calls(
|
||||
[
|
||||
mocker.call(since=page2_last_timestamp),
|
||||
mocker.call(since=page3_last_timestamp),
|
||||
]
|
||||
)
|
||||
|
||||
# Incremental should list nothing one last time
|
||||
lister = GolangLister(scheduler=swh_scheduler, incremental=True)
|
||||
mock = mocker.spy(lister, "get_single_page")
|
||||
stats = lister.run()
|
||||
assert stats.pages == 0
|
||||
assert stats.origins == 0
|
||||
mock.assert_has_calls([mocker.call(since=page3_last_timestamp)])
|
||||
|
|
|
@ -30,3 +30,23 @@ def test_golang_full_listing_task(
|
|||
|
||||
lister.from_configfile.assert_called_once_with()
|
||||
lister.run.assert_called_once_with()
|
||||
|
||||
|
||||
def test_golang_incremental_listing_task(
|
||||
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
|
||||
):
|
||||
lister = mocker.patch("swh.lister.golang.tasks.GolangLister")
|
||||
lister.from_configfile.return_value = lister
|
||||
stats = ListerStats(pages=1, origins=28000)
|
||||
lister.run.return_value = stats
|
||||
|
||||
res = swh_scheduler_celery_app.send_task(
|
||||
"swh.lister.golang.tasks.IncrementalGolangLister"
|
||||
)
|
||||
assert res
|
||||
res.wait()
|
||||
assert res.successful()
|
||||
assert res.result == stats.dict()
|
||||
|
||||
lister.from_configfile.assert_called_once_with(incremental=True)
|
||||
lister.run.assert_called_once_with()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue