Source code for ufotest.util

"""
A module containing various utility functions for usage in other modules.
"""
import string
import click
import random
import re
import subprocess
import shutil
import datetime
import json
import importlib.util
from typing import Optional, Tuple, Dict, List
from abc import ABC, abstractmethod

from jinja2 import Template, Environment
from jinja2 import FileSystemLoader, ChoiceLoader

from ufotest.config import *

# GLOBAL VARIABLES
VERSION_PATH = os.path.join(PATH, 'VERSION')


#TEMPLATE_LOADER = FileSystemLoader(TEMPLATE_PATH)
#TEMPLATE_ENVIRONMENT = Environment(loader=TEMPLATE_LOADER)
#TEMPLATE_ENVIRONMENT.globals['config'] = Config()


SCRIPTS = {}

# == FUNCTIONS

# -- OUTPUT RELATED


[docs]def cerror(message: str) -> None: """Outputs the *message* as an error to the console. :param message: The message to be printed """ click.secho(f'[!] {message}', fg='red')
[docs]def cresult(message: str) -> None: """Outputs the *message* as progress to the console :param message: The message to be printed """ click.secho(f'(+) {message}', fg='green')
[docs]def ctitle(message: str) -> None: """Outputs the *message* as a title to the console :param message: The message to be printed """ click.secho(f'\n| | {message.upper()} | |', bold=True)
[docs]def csubtitle(message: str) -> None: """Outputs the *message* as a subtitle to the console :param message: The message to be printed """ click.secho(f'\n==| {message} |==', bold=True)
[docs]def cparams(parameters: Dict[str, str]) -> None: """Outputs a dictionary of parameters as key value pairs to the console :param parameters: A dict of string keys and string values. """ for key, value in parameters.items(): click.secho(f'--| {key}: {value}') click.secho('')
[docs]def cprint(message: str) -> None: """Outputs the *message* to the console :param message: The message to be printed """ click.secho(f'... {message}')
# -- OS RELATED
[docs]def get_folder_size(folder_path: str) -> int: """ Returns the size of an entire recursive directory tree as an integer in bytes. https://www.thepythoncode.com/article/get-directory-size-in-bytes-using-python :param str folder_path: The absolute path to the folder which is to be measured :return: The integer amount of bytes which all contents of the folder collectively take up """ total = 0 try: for entry in os.scandir(folder_path): if entry.is_file(): total += entry.stat().st_size elif entry.is_dir(): total += get_folder_size(entry.path) return total except NotADirectoryError: return os.path.getsize(folder_path) except PermissionError: return 0
[docs]def format_byte_size(value: int, unit: str = 'B', factor: int = 1024, include_unit=True) -> str: """ Given an integer *value*, this function formats the number which is assumed to be a size in bytes into a different byte related unit such as kilobytes, megabytes etc. :param int value: The actual integer amount of bytes to be converted into another unit :param int factor: The factor which seperates two denominations of units. By default this is 1024 which is correct when working with bytes. Should not be changed. :param str unit: The string identifier of the unit to be converted to. May only be one of the following strings: B, KB, MB, GB, TB. Default is B (Bytes) :param bool include_unit: A boolean flag which determines whether or not the unit string should be included in the final returned string value. :returns: The string representation of the formatted value, now (potentially) in the new unit. The numeric value within the string will have two decimals accuracy """ unit = unit.upper() unit_factors = { 'B': factor ** 0, 'KB': factor ** 1, 'MB': factor ** 2, 'GB': factor ** 3, 'TB': factor ** 4 } # It is important to raise an appropriate error message. In this case if the unit string identifier is not valid. if unit not in unit_factors.keys(): raise ValueError(f'You have passed {unit} to specific the unit to format a byte value. This is not a valid ' f'unit identifier! Please use one of the following: {", ".join(unit_factors.keys())}') value_string = f'{value / unit_factors[unit]:.2f}' # Optional flag of whether or not to include the unit in the final string output if include_unit: value_string += f' {unit}' return value_string
# -- OTHERS
[docs]def get_repository_name(repository_url: str) -> str: """Returns the name of a git repository if the *repository_url* for it is given. :param repository_url: the string url of the remote repository :return: the string name of the repo, which does NOT include the username it is just the name of the top level folder of that repo. """ repository_name = re.findall(r'/([^/]*)\.git', repository_url)[0] return repository_name
[docs]def markdown_to_html(input_path: str, output_path: str, header_lines: List[str] = []): """Converts the markdown file at *input_path* to html file at *output_path* The functionality of this function is provided by the command line tool 'md-to-html' provided by the python package of the same name. This function also offers the additional possibility to add custom string lines to the head section of the created html file to potentially insert additional styles. :param input_path: The path to the original markdown file :param output_path: The path to be created for the html file :param header_lines: A list of strings to be included as separate lines into the head of the html """ # The command md-to-html should be pretty self explanatory, it takes a markdown file as the input and produces a # corresponding html file from it. We can assume that this command generally works because it is provided by a # python module with the same name which is listed as one of the dependencies of this project. So when the ufotest # package is installed, this will be installed as well automatically. command = 'md-to-html --input {} --output {}'.format( input_path, output_path ) execute_command(command, False) # This plain html file which was created looks pretty plain, so additionally there should be the possibility to # add stylesheets to the header of the document. This will be realized by being able to generally insert any kind # of string to the header section of the html file. Luckily this will be a rather simple string operation since # the previous conversion creates a html file which is very well structured by new lines. with open(output_path, mode='r+') as read_file: lines = read_file.readlines() for line in header_lines: lines.insert(lines.index(' </head>'), line) with open(output_path, mode='w+') as write_file: write_file.write('\n'.join(lines))
[docs]def get_version() -> str: """ Returns the version string for the ufotest project. The version scheme of ufotest loosely follows the technique of `Semantic Versioning <https://semver.org/>`_. Where a minor version change may introduce backward incompatible changes, due to the project still being in active development with many features being subject to change. The return value of this function is subject to the "get_version" filter hook, which is able to modify the version string *after* it has been loaded from the file and sanitized. *EXAMPLE* .. code-block:: python version = get_version() # "1.2.1" :returns: The version string without any additional characters or whitespaces. """ with open(VERSION_PATH) as version_file: version = version_file.read() version = version.replace(' ', '').replace('\n', '') # Here we actually need to check if the plugin management system is actually initialized (this is what the boolean # return of is_prepared indicates) because the version function needs to be functional even when the ufotest # installation folder and thus the config file does not yet exist. if CONFIG.is_prepared(): version = CONFIG.pm.apply_filter('get_version', value=version) return version
[docs]def dynamic_import(module_name: str, file_path: str): spec = importlib.util.spec_from_file_location(module_name, file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module
[docs]def get_command_output(command: str, cwd: Optional[str] = None): """ Executes the given "command" and returns the output of the command as string The additional "cwd" option can be used to pass a string path description, which is supposed to be used as the current working directory for the command execution. :param command: The command which is to be executed :type command: str :param cwd: The path string of the current working directory to be assumed for the command execution :type cwd: Optional[str] """ completed_process = subprocess.run(command, cwd=cwd, shell=True, stdout=subprocess.PIPE) byte_output = completed_process.stdout return byte_output.decode('utf-8')
[docs]def execute_command(command: str, verbose: bool, cwd: Optional[str] = None, foreground=True): """ Executes the given system "command" The "verbose" flag controls whether or not the output of the command is written to stdout or not. With the "cwd" string a path can be passed, which is supposed to be used as the current working directory from which the command is to be executed. :deprecated: """ output = None if verbose else subprocess.DEVNULL completed_process = subprocess.run( command, cwd=cwd, shell=True, stdout=output, stderr=output, close_fds=not foreground ) return completed_process.returncode
[docs]def run_command(command: str, cwd: Optional[str] = None) -> Tuple[int, str]: """Runs a terminal command and returns it's exit code and console output :param command: The command string to execute :param cwd: Optionally a string path, which is to be used as the current working directory for the execution of the command. :return: A tuple, where the first value ist the int exit code of the command execution and the second value is the string of all the output the command produced on the stdout pipe. """ completed_process = subprocess.run( command, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output = completed_process.stdout.decode() error = completed_process.stderr.decode() exit_code = completed_process.returncode if CONFIG.verbose(): click.secho(f'[#] {command}', fg='cyan') if output: click.secho(f' > {output}', fg='cyan') if error: click.secho(f' ! {error}', fg='red') return exit_code, output
[docs]def setup_environment(): """ Sets up all the environmental variables defined in the config file """ for key, value in CONFIG['environment'].items(): # Is this enough? os.environ[key] = value
[docs]def create_folder(folder_path: str): if not os.path.exists(folder_path): os.mkdir(folder_path) os.chmod(folder_path, 0o777)
[docs]def init_install(verbose=False) -> str: """Initializes the installation folder for the ufotest app. :param verbose: whether or not to print additional info to the console. :return: The string absolute path to the top level installation folder which has been created. """ # First we check if the installation folder already exists. # "get_path" returns the path to where the config file is supposed to be installed to. folder_path = get_path() if os.path.exists(folder_path): raise IsADirectoryError('An ufotest installation already exists!') else: # In case it does not exist we will create it os.mkdir(folder_path) os.chmod(folder_path, 0o777) if verbose: click.secho(' Created new folder: {}'.format(folder_path)) # -- CREATE CONFIG FILE # Then we need to copy the default template for the config file into this folder, if it is not already there config_path = get_config_path() if not os.path.exists(config_path): shutil.copyfile(CONFIG_TEMPLATE_PATH, config_path) if verbose: click.secho(' Created default config: {}'.format(config_path)) # -- BUILD QUEUE FILE queue_path = os.path.join(folder_path, 'build.queue') with open(queue_path, mode='w') as file: file.write('[]') if verbose: click.secho(' Created empty build queue: {}'.format(queue_path)) # -- TESTS FOLDER # Also we need to create the tests folder inside of this folder test_folder_path = os.path.join(folder_path, 'tests') os.mkdir(test_folder_path) os.chmod(folder_path, 0o777) if verbose: click.secho(' Created the "tests" folder for ufotest') # -- ARCHIVE FOLDER # Additionally we need to create a folder for the archive of the test runs archive_folder_path = os.path.join(folder_path, 'archive') os.mkdir(archive_folder_path) os.chmod(folder_path, 0o777) if verbose: click.secho(' Created the "archive" folder for ufotest') # -- BUILDS FOLDER # This folder will contain the build reports which are created when a new version of the hardware configuration # is built from the source repository builds_folder_path = get_builds_path() os.mkdir(builds_folder_path) if verbose: click.secho(' Created the "builds" folder for ufotest') # -- PLUGINS FOLDER # This folder will contain the individual plugins folders for ufotest plugins_path = os.path.join(folder_path, 'plugins') shutil.copytree(DEFAULT_PLUGINS_PATH, plugins_path) if verbose: click.secho(' Created the "plugins" folder for ufotest') # -- STATIC FOLDER # This folder contains all the static assets which are required for the internal CI web server. This folder already # exists within the ufotest package, it simply needs to be copied into the installation folder... static_path = os.path.join(folder_path, 'static') shutil.copytree(STATIC_PATH, static_path) if verbose: click.secho(' Copied the "static" folder into ufotest installation') return folder_path
[docs]def update_install(folder_path: str) -> None: # ~ replacing the static folder static_path = os.path.join(folder_path, 'static') shutil.rmtree(static_path) shutil.copytree(STATIC_PATH, static_path) if CONFIG.verbose(): cprint('Updated ufotest static folder to new version')
[docs]def check_install(): folder_path = get_path() config_path = get_config_path() valid_install = True if not os.path.exists(folder_path): click.secho('ufotest is missing an installation folder!', bold=True, fg='red') valid_install = False if not os.path.exists(config_path): click.secho('ufotest is missing the configuration file at "{}"!'.format(config_path), bold=True, fg='red') valid_install = False if not valid_install: click.secho('A new ufotest installation can be created by typing "ufotest init" into the console...') return valid_install
[docs]def check_path(path: str, is_dir: bool = False): exists = os.path.exists(path) correct_type = os.path.isdir(path) if is_dir else os.path.isfile(path) return exists and correct_type
[docs]def check_vivado(): vivado_path = CONFIG['install']['vivado_path'] vivado_settings_path = os.path.join(vivado_path, 'settings64.sh') vivado_folder_exists = check_path(vivado_path, is_dir=True) vivado_settings_exists = check_path(vivado_settings_path, is_dir=False) return vivado_folder_exists and vivado_settings_exists
[docs]def run_script(script_name: str, prefix: str = '') -> Tuple[int, str]: """Runs the script which is identified by *script_name*. :param script_name: The string name, which identifies the script within the ufotest application. :param prefix: A string which is prepended to the command string for the execution of the script. This mainly exists to possibly supply the "sudo" prefix to the execution of a script, if it needs it. Default is empty str. :raises FileNotFoundError: If the given script name does not refer to a valid script, which is registered with the ufotest application. :returns: A tuple, where the first element is the integer exit code of the script command and the second the str of the output, which the command generated on stdout. """ if script_name not in SCRIPTS.keys(): raise FileNotFoundError(f'The script "{script_name}" is not known to the application') script_data = SCRIPTS[script_name] script_folder = os.path.dirname(script_data['path']) script_command = prefix + script_data['path'] # This perhaps needs explanation: The script command in itself would work fine. Since it contains the absolute path # to an executable file, that is fine. But we set the current working dir to the script folder anyways since some # of the scripts call other scripts and the way they call them, they expect them to be in the cwd... exit_code, stdout = run_command(script_command, cwd=script_folder) if CONFIG.verbose(): if exit_code: cerror(f'The script "{script_name}" terminated with exit code 1') cerror(stdout) else: cerror(f'The script "{script_name}" terminated successfully') return exit_code, stdout
[docs]def execute_script(name: str, prefix: str = '', verbose: bool = False): if name not in SCRIPTS.keys(): click.secho('There is no script with the name "{}" registered!'.format(name), fg='red') return script = SCRIPTS[name] script_folder = os.path.dirname(script['path']) script_command = prefix + script['path'] exit_code = execute_command(script_command, verbose=verbose, cwd=script_folder) if not exit_code: click.secho('Script "{}" executed successfully!'.format(name), fg='green') else: click.secho('Script "{}" encountered an error!'.format(name), fg='red') return exit_code
[docs]def get_template(name: str) -> Template: """ Returns the jinja2 template with the given file name *name* within the static templates folder of this project :param name: The file name of the template to be loaded :returns: The template object for the specified template """ return CONFIG.template_environment.get_template(name)
[docs]def random_string(length: int, additional_letters: str = ' '): letters = string.ascii_letters + additional_letters return ''.join(random.choice(letters) for i in range(length))
[docs]def clean_pci_read_output(output: str): """ Cleans the output from the "pci_read" function. This essentially only means, that all the newlines and the leading whitespaces are being removed. :param output: The output message of the "pci_read" function :type output: str :return: """ # Removing the newlines cleaned = output.replace('\n', '') # Removing the whitespaces in the front cleaned = cleaned.lstrip(' ') return cleaned
[docs]def get_test_reports() -> List[dict]: """ Returns a list of all test reports, which are represented as dicts. The test reports are represented as dicts and not as actual TestReport instances because they are loaded from memory where they are represented as JSON files. Loading these JSON files will yield dicts. The test reports will be sorted by the time they where created, where the most recent one will be the first element of the list. :return: A list of dicts """ reports = [] archive_path = CONFIG.get_archive_path() for root, folders, files in os.walk(archive_path): # Each test run gets it's own sub folder in the archive folder for folder in folders: folder_path = os.path.join(root, folder) # Within each of these test folders, the json version of the test report is saved in a file which is always # called "report.json". Just like the according html file is always called "report.html" for example. report_json_path = os.path.join(folder_path, 'report.json') # We really need to check for the existence here because of the following case: A test run has been started # but is not yet complete. In this case the test folder already exists, but the report does not. In this # case attempting to open the file would cause an exception! if os.path.exists(report_json_path): with open(report_json_path, mode='r') as report_json_file: report = json.loads(report_json_file.read()) reports.append(report) break # At this point, the reports within the list are generally still out of order because they were added in # whatever order the filesystem iterated them. This means we need to sort them still. In this function this is done # by using the start time sorted_reports = sorted( reports, key=lambda r: datetime.datetime.fromisoformat(r['start_iso']), reverse=True ) # This filter hook offers the possibility to add additional, externally loaded test reports to this list or # change the order of the list etc... return CONFIG.pm.apply_filter( 'get_test_reports', sorted_reports )
[docs]def get_build_reports() -> List[dict]: """ Returns a list of all build reports, which are represented as dicts. The build reports are represented as dicts and not as actual BuildReport instances because they are loaded from memory where they are represented as JSON files. Loading these JSON files will yield dicts. The test reports will be sorted by the time they where created, where the most recent one will be the first element of the list. :return: A list of dicts """ reports = [] builds_path = CONFIG.get_builds_path() for root, folders, files in os.walk(builds_path): # Each test run gets it's own sub folder in the archive folder for folder in folders: folder_path = os.path.join(root, folder) # Within each of these test folders, the json version of the test report is saved in a file which is always # called "report.json". Just like the according html file is always called "report.html" for example. report_json_path = os.path.join(folder_path, 'report.json') # We really need to check for the existence here because of the following case: A test run has been started # but is not yet complete. In this case the test folder already exists, but the report does not. In this # case attempting to open the file would cause an exception! if os.path.exists(report_json_path): with open(report_json_path, mode='r') as report_json_file: report = json.loads(report_json_file.read()) reports.append(report) break # At this point, the reports within the list are generally still out of order because they were added in # whatever order the filesystem iterated them. This means we need to sort them still. In this function this is done # by using the start time sorted_reports = sorted( reports, key=lambda r: datetime.datetime.fromisoformat(r['start_iso']), reverse=True ) # This filter hook offers the possibility to add additional, externally loaded test reports to this list or # change the order of the list etc... return CONFIG.pm.apply_filter( 'get_build_reports', sorted_reports )
[docs]def force_aspect(ax, aspect: float = 1): """ Sets the aspect ratio of the given matplotlib Axes object to the given float. :param ax: A matplotlib Axes object which describes a plot :param float aspect: The aspect ratio which to force on the graph """ x_min, x_max = ax.get_xlim() y_min, y_max = ax.get_ylim() base = abs(x_max - x_min) / abs(y_max - y_min) ax.set_aspect(aspect * base)
# CLASSES # =======
[docs]class AbstractRichOutput(ABC): # TO BE IMPLEMENTED # -----------------
[docs] @abstractmethod def to_markdown(self) -> str: raise NotImplementedError()
[docs] @abstractmethod def to_latex(self) -> str: raise NotImplementedError()
[docs] @abstractmethod def to_string(self) -> str: raise NotImplementedError()
[docs] @abstractmethod def to_html(self) -> str: raise NotImplementedError()
[docs] @abstractmethod def to_dict(self) -> dict: raise NotImplementedError()
[docs]class HTMLTemplateMixin(object): """ This mixin can be used to provide a default implementation of the "to_html" method for the AbstractRichOutput interface. This implementation uses a jinja template string to render the html version of the object instance. **USAGE** Any class which inherits from this mixin has to define a static class member called "HTML_TEMPLATE" which is supposed to be the string representation of the jinja template to be rendered for the HTML representation. Upon calling the "to_html" method, this template will be passed a context where "self" references to the actual object instance, which allows for all the instance attributes to be used in the template. Within the template string all jinja syntax can be used. In fact, within the template the ufotest jinja environment is loaded, which means that there is even access to the global context variable "config". .. code-block:: python class Custom(HTMLTemplateMixin): HTML_TEMPLATE = "<p>{{ self.text }}</p>" def __init__(self): HTMLTemplateMixin.__init__(self) self.text = "Hello World" c = Custom() c.to_html() # <p>Hello World</p> **RECOMPILING** Aside from this main functionality, the more important feature is that this mixin also enables the inheriting class to be recompilable from the JSON / dict representation. For that, we first define this use case: Assuming we have saved an objects attributes as a json file and now load it again such that all the attribute values are part of a dict. Now we want to render the HTML template using this dict representation. For this case, the mixin adds it's own implementation of "to_dict" which is used to convert it into json in the first place. This method saves the template string as a json field. The static method "html_from_dict" can now be used on such a dict to render the html template from it, without needing an actual object instance. .. code-block:: python class Custom(HTMLTemplateMixin): HTML_TEMPLATE = "<p>{{ self.text }}</p>" def __init__(self): HTMLTemplateMixin.__init__(self) self.text = "Hello World" def to_dict(self): # It is important that the mixins implementation of to_dict is considered here! return { **HTMLTemplateMixin.to_dict(self), # It is also important that all the dict fields have the same name as the instance attributes 'text': self.text } c = Custom() c_dict = c.to_dict() c_dict['text'] = 'Bye World' HTMLTemplateMixin.html_from_dict(c_dict) # <p>Bye World</p> """ TEMPLATE_FIELD_NAME = '_html_template' """ :cvar TEMPLATE_FIELD_NAME: This is a string which contains the key name under which the html template is being saved within the dict representation of the object """
[docs] def get_html_template_string(self) -> str: """ Returns the jinja html template string of the object instance. **DESIGN CHOICE** Why does this method exists? On first glance it is useless, because it only returns a static value. At any position where this method is used, one could just use the actual value instead. This is correct, but this method exists for possible future changes. It wraps access to the template string, which means that the actual access to this template string can easily be changed. In the future it would be possible to replace the class constant with a object attribute for the template string rather easily if that is desired. More conveniently, having this method allows a subclass to potentially overwrite it with very custom behavior! :returns str: The template string """ template_string = self.HTML_TEMPLATE return template_string
[docs] def to_html(self) -> str: """ Returns the HTML string representation of the object which is based on the jinja html template. :returns str: The rendered html string """ CONFIG.template_environment.filters['html_from_dict'] = self.html_from_dict template_string = self.get_html_template_string() template = CONFIG.template_environment.from_string(template_string) return template.render({'this': self})
[docs] def to_dict(self) -> dict: """ Returns the dict representation which contains the important fields required for the HTMLTemplateMixin functionality. :returns dict: """ template_string = self.get_html_template_string() return {self.TEMPLATE_FIELD_NAME: template_string}
[docs] @classmethod def html_from_dict(cls, data: dict) -> str: """ Given a "data" dict, which was created by the "to_dict" method of a class which implements this mixin, this method will use the template string which is saved in this dict and the other fields which represent the original instance attributes to render and return the appropriate html representation. :raises KeyError: If the given "data" dict does not actually originate from a class which implements this mixin which is indicated by the fact that it wont contain the necessary field for the template string. :returns str: The html representation according to the template string and the values contained in the dict. """ if cls.TEMPLATE_FIELD_NAME not in data: raise KeyError(( f'The subject dictionary does not contain the necessary key "{cls.TEMPLATE_FIELD_NAME}", which is ' f'required to compile it as a html string! Make sure that the dictionary was created by invoking the ' f'"to_dict" method of a class inheriting from {cls.__class__.__name__}!' )) template_string = data[cls.TEMPLATE_FIELD_NAME] CONFIG.template_environment.filters['html_from_dict'] = cls.html_from_dict template = CONFIG.template_environment.from_string(template_string) return template.render({'this': data})