Make PyPI lister incremental and complete in regards to last_update

This rewrote the current implementation to actually use pypi's xml-rpc api which allows
to be incremental. It also allows to fetch the last release date per package. This last
part actually make it possible to update the "last_update" entry in the ListedOrigin
model.

Related to T3399
This commit is contained in:
Antoine R. Dumont (@ardumont) 2021-07-07 16:14:09 +02:00
parent 698be475e9
commit df46b22098
No known key found for this signature in database
GPG key ID: 52E2E9840D10C3B8
4 changed files with 337 additions and 97 deletions

View file

@ -3,32 +3,71 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import logging
from typing import Iterator, List, Optional
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xmlrpc.client import Fault, ServerProxy
from bs4 import BeautifulSoup
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import retry_attempt, throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, StatelessLister
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
PackageListPage = List[str]
# Type returned by the XML-RPC changelog call:
# package, version, release timestamp, description, serial
ChangelogEntry = Tuple[str, str, int, str, int]
# Manipulated package updated type which is a subset information
# of the ChangelogEntry type: package, max release date
PackageUpdate = Tuple[str, datetime]
# Type returned by listing a page of results
PackageListPage = List[PackageUpdate]
class PyPILister(StatelessLister[PackageListPage]):
@dataclass
class PyPIListerState:
"""State of PyPI lister"""
last_serial: Optional[int] = None
"""Last seen serial when visiting the pypi instance"""
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate to handle xmlrpc client error:
.. code::
xmlrpc.client.Fault: <Fault -32500: 'HTTPTooManyRequests: The action could not
be performed because there were too many requests by the client. Limit may reset
in 1 seconds.'>
"""
attempt = retry_attempt(retry_state)
return attempt.failed and isinstance(attempt.exception(), Fault)
def pypi_url(package_name: str) -> str:
"""Build pypi url out of a package name.
"""
return PyPILister.PACKAGE_URL.format(package_name=package_name)
class PyPILister(Lister[PyPIListerState, PackageListPage]):
"""List origins from PyPI.
"""
LISTER_NAME = "pypi"
INSTANCE = "pypi" # As of today only the main pypi.org is used
PACKAGE_LIST_URL = "https://pypi.org/simple/"
PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url
PACKAGE_URL = "https://pypi.org/project/{package_name}/"
def __init__(
@ -43,35 +82,100 @@ class PyPILister(StatelessLister[PackageListPage]):
credentials=credentials,
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/html", "User-Agent": USER_AGENT}
)
# used as termination condition and if useful, becomes the new state when the
# visit is done
self.last_processed_serial: Optional[int] = None
def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState:
return PyPIListerState(last_serial=d.get("last_serial"))
def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_last_serial(self, client: ServerProxy) -> int:
"""Internal detail to allow throttling when calling the changelog last entry"""
serial = client.changelog_last_serial()
assert isinstance(serial, int)
return serial
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_since_serial(
self, client: ServerProxy, serial: int
) -> List[ChangelogEntry]:
"""Internal detail to allow throttling when calling the changelog listing"""
sleep(1) # to avoid the initial warning about throttling
return client.changelog_since_serial(serial) # type: ignore
def get_pages(self) -> Iterator[PackageListPage]:
"""Iterate other changelog events per package, determine the max release date for that
package and use that max release date as last_update. When the execution is
done, this will also set the self.last_processed_serial attribute so we can
finalize the state of the lister for the next visit.
response = self.session.get(self.PACKAGE_LIST_URL)
Yields:
List of Tuple of (package-name, max release-date)
response.raise_for_status()
"""
client = ServerProxy(self.url)
page = BeautifulSoup(response.content, features="html.parser")
last_processed_serial = -1
if self.state.last_serial is not None:
last_processed_serial = self.state.last_serial
upstream_last_serial = self._changelog_last_serial(client)
page_results = [p.text for p in page.find_all("a")]
# Paginate through result of pypi, until we read everything
while last_processed_serial < upstream_last_serial:
updated_packages = defaultdict(list)
yield page_results
for package, _, release_date, _, serial in self._changelog_since_serial(
client, last_processed_serial
):
updated_packages[package].append(release_date)
# Compute the max serial so we can stop when done
last_processed_serial = max(last_processed_serial, serial)
# Returns pages of result to flush regularly
yield [
(
pypi_url(package),
datetime.fromtimestamp(max(release_dates)).replace(
tzinfo=timezone.utc
),
)
for package, release_dates in updated_packages.items()
]
self.last_processed_serial = upstream_last_serial
def get_origins_from_page(
self, packages_name: PackageListPage
self, packages: PackageListPage
) -> Iterator[ListedOrigin]:
"""Convert a page of PyPI repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for package_name in packages_name:
package_url = self.PACKAGE_URL.format(package_name=package_name)
for origin, last_update in packages:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=package_url,
url=origin,
visit_type="pypi",
last_update=None, # available on PyPI JSON API
last_update=last_update,
)
def finalize(self):
"""Finalize the visit state by updating with the new last_serial if updates
actually happened.
"""
self.updated = (
self.state
and self.state.last_serial
and self.last_processed_serial
and self.state.last_serial < self.last_processed_serial
) or (not self.state.last_serial and self.last_processed_serial)
if self.updated:
self.state.last_serial = self.last_processed_serial

View file

@ -1,4 +1,4 @@
# Copyright (C) 2018 the Software Heritage developers
# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@ -7,7 +7,7 @@ from celery import shared_task
from .lister import PyPILister
@shared_task(name=__name__ + ".PyPIListerTask")
@shared_task(name=f"{__name__}.PyPIListerTask")
def list_pypi():
"Full listing of the PyPI registry"
lister = PyPILister.from_configfile()

View file

@ -1,12 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Simple index</title>
</head>
<body>
<a href="/simple/0lever-so/">0lever-so</a>
<a href="/simple/0lever-utils/">0lever-utils</a>
<a href="/simple/0-orchestrator/">0-orchestrator</a>
<a href="/simple/0wned/">0wned</a>
</body>
</html>

View file

@ -1,26 +1,19 @@
# Copyright (C) 2019 The Software Heritage developers
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timezone
from typing import List
import pytest
import requests
from swh.lister.pypi.lister import PyPILister
from swh.lister.pypi.lister import ChangelogEntry, PyPILister, pypi_url
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
@pytest.fixture
def pypi_packages_testdata(datadir):
content = Path(datadir, "https_pypi.org", "simple").read_bytes()
names = ["0lever-so", "0lever-utils", "0-orchestrator", "0wned"]
urls = [PyPILister.PACKAGE_URL.format(package_name=n) for n in names]
return content, names, urls
def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]):
"""Asserts that the two collections have the same origin URLs"""
@ -33,53 +26,6 @@ def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedO
assert l_url == s_origin.url
def test_pypi_list(swh_scheduler, requests_mock, mocker, pypi_packages_testdata):
t_content, t_names, t_urls = pypi_packages_testdata
requests_mock.get(PyPILister.PACKAGE_LIST_URL, content=t_content)
lister = PyPILister(scheduler=swh_scheduler)
lister.get_origins_from_page = mocker.spy(lister, "get_origins_from_page")
lister.session.get = mocker.spy(lister.session, "get")
stats = lister.run()
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
lister.session.get.assert_called_once_with(lister.PACKAGE_LIST_URL)
lister.get_origins_from_page.assert_called_once_with(t_names)
assert stats.pages == 1
assert stats.origins == 4
assert len(scheduler_origins) == 4
check_listed_origins(t_urls, scheduler_origins)
assert lister.get_state_from_scheduler() is None
@pytest.mark.parametrize("http_code", [400, 429, 500])
def test_pypi_list_http_error(swh_scheduler, requests_mock, mocker, http_code):
requests_mock.get(
PyPILister.PACKAGE_LIST_URL, [{"content": None, "status_code": http_code},],
)
lister = PyPILister(scheduler=swh_scheduler)
lister.session.get = mocker.spy(lister.session, "get")
with pytest.raises(requests.HTTPError):
lister.run()
lister.session.get.assert_called_once_with(lister.PACKAGE_LIST_URL)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == 0
@pytest.mark.parametrize(
"credentials, expected_credentials",
[
@ -109,3 +55,205 @@ def test_lister_pypi_from_configfile(swh_scheduler_config, mocker):
lister = PyPILister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
def to_serial(changelog_entry: ChangelogEntry) -> int:
"""Helper utility to read the serial entry in the tuple
Args:
changelog_entry: Changelog entry to read data from
Returns:
The serial from the entry
"""
return changelog_entry[4]
def configure_scheduler_state(
scheduler: SchedulerInterface, data: List[ChangelogEntry]
):
"""Allows to pre configure a last serial state for the lister consistent with the test
data set (the last_serial will be something inferior than the most minimal serial
in the data set).
Args:
scheduler: The actual scheduler instance used during test
data: The actual dataset used during test
"""
# Compute the lowest serial to make it a minimum state to store in the scheduler
lowest_serial = min(map(to_serial, data))
# We'll need to configure the scheduler's state
lister_obj = scheduler.get_or_create_lister(
name=PyPILister.LISTER_NAME, instance_name=PyPILister.INSTANCE
)
lister_obj.current_state = {"last_serial": lowest_serial - 10}
scheduler.update_lister(lister_obj)
@pytest.fixture
def mock_pypi_xmlrpc(mocker, swh_scheduler):
"""This setups a lister so it can actually fake the call to the rpc service executed
during an incremental listing.
To retrieve or update the faked data, open a python3 toplevel and execute the
following:
.. code:: python
from datetime import timezone, datetime, timedelta
from xmlrpc.client import ServerProxy
from swh.scheduler.utils import utcnow
RPC_URL = "https://pypi.org/pypi"
cli = ServerProxy(RPC_URL)
last_serial = cli.changelog_last_serial()
# 10854808
last_state_serial = 2168587
results = cli.changelog_since_serial(last_state_serial)
Returns:
the following Tuple[serial, List[PackageUpdate], MagicMock, MagicMock] type.
"""
data = [
["wordsmith", None, 1465998124, "add Owner DoublePlusAwks", 2168628],
["wordsmith", "0.1", 1465998123, "new release", 2168629],
["wordsmith", "0.1", 1465998131, "update classifiers", 2168630],
[
"UFx",
"1.0",
1465998207,
"update author_email, home_page, summary, description",
2168631,
],
["UFx", "1.0", 1465998236, "remove file UFx-1.0.tar.gz", 2168632],
["wordsmith", "0.1", 1465998309, "update classifiers", 2168633],
[
"wordsmith",
"0.1",
1465998406,
"update summary, description, classifiers",
2168634,
],
["property-manager", "2.0", 1465998436, "new release", 2168635],
[
"property-manager",
"2.0",
1465998439,
"add source file property-manager-2.0.tar.gz",
2168636,
],
["numtest", "2.0.0", 1465998446, "new release", 2168637],
["property-manager", "2.1", 1465998468, "new release", 2168638],
[
"property-manager",
"2.1",
1465998472,
"add source file property-manager-2.1.tar.gz",
2168639,
],
["kafka-utils", "0.2.0", 1465998477, "new release", 2168640],
[
"kafka-utils",
"0.2.0",
1465998480,
"add source file kafka-utils-0.2.0.tar.gz",
2168641,
],
["numtest", "2.0.1", 1465998520, "new release", 2168642],
["coala-bears", "0.3.0.dev20160615134909", 1465998552, "new release", 2168643],
[
"coala-bears",
"0.3.0.dev20160615134909",
1465998556,
"add py3 file coala_bears-0.3.0.dev20160615134909-py3-none-any.whl",
2168644,
],
["django_sphinxsearch", "0.4.0", 1465998571, "new release", 2168645],
[
"django_sphinxsearch",
"0.4.0",
1465998573,
"add source file django_sphinxsearch-0.4.0.tar.gz",
2168646,
],
[
"coala-bears",
"0.3.0.dev20160615134909",
1465998589,
"add source file coala-bears-0.3.0.dev20160615134909.tar.gz",
2168647,
],
]
highest_serial = min(map(to_serial, data))
def sleep(seconds):
pass
mocker.patch("swh.lister.pypi.lister.sleep").return_value = sleep
class FakeServerProxy:
"""Fake Server Proxy"""
def changelog_last_serial(self):
return highest_serial
def changelog_since_serial(self, serial):
return data
mock_serverproxy = mocker.patch("swh.lister.pypi.lister.ServerProxy")
mock_serverproxy.return_value = FakeServerProxy()
return highest_serial, data, mock_serverproxy
@pytest.mark.parametrize("configure_state", [True, False])
def test_lister_pypi_run(mock_pypi_xmlrpc, swh_scheduler, configure_state):
highest_serial, data, mock_serverproxy = mock_pypi_xmlrpc
if configure_state:
configure_scheduler_state(swh_scheduler, data)
updated_packages = defaultdict(list)
for [package, _, release_date, _, _] in data:
updated_packages[package].append(release_date)
assert len(updated_packages) > 0
expected_last_updates = {
pypi_url(package): datetime.fromtimestamp(max(releases)).replace(
tzinfo=timezone.utc
)
for package, releases in updated_packages.items()
}
expected_pypi_urls = [pypi_url(package_name) for package_name in updated_packages]
lister = PyPILister(scheduler=swh_scheduler)
stats = lister.run()
assert mock_serverproxy.called
assert stats.pages == 1
assert stats.origins == len(updated_packages)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == stats.origins
check_listed_origins(expected_pypi_urls, scheduler_origins)
actual_scheduler_state = lister.get_state_from_scheduler()
# This new visit updated the state to the new one
assert actual_scheduler_state.last_serial == highest_serial
for listed_origin in scheduler_origins:
assert listed_origin.last_update is not None
assert listed_origin.last_update == expected_last_updates[listed_origin.url]
def test__if_rate_limited():
# TODO
pass