Adds proof-of-concept rpm-ima-inspector implementation

master
Eugene Zamriy 1 year ago
parent 948bab6f9f
commit 50795e4885
Signed by: ezamriy
GPG Key ID: 7EBF95C7DCFA496C

@ -1,6 +1,6 @@
MIT License
Copyright (c) <year> <copyright holders>
Copyright (c) 2023 Eugene Zamriy, msvsphere-os.ru.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

@ -0,0 +1,269 @@
#!/usr/bin/env python3
import argparse
import functools
import sys
import textwrap
import yaml
import contextlib
import os
import struct
import tempfile
from typing import Iterator, Tuple
import plumbum
import cryptography.exceptions
import cryptography.x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey
import rpm
class NoIMASignatureError(Exception):
pass
class IMASignatureFormatError(Exception):
pass
def init_arg_parser() -> argparse.ArgumentParser:
"""
Initializes a command line arguments parser.
Returns:
Command line arguments parser.
"""
parser = argparse.ArgumentParser(
prog='rpm-ima-inspector',
description='Verifies RPM package IMA signatures'
)
parser.add_argument('-c', '--ima-cert', required=True,
help='public IMA certificate DER file path')
parser.add_argument('rpm_file', metavar='RPM_FILE', help='RPM file path')
return parser
def init_rpm_ts() -> rpm.TransactionSet:
"""
Initializes an RPM transaction.
Returns:
RPM transaction.
"""
ts = rpm.TransactionSet()
ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
return ts
def load_pub_key_from_der(cert_path: str) -> EllipticCurvePublicKey:
"""
Loads a public key from a DER-formatted certificate file.
Args:
cert_path: Certificate file path.
Returns:
Public key.
"""
with open(cert_path, 'rb') as fd:
cert = cryptography.x509.load_der_x509_certificate(fd.read())
return cert.public_key()
def get_pub_key_id(pub_key: EllipticCurvePublicKey) -> str:
"""
Extracts a key ID from a public key.
Args:
pub_key: Public key.
Returns:
Public key ID.
"""
key_id = cryptography.x509.SubjectKeyIdentifier.from_public_key(pub_key)
return key_id.digest[-4:].hex()
def read_rpm_header(rpm_path: str, ts: rpm.TransactionSet) -> rpm.hdr:
with open(rpm_path, 'r') as fd:
return ts.hdrFromFdno(fd)
def get_rpm_ima_signatures(
rpm_path: str, ts: rpm.TransactionSet
) -> Iterator[Tuple[str, str]]:
"""
Iterates over an RPM package files and their signatures.
Args:
rpm_path: RPM file path.
ts: RPM transaction set.
Returns:
Iterator over RPM package files and their signatures.
"""
with open(rpm_path, 'rb') as fd:
hdr = ts.hdrFromFdno(fd)
files = hdr[rpm.RPMTAG_FILENAMES]
signatures = hdr[rpm.RPMTAG_FILESIGNATURES]
if len(files) != len(signatures):
raise NoIMASignatureError(f'there are no IMA signatures in {rpm_path}')
return zip(files, signatures)
@contextlib.contextmanager
def unpack_rpm_to_tmp(rpm_path: str):
rpm2cpio = plumbum.local['rpm2cpio']
cpio = plumbum.local['cpio']
with tempfile.TemporaryDirectory() as tmp_dir:
with plumbum.local.cwd(tmp_dir):
cmd = rpm2cpio[rpm_path] | cpio['-idmv', '--no-absolute-filenames']
cmd()
yield tmp_dir
def parse_ima_signature(sig_hdr: str) -> Tuple[str, bytes]:
"""
Args:
sig_hdr:
Notes:
The constant values are taken from the imaevm.h file.
See the signature_v2_hdr structure definition in the imaevm.h file
for a signature header format description.
Returns:
Public key ID and a file signature.
"""
EVM_IMA_XATTR_DIGSIG = 3
DIGSIG_VERSION_2 = 2
PKEY_HASH_SHA256 = 4
byte_sign = bytearray.fromhex(sig_hdr)
if byte_sign[0] != EVM_IMA_XATTR_DIGSIG:
raise IMASignatureFormatError(f'invalid signature type {byte_sign[0]}')
elif byte_sign[1] != DIGSIG_VERSION_2:
raise IMASignatureFormatError(f'only V2 format signatures are supported')
elif byte_sign[2] != PKEY_HASH_SHA256:
raise IMASignatureFormatError(f'only SHA256 digest algorithm is supported')
pub_key_id = bytes(byte_sign[3:7]).hex()
sig_size = struct.unpack('>H', byte_sign[7:9])[0]
signature = bytes(byte_sign[9:sig_size+9])
return pub_key_id, signature
class TapProducer:
def __init__(self):
self.i = 0
def counter(fn):
@functools.wraps(fn)
def wrap(self, *args, **kwargs):
self.i += 1
return fn(self, *args, **kwargs)
return wrap
@counter
def abort(self, description: str = None, payload: dict = None):
self._render(False, description, payload)
sys.stdout.write(f'1..{self.i} # fatal error\n')
@counter
def failed(self, description: str = None, payload: dict = None):
self._render(False, description, payload)
def finalize(self):
sys.stdout.write(f'1..{self.i}\n')
@counter
def passed(self, description: str = None, payload: dict = None):
"""
Reports a passed test.
Args:
description: Test description.
payload: Extra diagnostics payload to be serialized as YAML.
"""
self._render(True, description, payload)
@counter
def skipped(self, description: str = None, payload: dict = None,
reason: str = None):
self._render(True, description, payload, skip=True, reason=reason)
def _render(self, success: bool, description: str = None,
payload: dict = None, skip: bool = False, reason: str = None):
status = 'ok' if success else 'not ok'
sys.stdout.write(f'{status} {self.i}')
if description:
sys.stdout.write(f' - {description}')
if skip:
sys.stdout.write(' # SKIP')
if reason is not None:
sys.stdout.write(f' {reason}')
# TODO: add todo directive support
sys.stdout.write('\n')
if payload:
yaml_str = yaml.dump(payload, explicit_start=True,
explicit_end=True, indent=2)
yaml_str = textwrap.indent(yaml_str, ' ')
sys.stdout.write(yaml_str)
counter = staticmethod(counter)
def main():
arg_parser = init_arg_parser()
args = arg_parser.parse_args()
rpm_file = args.rpm_file
tap = TapProducer()
#
step = 'load public IMA certificate'
try:
pub_key = load_pub_key_from_der(args.ima_cert)
pub_key_id = get_pub_key_id(pub_key)
tap.passed(f'{step} with {pub_key_id} key ID')
except Exception as e:
tap.abort(step, {'message': str(e), 'file': args.ima_cert})
sys.exit(1)
#
rpm_ts = init_rpm_ts()
step = 'read RPM IMA signatures'
try:
ima_sigs = get_rpm_ima_signatures(rpm_file, rpm_ts)
tap.passed(step)
except NoIMASignatureError as e:
tap.abort(step, {'message': str(e), 'file': rpm_file})
sys.exit(1)
#
failed = False
with unpack_rpm_to_tmp(rpm_file) as rpm_dir:
for rel_path, sig_hdr in ima_sigs:
file_path = os.path.join(rpm_dir, rel_path)
if not os.path.isfile(file_path) or os.path.islink(file_path):
tap.skipped(f'{rel_path} is not a regular file')
continue
key_id, sig = parse_ima_signature(sig_hdr)
with open(file_path, 'rb') as fd:
try:
pub_key.verify(sig, fd.read(), ECDSA(hashes.SHA256()))
assert pub_key_id == key_id
tap.passed(f'{rel_path} is signed with {key_id}')
except cryptography.exceptions.InvalidSignature:
tap.failed(f'{rel_path} signature is not valid')
failed = True
tap.finalize()
if failed:
return 1
if __name__ == '__main__':
sys.exit(main())
Loading…
Cancel
Save