From 50795e488588537ceab508a0f7b284cea8bb7c5b Mon Sep 17 00:00:00 2001 From: Eugene Zamriy Date: Tue, 17 Oct 2023 00:55:53 +0300 Subject: [PATCH] Adds proof-of-concept rpm-ima-inspector implementation --- LICENSE | 2 +- rpm-ima-inspector.py | 269 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 270 insertions(+), 1 deletion(-) create mode 100755 rpm-ima-inspector.py diff --git a/LICENSE b/LICENSE index 2071b23..ed35c0d 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) +Copyright (c) 2023 Eugene Zamriy, msvsphere-os.ru. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/rpm-ima-inspector.py b/rpm-ima-inspector.py new file mode 100755 index 0000000..eb954ee --- /dev/null +++ b/rpm-ima-inspector.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 + +import argparse +import functools +import sys +import textwrap + +import yaml + +import contextlib +import os +import struct +import tempfile +from typing import Iterator, Tuple + +import plumbum + +import cryptography.exceptions +import cryptography.x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey +import rpm + + +class NoIMASignatureError(Exception): + + pass + + +class IMASignatureFormatError(Exception): + + pass + + +def init_arg_parser() -> argparse.ArgumentParser: + """ + Initializes a command line arguments parser. + + Returns: + Command line arguments parser. + """ + parser = argparse.ArgumentParser( + prog='rpm-ima-inspector', + description='Verifies RPM package IMA signatures' + ) + parser.add_argument('-c', '--ima-cert', required=True, + help='public IMA certificate DER file path') + parser.add_argument('rpm_file', metavar='RPM_FILE', help='RPM file path') + return parser + + +def init_rpm_ts() -> rpm.TransactionSet: + """ + Initializes an RPM transaction. + + Returns: + RPM transaction. + """ + ts = rpm.TransactionSet() + ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES) + return ts + + +def load_pub_key_from_der(cert_path: str) -> EllipticCurvePublicKey: + """ + Loads a public key from a DER-formatted certificate file. + + Args: + cert_path: Certificate file path. + + Returns: + Public key. + """ + with open(cert_path, 'rb') as fd: + cert = cryptography.x509.load_der_x509_certificate(fd.read()) + return cert.public_key() + + +def get_pub_key_id(pub_key: EllipticCurvePublicKey) -> str: + """ + Extracts a key ID from a public key. + + Args: + pub_key: Public key. + + Returns: + Public key ID. + """ + key_id = cryptography.x509.SubjectKeyIdentifier.from_public_key(pub_key) + return key_id.digest[-4:].hex() + + +def read_rpm_header(rpm_path: str, ts: rpm.TransactionSet) -> rpm.hdr: + with open(rpm_path, 'r') as fd: + return ts.hdrFromFdno(fd) + + +def get_rpm_ima_signatures( + rpm_path: str, ts: rpm.TransactionSet +) -> Iterator[Tuple[str, str]]: + """ + Iterates over an RPM package files and their signatures. + + Args: + rpm_path: RPM file path. + ts: RPM transaction set. + + Returns: + Iterator over RPM package files and their signatures. + """ + with open(rpm_path, 'rb') as fd: + hdr = ts.hdrFromFdno(fd) + files = hdr[rpm.RPMTAG_FILENAMES] + signatures = hdr[rpm.RPMTAG_FILESIGNATURES] + if len(files) != len(signatures): + raise NoIMASignatureError(f'there are no IMA signatures in {rpm_path}') + return zip(files, signatures) + + +@contextlib.contextmanager +def unpack_rpm_to_tmp(rpm_path: str): + rpm2cpio = plumbum.local['rpm2cpio'] + cpio = plumbum.local['cpio'] + with tempfile.TemporaryDirectory() as tmp_dir: + with plumbum.local.cwd(tmp_dir): + cmd = rpm2cpio[rpm_path] | cpio['-idmv', '--no-absolute-filenames'] + cmd() + yield tmp_dir + + +def parse_ima_signature(sig_hdr: str) -> Tuple[str, bytes]: + """ + + Args: + sig_hdr: + + Notes: + The constant values are taken from the imaevm.h file. + + See the signature_v2_hdr structure definition in the imaevm.h file + for a signature header format description. + + Returns: + Public key ID and a file signature. + """ + EVM_IMA_XATTR_DIGSIG = 3 + DIGSIG_VERSION_2 = 2 + PKEY_HASH_SHA256 = 4 + byte_sign = bytearray.fromhex(sig_hdr) + if byte_sign[0] != EVM_IMA_XATTR_DIGSIG: + raise IMASignatureFormatError(f'invalid signature type {byte_sign[0]}') + elif byte_sign[1] != DIGSIG_VERSION_2: + raise IMASignatureFormatError(f'only V2 format signatures are supported') + elif byte_sign[2] != PKEY_HASH_SHA256: + raise IMASignatureFormatError(f'only SHA256 digest algorithm is supported') + pub_key_id = bytes(byte_sign[3:7]).hex() + sig_size = struct.unpack('>H', byte_sign[7:9])[0] + signature = bytes(byte_sign[9:sig_size+9]) + return pub_key_id, signature + + +class TapProducer: + + def __init__(self): + self.i = 0 + + def counter(fn): + @functools.wraps(fn) + def wrap(self, *args, **kwargs): + self.i += 1 + return fn(self, *args, **kwargs) + return wrap + + @counter + def abort(self, description: str = None, payload: dict = None): + self._render(False, description, payload) + sys.stdout.write(f'1..{self.i} # fatal error\n') + + @counter + def failed(self, description: str = None, payload: dict = None): + self._render(False, description, payload) + + def finalize(self): + sys.stdout.write(f'1..{self.i}\n') + + @counter + def passed(self, description: str = None, payload: dict = None): + """ + Reports a passed test. + + Args: + description: Test description. + payload: Extra diagnostics payload to be serialized as YAML. + """ + self._render(True, description, payload) + + @counter + def skipped(self, description: str = None, payload: dict = None, + reason: str = None): + self._render(True, description, payload, skip=True, reason=reason) + + def _render(self, success: bool, description: str = None, + payload: dict = None, skip: bool = False, reason: str = None): + status = 'ok' if success else 'not ok' + sys.stdout.write(f'{status} {self.i}') + if description: + sys.stdout.write(f' - {description}') + if skip: + sys.stdout.write(' # SKIP') + if reason is not None: + sys.stdout.write(f' {reason}') + # TODO: add todo directive support + sys.stdout.write('\n') + if payload: + yaml_str = yaml.dump(payload, explicit_start=True, + explicit_end=True, indent=2) + yaml_str = textwrap.indent(yaml_str, ' ') + sys.stdout.write(yaml_str) + + counter = staticmethod(counter) + + +def main(): + arg_parser = init_arg_parser() + args = arg_parser.parse_args() + rpm_file = args.rpm_file + tap = TapProducer() + # + step = 'load public IMA certificate' + try: + pub_key = load_pub_key_from_der(args.ima_cert) + pub_key_id = get_pub_key_id(pub_key) + tap.passed(f'{step} with {pub_key_id} key ID') + except Exception as e: + tap.abort(step, {'message': str(e), 'file': args.ima_cert}) + sys.exit(1) + # + rpm_ts = init_rpm_ts() + step = 'read RPM IMA signatures' + try: + ima_sigs = get_rpm_ima_signatures(rpm_file, rpm_ts) + tap.passed(step) + except NoIMASignatureError as e: + tap.abort(step, {'message': str(e), 'file': rpm_file}) + sys.exit(1) + # + failed = False + with unpack_rpm_to_tmp(rpm_file) as rpm_dir: + for rel_path, sig_hdr in ima_sigs: + file_path = os.path.join(rpm_dir, rel_path) + if not os.path.isfile(file_path) or os.path.islink(file_path): + tap.skipped(f'{rel_path} is not a regular file') + continue + key_id, sig = parse_ima_signature(sig_hdr) + with open(file_path, 'rb') as fd: + try: + pub_key.verify(sig, fd.read(), ECDSA(hashes.SHA256())) + assert pub_key_id == key_id + tap.passed(f'{rel_path} is signed with {key_id}') + except cryptography.exceptions.InvalidSignature: + tap.failed(f'{rel_path} signature is not valid') + failed = True + tap.finalize() + if failed: + return 1 + + +if __name__ == '__main__': + sys.exit(main())