You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
7.9 KiB

#!/usr/bin/env python3
import argparse
import functools
import sys
import textwrap
import yaml
import contextlib
import os
import struct
import tempfile
from typing import Iterator, Tuple
import plumbum
import cryptography.exceptions
import cryptography.x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey
import rpm
class NoIMASignatureError(Exception):
pass
class IMASignatureFormatError(Exception):
pass
def init_arg_parser() -> argparse.ArgumentParser:
"""
Initializes a command line arguments parser.
Returns:
Command line arguments parser.
"""
parser = argparse.ArgumentParser(
prog='rpm-ima-inspector',
description='Verifies RPM package IMA signatures'
)
parser.add_argument('-c', '--ima-cert', required=True,
help='public IMA certificate DER file path')
parser.add_argument('rpm_file', metavar='RPM_FILE', help='RPM file path')
return parser
def init_rpm_ts() -> rpm.TransactionSet:
"""
Initializes an RPM transaction.
Returns:
RPM transaction.
"""
ts = rpm.TransactionSet()
ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
return ts
def load_pub_key_from_der(cert_path: str) -> EllipticCurvePublicKey:
"""
Loads a public key from a DER-formatted certificate file.
Args:
cert_path: Certificate file path.
Returns:
Public key.
"""
with open(cert_path, 'rb') as fd:
cert = cryptography.x509.load_der_x509_certificate(fd.read())
return cert.public_key()
def get_pub_key_id(pub_key: EllipticCurvePublicKey) -> str:
"""
Extracts a key ID from a public key.
Args:
pub_key: Public key.
Returns:
Public key ID.
"""
key_id = cryptography.x509.SubjectKeyIdentifier.from_public_key(pub_key)
return key_id.digest[-4:].hex()
def read_rpm_header(rpm_path: str, ts: rpm.TransactionSet) -> rpm.hdr:
with open(rpm_path, 'r') as fd:
return ts.hdrFromFdno(fd)
def get_rpm_ima_signatures(
rpm_path: str, ts: rpm.TransactionSet
) -> Iterator[Tuple[str, str]]:
"""
Iterates over an RPM package files and their signatures.
Args:
rpm_path: RPM file path.
ts: RPM transaction set.
Returns:
Iterator over RPM package files and their signatures.
"""
with open(rpm_path, 'rb') as fd:
hdr = ts.hdrFromFdno(fd)
files = hdr[rpm.RPMTAG_FILENAMES]
signatures = hdr[rpm.RPMTAG_FILESIGNATURES]
if len(files) != len(signatures):
raise NoIMASignatureError(f'there are no IMA signatures in {rpm_path}')
return zip(files, signatures)
@contextlib.contextmanager
def unpack_rpm_to_tmp(rpm_path: str):
rpm2cpio = plumbum.local['rpm2cpio']
cpio = plumbum.local['cpio']
with tempfile.TemporaryDirectory() as tmp_dir:
with plumbum.local.cwd(tmp_dir):
cmd = rpm2cpio[rpm_path] | cpio['-idmv', '--no-absolute-filenames']
cmd()
yield tmp_dir
def parse_ima_signature(sig_hdr: str) -> Tuple[str, bytes]:
"""
Args:
sig_hdr:
Notes:
The constant values are taken from the imaevm.h file.
See the signature_v2_hdr structure definition in the imaevm.h file
for a signature header format description.
Returns:
Public key ID and a file signature.
"""
EVM_IMA_XATTR_DIGSIG = 3
DIGSIG_VERSION_2 = 2
PKEY_HASH_SHA256 = 4
byte_sign = bytearray.fromhex(sig_hdr)
if byte_sign[0] != EVM_IMA_XATTR_DIGSIG:
raise IMASignatureFormatError(f'invalid signature type {byte_sign[0]}')
elif byte_sign[1] != DIGSIG_VERSION_2:
raise IMASignatureFormatError(f'only V2 format signatures are supported')
elif byte_sign[2] != PKEY_HASH_SHA256:
raise IMASignatureFormatError(f'only SHA256 digest algorithm is supported')
pub_key_id = bytes(byte_sign[3:7]).hex()
sig_size = struct.unpack('>H', byte_sign[7:9])[0]
signature = bytes(byte_sign[9:sig_size+9])
return pub_key_id, signature
class TapProducer:
def __init__(self):
self.i = 0
def counter(fn):
@functools.wraps(fn)
def wrap(self, *args, **kwargs):
self.i += 1
return fn(self, *args, **kwargs)
return wrap
@counter
def abort(self, description: str = None, payload: dict = None):
self._render(False, description, payload)
sys.stdout.write(f'1..{self.i} # fatal error\n')
@counter
def failed(self, description: str = None, payload: dict = None):
self._render(False, description, payload)
def finalize(self):
sys.stdout.write(f'1..{self.i}\n')
@counter
def passed(self, description: str = None, payload: dict = None):
"""
Reports a passed test.
Args:
description: Test description.
payload: Extra diagnostics payload to be serialized as YAML.
"""
self._render(True, description, payload)
@counter
def skipped(self, description: str = None, payload: dict = None,
reason: str = None):
self._render(True, description, payload, skip=True, reason=reason)
def _render(self, success: bool, description: str = None,
payload: dict = None, skip: bool = False, reason: str = None):
status = 'ok' if success else 'not ok'
sys.stdout.write(f'{status} {self.i}')
if description:
sys.stdout.write(f' - {description}')
if skip:
sys.stdout.write(' # SKIP')
if reason is not None:
sys.stdout.write(f' {reason}')
# TODO: add todo directive support
sys.stdout.write('\n')
if payload:
yaml_str = yaml.dump(payload, explicit_start=True,
explicit_end=True, indent=2)
yaml_str = textwrap.indent(yaml_str, ' ')
sys.stdout.write(yaml_str)
counter = staticmethod(counter)
def normalize_path(path: str) -> str:
"""
Returns an absolute path with all variables expanded.
Args:
path: path to be normalized.
Returns:
Normalized path.
"""
return os.path.abspath(os.path.expanduser(os.path.expandvars(path)))
def main():
arg_parser = init_arg_parser()
args = arg_parser.parse_args()
rpm_file = normalize_path(args.rpm_file)
tap = TapProducer()
#
step = 'load public IMA certificate'
try:
pub_key = load_pub_key_from_der(args.ima_cert)
pub_key_id = get_pub_key_id(pub_key)
tap.passed(f'{step} with {pub_key_id} key ID')
except Exception as e:
tap.abort(step, {'message': str(e), 'file': args.ima_cert})
sys.exit(1)
#
rpm_ts = init_rpm_ts()
step = 'read RPM IMA signatures'
try:
ima_sigs = get_rpm_ima_signatures(rpm_file, rpm_ts)
tap.passed(step)
except NoIMASignatureError as e:
tap.abort(step, {'message': str(e), 'file': rpm_file})
sys.exit(1)
#
failed = False
with unpack_rpm_to_tmp(rpm_file) as rpm_dir:
for rel_path, sig_hdr in ima_sigs:
file_path = os.path.join(rpm_dir, os.path.relpath(rel_path, '/'))
if not os.path.isfile(file_path) or os.path.islink(file_path):
tap.skipped(f'{rel_path} is not a regular file')
continue
key_id, sig = parse_ima_signature(sig_hdr)
with open(file_path, 'rb') as fd:
try:
pub_key.verify(sig, fd.read(), ECDSA(hashes.SHA256()))
assert pub_key_id == key_id
tap.passed(f'{rel_path} is signed with {key_id}')
except cryptography.exceptions.InvalidSignature:
tap.failed(f'{rel_path} signature is not valid')
failed = True
tap.finalize()
if failed:
return 1
if __name__ == '__main__':
sys.exit(main())