pypi.lister: Add integration test which checks the scheduled tasks
Related T2032
This commit is contained in:
parent
8d50e0d941
commit
599af25ad6
3 changed files with 23 additions and 55 deletions
|
@ -1,4 +1,5 @@
|
|||
# Copyright (C) 2018-2019 the Software Heritage developers
|
||||
# Copyright (C) 2018-2019 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
|
|
|
@ -1,64 +1,31 @@
|
|||
# Copyright (C) 2019 the Software Heritage developers
|
||||
# Copyright (C) 2019 The Software Heritage developers
|
||||
# See the AUTHORS file at the top-level directory of this distribution
|
||||
# License: GNU General Public License version 3, or any later version
|
||||
# See top-level LICENSE file for more information
|
||||
|
||||
import requests_mock
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
from swh.lister.pypi.lister import PyPILister
|
||||
from swh.lister.core.tests.test_lister import HttpSimpleListerTester
|
||||
|
||||
lister = PyPILister()
|
||||
def test_pypi_lister_(swh_listers, requests_mock_datadir):
|
||||
lister = swh_listers['pypi']
|
||||
|
||||
expected_packages = ['0lever-so', '0lever-utils', '0-orchestrator', '0wned']
|
||||
lister.run()
|
||||
|
||||
expected_model = {
|
||||
'uid': 'arrow',
|
||||
'name': 'arrow',
|
||||
'full_name': 'arrow',
|
||||
'html_url': 'https://pypi.org/pypi/arrow/json',
|
||||
'origin_url': 'https://pypi.org/project/arrow/',
|
||||
'origin_type': 'pypi',
|
||||
}
|
||||
r = lister.scheduler.search_tasks(task_type='load-pypi')
|
||||
assert len(r) == 4
|
||||
|
||||
for row in r:
|
||||
assert row['type'] == 'load-pypi'
|
||||
# arguments check
|
||||
args = row['arguments']['args']
|
||||
assert len(args) == 2
|
||||
|
||||
class PyPIListerTester(HttpSimpleListerTester, unittest.TestCase):
|
||||
Lister = PyPILister
|
||||
PAGE = 'https://pypi.org/simple/'
|
||||
lister_subdir = 'pypi'
|
||||
good_api_response_file = 'api_response.html'
|
||||
entries = 4
|
||||
project = args[0]
|
||||
url = args[1]
|
||||
assert url == 'https://pypi.org/project/%s/' % project
|
||||
|
||||
@requests_mock.Mocker()
|
||||
def test_list_packages(self, http_mocker):
|
||||
"""List packages from simple api page should retrieve all packages within
|
||||
# kwargs
|
||||
kwargs = row['arguments']['kwargs']
|
||||
meta_url = kwargs['project_metadata_url']
|
||||
assert meta_url == 'https://pypi.org/pypi/%s/json' % project
|
||||
|
||||
"""
|
||||
http_mocker.get(self.PAGE, text=self.mock_response)
|
||||
fl = self.get_fl()
|
||||
packages = fl.list_packages(self.get_api_response(0))
|
||||
|
||||
for package in expected_packages:
|
||||
assert package in packages
|
||||
|
||||
def test_transport_response_simplified(self):
|
||||
"""Test model created by the lister
|
||||
|
||||
"""
|
||||
fl = self.get_fl()
|
||||
model = fl.transport_response_simplified(['arrow'])
|
||||
assert len(model) == 1
|
||||
for key, values in model[0].items():
|
||||
assert values == expected_model[key]
|
||||
|
||||
def test_task_dict(self):
|
||||
"""Test the task creation of lister
|
||||
|
||||
"""
|
||||
with patch('swh.lister.pypi.lister.utils.create_task_dict') as mock_create_tasks: # noqa
|
||||
lister.task_dict(origin_type='pypi', origin_url='https://abc',
|
||||
name='test_pack', html_url='https://def')
|
||||
|
||||
mock_create_tasks.assert_called_once_with(
|
||||
'load-pypi', 'recurring', 'test_pack', 'https://abc',
|
||||
project_metadata_url='https://def')
|
||||
assert row['policy'] == 'recurring'
|
||||
assert row['priority'] is None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue