#!/usr/bin/env python3 import argparse import functools import sys import textwrap import yaml import contextlib import os import struct import tempfile from typing import Iterator, Tuple import plumbum import cryptography.exceptions import cryptography.x509 from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey import rpm class NoIMASignatureError(Exception): pass class IMASignatureFormatError(Exception): pass def init_arg_parser() -> argparse.ArgumentParser: """ Initializes a command line arguments parser. Returns: Command line arguments parser. """ parser = argparse.ArgumentParser( prog='rpm-ima-inspector', description='Verifies RPM package IMA signatures' ) parser.add_argument('-c', '--ima-cert', required=True, help='public IMA certificate DER file path') parser.add_argument('rpm_file', metavar='RPM_FILE', help='RPM file path') return parser def init_rpm_ts() -> rpm.TransactionSet: """ Initializes an RPM transaction. Returns: RPM transaction. """ ts = rpm.TransactionSet() ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES) return ts def load_pub_key_from_der(cert_path: str) -> EllipticCurvePublicKey: """ Loads a public key from a DER-formatted certificate file. Args: cert_path: Certificate file path. Returns: Public key. """ with open(cert_path, 'rb') as fd: cert = cryptography.x509.load_der_x509_certificate(fd.read()) return cert.public_key() def get_pub_key_id(pub_key: EllipticCurvePublicKey) -> str: """ Extracts a key ID from a public key. Args: pub_key: Public key. Returns: Public key ID. """ key_id = cryptography.x509.SubjectKeyIdentifier.from_public_key(pub_key) return key_id.digest[-4:].hex() def read_rpm_header(rpm_path: str, ts: rpm.TransactionSet) -> rpm.hdr: with open(rpm_path, 'r') as fd: return ts.hdrFromFdno(fd) def get_rpm_ima_signatures( rpm_path: str, ts: rpm.TransactionSet ) -> Iterator[Tuple[str, str]]: """ Iterates over an RPM package files and their signatures. Args: rpm_path: RPM file path. ts: RPM transaction set. Returns: Iterator over RPM package files and their signatures. """ with open(rpm_path, 'rb') as fd: hdr = ts.hdrFromFdno(fd) files = hdr[rpm.RPMTAG_FILENAMES] signatures = hdr[rpm.RPMTAG_FILESIGNATURES] if len(files) != len(signatures): raise NoIMASignatureError(f'there are no IMA signatures in {rpm_path}') return zip(files, signatures) @contextlib.contextmanager def unpack_rpm_to_tmp(rpm_path: str): rpm2cpio = plumbum.local['rpm2cpio'] cpio = plumbum.local['cpio'] with tempfile.TemporaryDirectory() as tmp_dir: with plumbum.local.cwd(tmp_dir): cmd = rpm2cpio[rpm_path] | cpio['-idmv', '--no-absolute-filenames'] cmd() yield tmp_dir def parse_ima_signature(sig_hdr: str) -> Tuple[str, bytes]: """ Args: sig_hdr: Notes: The constant values are taken from the imaevm.h file. See the signature_v2_hdr structure definition in the imaevm.h file for a signature header format description. Returns: Public key ID and a file signature. """ EVM_IMA_XATTR_DIGSIG = 3 DIGSIG_VERSION_2 = 2 PKEY_HASH_SHA256 = 4 byte_sign = bytearray.fromhex(sig_hdr) if byte_sign[0] != EVM_IMA_XATTR_DIGSIG: raise IMASignatureFormatError(f'invalid signature type {byte_sign[0]}') elif byte_sign[1] != DIGSIG_VERSION_2: raise IMASignatureFormatError(f'only V2 format signatures are supported') elif byte_sign[2] != PKEY_HASH_SHA256: raise IMASignatureFormatError(f'only SHA256 digest algorithm is supported') pub_key_id = bytes(byte_sign[3:7]).hex() sig_size = struct.unpack('>H', byte_sign[7:9])[0] signature = bytes(byte_sign[9:sig_size+9]) return pub_key_id, signature class TapProducer: def __init__(self): self.i = 0 def counter(fn): @functools.wraps(fn) def wrap(self, *args, **kwargs): self.i += 1 return fn(self, *args, **kwargs) return wrap @counter def abort(self, description: str = None, payload: dict = None): self._render(False, description, payload) sys.stdout.write(f'1..{self.i} # fatal error\n') @counter def failed(self, description: str = None, payload: dict = None): self._render(False, description, payload) def finalize(self): sys.stdout.write(f'1..{self.i}\n') @counter def passed(self, description: str = None, payload: dict = None): """ Reports a passed test. Args: description: Test description. payload: Extra diagnostics payload to be serialized as YAML. """ self._render(True, description, payload) @counter def skipped(self, description: str = None, payload: dict = None, reason: str = None): self._render(True, description, payload, skip=True, reason=reason) def _render(self, success: bool, description: str = None, payload: dict = None, skip: bool = False, reason: str = None): status = 'ok' if success else 'not ok' sys.stdout.write(f'{status} {self.i}') if description: sys.stdout.write(f' - {description}') if skip: sys.stdout.write(' # SKIP') if reason is not None: sys.stdout.write(f' {reason}') # TODO: add todo directive support sys.stdout.write('\n') if payload: yaml_str = yaml.dump(payload, explicit_start=True, explicit_end=True, indent=2) yaml_str = textwrap.indent(yaml_str, ' ') sys.stdout.write(yaml_str) counter = staticmethod(counter) def normalize_path(path: str) -> str: """ Returns an absolute path with all variables expanded. Args: path: path to be normalized. Returns: Normalized path. """ return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) def main(): arg_parser = init_arg_parser() args = arg_parser.parse_args() rpm_file = normalize_path(args.rpm_file) tap = TapProducer() # step = 'load public IMA certificate' try: pub_key = load_pub_key_from_der(args.ima_cert) pub_key_id = get_pub_key_id(pub_key) tap.passed(f'{step} with {pub_key_id} key ID') except Exception as e: tap.abort(step, {'message': str(e), 'file': args.ima_cert}) sys.exit(1) # rpm_ts = init_rpm_ts() step = 'read RPM IMA signatures' try: ima_sigs = get_rpm_ima_signatures(rpm_file, rpm_ts) tap.passed(step) except NoIMASignatureError as e: tap.abort(step, {'message': str(e), 'file': rpm_file}) sys.exit(1) # failed = False with unpack_rpm_to_tmp(rpm_file) as rpm_dir: for rel_path, sig_hdr in ima_sigs: file_path = os.path.join(rpm_dir, os.path.relpath(rel_path, '/')) if not os.path.isfile(file_path) or os.path.islink(file_path): tap.skipped(f'{rel_path} is not a regular file') continue key_id, sig = parse_ima_signature(sig_hdr) with open(file_path, 'rb') as fd: try: pub_key.verify(sig, fd.read(), ECDSA(hashes.SHA256())) assert pub_key_id == key_id tap.passed(f'{rel_path} is signed with {key_id}') except cryptography.exceptions.InvalidSignature: tap.failed(f'{rel_path} signature is not valid') failed = True tap.finalize() if failed: return 1 if __name__ == '__main__': sys.exit(main())