# Copyright lowRISC contributors. # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 r""" Class describing simulation configuration object """ import os import shutil import subprocess import sys from collections import OrderedDict import logging as log from tabulate import tabulate from Deploy import CompileSim, CovAnalyze, CovMerge, CovReport, RunTest, Deploy from FlowCfg import FlowCfg from Modes import BuildModes, Modes, Regressions, RunModes, Tests from testplanner import testplan_utils from utils import VERBOSE, find_and_substitute_wildcards def pick_dump_format(fmts): '''Choose a supported wave dumping format fmts is a list of formats that the chosen tool supports. Return the first that we think is possible (e.g. not fsdb if Verdi is not installed). ''' assert fmts fmt = fmts[0] if fmt == 'fsdb' and not shutil.which('verdi'): return pick_dump_format(fmts[1:]) return fmt def resolve_dump_format(tool, dump): '''Decide on the correct dumping format This is called after reading the config file. tool is the chosen tool, which will always have been resolved by this point. waves is a boolean which determines whether waves should be dumped at all (from the --waves argument). dump is the dumping format chosen on the command line or None. ''' assert tool is not None SUPPORTED_DUMP_FMTS = { 'vcs': ['fsdb', 'vpd'], 'xcelium': ['fsdb', 'shm', 'vpd'] } # Look up which dumping formats the tool supports fmts = SUPPORTED_DUMP_FMTS.get(tool) if dump is not None: # If the user has specified their preferred dumping format, use it. As # a sanity check, error out if the chosen tool doesn't support the # format, but only if we know about the tool. If not, we'll just assume # they know what they're doing. if fmts is not None and dump not in fmts: log.error('Chosen tool ({}) does not support wave ' 'dumping format {!r}.' .format(tool, dump)) sys.exit(1) return dump # If the user hasn't specified a dumping format, but has asked for waves, # we need to decide on a format for them. If fmts is None, we don't know # about this tool. Maybe it's a new simulator, in which case, default to # VPD and hope for the best. if not fmts: return 'vpd' return pick_dump_format(fmts) class SimCfg(FlowCfg): """Simulation configuration object A simulation configuration class holds key information required for building a DV regression framework. """ def __init__(self, flow_cfg_file, proj_root, args): super().__init__(flow_cfg_file, proj_root, args) # Options set from command line self.tool = args.tool self.build_opts = [] self.build_opts.extend(args.build_opts) self.en_build_modes = args.build_modes.copy() self.run_opts = [] self.run_opts.extend(args.run_opts) self.en_run_modes = [] self.en_run_modes.extend(args.run_modes) self.build_unique = args.build_unique self.build_only = args.build_only self.run_only = args.run_only self.reseed_ovrd = args.reseed self.reseed_multiplier = args.reseed_multiplier self.waves = args.waves self.max_waves = args.max_waves self.cov = args.cov self.cov_merge_previous = args.cov_merge_previous self.profile = args.profile or '(cfg uses profile without --profile)' self.xprop_off = args.xprop_off self.no_rerun = args.no_rerun self.verbosity = "{" + args.verbosity + "}" self.verbose = args.verbose self.dry_run = args.dry_run self.map_full_testplan = args.map_full_testplan # Disable cov if --build-only is passed. if self.build_only: self.cov = False # Set default sim modes for unpacking if self.waves is True: self.en_build_modes.append("waves") if self.cov is True: self.en_build_modes.append("cov") if args.profile is not None: self.en_build_modes.append("profile") if self.xprop_off is not True: self.en_build_modes.append("xprop") # Options built from cfg_file files self.project = "" self.flow = "" self.flow_makefile = "" self.build_dir = "" self.run_dir = "" self.sw_build_dir = "" self.pass_patterns = [] self.fail_patterns = [] self.name = "" self.dut = "" self.tb = "" self.testplan = "" self.fusesoc_core = "" self.ral_spec = "" self.build_modes = [] self.run_modes = [] self.regressions = [] # Options from tools - for building and running tests self.build_cmd = "" self.flist_gen_cmd = "" self.flist_gen_opts = [] self.flist_file = "" self.run_cmd = "" # Generated data structures self.links = {} self.build_list = [] self.run_list = [] self.cov_merge_deploy = None self.cov_report_deploy = None self.results_summary = OrderedDict() # If is_master_cfg is set, then each cfg will have its own cov_deploy. # Maintain an array of those in cov_deploys. self.cov_deploys = [] # Parse the cfg_file file tree self.parse_flow_cfg(flow_cfg_file) self._post_parse_flow_cfg() # Choose a dump format now. Note that this has to happen after parsing # the configuration format because our choice might depend on the # chosen tool. self.dump_fmt = (resolve_dump_format(self.tool, args.dump) if self.waves else 'none') # If build_unique is set, then add current timestamp to uniquify it if self.build_unique: self.build_dir += "_" + self.timestamp # Process overrides before substituting the wildcards. self._process_overrides() # Make substitutions, while ignoring the following wildcards # TODO: Find a way to set these in sim cfg instead ignored_wildcards = [ "build_mode", "index", "test", "seed", "uvm_test", "uvm_test_seq", "cov_db_dirs", "sw_test", "sw_test_is_prebuilt", "sw_build_device" ] self.__dict__ = find_and_substitute_wildcards(self.__dict__, self.__dict__, ignored_wildcards, self.is_master_cfg) # Set the title for simulation results. self.results_title = self.name.upper() + " Simulation Results" # Stuff below only pertains to individual cfg (not master cfg) # or individual selected cfgs (if select_cfgs is configured via command line) # TODO: find a better way to support select_cfgs if not self.is_master_cfg and (not self.select_cfgs or self.name in self.select_cfgs): # If self.tool is None at this point, there was no --tool argument on # the command line, and there is no default tool set in the config # file. That's ok if this is a master config (where the # sub-configurations can choose tools themselves), but not otherwise. if self.tool is None: log.error('Config file does not specify a default tool, ' 'and there was no --tool argument on the command line.') sys.exit(1) # Print info: log.info("[scratch_dir]: [%s]: [%s]", self.name, self.scratch_path) # Set directories with links for ease of debug / triage. self.links = { "D": self.scratch_path + "/" + "dispatched", "P": self.scratch_path + "/" + "passed", "F": self.scratch_path + "/" + "failed", "K": self.scratch_path + "/" + "killed" } # Use the default build mode for tests that do not specify it if not hasattr(self, "build_mode"): self.build_mode = 'default' self._process_exports() # Create objects from raw dicts - build_modes, sim_modes, run_modes, # tests and regressions, only if not a master cfg obj self._create_objects() # Post init checks self.__post_init__() def __post_init__(self): # Run some post init checks super().__post_init__() def kill(self): '''kill running processes and jobs gracefully ''' super().kill() for item in self.cov_deploys: item.kill() # Purge the output directories. This operates on self. def _purge(self): if self.scratch_path: try: log.info("Purging scratch path %s", self.scratch_path) os.system("/bin/rm -rf " + self.scratch_path) except IOError: log.error('Failed to purge scratch directory %s', self.scratch_path) def _create_objects(self): # Create build and run modes objects self.build_modes = Modes.create_modes(BuildModes, self.build_modes) self.run_modes = Modes.create_modes(RunModes, self.run_modes) # Walk through build modes enabled on the CLI and append the opts for en_build_mode in self.en_build_modes: build_mode_obj = Modes.find_mode(en_build_mode, self.build_modes) if build_mode_obj is not None: self.build_opts.extend(build_mode_obj.build_opts) self.run_opts.extend(build_mode_obj.run_opts) else: log.error( "Mode \"%s\" enabled on the the command line is not defined", en_build_mode) sys.exit(1) # Walk through run modes enabled on the CLI and append the opts for en_run_mode in self.en_run_modes: run_mode_obj = Modes.find_mode(en_run_mode, self.run_modes) if run_mode_obj is not None: self.run_opts.extend(run_mode_obj.run_opts) else: log.error( "Mode \"%s\" enabled on the the command line is not defined", en_run_mode) sys.exit(1) # Create tests from given list of items self.tests = Tests.create_tests(self.tests, self) # Regressions # Parse testplan if provided. if self.testplan != "": self.testplan = testplan_utils.parse_testplan(self.testplan) # Extract tests in each milestone and add them as regression target. self.regressions.extend(self.testplan.get_milestone_regressions()) # Create regressions self.regressions = Regressions.create_regressions(self.regressions, self, self.tests) def _print_list(self): for list_item in self.list_items: log.info("---- List of %s in %s ----", list_item, self.name) if hasattr(self, list_item): items = getattr(self, list_item) for item in items: log.info(item) else: log.error("Item %s does not exist!", list_item) def _create_build_and_run_list(self): # Walk through the list of items to run and create the build and run # objects. # Allow multiple regressions to run as long as the do not enable # sim_modes or run_modes def get_overlapping_tests(tests, run_list_names): overlapping_tests = [] for test in tests: if test.name in run_list_names: overlapping_tests.append(test) return overlapping_tests def prune_items(items, marked_items): pruned_items = [] for item in items: if item not in marked_items: pruned_items.append(item) return pruned_items # Check if there are items to run if self.items == []: log.error( "No items provided for running this simulation / regression") sys.exit(1) items_list = self.items run_list_names = [] marked_items = [] # Process regressions first for regression in self.regressions: if regression.name in items_list: overlapping_tests = get_overlapping_tests( regression.tests, run_list_names) if overlapping_tests != []: log.error( "Regression \"%s\" added for run contains tests that overlap with " "other regressions added. This can result in conflicting " "build / run_opts to be set causing unexpected results.", regression.name) sys.exit(1) self.run_list.extend(regression.tests) # Merge regression's build and run opts with its tests and their # build_modes regression.merge_regression_opts() run_list_names.extend(regression.test_names) marked_items.append(regression.name) items_list = prune_items(items_list, marked_items) # Process individual tests for test in self.tests: if test.name in items_list: overlapping_tests = get_overlapping_tests([test], run_list_names) if overlapping_tests == []: self.run_list.append(test) run_list_names.append(test.name) marked_items.append(test.name) items_list = prune_items(items_list, marked_items) # Merge the global build and run opts Tests.merge_global_opts(self.run_list, self.build_opts, self.run_opts) # Check if all items have been processed if items_list != []: log.error( "The items %s added for run were not found in \n%s!\n " "Use the --list switch to see a list of available " "tests / regressions.", items_list, self.flow_cfg_file) # Process reseed override and create the build_list build_list_names = [] for test in self.run_list: # Override reseed if available. if self.reseed_ovrd is not None: test.reseed = self.reseed_ovrd # Apply reseed multiplier if set on the command line. test.reseed *= self.reseed_multiplier # Create the unique set of builds needed. if test.build_mode.name not in build_list_names: self.build_list.append(test.build_mode) build_list_names.append(test.build_mode.name) def _create_dirs(self): '''Create initial set of directories ''' # Invoking system calls has a performance penalty. # Construct a single command line chained with '&&' to invoke # the system call only once, rather than multiple times. create_link_dirs_cmd = "" for link in self.links.keys(): create_link_dirs_cmd += "/bin/rm -rf " + self.links[link] + " && " create_link_dirs_cmd += "mkdir -p " + self.links[link] + " && " create_link_dirs_cmd += " true" try: os.system(create_link_dirs_cmd) except IOError: log.error("Error running when running the cmd \"%s\"", create_link_dirs_cmd) sys.exit(1) def _expand_run_list(self, build_map): '''Generate a list of tests to be run For each test in tests, we add it test.reseed times. The ordering is interleaved so that we run through all of the tests as soon as possible. If there are multiple tests and they have different reseed values, they are "fully interleaved" at the start (so if there are tests A, B with reseed values of 5 and 2, respectively, then the list will be ABABAAA). build_map is a dictionary from build name to a CompileSim object. Each test is added to the CompileSim item that it depends on (signifying that the test should be built once the build on which it depends is done). cfg is a SimCfg object, passed to the RunTest constructor. ''' tagged = [] for test in self.run_list: for idx in range(test.reseed): tagged.append((idx, test, RunTest(idx, test, self))) # Stably sort the tagged list by the 1st coordinate tagged.sort(key=lambda x: x[0]) # Now iterate over it again, adding tests to build_map (in the # interleaved order) and collecting up the RunTest objects. runs = [] for _, test, run in tagged: build_map[test.build_mode].sub.append(run) runs.append(run) return runs def _create_deploy_objects(self): '''Create deploy objects from the build and run lists. ''' # Create the build and run list first self._create_build_and_run_list() builds = [] build_map = {} for build in self.build_list: item = CompileSim(build, self) builds.append(item) build_map[build] = item self.builds = builds self.runs = ([] if self.build_only else self._expand_run_list(build_map)) if self.run_only is True: self.deploy = self.runs else: self.deploy = builds # Create cov_merge and cov_report objects if self.cov: self.cov_merge_deploy = CovMerge(self) self.cov_report_deploy = CovReport(self) # Generate reports only if merge was successful; add it as a dependency # of merge. self.cov_merge_deploy.sub.append(self.cov_report_deploy) # Create initial set of directories before kicking off the regression. self._create_dirs() def create_deploy_objects(self): '''Public facing API for _create_deploy_objects(). ''' super().create_deploy_objects() # Also, create cov_deploys if self.cov: for item in self.cfgs: if item.cov: self.cov_deploys.append(item.cov_merge_deploy) # deploy additional commands as needed. We do this separated for coverage # since that needs to happen at the end. def deploy_objects(self): '''This is a public facing API, so we use "self.cfgs" instead of self. ''' # Invoke the base class method to run the regression. super().deploy_objects() # If coverage is enabled, then deploy the coverage tasks. if self.cov: Deploy.deploy(self.cov_deploys) def _cov_analyze(self): '''Use the last regression coverage data to open up the GUI tool to analyze the coverage. ''' cov_analyze_deploy = CovAnalyze(self) self.deploy = [cov_analyze_deploy] def cov_analyze(self): '''Public facing API for analyzing coverage. ''' for item in self.cfgs: item._cov_analyze() def _gen_results(self): ''' The function is called after the regression has completed. It collates the status of all run targets and generates a dict. It parses the testplan and maps the generated result to the testplan entries to generate a final table (list). It also prints the full list of failures for debug / triage. If cov is enabled, then the summary coverage report is also generated. The final result is in markdown format. ''' # TODO: add support for html def retrieve_result(name, results): for item in results: if name == item["name"]: return item return None def gen_results_sub(items, results, fail_msgs): ''' Generate the results table from the test runs (builds are ignored). The table has 3 columns - name, passing and total as a list of dicts. This is populated for all tests. The number of passing and total is in reference to the number of iterations or reseeds for that test. This list of dicts is directly consumed by the Testplan::results_table method for testplan mapping / annotation. ''' for item in items: if item.status == "F": fail_msgs += item.fail_msg # Generate results table for runs. if item.target == "run": result = retrieve_result(item.name, results) if result is None: result = {"name": item.name, "passing": 0, "total": 0} results.append(result) if item.status == "P": result["passing"] += 1 result["total"] += 1 (results, fail_msgs) = gen_results_sub(item.sub, results, fail_msgs) return (results, fail_msgs) regr_results = [] fail_msgs = "" deployed_items = self.deploy if self.cov: deployed_items.append(self.cov_merge_deploy) (regr_results, fail_msgs) = gen_results_sub(deployed_items, regr_results, fail_msgs) # Add title if there are indeed failures if fail_msgs != "": fail_msgs = "\n## List of Failures\n" + fail_msgs self.errors_seen = True # Generate results table for runs. results_str = "## " + self.results_title + "\n" results_str += "### " + self.timestamp_long + "\n" # Add path to testplan. if hasattr(self, "testplan_doc_path"): testplan = "https://" + self.doc_server + '/' + self.testplan_doc_path else: testplan = "https://" + self.doc_server + '/' + self.rel_path testplan = testplan.replace("/dv", "/doc/dv_plan/#testplan") results_str += "### [Testplan](" + testplan + ")\n" results_str += "### Simulator: " + self.tool.upper() + "\n\n" if regr_results == []: results_str += "No results to display.\n" else: # TODO: check if testplan is not null? # Map regr results to the testplan entries. results_str += self.testplan.results_table( regr_results=regr_results, map_full_testplan=self.map_full_testplan) results_str += "\n" self.results_summary = self.testplan.results_summary # Append coverage results of coverage was enabled. if self.cov: if self.cov_report_deploy.status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. if hasattr(self, "cov_report_page"): results_str += "\n### [Coverage Dashboard]" results_str += "({})\n\n".format(self.cov_report_page) results_str += self.cov_report_deploy.cov_results self.results_summary[ "Coverage"] = self.cov_report_deploy.cov_total else: self.results_summary["Coverage"] = "--" # append link of detail result to block name self.results_summary["Name"] = self._get_results_page_link( self.results_summary["Name"]) # Append failures for triage self.results_md = results_str + fail_msgs results_str += fail_msgs # Write results to the scratch area results_file = self.scratch_path + "/results_" + self.timestamp + ".md" f = open(results_file, 'w') f.write(self.results_md) f.close() # Return only the tables log.info("[results page]: [%s] [%s]", self.name, results_file) return results_str def gen_results_summary(self): # sim summary result has 5 columns from each SimCfg.results_summary header = ["Name", "Passing", "Total", "Pass Rate"] if self.cov: header.append('Coverage') table = [header] colalign = ("center", ) * len(header) for item in self.cfgs: row = [] for title in item.results_summary: row.append(item.results_summary[title]) if row == []: continue table.append(row) self.results_summary_md = "## " + self.results_title + " (Summary)\n" self.results_summary_md += "### " + self.timestamp_long + "\n" self.results_summary_md += tabulate(table, headers="firstrow", tablefmt="pipe", colalign=colalign) print(self.results_summary_md) return self.results_summary_md def _publish_results(self): '''Publish coverage results to the opentitan web server.''' super()._publish_results() if self.cov: results_server_dir_url = self.results_server_dir.replace( self.results_server_prefix, self.results_server_url_prefix) log.info("Publishing coverage results to %s", results_server_dir_url) cmd = (self.results_server_cmd + " -m cp -R " + self.cov_report_deploy.cov_report_dir + " " + self.results_server_dir) try: cmd_output = subprocess.run(args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) log.log(VERBOSE, cmd_output.stdout.decode("utf-8")) except Exception as e: log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))