From a13aa83d1b5bb4b6ce4396aef3457b48695b7c41 Mon Sep 17 00:00:00 2001 From: Miroslav Rezanina Date: Thu, 6 May 2021 12:53:31 +0200 Subject: Add vmbus_testing tool build files Add the vmbus_testing tool to redhat build dirs (cherry-pick from rhel 8.4.0 commit d8ca5e0) Signed-off-by: Mohammed Gamal Signed-off-by: Miroslav Rezanina --- .distro/hyperv-daemons.spec.template | 2 + vmbus_testing | 376 +++++++++++++++++++++++++++ 2 files changed, 378 insertions(+) create mode 100755 vmbus_testing diff --git a/vmbus_testing b/vmbus_testing new file mode 100755 index 0000000..e721290 --- /dev/null +++ b/vmbus_testing @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0 +# +# Program to allow users to fuzz test Hyper-V drivers +# by interfacing with Hyper-V debugfs attributes. +# Current test methods available: +# 1. delay testing +# +# Current file/directory structure of hyper-V debugfs: +# /sys/kernel/debug/hyperv/UUID +# /sys/kernel/debug/hyperv/UUID/ +# /sys/kernel/debug/hyperv/UUID/ +# +# author: Branden Bonaby + +import os +import cmd +import argparse +import glob +from argparse import RawDescriptionHelpFormatter +from argparse import RawTextHelpFormatter +from enum import Enum + +# Do not change unless, you change the debugfs attributes +# in /drivers/hv/debugfs.c. All fuzz testing +# attributes will start with "fuzz_test". + +# debugfs path for hyperv must exist before proceeding +debugfs_hyperv_path = "/sys/kernel/debug/hyperv" +if not os.path.isdir(debugfs_hyperv_path): + print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) + exit(-1) + +class dev_state(Enum): + off = 0 + on = 1 + +# File names, that correspond to the files created in +# /drivers/hv/debugfs.c +class f_names(Enum): + state_f = "fuzz_test_state" + buff_f = "fuzz_test_buffer_interrupt_delay" + mess_f = "fuzz_test_message_delay" + +# Both single_actions and all_actions are used +# for error checking and to allow for some subparser +# names to be abbreviated. Do not abbreviate the +# test method names, as it will become less intuitive +# as to what the user can do. If you do decide to +# abbreviate the test method name, make sure the main +# function reflects this change. + +all_actions = [ + "disable_all", + "D", + "enable_all", + "view_all", + "V" +] + +single_actions = [ + "disable_single", + "d", + "enable_single", + "view_single", + "v" +] + +def main(): + + file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) + args = parse_args() + if (not args.action): + print ("Error, no options selected...exiting") + exit(-1) + arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } + arg_set.add(args.action) + path = args.path if "path" in arg_set else None + if (path and path[-1] == "/"): + path = path[:-1] + validate_args_path(path, arg_set, file_map) + if (path and "enable_single" in arg_set): + state_path = locate_state(path, file_map) + set_test_state(state_path, dev_state.on.value, args.quiet) + + # Use subparsers as the key for different actions + if ("delay" in arg_set): + validate_delay_values(args.delay_time) + if (args.enable_all): + set_delay_all_devices(file_map, args.delay_time, + args.quiet) + else: + set_delay_values(path, file_map, args.delay_time, + args.quiet) + elif ("disable_all" in arg_set or "D" in arg_set): + disable_all_testing(file_map) + elif ("disable_single" in arg_set or "d" in arg_set): + disable_testing_single_device(path, file_map) + elif ("view_all" in arg_set or "V" in arg_set): + get_all_devices_test_status(file_map) + elif ("view_single" in arg_set or "v" in arg_set): + get_device_test_values(path, file_map) + +# Get the state location +def locate_state(device, file_map): + return file_map[device][f_names.state_f.value] + +# Validate delay values to make sure they are acceptable to +# enable delays on a device +def validate_delay_values(delay): + + if (delay[0] == -1 and delay[1] == -1): + print("\nError, At least 1 value must be greater than 0") + exit(-1) + for i in delay: + if (i < -1 or i == 0 or i > 1000): + print("\nError, Values must be equal to -1 " + "or be > 0 and <= 1000") + exit(-1) + +# Validate argument path +def validate_args_path(path, arg_set, file_map): + + if (not path and any(element in arg_set for element in single_actions)): + print("Error, path (-p) REQUIRED for the specified option. " + "Use (-h) to check usage.") + exit(-1) + elif (path and any(item in arg_set for item in all_actions)): + print("Error, path (-p) NOT REQUIRED for the specified option. " + "Use (-h) to check usage." ) + exit(-1) + elif (path not in file_map and any(item in arg_set + for item in single_actions)): + print("Error, path '{}' not a valid vmbus device".format(path)) + exit(-1) + +# display Testing status of single device +def get_device_test_values(path, file_map): + + for name in file_map[path]: + file_location = file_map[path][name] + print( name + " = " + str(read_test_files(file_location))) + +# Create a map of the vmbus devices and their associated files +# [key=device, value = [key = filename, value = file path]] +def recursive_file_lookup(path, file_map): + + for f_path in glob.iglob(path + '**/*'): + if (os.path.isfile(f_path)): + if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): + directory = f_path.rsplit("/",1)[0] + else: + directory = f_path.rsplit("/",2)[0] + f_name = f_path.split("/")[-1] + if (file_map.get(directory)): + file_map[directory].update({f_name:f_path}) + else: + file_map[directory] = {f_name:f_path} + elif (os.path.isdir(f_path)): + recursive_file_lookup(f_path,file_map) + return file_map + +# display Testing state of devices +def get_all_devices_test_status(file_map): + + for device in file_map: + if (get_test_state(locate_state(device, file_map)) is 1): + print("Testing = ON for: {}" + .format(device.split("/")[5])) + else: + print("Testing = OFF for: {}" + .format(device.split("/")[5])) + +# read the vmbus device files, path must be absolute path before calling +def read_test_files(path): + try: + with open(path,"r") as f: + file_value = f.readline().strip() + return int(file_value) + + except IOError as e: + errno, strerror = e.args + print("I/O error({0}): {1} on file {2}" + .format(errno, strerror, path)) + exit(-1) + except ValueError: + print ("Element to int conversion error in: \n{}".format(path)) + exit(-1) + +# writing to vmbus device files, path must be absolute path before calling +def write_test_files(path, value): + + try: + with open(path,"w") as f: + f.write("{}".format(value)) + except IOError as e: + errno, strerror = e.args + print("I/O error({0}): {1} on file {2}" + .format(errno, strerror, path)) + exit(-1) + +# set testing state of device +def set_test_state(state_path, state_value, quiet): + + write_test_files(state_path, state_value) + if (get_test_state(state_path) is 1): + if (not quiet): + print("Testing = ON for device: {}" + .format(state_path.split("/")[5])) + else: + if (not quiet): + print("Testing = OFF for device: {}" + .format(state_path.split("/")[5])) + +# get testing state of device +def get_test_state(state_path): + #state == 1 - test = ON + #state == 0 - test = OFF + return read_test_files(state_path) + +# write 1 - 1000 microseconds, into a single device using the +# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay +# debugfs attributes +def set_delay_values(device, file_map, delay_length, quiet): + + try: + interrupt = file_map[device][f_names.buff_f.value] + message = file_map[device][f_names.mess_f.value] + + # delay[0]- buffer interrupt delay, delay[1]- message delay + if (delay_length[0] >= 0 and delay_length[0] <= 1000): + write_test_files(interrupt, delay_length[0]) + if (delay_length[1] >= 0 and delay_length[1] <= 1000): + write_test_files(message, delay_length[1]) + if (not quiet): + print("Buffer delay testing = {} for: {}" + .format(read_test_files(interrupt), + interrupt.split("/")[5])) + print("Message delay testing = {} for: {}" + .format(read_test_files(message), + message.split("/")[5])) + except IOError as e: + errno, strerror = e.args + print("I/O error({0}): {1} on files {2}{3}" + .format(errno, strerror, interrupt, message)) + exit(-1) + +# enabling delay testing on all devices +def set_delay_all_devices(file_map, delay, quiet): + + for device in (file_map): + set_test_state(locate_state(device, file_map), + dev_state.on.value, + quiet) + set_delay_values(device, file_map, delay, quiet) + +# disable all testing on a SINGLE device. +def disable_testing_single_device(device, file_map): + + for name in file_map[device]: + file_location = file_map[device][name] + write_test_files(file_location, dev_state.off.value) + print("ALL testing now OFF for {}".format(device.split("/")[-1])) + +# disable all testing on ALL devices +def disable_all_testing(file_map): + + for device in file_map: + disable_testing_single_device(device, file_map) + +def parse_args(): + parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" + "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" + "%(prog)s [view_all | V] [-h]\n" + "%(prog)s [disable_all | D] [-h]\n" + "%(prog)s [disable_single | d] [-h|-p]\n" + "%(prog)s [view_single | v] [-h|-p]\n" + "%(prog)s --version\n", + description = "\nUse lsvmbus to get vmbus device type " + "information.\n" "\nThe debugfs root path is " + "/sys/kernel/debug/hyperv", + formatter_class = RawDescriptionHelpFormatter) + subparsers = parser.add_subparsers(dest = "action") + parser.add_argument("--version", action = "version", + version = '%(prog)s 0.1.0') + parser.add_argument("-q","--quiet", action = "store_true", + help = "silence none important test messages." + " This will only work when enabling testing" + " on a device.") + # Use the path parser to hold the --path attribute so it can + # be shared between subparsers. Also do the same for the state + # parser, as all testing methods will use --enable_all and + # enable_single. + path_parser = argparse.ArgumentParser(add_help=False) + path_parser.add_argument("-p","--path", metavar = "", + help = "Debugfs path to a vmbus device. The path " + "must be the absolute path to the device.") + state_parser = argparse.ArgumentParser(add_help=False) + state_group = state_parser.add_mutually_exclusive_group(required = True) + state_group.add_argument("-E", "--enable_all", action = "store_const", + const = "enable_all", + help = "Enable the specified test type " + "on ALL vmbus devices.") + state_group.add_argument("-e", "--enable_single", + action = "store_const", + const = "enable_single", + help = "Enable the specified test type on a " + "SINGLE vmbus device.") + parser_delay = subparsers.add_parser("delay", + parents = [state_parser, path_parser], + help = "Delay the ring buffer interrupt or the " + "ring buffer message reads in microseconds.", + prog = "vmbus_testing", + usage = "%(prog)s [-h]\n" + "%(prog)s -E -t [value] [value]\n" + "%(prog)s -e -t [value] [value] -p", + description = "Delay the ring buffer interrupt for " + "vmbus devices, or delay the ring buffer message " + "reads for vmbus devices (both in microseconds). This " + "is only on the host to guest channel.") + parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, + type = check_range, default =[0,0], required = (True), + help = "Set [buffer] & [message] delay time. " + "Value constraints: -1 == value " + "or 0 < value <= 1000.\n" + "Use -1 to keep the previous value for that delay " + "type, or a value > 0 <= 1000 to change the delay " + "time.") + parser_dis_all = subparsers.add_parser("disable_all", + aliases = ['D'], prog = "vmbus_testing", + usage = "%(prog)s [disable_all | D] -h\n" + "%(prog)s [disable_all | D]\n", + help = "Disable ALL testing on ALL vmbus devices.", + description = "Disable ALL testing on ALL vmbus " + "devices.") + parser_dis_single = subparsers.add_parser("disable_single", + aliases = ['d'], + parents = [path_parser], prog = "vmbus_testing", + usage = "%(prog)s [disable_single | d] -h\n" + "%(prog)s [disable_single | d] -p\n", + help = "Disable ALL testing on a SINGLE vmbus device.", + description = "Disable ALL testing on a SINGLE vmbus " + "device.") + parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], + help = "View the test state for ALL vmbus devices.", + prog = "vmbus_testing", + usage = "%(prog)s [view_all | V] -h\n" + "%(prog)s [view_all | V]\n", + description = "This shows the test state for ALL the " + "vmbus devices.") + parser_view_single = subparsers.add_parser("view_single", + aliases = ['v'],parents = [path_parser], + help = "View the test values for a SINGLE vmbus " + "device.", + description = "This shows the test values for a SINGLE " + "vmbus device.", prog = "vmbus_testing", + usage = "%(prog)s [view_single | v] -h\n" + "%(prog)s [view_single | v] -p") + + return parser.parse_args() + +# value checking for range checking input in parser +def check_range(arg1): + + try: + val = int(arg1) + except ValueError as err: + raise argparse.ArgumentTypeError(str(err)) + if val < -1 or val > 1000: + message = ("\n\nvalue must be -1 or 0 < value <= 1000. " + "Value program received: {}\n").format(val) + raise argparse.ArgumentTypeError(message) + return val + +if __name__ == "__main__": + main() -- 2.27.0