Implement phabricator lister using the new pattern class

This commit is contained in:
Nicolas Dandrimont 2020-07-22 12:05:56 +02:00
parent 734901747b
commit 9944295729
7 changed files with 152 additions and 283 deletions

View file

@ -5,10 +5,9 @@
def register():
from .lister import PhabricatorLister
from .models import PhabricatorModel
return {
"models": [PhabricatorModel],
"models": [],
"lister": PhabricatorLister,
"task_modules": ["%s.tasks" % __name__],
}

View file

@ -1,166 +1,147 @@
# Copyright (C) 2019 the Software Heritage developers
# Copyright (C) 2019-2020 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
import random
from typing import Any, Dict, List, Optional
import urllib.parse
from typing import Any, Dict, Iterator, List, Optional
from requests import Response
from sqlalchemy import func
import requests
from swh.lister.core.indexing_lister import IndexingHttpLister
from swh.lister.phabricator.models import PhabricatorModel
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
class PhabricatorLister(IndexingHttpLister):
PATH_TEMPLATE = "?order=oldest&attachments[uris]=1&after=%s"
DEFAULT_URL = "https://forge.softwareheritage.org/api/diffusion.repository.search"
MODEL = PhabricatorModel
PageType = List[Dict[str, Any]]
class PhabricatorLister(StatelessLister[PageType]):
LISTER_NAME = "phabricator"
def __init__(self, url=None, instance=None, override_config=None):
super().__init__(url=url, override_config=override_config)
if not instance:
instance = urllib.parse.urlparse(self.url).hostname
self.instance = instance
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: str,
api_token: Optional[str] = None,
credentials: CredentialsType = None,
):
super().__init__(scheduler, url, instance, credentials)
def request_params(self, identifier: str) -> Dict[str, Any]:
"""Override the default params behavior to retrieve the api token
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
Credentials are stored as:
credentials:
phabricator:
<instance>:
- username: <account>
password: <api-token>
"""
creds = self.request_instance_credentials()
if not creds:
raise ValueError(
"Phabricator forge needs authentication credential to list."
)
api_token = random.choice(creds)["password"]
return {
"headers": self.request_headers() or {},
"params": {"api.token": api_token},
}
def request_headers(self):
"""
(Override) Set requests headers to send when querying the
Phabricator API
"""
headers = super().request_headers()
headers["Accept"] = "application/json"
return headers
def get_model_from_repo(self, repo: Dict[str, Any]) -> Optional[Dict[str, Any]]:
url = get_repo_url(repo["attachments"]["uris"]["uris"])
if url is None:
return None
return {
"uid": url,
"indexable": repo["id"],
"name": repo["fields"]["shortName"],
"full_name": repo["fields"]["name"],
"html_url": url,
"origin_url": url,
"origin_type": repo["fields"]["vcs"],
"instance": self.instance,
}
def get_next_target_from_response(self, response: Response) -> Optional[int]:
body = response.json()["result"]["cursor"]
if body["after"] and body["after"] != "null":
return int(body["after"])
return None
def transport_response_simplified(
self, response: Response
) -> List[Optional[Dict[str, Any]]]:
repos = response.json()
if repos["result"] is None:
raise ValueError(
"Problem during information fetch: %s" % repos["error_code"]
)
repos = repos["result"]["data"]
return [self.get_model_from_repo(repo) for repo in repos]
def filter_before_inject(self, models_list):
"""
(Overrides) IndexingLister.filter_before_inject
Bounds query results by this Lister's set max_index.
"""
models_list = [m for m in models_list if m is not None]
return super().filter_before_inject(models_list)
def disable_deleted_repo_tasks(self, index: int, next_index: int, keep_these: str):
"""
(Overrides) Fix provided index value to avoid:
- database query error
- erroneously disabling some scheduler tasks
"""
# First call to the Phabricator API uses an empty 'after' parameter,
# so set the index to 0 to avoid database query error
if index == "":
index = 0
# Next listed repository ids are strictly greater than the 'after'
# parameter, so increment the index to avoid disabling the latest
# created task when processing a new repositories page returned by
# the Phabricator API
if api_token is not None:
self.api_token = api_token
else:
index = index + 1
return super().disable_deleted_repo_tasks(index, next_index, keep_these)
if not self.credentials:
raise ValueError(
f"No credentials found for phabricator instance {self.instance};"
" Please set them in the lister configuration file."
)
def db_first_index(self) -> Optional[int]:
"""
(Overrides) Filter results by Phabricator instance
self.api_token = random.choice(self.credentials)["password"]
Returns:
the smallest indexable value of all repos in the db
"""
t = self.db_session.query(func.min(self.MODEL.indexable))
t = t.filter(self.MODEL.instance == self.instance).first()
if t:
return t[0]
return None
def get_request_params(self, after: Optional[str]) -> Dict[str, str]:
"""Get the query parameters for the request."""
def db_last_index(self):
"""
(Overrides) Filter results by Phabricator instance
base_params = {
# Stable order
"order": "oldest",
# Add all URIs to the response
"attachments[uris]": "1",
# API token from stored credentials
"api.token": self.api_token,
}
Returns:
the largest indexable value of all instance repos in the db
"""
t = self.db_session.query(func.max(self.MODEL.indexable))
t = t.filter(self.MODEL.instance == self.instance).first()
if t:
return t[0]
if after is not None:
base_params["after"] = after
def db_query_range(self, start: int, end: int):
"""
(Overrides) Filter the results by the Phabricator instance to
avoid disabling loading tasks for repositories hosted on a
different instance.
return base_params
Returns:
a list of sqlalchemy.ext.declarative.declarative_base objects
with indexable values within the given range for the instance
"""
retlist = super().db_query_range(start, end)
return retlist.filter(self.MODEL.instance == self.instance)
@staticmethod
def filter_params(params: Dict[str, str]) -> Dict[str, str]:
"""Filter the parameters for debug purposes"""
return {
k: (v if k != "api.token" else "**redacted**") for k, v in params.items()
}
def get_pages(self) -> Iterator[PageType]:
after: Optional[str] = None
while True:
params = self.get_request_params(after)
logger.debug(
"Retrieving results on URI=%s, parameters %s",
self.url,
self.filter_params(params),
)
response = self.session.post(self.url, data=params)
if response.status_code != 200:
logger.warning(
"Got unexpected status_code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
break
response_data = response.json()
if response_data.get("result") is None:
logger.warning(
"Got unexpected response on %s: %s", response.url, response_data,
)
break
result = response_data["result"]
yield result["data"]
after = None
if "cursor" in result and "after" in result["cursor"]:
after = result["cursor"]["after"]
if not after:
logger.debug("Empty `after` cursor. All done")
break
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for repo in page:
url = get_repo_url(repo["attachments"]["uris"]["uris"])
if url is None:
short_name: Optional[str] = None
for field in "shortName", "name", "callsign":
short_name = repo["fields"].get(field)
if short_name:
break
logger.warning(
"No valid url for repository [%s] (phid=%s)",
short_name or repo["phid"],
repo["phid"],
)
continue
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=url,
visit_type=repo["fields"]["vcs"],
# The "dateUpdated" field returned by the Phabricator API only refers to
# the repository metadata; We can't use it for our purposes.
last_update=None,
)
def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[int]:
def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[str]:
"""
Return url for a hosted repository from its uris attachments according
to the following priority lists:

View file

@ -1,17 +0,0 @@
# Copyright (C) 2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from sqlalchemy import Column, Integer, String
from swh.lister.core.models import IndexingModelBase
class PhabricatorModel(IndexingModelBase):
"""a Phabricator repository"""
__tablename__ = "phabricator_repo"
uid = Column(String, primary_key=True)
indexable = Column(Integer, index=True)
instance = Column(String, index=True)

View file

@ -8,9 +8,9 @@ from swh.lister.phabricator.lister import PhabricatorLister
@shared_task(name=__name__ + ".FullPhabricatorLister")
def list_phabricator_full(**lister_args):
def list_phabricator_full(url: str, instance: str):
"""Full update of a Phabricator instance"""
return PhabricatorLister(**lister_args).run()
return PhabricatorLister.from_configfile(url=url, instance=instance).run()
@shared_task(name=__name__ + ".ping")

View file

@ -3,131 +3,27 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import importlib.resources
import json
import logging
import re
import unittest
import requests_mock
from swh.lister.core.tests.test_lister import HttpListerTester
from swh.lister.phabricator.lister import PhabricatorLister, get_repo_url
logger = logging.getLogger(__name__)
from swh.lister.phabricator.lister import get_repo_url
class PhabricatorListerTester(HttpListerTester, unittest.TestCase):
Lister = PhabricatorLister
# first request will have the after parameter empty
test_re = re.compile(r"\&after=([^?&]*)")
lister_subdir = "phabricator"
good_api_response_file = "data/api_first_response.json"
good_api_response_undefined_protocol = "data/api_response_undefined_protocol.json"
bad_api_response_file = "data/api_empty_response.json"
# first_index must be retrieved through a bootstrap process for Phabricator
first_index = None
last_index = 12
entries_per_page = 10
convert_type = int
def request_index(self, request):
"""(Override) This is needed to emulate the listing bootstrap
when no min_bound is provided to run
"""
m = self.test_re.search(request.path_url)
idx = m.group(1)
if idx not in ("", "None"):
return int(idx)
def get_fl(self, override_config=None):
"""(Override) Retrieve an instance of fake lister (fl).
"""
if override_config or self.fl is None:
credentials = {"phabricator": {"fake": [{"password": "toto"}]}}
override_config = dict(credentials=credentials, **(override_config or {}))
self.fl = self.Lister(
url="https://fakeurl", instance="fake", override_config=override_config
)
self.fl.INITIAL_BACKOFF = 1
self.fl.reset_backoff()
return self.fl
def test_get_repo_url(self):
f = open(
"swh/lister/%s/tests/%s" % (self.lister_subdir, self.good_api_response_file)
)
def test_get_repo_url():
with importlib.resources.open_text(
"swh.lister.phabricator.tests.data", "api_first_response.json"
) as f:
api_response = json.load(f)
repos = api_response["result"]["data"]
for repo in repos:
self.assertEqual(
"https://forge.softwareheritage.org/source/%s.git"
% (repo["fields"]["shortName"]),
get_repo_url(repo["attachments"]["uris"]["uris"]),
)
f = open(
"swh/lister/%s/tests/%s"
% (self.lister_subdir, self.good_api_response_undefined_protocol)
repos = api_response["result"]["data"]
for repo in repos:
expected_name = "https://forge.softwareheritage.org/source/%s.git" % (
repo["fields"]["shortName"]
)
assert get_repo_url(repo["attachments"]["uris"]["uris"]) == expected_name
with importlib.resources.open_text(
"swh.lister.phabricator.tests.data", "api_response_undefined_protocol.json",
) as f:
repo = json.load(f)
self.assertEqual(
"https://svn.blender.org/svnroot/bf-blender/",
get_repo_url(repo["attachments"]["uris"]["uris"]),
)
@requests_mock.Mocker()
def test_scheduled_tasks(self, http_mocker):
self.scheduled_tasks_test("data/api_next_response.json", 23, http_mocker)
@requests_mock.Mocker()
def test_scheduled_tasks_multiple_instances(self, http_mocker):
fl = self.create_fl_with_db(http_mocker)
# list first Phabricator instance
fl.run()
fl.instance = "other_fake"
fl.config["credentials"] = {
"phabricator": {"other_fake": [{"password": "foo"}]}
}
# list second Phabricator instance hosting repositories having
# same ids as those listed from the first instance
self.good_api_response_file = "data/api_first_response_other_instance.json"
self.last_index = 13
fl.run()
# check expected number of loading tasks
self.assertEqual(len(self.scheduler_tasks), 2 * self.entries_per_page)
# check tasks are not disabled
for task in self.scheduler_tasks:
self.assertTrue(task["status"] != "disabled")
def test_phabricator_lister(lister_phabricator, requests_mock_datadir):
lister = lister_phabricator
assert lister.url == lister.DEFAULT_URL
assert lister.instance == "forge.softwareheritage.org"
lister.run()
r = lister.scheduler.search_tasks(task_type="load-git")
assert len(r) == 10
for row in r:
assert row["type"] == "load-git"
# arguments check
args = row["arguments"]["args"]
assert len(args) == 0
# kwargs
kwargs = row["arguments"]["kwargs"]
url = kwargs["url"]
assert lister.instance in url
assert row["policy"] == "recurring"
assert row["priority"] is None
expected_name = "https://svn.blender.org/svnroot/bf-blender/"
assert get_repo_url(repo["attachments"]["uris"]["uris"]) == expected_name

View file

@ -9,6 +9,14 @@ from swh.lister.cli import SUPPORTED_LISTERS, get_lister
from .test_utils import init_db
lister_args = {
"phabricator": {
"instance": "softwareheritage",
"url": "https://forge.softwareheritage.org/api/diffusion.repository.search",
"api_token": "bogus",
},
}
def test_get_lister_wrong_input():
"""Unsupported lister should raise"""
@ -25,7 +33,10 @@ def test_get_lister(swh_scheduler_config):
db_url = init_db().url()
for lister_name in SUPPORTED_LISTERS:
lst = get_lister(
lister_name, db_url, scheduler={"cls": "local", **swh_scheduler_config}
lister_name,
db_url,
scheduler={"cls": "local", **swh_scheduler_config},
**lister_args.get(lister_name, {}),
)
assert hasattr(lst, "run")
@ -38,7 +49,6 @@ def test_get_lister_override():
listers = {
"gitlab": "https://other.gitlab.uni/api/v4/",
"phabricator": "https://somewhere.org/api/diffusion.repository.search",
"cgit": "https://some.where/cgit",
}