Refactor and deduplicate HTTP requests code in listers

Numerous listers were using the same page_request method or equivalent
in their implementation so prefer to deduplicate that code by adding
an http_request method in base lister class: swh.lister.pattern.Lister.

That method simply wraps a call to requests.Session.request and logs
some useful info for debugging and error reporting, also an HTTPError
will be raised if a request ends up with an error.

All listers using that new method now benefit of requests retry when
an HTTP error occurs thanks to the use of the http_retry decorator.
This commit is contained in:
Antoine Lambert 2022-09-21 19:53:22 +02:00
parent 9c55acd286
commit db6ce12e9e
28 changed files with 174 additions and 449 deletions

View file

@ -1,4 +1,4 @@
# Copyright (C) 2021 The Software Heritage developers
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@ -8,14 +8,9 @@ import logging
from typing import Any, Dict, Iterator, List
from urllib.parse import urljoin
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import http_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
@ -65,11 +60,7 @@ class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]):
instance=instance,
)
self.session = requests.Session()
# Declare the USER_AGENT is more sysadm-friendly for the forge we list
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
self.session.headers.update({"Accept": "application/json"})
def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState:
return NewForgeListerState(**d)
@ -77,32 +68,6 @@ class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]):
def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]:
return asdict(state)
@http_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url, params) -> requests.Response:
# Do the network resource request under a retrying decorator
# to handle rate limiting and transient errors up to a limit.
# `http_retry` by default use the `requests` library to check
# only for rate-limit and a base-10 exponential waiting strategy.
# This can be customized by passed waiting, retrying and logging strategies
# as functions. See the `tenacity` library documentation.
# Log listed URL to ease debugging
logger.debug("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
# Log response content to ease debugging
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
# The lister must fail on blocking errors
response.raise_for_status()
return response
def get_pages(self) -> Iterator[NewForgeListerPage]:
# The algorithm depends on the service, but should request data reliably,
# following pagination if relevant and yielding pages in a streaming fashion.
@ -120,7 +85,7 @@ class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]):
while current is not None:
# Parametrize the request for incremental listing
body = self.page_request(url, {"current": current}).json()
body = self.http_request(url, params={"current": current}).json()
# Simplify the page if possible to only the necessary elements
# and yield it