gitlab: Port to the new lister api

Related to T2987
This commit is contained in:
Antoine R. Dumont (@ardumont) 2021-01-23 15:18:20 +01:00
parent ff232f0d91
commit 1390a513f2
No known key found for this signature in database
GPG key ID: 52E2E9840D10C3B8
9 changed files with 241 additions and 181 deletions

View file

@ -1,14 +1,12 @@
# Copyright (C) 2019 the Software Heritage developers
# Copyright (C) 2019-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
def register():
from .lister import GitLabLister
from .models import GitLabModel
return {
"models": [GitLabModel],
"lister": GitLabLister,
"task_modules": ["%s.tasks" % __name__],
}

View file

@ -1,97 +1,202 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import time
from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union
from dataclasses import asdict, dataclass
import logging
from typing import Any, Dict, Iterator, Optional, Tuple
from urllib.parse import parse_qs, urlparse
from requests import Response
import requests
from requests.exceptions import HTTPError
from requests.status_codes import codes
from tenacity.before_sleep import before_sleep_log
from urllib3.util import parse_url
from ..core.page_by_page_lister import PageByPageHttpLister
from .models import GitLabModel
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, Lister
from swh.lister.utils import retry_attempt, throttling_retry
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
class GitLabLister(PageByPageHttpLister):
# Template path expecting an integer that represents the page id
PATH_TEMPLATE = "/projects?page=%d&order_by=id"
DEFAULT_URL = "https://gitlab.com/api/v4/"
MODEL = GitLabModel
@dataclass
class GitLabListerState:
"""State of the GitLabLister"""
last_seen_next_link: Optional[str] = None
"""Last link header (not visited yet) during an incremental pass
"""
Repository = Dict[str, Any]
@dataclass
class PageResult:
"""Result from a query to a gitlab project api page."""
repositories: Optional[Tuple[Repository, ...]] = None
next_page: Optional[str] = None
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate for handling HTTP responses with status code 403
with specific ratelimit header.
"""
attempt = retry_attempt(retry_state)
if attempt.failed:
exc = attempt.exception()
return (
isinstance(exc, HTTPError)
and exc.response.status_code == codes.forbidden
and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0
)
return False
def _parse_page_id(url: Optional[str]) -> Optional[int]:
"""Given an url, extract a return the 'page' query parameter associated value or None.
"""
if not url:
return None
# link: https://${project-api}/?...&page=2x...
query_data = parse_qs(urlparse(url).query)
page = query_data.get("page")
if page and len(page) > 0:
return int(page[0])
return None
class GitLabLister(Lister[GitLabListerState, PageResult]):
"""List origins for a gitlab instance.
By default, the lister runs in incremental mode: it lists all repositories,
starting with the `last_seen_next_link` stored in the scheduler backend.
"""
LISTER_NAME = "gitlab"
def __init__(
self, url=None, instance=None, override_config=None, sort="asc", per_page=20
):
super().__init__(url=url, override_config=override_config)
if instance is None:
instance = parse_url(self.url).host
self.instance = instance
self.PATH_TEMPLATE = "%s&sort=%s&per_page=%s" % (
self.PATH_TEMPLATE,
sort,
per_page,
)
def uid(self, repo: Dict[str, Any]) -> str:
return "%s/%s" % (self.instance, repo["path_with_namespace"])
def get_model_from_repo(self, repo: Dict[str, Any]) -> Dict[str, Any]:
return {
"instance": self.instance,
"uid": self.uid(repo),
"name": repo["name"],
"full_name": repo["path_with_namespace"],
"html_url": repo["web_url"],
"origin_url": repo["http_url_to_repo"],
"origin_type": "git",
}
def transport_quota_check(
self, response: Response
) -> Tuple[bool, Union[int, float]]:
"""Deal with rate limit if any.
"""
# not all gitlab instance have rate limit
if "RateLimit-Remaining" in response.headers:
reqs_remaining = int(response.headers["RateLimit-Remaining"])
if response.status_code == 403 and reqs_remaining == 0:
reset_at = int(response.headers["RateLimit-Reset"])
delay = min(reset_at - time.time(), 3600)
return True, delay
return False, 0
def _get_int(self, headers: MutableMapping[str, Any], key: str) -> Optional[int]:
_val = headers.get(key)
if _val:
return int(_val)
return None
def get_next_target_from_response(self, response: Response) -> Optional[int]:
"""Determine the next page identifier.
"""
return self._get_int(response.headers, "x-next-page")
def get_pages_information(
self,
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
"""Determine pages information.
scheduler,
url=None,
instance=None,
credentials: CredentialsType = None,
incremental: bool = False,
):
if instance is None:
instance = parse_url(url).host
super().__init__(
scheduler=scheduler,
credentials=None, # anonymous for now
url=url,
instance=instance,
)
self.incremental = incremental
"""
response = self.transport_head(identifier=1) # type: ignore
if not response.ok:
raise ValueError(
"Problem during information fetch: %s" % response.status_code
)
h = response.headers
return (
self._get_int(h, "x-total"),
self._get_int(h, "x-total-pages"),
self._get_int(h, "x-per-page"),
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]:
repos = response.json()
return [self.get_model_from_repo(repo) for repo in repos]
def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState:
return GitLabListerState(**d)
def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def get_page_result(self, url: str) -> PageResult:
logger.debug("Fetching URL %s", url)
response = self.session.get(url)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
repositories: Tuple[Repository, ...] = tuple(response.json())
if hasattr(response, "links") and response.links.get("next"):
next_page = response.links["next"]["url"]
else:
next_page = None
return PageResult(repositories, next_page)
def get_pages(self) -> Iterator[PageResult]:
next_page: Optional[str]
if self.incremental and self.state is not None:
next_page = self.state.last_seen_next_link
else:
next_page = f"{self.url}projects?page=1&order_by=id&sort=asc&per_page=20"
while next_page:
page_result = self.get_page_result(next_page)
yield page_result
next_page = page_result.next_page
def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
repositories = page_result.repositories if page_result.repositories else []
for repo in repositories:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["http_url_to_repo"],
visit_type="git",
# TODO: Support "last_activity_at" as update information
# last_update=repo["last_activity_at"],
)
def commit_page(self, page_result: PageResult) -> None:
"""Update currently stored state using the latest listed "next" page if relevant.
Relevancy is determined by the next_page link whose 'page' id must be strictly
superior to the currently stored one.
Note: this is a noop for full listing mode
"""
if self.incremental:
# link: https://${project-api}/?...&page=2x...
next_page = page_result.next_page
if next_page:
page_id = _parse_page_id(next_page)
previous_next_page = self.state.last_seen_next_link
previous_page_id = _parse_page_id(previous_next_page)
if previous_next_page is None or (
previous_page_id and page_id and previous_page_id < page_id
):
self.state.last_seen_next_link = next_page
def finalize(self) -> None:
"""finalize the lister state when relevant (see `fn:commit_page` for details)
Note: this is a noop for full listing mode
"""
next_page = self.state.last_seen_next_link
if self.incremental and next_page:
# link: https://${project-api}/?...&page=2x...
next_page_id = _parse_page_id(next_page)
scheduler_state = self.get_state_from_scheduler()
previous_next_page_id = _parse_page_id(scheduler_state.last_seen_next_link)
if (
previous_next_page_id
and next_page_id
and previous_next_page_id < next_page_id
):
self.updated = True

View file

@ -1,18 +0,0 @@
# Copyright (C) 2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from sqlalchemy import Column, String
from ..core.models import ModelBase
class GitLabModel(ModelBase):
"""a Gitlab repository from a gitlab instance
"""
__tablename__ = "gitlab_repo"
uid = Column(String, primary_key=True)
instance = Column(String, index=True)

View file

@ -1,26 +0,0 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
@pytest.fixture
def lister_under_test():
return "gitlab"
@pytest.fixture
def lister_gitlab(swh_lister):
for task_type in [
{
"type": "load-git",
"description": "Load git repository",
"backend_name": "swh.loader.git.tasks.UpdateGitRepository",
"default_interval": "1 day",
},
]:
swh_lister.scheduler.create_task_type(task_type)
return swh_lister

View file

@ -1 +0,0 @@
api_v4__projects,page=0,order_by=id,sort=asc,per_page=20

View file

@ -1,70 +1,74 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta
import logging
import re
import unittest
import pytest
from swh.lister.core.tests.test_lister import HttpListerTesterBase
from swh.lister.gitlab.lister import GitLabLister
from swh.lister.gitlab.lister import GitLabLister, _parse_page_id
from swh.lister.pattern import ListerStats
logger = logging.getLogger(__name__)
class GitLabListerTester(HttpListerTesterBase, unittest.TestCase):
Lister = GitLabLister
test_re = re.compile(r"^.*/projects.*page=(\d+).*")
lister_subdir = "gitlab"
good_api_response_file = "data/gitlab.com/api_response.json"
bad_api_response_file = "data/gitlab.com/api_empty_response.json"
first_index = 1
entries_per_page = 10
convert_type = int
def response_headers(self, request):
headers = {"RateLimit-Remaining": "1"}
if self.request_index(request) == self.first_index:
headers.update(
{"x-next-page": "3",}
)
return headers
def mock_rate_quota(self, n, request, context):
self.rate_limit += 1
context.status_code = 403
context.headers["RateLimit-Remaining"] = "0"
one_second = int((datetime.now() + timedelta(seconds=1.5)).timestamp())
context.headers["RateLimit-Reset"] = str(one_second)
return '{"error":"dummy"}'
@pytest.fixture
def lister_under_test():
return "gitlab"
def lister_gitlab(swh_scheduler):
url = "https://gitlab.com/api/v4/"
return GitLabLister(swh_scheduler, url=url)
# class GitLabListerTester(HttpListerTesterBase, unittest.TestCase):
# Lister = GitLabLister
# test_re = re.compile(r"^.*/projects.*page=(\d+).*")
# lister_subdir = "gitlab"
# good_api_response_file = "data/gitlab.com/api_response.json"
# bad_api_response_file = "data/gitlab.com/api_empty_response.json"
# first_index = 1
# entries_per_page = 10
# convert_type = int
# def response_headers(self, request):
# headers = {"RateLimit-Remaining": "1"}
# if self.request_index(request) == self.first_index:
# headers.update(
# {"x-next-page": "3",}
# )
# return headers
# def mock_rate_quota(self, n, request, context):
# self.rate_limit += 1
# context.status_code = 403
# context.headers["RateLimit-Remaining"] = "0"
# one_second = int((datetime.now() + timedelta(seconds=1.5)).timestamp())
# context.headers["RateLimit-Reset"] = str(one_second)
# return '{"error":"dummy"}'
def test_lister_gitlab(lister_gitlab, requests_mock_datadir):
lister_gitlab.run()
listed_result = lister_gitlab.run()
assert listed_result == ListerStats(pages=1, origins=10)
r = lister_gitlab.scheduler.search_tasks(task_type="load-git")
assert len(r) == 10
scheduler_origins = lister_gitlab.scheduler.get_listed_origins(
lister_gitlab.lister_obj.id
).origins
assert len(scheduler_origins) == 10
for row in r:
assert row["type"] == "load-git"
# arguments check
args = row["arguments"]["args"]
assert len(args) == 0
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith("https://gitlab.com")
# kwargs
kwargs = row["arguments"]["kwargs"]
url = kwargs["url"]
assert url.startswith("https://gitlab.com")
assert row["policy"] == "recurring"
assert row["priority"] is None
@pytest.mark.parametrize(
"url,expected_result",
[
(None, None),
("http://dummy/?query=1", None),
("http://dummy/?foo=bar&page=1&some=result", 1),
("http://dummy/?foo=bar&page=&some=result", None),
],
)
def test__parse_page_id(url, expected_result):
assert _parse_page_id(url) == expected_result

View file

@ -48,7 +48,6 @@ def test_get_lister_override():
db_url = init_db().url()
listers = {
"gitlab": "https://other.gitlab.uni/api/v4/",
"cgit": "https://some.where/cgit",
}