add fdroid lister
This commit is contained in:
parent
213a4a152f
commit
d5d56e3a16
4 changed files with 112 additions and 0 deletions
12
swh/lister/f_droid/__init__.py
Normal file
12
swh/lister/f_droid/__init__.py
Normal file
|
@ -0,0 +1,12 @@
|
|||
# Copyright (C) 2019-2021 the Software Heritage developers
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
|
||||
def register():
|
||||
from .lister import FDroidLister
|
||||
|
||||
return {
|
||||
"lister": FDroidLister,
|
||||
"task_modules": ["%s.tasks" % __name__],
|
||||
}
|
80
swh/lister/f_droid/lister.py
Normal file
80
swh/lister/f_droid/lister.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
# Copyright (C) 2021-2022 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, Iterator, List
|
||||
|
||||
from swh.scheduler.interface import SchedulerInterface
|
||||
from swh.scheduler.model import ListedOrigin
|
||||
|
||||
|
||||
from ..pattern import CredentialsType, StatelessLister
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
FDroidListerPage = List[Dict[str, Any]]
|
||||
|
||||
|
||||
class FDroidLister(StatelessLister[FDroidListerPage]):
|
||||
"""List origins from the FDroid."""
|
||||
|
||||
LISTER_NAME = "f_droid"
|
||||
INSTANCE = "f_droid"
|
||||
VISIT_TYPE = "f_droid"
|
||||
REPO_INDEX_V2_URL = "https://f-droid.org/repo/index-v2.json"
|
||||
PACKAGE_URL_TEMPLATE = "https://f-droid.org/packages/{package_id}"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
scheduler: SchedulerInterface,
|
||||
url: str = REPO_INDEX_V2_URL,
|
||||
instance: str = INSTANCE,
|
||||
credentials: CredentialsType = None,
|
||||
):
|
||||
super().__init__(
|
||||
scheduler=scheduler,
|
||||
credentials=credentials,
|
||||
url=url,
|
||||
instance=instance,
|
||||
)
|
||||
|
||||
self.session.headers.update({"Accept": "application/json"})
|
||||
|
||||
def get_pages(self) -> Iterator[FDroidListerPage]:
|
||||
data = self.http_request(self.url).json()
|
||||
|
||||
if "packages" not in data or not isinstance(data["packages"], dict):
|
||||
raise RuntimeError(
|
||||
f"Invalid response from {self.url}: missing 'packages' key"
|
||||
)
|
||||
|
||||
packages: Dict[str, Any] = data["packages"]
|
||||
return iter([[packages]])
|
||||
|
||||
def get_origins_from_page(self, page: FDroidListerPage) -> Iterator[ListedOrigin]:
|
||||
"""Convert a page of FDroidLister repositories into a list of ListedOrigins"""
|
||||
assert self.lister_obj.id is not None
|
||||
|
||||
for item in page:
|
||||
for id, value in item.items():
|
||||
metadata = value.get("metaData", {})
|
||||
versions = value.get("versions", {})
|
||||
last_updated = metadata.get("lastUpdated", None)
|
||||
if last_updated is not None:
|
||||
last_updated = datetime.fromtimestamp(last_updated)
|
||||
|
||||
yield ListedOrigin(
|
||||
lister_id=self.lister_obj.id,
|
||||
visit_type=self.VISIT_TYPE,
|
||||
url=self.PACKAGE_URL_TEMPLATE.format(
|
||||
package_id=id,
|
||||
),
|
||||
last_update=last_updated,
|
||||
extra_loader_arguments={
|
||||
"metadata": metadata,
|
||||
"versions": versions,
|
||||
},
|
||||
)
|
19
swh/lister/f_droid/tasks.py
Normal file
19
swh/lister/f_droid/tasks.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
# Copyright (C) 2018-2023 the Software Heritage developers
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
from .lister import FDroidLister
|
||||
|
||||
|
||||
@shared_task(name=f"{__name__}.FDroidListerTask")
|
||||
def list_f_droid(**lister_args):
|
||||
"Full listing of the FDroid registry"
|
||||
lister = FDroidLister.from_configfile(**lister_args)
|
||||
return lister.run().dict()
|
||||
|
||||
|
||||
@shared_task(name=__name__ + ".ping")
|
||||
def _ping():
|
||||
return "OK"
|
Loading…
Add table
Add a link
Reference in a new issue