You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

388 lines
15 KiB

11 months ago
import os
from collections import namedtuple
import pytest
from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common import repofileutils, rhsm
from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked
from leapp.libraries.stdlib import api, CalledProcessError
from leapp.models import RepositoryData, RepositoryFile
from leapp.utils.report import is_inhibitor
Repository = namedtuple('Repository', ['repoid', 'file'])
LIST_SEPARATOR = '\n - '
# External commands called by the RHSM library
CMD_RHSM_LIST_CONSUMED = ('subscription-manager', 'list', '--consumed')
CMD_RHSM_STATUS = ('subscription-manager', 'status')
CMD_RHSM_RELEASE = ('subscription-manager', 'release')
CMD_RHSM_LIST_ENABLED_REPOS = ('subscription-manager', 'repos', '--list-enabled')
RHSM_STATUS_OUTPUT_NOSCA = '''
+-------------------------------------------+
System Status Details
+-------------------------------------------+
Overall Status: Current
System Purpose Status: Not Specified
'''
RHSM_STATUS_OUTPUT_SCA = '''
+-------------------------------------------+
System Status Details
+-------------------------------------------+
Overall Status: Current
System Purpose Status: Matched
Content Access Mode is set to Simple Content Access
'''
# Used to simulate realistic output of RHSM, therefore carries more information than `Repository` namedtuple
RHSMRepositoryEntry = namedtuple('RHSMRepositoryEntry', ('id', 'name', 'url', 'enabled')) # For clarity purposes
RHSM_ENABLED_REPOS = [
RHSMRepositoryEntry(
id='rhel-8-for-x86_64-appstream-rpms',
name='Appstream',
url='some_url',
enabled='1'),
RHSMRepositoryEntry(
id='satellite-tools-6.6-for-rhel-8-x86_64-rpms',
name='Satellite',
url='some_url',
enabled='1'),
RHSMRepositoryEntry(
id='rhel-8-for-x86_64-baseos-rpms',
name='Base',
url='some_url',
enabled='1')
]
class IsolatedActionsMocked(object):
def __init__(self, call_stdout=None, raise_err=False):
self.commands_called = []
self.call_return = {'stdout': call_stdout, 'stderr': None}
self.raise_err = raise_err
# A map from called commands to their mocked output
self.mocked_command_call_outputs = dict()
def call(self, cmd, *args, **dummy_kwargs):
self.commands_called.append(cmd)
if self.raise_err:
raise_call_error(cmd)
return self.mocked_command_call_outputs.get(
tuple(cmd), # Cast to tuple, as list is not hashable
self.call_return)
def add_mocked_command_call_with_stdout(self, cmd, stdout):
# We cast `cmd` from list to tuple, as a list cannot be hashed
self.mocked_command_call_outputs[tuple(cmd)] = {
'stdout': stdout,
'stderr': None}
def full_path(self, path):
return path
@pytest.fixture
def actor_mocked(monkeypatch):
"""
Fixture providing a mocked actor that was already used to monkeypatch api.current_actor.
Introduced to reduce repetition inside tests.
"""
actor = CurrentActorMocked()
monkeypatch.setattr(api, 'current_actor', actor)
return actor
@pytest.fixture
def context_mocked():
return IsolatedActionsMocked()
def raise_call_error(args=None, exit_code=1):
raise CalledProcessError(
message='Command {0} failed with exit code {1}.'.format(str(args), exit_code),
command=args,
result={'signal': None, 'exit_code': exit_code, 'pid': 0, 'stdout': 'fake out', 'stderr': 'fake err'}
)
def _gen_repo(repoid):
return RepositoryData(repoid=repoid, name='name {}'.format(repoid))
def _gen_repofile(rfile, data=None):
if data is None:
data = [_gen_repo("{}-{}".format(rfile.split("/")[-1], i)) for i in range(3)]
return RepositoryFile(file=rfile, data=data)
@pytest.mark.parametrize('other_repofiles', [
[],
[_gen_repofile("foo")],
[_gen_repofile("foo"), _gen_repofile("bar")],
])
@pytest.mark.parametrize('rhsm_repofile', [
None,
_gen_repofile(rhsm._DEFAULT_RHSM_REPOFILE, []),
_gen_repofile(rhsm._DEFAULT_RHSM_REPOFILE, [_gen_repo("rh-0")]),
_gen_repofile(rhsm._DEFAULT_RHSM_REPOFILE),
])
def test_get_available_repo_ids(monkeypatch, other_repofiles, rhsm_repofile):
context_mocked = IsolatedActionsMocked()
repos = other_repofiles[:]
if rhsm_repofile:
repos.append(rhsm_repofile)
rhsm_repos = [repo.repoid for repo in rhsm_repofile.data] if rhsm_repofile else []
monkeypatch.setattr(api, 'current_logger', logger_mocked())
monkeypatch.setattr(rhsm, '_inhibit_on_duplicate_repos', lambda x: None)
monkeypatch.setattr(repofileutils, 'get_parsed_repofiles', lambda x: repos)
result = rhsm.get_available_repo_ids(context_mocked)
rhsm_repos.sort()
assert context_mocked.commands_called == [['yum', 'clean', 'all']]
assert result == rhsm_repos
if result:
msg = (
'The following repoids are available through RHSM:{0}{1}'
.format(LIST_SEPARATOR, LIST_SEPARATOR.join(rhsm_repos))
)
assert msg in api.current_logger.infomsg
else:
assert 'There are no repos available through RHSM.' in api.current_logger.infomsg
def test_get_available_repo_ids_error():
context_mocked = IsolatedActionsMocked(raise_err=True)
with pytest.raises(StopActorExecutionError) as err:
rhsm.get_available_repo_ids(context_mocked)
assert 'Unable to use yum' in str(err)
def test_inhibit_on_duplicate_repos(monkeypatch):
monkeypatch.setattr(reporting, 'create_report', create_report_mocked())
monkeypatch.setattr(api, 'current_logger', logger_mocked())
repofiles = [
_gen_repofile("foo", [_gen_repo('repoX'), _gen_repo('repoY')]),
_gen_repofile("bar", [_gen_repo('repoX')]),
]
rhsm._inhibit_on_duplicate_repos(repofiles)
dups = ['repoX']
assert ('The following repoids are defined multiple times:{0}{1}'
.format(LIST_SEPARATOR, LIST_SEPARATOR.join(dups))) in api.current_logger.warnmsg
assert reporting.create_report.called == 1
assert is_inhibitor(reporting.create_report.report_fields)
assert reporting.create_report.report_fields['title'] == 'A YUM/DNF repository defined multiple times'
summary = ('The following repositories are defined multiple times:{0}{1}'
.format(LIST_SEPARATOR, LIST_SEPARATOR.join(dups)))
assert summary in reporting.create_report.report_fields['summary']
def test_inhibit_on_duplicate_repos_no_dups(monkeypatch):
monkeypatch.setattr(reporting, 'create_report', create_report_mocked())
monkeypatch.setattr(api, 'current_logger', logger_mocked())
rhsm._inhibit_on_duplicate_repos([_gen_repofile("foo")])
assert not api.current_logger.warnmsg
assert reporting.create_report.called == 0
def test_sku_listing(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the rhsm library can obtain used SKUs correctly."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_LIST_CONSUMED, 'SKU: 598339696910')
attached_skus = rhsm.get_attached_skus(context_mocked)
assert_fail_description = 'Some calls to subscription-manager were expected.'
assert context_mocked.commands_called, assert_fail_description
assert_fail_description = 'RHSM command reported 1 SKU, however {0} were detected.'.format(
len(attached_skus)
)
assert len(attached_skus) == 1, assert_fail_description
assert_fail_description = 'The parsed SKU is different than the one contained in the mocked RHSM output.'
assert attached_skus[0] == '598339696910', assert_fail_description
def test_scanrhsminfo_with_skip_rhsm(monkeypatch, context_mocked):
"""Tests whether the scan_rhsm_info respects the LEAPP_NO_RHSM environmental variable."""
mocked_actor = CurrentActorMocked(envars={'LEAPP_NO_RHSM': '1'})
monkeypatch.setattr(api, 'current_actor', mocked_actor)
result = rhsm.scan_rhsm_info(context_mocked)
assert_fail_description = 'No external shell commands should be executed when RHSM is skipped.'
assert not context_mocked.commands_called, assert_fail_description
assert result is None, 'The `scan_rhsm_info` should not provide any output when RHSM is skipped.'
def test_get_release(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library correctly retrieves release from RHSM."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_RELEASE, 'Release: 7.9')
release = rhsm.get_release(context_mocked)
assert release, 'No release information detected (but valid release info was provided).'
assert release == '7.9', 'Detected release is incorrect.'
def test_get_release_with_release_not_set(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library does not retrieve release information when the release is not set."""
# Test whether no release is detected correctly too
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_RELEASE, 'Release not set')
release = rhsm.get_release(context_mocked)
fail_description = 'The release information was obtained, even if "No release set" was repored by rhsm.'
assert not release, fail_description
def test_is_manifest_sca_on_nonsca_system(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library obtains the SCA information correctly from a non-SCA system."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_STATUS, RHSM_STATUS_OUTPUT_NOSCA)
is_sca = rhsm.is_manifest_sca(context_mocked)
assert not is_sca, 'SCA was detected on a non-SCA system.'
def test_is_manifest_sca_on_sca_system(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library obtains the SCA information from SCA system correctly."""
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_STATUS, RHSM_STATUS_OUTPUT_SCA)
is_sca = rhsm.is_manifest_sca(context_mocked)
assert is_sca, 'Failed to detected SCA on a SCA system.'
def test_get_enabled_repo_ids(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library retrieves correct information about enabled repositories."""
# Prepare the (realistic) RHSM output
rhsm_list_enabled_output = '''
+----------------------------------------------------------+
Available Repositories in /etc/yum.repos.d/redhat.repo
+----------------------------------------------------------+
'''
for enabled_repository in RHSM_ENABLED_REPOS:
rhsm_output_fragment = 'Repo ID: {0}\n'.format(enabled_repository.id)
rhsm_output_fragment += 'Repo Name: {0}\n'.format(enabled_repository.name)
rhsm_output_fragment += 'Repo URL: {0}\n'.format(enabled_repository.url)
rhsm_output_fragment += 'Enabled: {0}\n'.format(enabled_repository.enabled)
rhsm_output_fragment += '\n'
rhsm_list_enabled_output += rhsm_output_fragment
context_mocked.add_mocked_command_call_with_stdout(CMD_RHSM_LIST_ENABLED_REPOS, rhsm_list_enabled_output)
enabled_repo_ids = rhsm.get_enabled_repo_ids(context_mocked)
fail_description = 'Failed to detected enabled repositories on the system.'
assert len(enabled_repo_ids) == 3, fail_description
fail_description = 'Failed to retrieve repository ID provided in the RHSM output.'
for enabled_repository in RHSM_ENABLED_REPOS:
assert enabled_repository.id in enabled_repo_ids, fail_description
def test_get_existing_product_certificates(monkeypatch, actor_mocked, context_mocked):
"""Verifies that the library is able to correctly retrieve existing product certificates."""
CERT_DIRS_LAYOUT = {
'/etc/pki/product': ['cert1', 'cert2'],
'/etc/pki/product-default': ['cert3']
}
def mocked_isdir(path):
if path in CERT_DIRS_LAYOUT:
return True
err_message = 'RHSM library should not gather info about additional dirs (attempted to isdir: {0}).'
raise ValueError(err_message.format(path))
def mocked_listdir(path):
if path in CERT_DIRS_LAYOUT:
return CERT_DIRS_LAYOUT[path]
err_message = 'RHSM library should not listdir additional dirs (attempted to listdir: {0}).'
raise ValueError(err_message.format(path))
def mocked_isfile(path):
if path in CERT_DIRS_LAYOUT:
# The certificate directories are not files
return False
basename = os.path.basename(path)
dirname = os.path.dirname(path)
if dirname in CERT_DIRS_LAYOUT:
return basename in CERT_DIRS_LAYOUT[dirname]
err_message = 'RHSM library should not isfile additional paths (attempted to isfile: {0}).'
raise ValueError(err_message.format(path))
monkeypatch.setattr(rhsm.os.path, 'isdir', mocked_isdir)
monkeypatch.setattr(rhsm.os, 'listdir', mocked_listdir)
monkeypatch.setattr(rhsm.os.path, 'isfile', mocked_isfile)
existing_product_certificates = rhsm.get_existing_product_certificates(context_mocked)
fail_description = 'Retrieved different number of certificates than expected.'
assert len(existing_product_certificates) == 3, fail_description
fail_description_bad_dir = 'Found certificate in unexpected path: {0}'
fail_description_bad_cert_file = 'Found certificate file that was not provided by mocked output: {0}'
for certificate_path in existing_product_certificates:
dirname = os.path.dirname(certificate_path)
basename = os.path.basename(certificate_path)
assert dirname in CERT_DIRS_LAYOUT, fail_description_bad_dir.format(certificate_path)
assert basename in CERT_DIRS_LAYOUT[dirname], fail_description_bad_cert_file.format(certificate_path)
def test_get_existing_product_certificates_missing_cert_directory(monkeypatch, actor_mocked, context_mocked):
"""Tests whether the library is able to retrieve certificates even if /etc/pki/product is missing."""
def mocked_isdir(path):
if path == '/etc/pki/product':
return False # Directory is missing
if path == '/etc/pki/product-default':
return True
err_msg = 'Tried to isdir a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))
def mocked_isfile(path):
if path == '/etc/pki/product-default/cert':
return True
err_msg = 'Tried to use isfile on a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))
def mocked_listdir(path):
if path == '/etc/pki/product-default':
return ['cert']
err_msg = 'Tried to use listdir on a path that is not a part of the mocked paths. Path: {0}'
raise ValueError(err_msg.format(path))
monkeypatch.setattr(rhsm.os.path, 'isdir', mocked_isdir)
monkeypatch.setattr(rhsm.os, 'listdir', mocked_listdir)
monkeypatch.setattr(rhsm.os.path, 'isfile', mocked_isfile)
existing_product_certificates = rhsm.get_existing_product_certificates(context_mocked)
fail_description = 'Library identified more certificates than there are in mocked outputs.'
assert len(existing_product_certificates) == 1, fail_description
fail_description = 'Library failed to identify certificate from mocked outputs.'
assert existing_product_certificates[0] == '/etc/pki/product-default/cert', fail_description