Move GitHubSession from github/lister.py to github/utils.py
So it can be reused by other packages without importing lister.py itself
This commit is contained in:
parent
9ee4a99f15
commit
2d04244cc9
3 changed files with 177 additions and 164 deletions
|
@ -1,4 +1,4 @@
|
|||
# Copyright (C) 2020 The Software Heritage developers
|
||||
# Copyright (C) 2020-2022 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
@ -6,178 +6,20 @@
|
|||
from dataclasses import asdict, dataclass
|
||||
import datetime
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from typing import Any, Dict, Iterator, List, Optional, Set
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
import iso8601
|
||||
import requests
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_any,
|
||||
retry_if_exception_type,
|
||||
retry_if_result,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from swh.scheduler.interface import SchedulerInterface
|
||||
from swh.scheduler.model import ListedOrigin
|
||||
|
||||
from .. import USER_AGENT
|
||||
from ..pattern import CredentialsType, Lister
|
||||
from .utils import GitHubSession, MissingRateLimitReset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RateLimited(Exception):
|
||||
def __init__(self, response):
|
||||
self.reset_time: Optional[int]
|
||||
|
||||
# Figure out how long we need to sleep because of that rate limit
|
||||
ratelimit_reset = response.headers.get("X-Ratelimit-Reset")
|
||||
retry_after = response.headers.get("Retry-After")
|
||||
if ratelimit_reset is not None:
|
||||
self.reset_time = int(ratelimit_reset)
|
||||
elif retry_after is not None:
|
||||
self.reset_time = int(time.time()) + int(retry_after) + 1
|
||||
else:
|
||||
logger.warning(
|
||||
"Received a rate-limit-like status code %s, but no rate-limit "
|
||||
"headers set. Response content: %s",
|
||||
response.status_code,
|
||||
response.content,
|
||||
)
|
||||
self.reset_time = None
|
||||
self.response = response
|
||||
|
||||
|
||||
class MissingRateLimitReset(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class GitHubSession:
|
||||
"""Manages a :class:`requests.Session` with (optionally) multiple credentials,
|
||||
and cycles through them when reaching rate-limits."""
|
||||
|
||||
def __init__(self, credentials: Optional[List[Dict[str, str]]] = None) -> None:
|
||||
"""Initialize a requests session with the proper headers for requests to
|
||||
GitHub."""
|
||||
self.credentials = credentials
|
||||
if self.credentials:
|
||||
random.shuffle(self.credentials)
|
||||
|
||||
self.session = requests.Session()
|
||||
|
||||
self.session.headers.update(
|
||||
{"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
|
||||
)
|
||||
|
||||
self.anonymous = not self.credentials
|
||||
|
||||
if self.anonymous:
|
||||
logger.warning("No tokens set in configuration, using anonymous mode")
|
||||
|
||||
self.token_index = -1
|
||||
self.current_user: Optional[str] = None
|
||||
|
||||
if not self.anonymous:
|
||||
# Initialize the first token value in the session headers
|
||||
self.set_next_session_token()
|
||||
|
||||
def set_next_session_token(self) -> None:
|
||||
"""Update the current authentication token with the next one in line."""
|
||||
|
||||
assert self.credentials
|
||||
|
||||
self.token_index = (self.token_index + 1) % len(self.credentials)
|
||||
|
||||
auth = self.credentials[self.token_index]
|
||||
|
||||
self.current_user = auth["username"]
|
||||
logger.debug("Using authentication token for user %s", self.current_user)
|
||||
|
||||
if "password" in auth:
|
||||
token = auth["password"]
|
||||
else:
|
||||
token = auth["token"]
|
||||
|
||||
self.session.headers.update({"Authorization": f"token {token}"})
|
||||
|
||||
@retry(
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
retry=retry_any(
|
||||
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g.
|
||||
# when running the lister on a connection with high latency
|
||||
retry_if_exception_type(requests.exceptions.ChunkedEncodingError),
|
||||
# 502 status codes happen for a Server Error, sometimes
|
||||
retry_if_result(lambda r: r.status_code == 502),
|
||||
),
|
||||
)
|
||||
def _request(self, url: str) -> requests.Response:
|
||||
response = self.session.get(url)
|
||||
|
||||
if (
|
||||
# GitHub returns inconsistent status codes between unauthenticated
|
||||
# rate limit and authenticated rate limits. Handle both.
|
||||
response.status_code == 429
|
||||
or (self.anonymous and response.status_code == 403)
|
||||
):
|
||||
raise RateLimited(response)
|
||||
|
||||
return response
|
||||
|
||||
def request(self, url) -> requests.Response:
|
||||
"""Repeatedly requests the given URL, cycling through credentials and sleeping
|
||||
if necessary; until either a successful response or :exc:`MissingRateLimitReset`
|
||||
"""
|
||||
# The following for/else loop handles rate limiting; if successful,
|
||||
# it provides the rest of the function with a `response` object.
|
||||
#
|
||||
# If all tokens are rate-limited, we sleep until the reset time,
|
||||
# then `continue` into another iteration of the outer while loop,
|
||||
# attempting to get data from the same URL again.
|
||||
|
||||
while True:
|
||||
max_attempts = len(self.credentials) if self.credentials else 1
|
||||
reset_times: Dict[int, int] = {} # token index -> time
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
return self._request(url)
|
||||
except RateLimited as e:
|
||||
reset_info = "(unknown reset)"
|
||||
if e.reset_time is not None:
|
||||
reset_times[self.token_index] = e.reset_time
|
||||
reset_info = "(resetting in %ss)" % (e.reset_time - time.time())
|
||||
|
||||
if not self.anonymous:
|
||||
logger.info(
|
||||
"Rate limit exhausted for current user %s %s",
|
||||
self.current_user,
|
||||
reset_info,
|
||||
)
|
||||
# Use next token in line
|
||||
self.set_next_session_token()
|
||||
# Wait one second to avoid triggering GitHub's abuse rate limits
|
||||
time.sleep(1)
|
||||
|
||||
# All tokens have been rate-limited. What do we do?
|
||||
|
||||
if not reset_times:
|
||||
logger.warning(
|
||||
"No X-Ratelimit-Reset value found in responses for any token; "
|
||||
"Giving up."
|
||||
)
|
||||
raise MissingRateLimitReset()
|
||||
|
||||
sleep_time = max(reset_times.values()) - time.time() + 1
|
||||
logger.info(
|
||||
"Rate limits exhausted for all tokens. Sleeping for %f seconds.",
|
||||
sleep_time,
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitHubListerState:
|
||||
"""State of the GitHub lister"""
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Dict, Iterator, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
import requests_mock
|
||||
|
||||
from swh.lister.github.lister import GitHubLister, time
|
||||
from swh.lister.github.lister import GitHubLister
|
||||
from swh.lister.pattern import CredentialsType, ListerStats
|
||||
from swh.scheduler.interface import SchedulerInterface
|
||||
from swh.scheduler.model import Lister
|
||||
|
@ -267,7 +268,7 @@ def requests_ratelimited(
|
|||
|
||||
|
||||
def test_anonymous_ratelimit(swh_scheduler, caplog, requests_ratelimited) -> None:
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.lister")
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.utils")
|
||||
|
||||
lister = GitHubLister(scheduler=swh_scheduler)
|
||||
assert lister.github_session.anonymous
|
||||
|
@ -357,7 +358,7 @@ def test_ratelimit_once_recovery(
|
|||
lister_credentials,
|
||||
):
|
||||
"""Check that the lister recovers from hitting the rate-limit once"""
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.lister")
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.utils")
|
||||
|
||||
lister = GitHubLister(scheduler=swh_scheduler, credentials=lister_credentials)
|
||||
|
||||
|
@ -395,7 +396,7 @@ def test_ratelimit_reset_sleep(
|
|||
):
|
||||
"""Check that the lister properly handles rate-limiting when providing it with
|
||||
authentication tokens"""
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.lister")
|
||||
caplog.set_level(logging.DEBUG, "swh.lister.github.utils")
|
||||
|
||||
lister = GitHubLister(scheduler=swh_scheduler, credentials=lister_credentials)
|
||||
|
||||
|
|
170
swh/lister/github/utils.py
Normal file
170
swh/lister/github/utils.py
Normal file
|
@ -0,0 +1,170 @@
|
|||
# Copyright (C) 2020-2022 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_any,
|
||||
retry_if_exception_type,
|
||||
retry_if_result,
|
||||
wait_exponential,
|
||||
)
|
||||
|
||||
from .. import USER_AGENT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RateLimited(Exception):
|
||||
def __init__(self, response):
|
||||
self.reset_time: Optional[int]
|
||||
|
||||
# Figure out how long we need to sleep because of that rate limit
|
||||
ratelimit_reset = response.headers.get("X-Ratelimit-Reset")
|
||||
retry_after = response.headers.get("Retry-After")
|
||||
if ratelimit_reset is not None:
|
||||
self.reset_time = int(ratelimit_reset)
|
||||
elif retry_after is not None:
|
||||
self.reset_time = int(time.time()) + int(retry_after) + 1
|
||||
else:
|
||||
logger.warning(
|
||||
"Received a rate-limit-like status code %s, but no rate-limit "
|
||||
"headers set. Response content: %s",
|
||||
response.status_code,
|
||||
response.content,
|
||||
)
|
||||
self.reset_time = None
|
||||
self.response = response
|
||||
|
||||
|
||||
class MissingRateLimitReset(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class GitHubSession:
|
||||
"""Manages a :class:`requests.Session` with (optionally) multiple credentials,
|
||||
and cycles through them when reaching rate-limits."""
|
||||
|
||||
def __init__(self, credentials: Optional[List[Dict[str, str]]] = None) -> None:
|
||||
"""Initialize a requests session with the proper headers for requests to
|
||||
GitHub."""
|
||||
self.credentials = credentials
|
||||
if self.credentials:
|
||||
random.shuffle(self.credentials)
|
||||
|
||||
self.session = requests.Session()
|
||||
|
||||
self.session.headers.update(
|
||||
{"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
|
||||
)
|
||||
|
||||
self.anonymous = not self.credentials
|
||||
|
||||
if self.anonymous:
|
||||
logger.warning("No tokens set in configuration, using anonymous mode")
|
||||
|
||||
self.token_index = -1
|
||||
self.current_user: Optional[str] = None
|
||||
|
||||
if not self.anonymous:
|
||||
# Initialize the first token value in the session headers
|
||||
self.set_next_session_token()
|
||||
|
||||
def set_next_session_token(self) -> None:
|
||||
"""Update the current authentication token with the next one in line."""
|
||||
|
||||
assert self.credentials
|
||||
|
||||
self.token_index = (self.token_index + 1) % len(self.credentials)
|
||||
|
||||
auth = self.credentials[self.token_index]
|
||||
|
||||
self.current_user = auth["username"]
|
||||
logger.debug("Using authentication token for user %s", self.current_user)
|
||||
|
||||
if "password" in auth:
|
||||
token = auth["password"]
|
||||
else:
|
||||
token = auth["token"]
|
||||
|
||||
self.session.headers.update({"Authorization": f"token {token}"})
|
||||
|
||||
@retry(
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
retry=retry_any(
|
||||
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g.
|
||||
# when running the lister on a connection with high latency
|
||||
retry_if_exception_type(requests.exceptions.ChunkedEncodingError),
|
||||
# 502 status codes happen for a Server Error, sometimes
|
||||
retry_if_result(lambda r: r.status_code == 502),
|
||||
),
|
||||
)
|
||||
def _request(self, url: str) -> requests.Response:
|
||||
response = self.session.get(url)
|
||||
|
||||
if (
|
||||
# GitHub returns inconsistent status codes between unauthenticated
|
||||
# rate limit and authenticated rate limits. Handle both.
|
||||
response.status_code == 429
|
||||
or (self.anonymous and response.status_code == 403)
|
||||
):
|
||||
raise RateLimited(response)
|
||||
|
||||
return response
|
||||
|
||||
def request(self, url) -> requests.Response:
|
||||
"""Repeatedly requests the given URL, cycling through credentials and sleeping
|
||||
if necessary; until either a successful response or :exc:`MissingRateLimitReset`
|
||||
"""
|
||||
# The following for/else loop handles rate limiting; if successful,
|
||||
# it provides the rest of the function with a `response` object.
|
||||
#
|
||||
# If all tokens are rate-limited, we sleep until the reset time,
|
||||
# then `continue` into another iteration of the outer while loop,
|
||||
# attempting to get data from the same URL again.
|
||||
|
||||
while True:
|
||||
max_attempts = len(self.credentials) if self.credentials else 1
|
||||
reset_times: Dict[int, int] = {} # token index -> time
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
return self._request(url)
|
||||
except RateLimited as e:
|
||||
reset_info = "(unknown reset)"
|
||||
if e.reset_time is not None:
|
||||
reset_times[self.token_index] = e.reset_time
|
||||
reset_info = "(resetting in %ss)" % (e.reset_time - time.time())
|
||||
|
||||
if not self.anonymous:
|
||||
logger.info(
|
||||
"Rate limit exhausted for current user %s %s",
|
||||
self.current_user,
|
||||
reset_info,
|
||||
)
|
||||
# Use next token in line
|
||||
self.set_next_session_token()
|
||||
# Wait one second to avoid triggering GitHub's abuse rate limits
|
||||
time.sleep(1)
|
||||
|
||||
# All tokens have been rate-limited. What do we do?
|
||||
|
||||
if not reset_times:
|
||||
logger.warning(
|
||||
"No X-Ratelimit-Reset value found in responses for any token; "
|
||||
"Giving up."
|
||||
)
|
||||
raise MissingRateLimitReset()
|
||||
|
||||
sleep_time = max(reset_times.values()) - time.time() + 1
|
||||
logger.info(
|
||||
"Rate limits exhausted for all tokens. Sleeping for %f seconds.",
|
||||
sleep_time,
|
||||
)
|
||||
time.sleep(sleep_time)
|
Loading…
Add table
Add a link
Reference in a new issue