github: Refactor rate-limiting out of the GitHubLister class
This will allow the GitHub Metadata Fetcher to reuse the logic by importing the GitHubSession class.
This commit is contained in:
parent
334c54091e
commit
9ee4a99f15
2 changed files with 130 additions and 116 deletions
|
@ -30,19 +30,6 @@ from ..pattern import CredentialsType, Lister
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def init_session(session: Optional[requests.Session] = None) -> requests.Session:
|
||||
"""Initialize a requests session with the proper headers for requests to
|
||||
GitHub."""
|
||||
if not session:
|
||||
session = requests.Session()
|
||||
|
||||
session.headers.update(
|
||||
{"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
|
||||
)
|
||||
|
||||
return session
|
||||
|
||||
|
||||
class RateLimited(Exception):
|
||||
def __init__(self, response):
|
||||
self.reset_time: Optional[int]
|
||||
|
@ -65,30 +52,130 @@ class RateLimited(Exception):
|
|||
self.response = response
|
||||
|
||||
|
||||
@retry(
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
retry=retry_any(
|
||||
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g.
|
||||
# when running the lister on a connection with high latency
|
||||
retry_if_exception_type(requests.exceptions.ChunkedEncodingError),
|
||||
# 502 status codes happen for a Server Error, sometimes
|
||||
retry_if_result(lambda r: r.status_code == 502),
|
||||
),
|
||||
)
|
||||
def github_request(url: str, session: requests.Session) -> requests.Response:
|
||||
response = session.get(url)
|
||||
class MissingRateLimitReset(Exception):
|
||||
pass
|
||||
|
||||
anonymous = "Authorization" not in session.headers
|
||||
|
||||
if (
|
||||
# GitHub returns inconsistent status codes between unauthenticated
|
||||
# rate limit and authenticated rate limits. Handle both.
|
||||
response.status_code == 429
|
||||
or (anonymous and response.status_code == 403)
|
||||
):
|
||||
raise RateLimited(response)
|
||||
class GitHubSession:
|
||||
"""Manages a :class:`requests.Session` with (optionally) multiple credentials,
|
||||
and cycles through them when reaching rate-limits."""
|
||||
|
||||
return response
|
||||
def __init__(self, credentials: Optional[List[Dict[str, str]]] = None) -> None:
|
||||
"""Initialize a requests session with the proper headers for requests to
|
||||
GitHub."""
|
||||
self.credentials = credentials
|
||||
if self.credentials:
|
||||
random.shuffle(self.credentials)
|
||||
|
||||
self.session = requests.Session()
|
||||
|
||||
self.session.headers.update(
|
||||
{"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
|
||||
)
|
||||
|
||||
self.anonymous = not self.credentials
|
||||
|
||||
if self.anonymous:
|
||||
logger.warning("No tokens set in configuration, using anonymous mode")
|
||||
|
||||
self.token_index = -1
|
||||
self.current_user: Optional[str] = None
|
||||
|
||||
if not self.anonymous:
|
||||
# Initialize the first token value in the session headers
|
||||
self.set_next_session_token()
|
||||
|
||||
def set_next_session_token(self) -> None:
|
||||
"""Update the current authentication token with the next one in line."""
|
||||
|
||||
assert self.credentials
|
||||
|
||||
self.token_index = (self.token_index + 1) % len(self.credentials)
|
||||
|
||||
auth = self.credentials[self.token_index]
|
||||
|
||||
self.current_user = auth["username"]
|
||||
logger.debug("Using authentication token for user %s", self.current_user)
|
||||
|
||||
if "password" in auth:
|
||||
token = auth["password"]
|
||||
else:
|
||||
token = auth["token"]
|
||||
|
||||
self.session.headers.update({"Authorization": f"token {token}"})
|
||||
|
||||
@retry(
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
retry=retry_any(
|
||||
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g.
|
||||
# when running the lister on a connection with high latency
|
||||
retry_if_exception_type(requests.exceptions.ChunkedEncodingError),
|
||||
# 502 status codes happen for a Server Error, sometimes
|
||||
retry_if_result(lambda r: r.status_code == 502),
|
||||
),
|
||||
)
|
||||
def _request(self, url: str) -> requests.Response:
|
||||
response = self.session.get(url)
|
||||
|
||||
if (
|
||||
# GitHub returns inconsistent status codes between unauthenticated
|
||||
# rate limit and authenticated rate limits. Handle both.
|
||||
response.status_code == 429
|
||||
or (self.anonymous and response.status_code == 403)
|
||||
):
|
||||
raise RateLimited(response)
|
||||
|
||||
return response
|
||||
|
||||
def request(self, url) -> requests.Response:
|
||||
"""Repeatedly requests the given URL, cycling through credentials and sleeping
|
||||
if necessary; until either a successful response or :exc:`MissingRateLimitReset`
|
||||
"""
|
||||
# The following for/else loop handles rate limiting; if successful,
|
||||
# it provides the rest of the function with a `response` object.
|
||||
#
|
||||
# If all tokens are rate-limited, we sleep until the reset time,
|
||||
# then `continue` into another iteration of the outer while loop,
|
||||
# attempting to get data from the same URL again.
|
||||
|
||||
while True:
|
||||
max_attempts = len(self.credentials) if self.credentials else 1
|
||||
reset_times: Dict[int, int] = {} # token index -> time
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
return self._request(url)
|
||||
except RateLimited as e:
|
||||
reset_info = "(unknown reset)"
|
||||
if e.reset_time is not None:
|
||||
reset_times[self.token_index] = e.reset_time
|
||||
reset_info = "(resetting in %ss)" % (e.reset_time - time.time())
|
||||
|
||||
if not self.anonymous:
|
||||
logger.info(
|
||||
"Rate limit exhausted for current user %s %s",
|
||||
self.current_user,
|
||||
reset_info,
|
||||
)
|
||||
# Use next token in line
|
||||
self.set_next_session_token()
|
||||
# Wait one second to avoid triggering GitHub's abuse rate limits
|
||||
time.sleep(1)
|
||||
|
||||
# All tokens have been rate-limited. What do we do?
|
||||
|
||||
if not reset_times:
|
||||
logger.warning(
|
||||
"No X-Ratelimit-Reset value found in responses for any token; "
|
||||
"Giving up."
|
||||
)
|
||||
raise MissingRateLimitReset()
|
||||
|
||||
sleep_time = max(reset_times.values()) - time.time() + 1
|
||||
logger.info(
|
||||
"Rate limits exhausted for all tokens. Sleeping for %f seconds.",
|
||||
sleep_time,
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -156,37 +243,7 @@ class GitHubLister(Lister[GitHubListerState, List[Dict[str, Any]]]):
|
|||
|
||||
self.relisting = self.first_id is not None or self.last_id is not None
|
||||
|
||||
self.session = init_session()
|
||||
|
||||
random.shuffle(self.credentials)
|
||||
|
||||
self.anonymous = not self.credentials
|
||||
|
||||
if self.anonymous:
|
||||
logger.warning("No tokens set in configuration, using anonymous mode")
|
||||
|
||||
self.token_index = -1
|
||||
self.current_user: Optional[str] = None
|
||||
|
||||
if not self.anonymous:
|
||||
# Initialize the first token value in the session headers
|
||||
self.set_next_session_token()
|
||||
|
||||
def set_next_session_token(self) -> None:
|
||||
"""Update the current authentication token with the next one in line."""
|
||||
|
||||
self.token_index = (self.token_index + 1) % len(self.credentials)
|
||||
|
||||
auth = self.credentials[self.token_index]
|
||||
if "password" in auth:
|
||||
token = auth["password"]
|
||||
else:
|
||||
token = auth["token"]
|
||||
|
||||
self.current_user = auth["username"]
|
||||
logger.debug("Using authentication token for user %s", self.current_user)
|
||||
|
||||
self.session.headers.update({"Authorization": f"token {token}"})
|
||||
self.github_session = GitHubSession(credentials=self.credentials)
|
||||
|
||||
def state_from_dict(self, d: Dict[str, Any]) -> GitHubListerState:
|
||||
return GitHubListerState(**d)
|
||||
|
@ -206,54 +263,11 @@ class GitHubLister(Lister[GitHubListerState, List[Dict[str, Any]]]):
|
|||
while self.last_id is None or current_id < self.last_id:
|
||||
logger.debug("Getting page %s", current_url)
|
||||
|
||||
# The following for/else loop handles rate limiting; if successful,
|
||||
# it provides the rest of the function with a `response` object.
|
||||
#
|
||||
# If all tokens are rate-limited, we sleep until the reset time,
|
||||
# then `continue` into another iteration of the outer while loop,
|
||||
# attempting to get data from the same URL again.
|
||||
|
||||
max_attempts = 1 if self.anonymous else len(self.credentials)
|
||||
reset_times: Dict[int, int] = {} # token index -> time
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
response = github_request(current_url, session=self.session)
|
||||
break
|
||||
except RateLimited as e:
|
||||
reset_info = "(unknown reset)"
|
||||
if e.reset_time is not None:
|
||||
reset_times[self.token_index] = e.reset_time
|
||||
reset_info = "(resetting in %ss)" % (e.reset_time - time.time())
|
||||
|
||||
if not self.anonymous:
|
||||
logger.info(
|
||||
"Rate limit exhausted for current user %s %s",
|
||||
self.current_user,
|
||||
reset_info,
|
||||
)
|
||||
# Use next token in line
|
||||
self.set_next_session_token()
|
||||
# Wait one second to avoid triggering GitHub's abuse rate limits
|
||||
time.sleep(1)
|
||||
else:
|
||||
# All tokens have been rate-limited. What do we do?
|
||||
|
||||
if not reset_times:
|
||||
logger.warning(
|
||||
"No X-Ratelimit-Reset value found in responses for any token; "
|
||||
"Giving up."
|
||||
)
|
||||
break
|
||||
|
||||
sleep_time = max(reset_times.values()) - time.time() + 1
|
||||
logger.info(
|
||||
"Rate limits exhausted for all tokens. Sleeping for %f seconds.",
|
||||
sleep_time,
|
||||
)
|
||||
time.sleep(sleep_time)
|
||||
# This goes back to the outer page-by-page loop, doing one more
|
||||
# iteration on the same page
|
||||
continue
|
||||
try:
|
||||
response = self.github_session.request(current_url)
|
||||
except MissingRateLimitReset:
|
||||
# Give up
|
||||
break
|
||||
|
||||
# We've successfully retrieved a (non-ratelimited) `response`. We
|
||||
# still need to check it for validity.
|
||||
|
|
|
@ -270,7 +270,7 @@ def test_anonymous_ratelimit(swh_scheduler, caplog, requests_ratelimited) -> Non
|
|||
caplog.set_level(logging.DEBUG, "swh.lister.github.lister")
|
||||
|
||||
lister = GitHubLister(scheduler=swh_scheduler)
|
||||
assert lister.anonymous
|
||||
assert lister.github_session.anonymous
|
||||
assert "using anonymous mode" in caplog.records[-1].message
|
||||
caplog.clear()
|
||||
|
||||
|
@ -315,9 +315,9 @@ def test_authenticated_credentials(
|
|||
caplog.set_level(logging.DEBUG, "swh.lister.github.lister")
|
||||
|
||||
lister = GitHubLister(scheduler=swh_scheduler, credentials=lister_credentials)
|
||||
assert lister.token_index == 0
|
||||
assert lister.github_session.token_index == 0
|
||||
assert sorted(lister.credentials, key=lambda t: t["username"]) == github_credentials
|
||||
assert lister.session.headers["Authorization"] in [
|
||||
assert lister.github_session.session.headers["Authorization"] in [
|
||||
"token %s" % t for t in all_tokens
|
||||
]
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue