cran: Use pyreadr instead of rpy2 to read a RDS file from Python
The CRAN lister improvements introduced in 91e4e33
originally used pyreadr
to read a RDS file from Python instead of rpy2.
As swh-lister was still packaged for debian at the time, the choice of using
rpy2 instead was made as a debian package is available for it while it is not
for pyreadr.
Now debian packaging was dropped for swh-lister we can reinstate the pyreadr
based implementation which has the advantages of being faster and not depending
on the R language runtime.
Related to swh/meta#1709.
This commit is contained in:
parent
42d8e24d7e
commit
4aee4da784
4 changed files with 47 additions and 53 deletions
6
mypy.ini
6
mypy.ini
|
@ -31,6 +31,9 @@ ignore_missing_imports = True
|
|||
[mypy-pkg_resources.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-pyreadr.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-pytest.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
|
@ -40,9 +43,6 @@ ignore_missing_imports = True
|
|||
[mypy-requests_mock.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-rpy2.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-urllib3.util.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
|
|
|
@ -5,10 +5,10 @@ iso8601
|
|||
launchpadlib
|
||||
lxml
|
||||
psycopg2
|
||||
pyreadr
|
||||
python_debian
|
||||
repomd
|
||||
requests
|
||||
rpy2
|
||||
setuptools
|
||||
tenacity >= 6.2
|
||||
testing.postgresql
|
||||
|
|
|
@ -3,14 +3,14 @@
|
|||
# See top-level LICENSE file for more information
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from rpy2 import robjects
|
||||
import iso8601
|
||||
import pyreadr
|
||||
|
||||
from swh.lister.pattern import CredentialsType, StatelessLister
|
||||
from swh.scheduler.interface import SchedulerInterface
|
||||
|
@ -64,23 +64,14 @@ class CRANLister(StatelessLister[PageType]):
|
|||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
package_artifacts: Dict[str, Dict[str, Any]] = defaultdict(dict)
|
||||
dest_path = os.path.join(tmpdir, os.path.basename(CRAN_INFO_DB_URL))
|
||||
|
||||
response = self.http_request(CRAN_INFO_DB_URL, stream=True)
|
||||
with open(dest_path, "wb") as rds_file:
|
||||
for chunk in response.iter_content(chunk_size=1024):
|
||||
rds_file.write(chunk)
|
||||
|
||||
logger.debug("Fetching %s file to %s", CRAN_INFO_DB_URL, dest_path)
|
||||
dest_path = pyreadr.download_file(CRAN_INFO_DB_URL, dest_path)
|
||||
logger.debug("Parsing %s file", dest_path)
|
||||
robjects.r(f"cran_info_db_df <- readRDS('{dest_path}')")
|
||||
r_df = robjects.r["cran_info_db_df"]
|
||||
colnames = list(r_df.colnames)
|
||||
|
||||
def _get_col_value(row, colname):
|
||||
return r_df[colnames.index(colname)][row]
|
||||
cran_db_df = pyreadr.read_r(dest_path)[None]
|
||||
|
||||
logger.debug("Processing CRAN packages")
|
||||
for i in range(r_df.nrow):
|
||||
tarball_path = r_df.rownames[i]
|
||||
for package_artifact_metadata in cran_db_df.itertuples():
|
||||
tarball_path = package_artifact_metadata[0]
|
||||
package_info = tarball_path.split("/")[-1].replace(".tar.gz", "")
|
||||
if "_" not in package_info and "-" not in package_info:
|
||||
# skip package artifact with no version
|
||||
|
@ -98,11 +89,9 @@ class CRANLister(StatelessLister[PageType]):
|
|||
),
|
||||
"version": package_version,
|
||||
"package": package_name,
|
||||
"checksums": {"length": int(_get_col_value(i, "size"))},
|
||||
"mtime": (
|
||||
datetime.fromtimestamp(
|
||||
_get_col_value(i, "mtime"), tz=timezone.utc
|
||||
)
|
||||
"checksums": {"length": int(package_artifact_metadata.size)},
|
||||
"mtime": iso8601.parse_date(
|
||||
package_artifact_metadata.mtime.isoformat()
|
||||
),
|
||||
}
|
||||
|
||||
|
|
|
@ -6,18 +6,10 @@
|
|||
from os import path
|
||||
|
||||
import pandas
|
||||
import pyreadr
|
||||
import pytest
|
||||
from rpy2 import robjects
|
||||
from rpy2.robjects import pandas2ri
|
||||
from rpy2.robjects.conversion import localconverter
|
||||
|
||||
try:
|
||||
from rpy2.robjects.conversion import py2rpy
|
||||
except ImportError:
|
||||
# for old rpy2 versions (fix debian buster package build)
|
||||
from rpy2.robjects.pandas2ri import py2ri as py2rpy # noqa
|
||||
|
||||
from swh.lister.cran.lister import CRAN_INFO_DB_URL, CRAN_MIRROR_URL, CRANLister
|
||||
from swh.lister.cran.lister import CRAN_MIRROR_URL, CRANLister
|
||||
|
||||
CRAN_INFO_DB_DATA = {
|
||||
"/srv/ftp/pub/R/src/contrib/Archive/zooimage/zooimage_3.0-3.tar.gz": {
|
||||
|
@ -103,31 +95,46 @@ def cran_info_db_rds_path(tmp_path):
|
|||
orient="index",
|
||||
)
|
||||
rds_path = path.join(tmp_path, "cran_info_db.rds")
|
||||
# Convert pandas dataframe to R dataframe
|
||||
with localconverter(robjects.default_converter + pandas2ri.converter):
|
||||
r_df = py2rpy(df)
|
||||
robjects.r.assign("cran_info_db_df", r_df)
|
||||
robjects.r(f"saveRDS(cran_info_db_df, file='{rds_path}')")
|
||||
pyreadr.write_rds(rds_path, df)
|
||||
return rds_path
|
||||
|
||||
|
||||
def test_cran_lister_cran(swh_scheduler, requests_mock, cran_info_db_rds_path):
|
||||
def test_cran_lister_cran(swh_scheduler, mocker, cran_info_db_rds_path):
|
||||
lister = CRANLister(swh_scheduler)
|
||||
|
||||
with open(cran_info_db_rds_path, "rb") as cran_db_rds:
|
||||
mock_download_file = mocker.patch("swh.lister.cran.lister.pyreadr.download_file")
|
||||
mock_download_file.return_value = cran_info_db_rds_path
|
||||
|
||||
requests_mock.get(CRAN_INFO_DB_URL, body=cran_db_rds)
|
||||
read_r = pyreadr.read_r
|
||||
|
||||
lister = CRANLister(swh_scheduler)
|
||||
def read_r_restore_data_lost_by_write_r(*args, **kwargs):
|
||||
result = read_r(*args, **kwargs)
|
||||
# DataFrame index is lost when calling pyreadr.write_rds so recreate
|
||||
# the same one as in original cran_info_db.rds file
|
||||
# https://github.com/ofajardo/pyreadr/issues/68
|
||||
result[None]["rownames"] = list(CRAN_INFO_DB_DATA.keys())
|
||||
result[None].set_index("rownames", inplace=True)
|
||||
# pyreadr.write_rds serializes datetime to string so restore datetime type
|
||||
# as in original cran_info_db.rds file
|
||||
for dt_column in ("mtime", "ctime", "atime"):
|
||||
result[None][dt_column] = pandas.to_datetime(
|
||||
result[None][dt_column], utc=True
|
||||
)
|
||||
return result
|
||||
|
||||
stats = lister.run()
|
||||
mocker.patch(
|
||||
"swh.lister.cran.lister.pyreadr.read_r",
|
||||
wraps=read_r_restore_data_lost_by_write_r,
|
||||
)
|
||||
|
||||
assert stats.pages == 1
|
||||
assert stats.origins == 2
|
||||
stats = lister.run()
|
||||
|
||||
scheduler_origins = {
|
||||
o.url: o
|
||||
for o in swh_scheduler.get_listed_origins(lister.lister_obj.id).results
|
||||
}
|
||||
assert stats.pages == 1
|
||||
assert stats.origins == 2
|
||||
|
||||
scheduler_origins = {
|
||||
o.url: o for o in swh_scheduler.get_listed_origins(lister.lister_obj.id).results
|
||||
}
|
||||
|
||||
assert set(scheduler_origins.keys()) == {
|
||||
f"{CRAN_MIRROR_URL}/package=zooimage",
|
||||
|
@ -189,7 +196,6 @@ def test_cran_lister_cran(swh_scheduler, requests_mock, cran_info_db_rds_path):
|
|||
def test_lister_cran_instantiation_with_credentials(
|
||||
credentials, expected_credentials, swh_scheduler
|
||||
):
|
||||
|
||||
lister = CRANLister(swh_scheduler, credentials=credentials)
|
||||
|
||||
# Credentials are allowed in constructor
|
||||
|
@ -197,7 +203,6 @@ def test_lister_cran_instantiation_with_credentials(
|
|||
|
||||
|
||||
def test_lister_cran_from_configfile(swh_scheduler_config, mocker):
|
||||
|
||||
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
|
||||
load_from_envvar.return_value = {
|
||||
"scheduler": {"cls": "local", **swh_scheduler_config},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue