launchpad: Reimplement lister using new Lister API

Port launchpad lister to the swh.lister.pattern.Lister API.

Last update date of each listed git repositories is now sent to the scheduler.

The lister can work in incremental mode, only modified repositories since
the last listing operation will be returned in that case.

Closes T2992
This commit is contained in:
Antoine Lambert 2021-01-27 18:58:54 +01:00
parent ae17b6b9a0
commit f862004700
11 changed files with 307 additions and 191 deletions

View file

@ -20,6 +20,12 @@ ignore_missing_imports = True
[mypy-iso8601.*]
ignore_missing_imports = True
[mypy-launchpadlib.*]
ignore_missing_imports = True
[mypy-lazr.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True

View file

@ -1,14 +1,12 @@
# Copyright (C) 2020 the Software Heritage developers
# Copyright (C) 2020-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
def register():
from .lister import LaunchpadLister
from .models import LaunchpadModel
return {
"models": [LaunchpadModel],
"lister": LaunchpadLister,
"task_modules": ["%s.tasks" % __name__],
}

View file

@ -1,128 +1,127 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
from itertools import count
from typing import Any, Dict, List, Optional, Tuple, Union
from dataclasses import asdict, dataclass
from datetime import datetime
import logging
from typing import Any, Dict, Iterator, Optional
from launchpadlib.launchpad import Launchpad # type: ignore
from lazr.restfulclient.resource import Collection, Entry # type: ignore
from sqlalchemy import func
import iso8601
from launchpadlib.launchpad import Launchpad
from lazr.restfulclient.resource import Collection
from swh.lister.core.lister_base import ListerBase
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .models import LaunchpadModel
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
LaunchpadPageType = Iterator[Collection]
class LaunchpadLister(ListerBase):
MODEL = LaunchpadModel
@dataclass
class LaunchpadListerState:
"""State of Launchpad lister"""
date_last_modified: Optional[datetime] = None
"""modification date of last updated repository since last listing"""
class LaunchpadLister(Lister[LaunchpadListerState, LaunchpadPageType]):
"""
List git repositories from Launchpad.
Args:
scheduler: instance of SchedulerInterface
incremental: defines if incremental listing should be used, in that case
only modified or new repositories since last incremental listing operation
will be returned
"""
LISTER_NAME = "launchpad"
instance = "launchpad"
flush_packet_db = 20
def __init__(self, override_config=None):
super().__init__(override_config=override_config)
def __init__(
self,
scheduler: SchedulerInterface,
incremental: bool = False,
credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
url="https://launchpad.net/",
instance="launchpad",
credentials=credentials,
)
self.incremental = incremental
self.date_last_modified = None
def state_from_dict(self, d: Dict[str, Any]) -> LaunchpadListerState:
date_last_modified = d.get("date_last_modified")
if date_last_modified is not None:
d["date_last_modified"] = iso8601.parse_date(date_last_modified)
return LaunchpadListerState(**d)
def state_to_dict(self, state: LaunchpadListerState) -> Dict[str, Any]:
d = asdict(state)
date_last_modified = d.get("date_last_modified")
if date_last_modified is not None:
d["date_last_modified"] = date_last_modified.isoformat()
return d
def get_pages(self) -> Iterator[LaunchpadPageType]:
"""
Yields an iterator on all git repositories hosted on Launchpad sorted
by last modification date in ascending order.
"""
launchpad = Launchpad.login_anonymously(
"softwareheritage", "production", version="devel"
)
self.get_repos = launchpad.git_repositories.getRepositories
def get_model_from_repo(self, repo: Entry) -> Dict[str, Union[str, datetime]]:
return {
"uid": repo.unique_name,
"name": repo.name,
"full_name": repo.name,
"origin_url": repo.git_https_url,
"html_url": repo.web_link,
"origin_type": "git",
"date_last_modified": repo.date_last_modified,
}
def lib_response_simplified(
self, response: Collection
) -> List[Dict[str, Union[str, datetime]]]:
return [
self.get_model_from_repo(repo) for repo in response[: len(response.entries)]
]
def db_last_threshold(self) -> Optional[datetime]:
t = self.db_session.query(func.max(self.MODEL.date_last_modified)).first()
if t:
return t[0]
else:
return None
def ingest_data_lp(
self, identifier: Optional[datetime], checks: bool = False
) -> Tuple[Collection, dict]:
""" The core data fetch sequence. Request launchpadlib endpoint. Simplify and
filter response list of repositories. Inject repo information into
local db. Queue loader tasks for linked repositories.
Args:
identifier: Resource identifier.
checks: Additional checks required
"""
response = self.get_repos(
order_by="most neglected first", modified_since_date=identifier
date_last_modified = None
if self.incremental:
date_last_modified = self.state.date_last_modified
get_repos = launchpad.git_repositories.getRepositories
yield get_repos(
order_by="most neglected first", modified_since_date=date_last_modified
)
models_list = self.lib_response_simplified(response)
models_list = self.filter_before_inject(models_list)
if checks:
models_list = self.do_additional_checks(models_list)
if not models_list:
return response, {}
# inject into local db
injected = self.inject_repo_data_into_db(models_list)
# queue workers
self.schedule_missing_tasks(models_list, injected)
return response, injected
def run(self, max_bound: Optional[datetime] = None) -> Dict[str, Any]:
""" Main entry function. Sequentially fetches repository data
from the service according to the basic outline in the class
docstring, continually fetching sublists until either there
is no next index reference given or the given next index is greater
than the desired max_bound.
Args:
max_bound : optional date to start at
Returns:
Dict containing listing status
def get_origins_from_page(self, page: LaunchpadPageType) -> Iterator[ListedOrigin]:
"""
status = "uneventful"
Iterate on all git repositories and yield ListedOrigin instances.
"""
assert self.lister_obj.id is not None
def ingest_git_repos():
threshold = max_bound
for i in count(1):
response, injected_repos = self.ingest_data_lp(threshold)
if not response and not injected_repos:
return
for repo in page:
# batch is empty
if len(response.entries) == 0:
return
origin_url = repo.git_https_url
first: datetime = response[0].date_last_modified
last: datetime = response[len(response.entries) - 1].date_last_modified
# filter out origins with invalid URL
if not origin_url.startswith("https://"):
continue
next_date = last - timedelta(seconds=15)
last_update = repo.date_last_modified
if next_date <= first:
delta = last - first
next_date = last - delta / 2
self.date_last_modified = last_update
threshold = next_date
yield i
logger.debug("Found origin %s last updated on %s", origin_url, last_update)
for i in ingest_git_repos():
if (i % self.flush_packet_db) == 0:
self.db_session.commit()
self.db_session = self.mk_session()
status = "eventful"
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type="git",
url=origin_url,
last_update=last_update,
)
self.db_session.commit()
self.db_session = self.mk_session()
return {"status": status}
def finalize(self) -> None:
if self.date_last_modified is None:
return
if self.incremental and (
self.state.date_last_modified is None
or self.date_last_modified > self.state.date_last_modified
):
self.state.date_last_modified = self.date_last_modified
self.updated = True

View file

@ -1,16 +0,0 @@
# Copyright (C) 2017-2020 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from sqlalchemy import Column, Date, String
from swh.lister.core.models import ModelBase
class LaunchpadModel(ModelBase):
"""a Launchpad repository"""
__tablename__ = "launchpad_repo"
uid = Column(String, primary_key=True)
date_last_modified = Column(Date, index=True)

View file

@ -1,4 +1,4 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@ -8,27 +8,20 @@ from celery import shared_task
from .lister import LaunchpadLister
@shared_task(name=__name__ + ".ping")
def ping():
return "OK"
@shared_task(name=__name__ + ".FullLaunchpadLister")
def list_launchpad_full(**lister_args):
"""Full listing of git repositories hosted on Launchpad"""
lister = LaunchpadLister.from_configfile(**lister_args)
return lister.run().dict()
@shared_task(name=__name__ + ".IncrementalLaunchpadLister")
def list_launchpad_incremental(threshold, **lister_args):
"""Incremental update
"""
lister = LaunchpadLister(**lister_args)
return lister.run(max_bound=threshold)
@shared_task(name=__name__ + ".FullLaunchpadLister", bind=True)
def list_launchpad_full(self, **lister_args):
"""Full update of Launchpad
"""
self.log.debug("%s OK, spawned full task" % (self.name))
return list_launchpad_incremental(threshold=None, **lister_args)
@shared_task(name=__name__ + ".NewLaunchpadLister", bind=True)
def list_launchpad_new(self, **lister_args):
"""Update new entries of Launchpad
"""
lister = LaunchpadLister(**lister_args)
threshold = lister.db_last_threshold()
self.log.debug("%s OK, spawned new task" % (self.name))
return list_launchpad_incremental(threshold=threshold, **lister_args)
def list_launchpad_incremental(**lister_args):
"""Incremental listing of git repositories hosted on Launchpad"""
lister = LaunchpadLister.from_configfile(**lister_args, incremental=True)
return lister.run().dict()

View file

@ -1 +0,0 @@
[]

View file

@ -1,30 +1,156 @@
# Copyright (C) 2020 The Software Heritage developers
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import json
from pathlib import Path
from typing import List
def test_launchpad_lister(lister_launchpad, datadir):
lister_launchpad.run()
import pytest
assert len(lister_launchpad.get_repos.mock_calls) == 3
from ..lister import LaunchpadLister
r = lister_launchpad.scheduler.search_tasks(task_type="load-git")
assert len(r) == 30
for row in r:
assert row["type"] == "load-git"
# arguments check
args = row["arguments"]["args"]
assert len(args) == 0
class _Repo:
def __init__(self, d: dict):
for key in d.keys():
if key == "date_last_modified":
setattr(self, key, datetime.fromisoformat(d[key]))
else:
setattr(self, key, d[key])
# kwargs
kwargs = row["arguments"]["kwargs"]
assert set(kwargs.keys()) == {"url"}
url = kwargs["url"]
assert url.startswith("https://git.launchpad.net")
class _Collection:
entries: List[_Repo] = []
assert row["policy"] == "recurring"
assert row["priority"] is None
assert row["retries_left"] == 3
def __init__(self, file):
self.entries = [_Repo(r) for r in file]
def __getitem__(self, key):
return self.entries[key]
def __len__(self):
return len(self.entries)
def _launchpad_response(datadir, datafile):
return _Collection(json.loads(Path(datadir, datafile).read_text()))
@pytest.fixture
def launchpad_response1(datadir):
return _launchpad_response(datadir, "launchpad_response1.json")
@pytest.fixture
def launchpad_response2(datadir):
return _launchpad_response(datadir, "launchpad_response2.json")
def _mock_getRepositories(mocker, launchpad_response):
mock_launchpad = mocker.patch("swh.lister.launchpad.lister.Launchpad")
mock_getRepositories = mock_launchpad.git_repositories.getRepositories
mock_getRepositories.return_value = launchpad_response
mock_launchpad.login_anonymously.return_value = mock_launchpad
return mock_getRepositories
def _check_listed_origins(scheduler_origins, launchpad_response):
for origin in launchpad_response:
filtered_origins = [
o for o in scheduler_origins if o.url == origin.git_https_url
]
assert len(filtered_origins) == 1
assert filtered_origins[0].last_update == origin.date_last_modified
def test_lister_from_configfile(swh_scheduler_config, mocker):
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
load_from_envvar.return_value = {
"scheduler": {"cls": "local", **swh_scheduler_config},
"credentials": {},
}
lister = LaunchpadLister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
def test_launchpad_full_lister(swh_scheduler, mocker, launchpad_response1):
mock_getRepositories = _mock_getRepositories(mocker, launchpad_response1)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert not lister.incremental
assert lister.updated
assert stats.pages == 1
assert stats.origins == len(launchpad_response1)
mock_getRepositories.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == len(launchpad_response1)
_check_listed_origins(scheduler_origins, launchpad_response1)
def test_launchpad_incremental_lister(
swh_scheduler, mocker, launchpad_response1, launchpad_response2
):
mock_getRepositories = _mock_getRepositories(mocker, launchpad_response1)
lister = LaunchpadLister(scheduler=swh_scheduler, incremental=True)
stats = lister.run()
assert lister.incremental
assert lister.updated
assert stats.pages == 1
assert stats.origins == len(launchpad_response1)
mock_getRepositories.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
lister_state = lister.get_state_from_scheduler()
assert lister_state.date_last_modified == launchpad_response1[-1].date_last_modified
mock_getRepositories = _mock_getRepositories(mocker, launchpad_response2)
lister = LaunchpadLister(scheduler=swh_scheduler, incremental=True)
stats = lister.run()
assert lister.incremental
assert lister.updated
assert stats.pages == 1
assert stats.origins == len(launchpad_response2)
mock_getRepositories.assert_called_once_with(
order_by="most neglected first",
modified_since_date=lister_state.date_last_modified,
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == len(launchpad_response1) + len(launchpad_response2)
_check_listed_origins(scheduler_origins, launchpad_response1)
_check_listed_origins(scheduler_origins, launchpad_response2)
def test_launchpad_lister_invalid_url_filtering(
swh_scheduler, mocker,
):
invalid_origin = [_Repo({"git_https_url": "tag:launchpad.net:2008:redacted",})]
_mock_getRepositories(mocker, invalid_origin)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert not lister.updated
assert stats.pages == 1
assert stats.origins == 0

View file

@ -1,34 +1,26 @@
# Copyright (C) 2020 The Software Heritage developers
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from unittest.mock import patch
from swh.lister.pattern import ListerStats
@patch("swh.lister.launchpad.tasks.LaunchpadLister")
def test_new(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
# setup the mocked LaunchpadLister
lister.return_value = lister
lister.run.return_value = None
res = swh_scheduler_celery_app.send_task(
"swh.lister.launchpad.tasks.NewLaunchpadLister"
)
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.launchpad.tasks.ping")
assert res
res.wait()
assert res.successful()
assert lister.call_count == 2
lister.db_last_threshold.assert_called_once()
lister.run.assert_called_once()
assert res.result == "OK"
@patch("swh.lister.launchpad.tasks.LaunchpadLister")
def test_full(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
# setup the mocked LaunchpadLister
lister.return_value = lister
lister.run.return_value = None
def test_launchpad_full_listing_task(
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
):
lister = mocker.patch("swh.lister.launchpad.tasks.LaunchpadLister")
lister.from_configfile.return_value = lister
stats = ListerStats(pages=1, origins=28000)
lister.run.return_value = stats
res = swh_scheduler_celery_app.send_task(
"swh.lister.launchpad.tasks.FullLaunchpadLister"
@ -36,7 +28,27 @@ def test_full(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
assert res
res.wait()
assert res.successful()
assert res.result == stats.dict()
lister.assert_called_once()
lister.db_last_threshold.assert_not_called()
lister.run.assert_called_once_with(max_bound=None)
lister.from_configfile.assert_called_once_with()
lister.run.assert_called_once_with()
def test_launchpad_incremental_listing_task(
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
):
lister = mocker.patch("swh.lister.launchpad.tasks.LaunchpadLister")
lister.from_configfile.return_value = lister
stats = ListerStats(pages=1, origins=200)
lister.run.return_value = stats
res = swh_scheduler_celery_app.send_task(
"swh.lister.launchpad.tasks.IncrementalLaunchpadLister"
)
assert res
res.wait()
assert res.successful()
assert res.result == stats.dict()
lister.from_configfile.assert_called_once_with(incremental=True)
lister.run.assert_called_once_with()

View file

@ -36,8 +36,7 @@ def test_get_lister(swh_scheduler_config):
db_url = init_db().url()
# Drop launchpad lister from the lister to check, its test setup is more involved
# than the other listers and it's not currently done here
supported_listers = set(SUPPORTED_LISTERS) - {"launchpad"}
for lister_name in supported_listers:
for lister_name in SUPPORTED_LISTERS:
lst = get_lister(
lister_name,
db_url,