From f7d6238731dab62c7510f0f61b0807e6f8308ba7 Mon Sep 17 00:00:00 2001 From: tmeissner Date: Sat, 13 Jan 2024 10:33:26 +0100 Subject: [PATCH] Add AES test which uses pyuvm --- pyuvm_tests/Coverage.py | 46 ++++++++ pyuvm_tests/Makefile | 51 +++++++++ pyuvm_tests/VaiBfm.py | 118 ++++++++++++++++++++ pyuvm_tests/tb_aes.py | 232 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 447 insertions(+) create mode 100644 pyuvm_tests/Coverage.py create mode 100644 pyuvm_tests/Makefile create mode 100644 pyuvm_tests/VaiBfm.py create mode 100644 pyuvm_tests/tb_aes.py diff --git a/pyuvm_tests/Coverage.py b/pyuvm_tests/Coverage.py new file mode 100644 index 0000000..b7aac64 --- /dev/null +++ b/pyuvm_tests/Coverage.py @@ -0,0 +1,46 @@ +import vsc + + +# Random stimuli model class +@vsc.randobj +class constraints(): + def __init__(self): + self.key = vsc.rand_bit_t(128) + self.data = vsc.rand_bit_t(128) + + @vsc.constraint + def c(self): + self.data >= 0 and self.data <= 2**128-1 + vsc.dist(self.key, [ + vsc.weight(0, 15), + vsc.weight((1,2**128-2), 70), + vsc.weight((2**128-1), 15)]) + + +# Stimuli covergroup +@vsc.covergroup +class covergroup(): + def __init__(self, name="bla"): + self.options.name = name + self.with_sample( + mode = vsc.bit_t(1), + key = vsc.bit_t(128) + ) + + self.enc = vsc.coverpoint(self.mode, bins=dict( + enc = vsc.bin(0))) + + self.dec = vsc.coverpoint(self.mode, bins=dict( + dec = vsc.bin(1))) + + self.key0 = vsc.coverpoint(self.key, bins=dict( + key0 = vsc.bin(0))) + + self.keyF = vsc.coverpoint(self.key, bins=dict( + keyF = vsc.bin(2**128-1))) + + self.encXkey0 = vsc.cross([self.enc, self.key0]) + self.encXkeyF = vsc.cross([self.enc, self.keyF]) + + self.decXkey0 = vsc.cross([self.dec, self.key0]) + self.decXkeyF = vsc.cross([self.dec, self.keyF]) diff --git a/pyuvm_tests/Makefile b/pyuvm_tests/Makefile new file mode 100644 index 0000000..2e0f1e4 --- /dev/null +++ b/pyuvm_tests/Makefile @@ -0,0 +1,51 @@ +# Default test +DUT ?= aes + +# Path to ext deps +EXT := ../ext + +ifeq (${DUT}, wishbone) + TOPLEVEL := wishboneslavee + SIM_ARGS := -gSimulation=true \ + -gAddressWidth=8 \ + -gDataWidth=16 +else + TOPLEVEL := ${DUT} +endif + +# Cocotb related +MODULE := tb_${DUT} +COCOTB_LOG_LEVEL := DEBUG +CUSTOM_COMPILE_DEPS := results +COCOTB_RESULTS_FILE := results/${MODULE}.xml + +# Simulator & RTL related +SIM ?= ghdl +TOPLEVEL_LANG := vhdl +VHDL_SOURCES_libvhdl := ${EXT}/libvhdl/common/UtilsP.vhd +VHDL_SOURCES := ${EXT}/libvhdl/syn/* \ + ${EXT}/cryptocores/aes/rtl/vhdl/*.vhd +SIM_BUILD := build + +ifeq (${SIM}, ghdl) + COMPILE_ARGS := --std=08 + SIM_ARGS += \ + --wave=results/${MODULE}.ghw \ + --psl-report=results/${MODULE}_psl.json \ + --vpi-trace=results/${MODULE}_vpi.log +else + EXTRA_ARGS := --std=08 + VHDL_LIB_ORDER := libvhdl +endif + + +include $(shell cocotb-config --makefiles)/Makefile.sim + + +results: + mkdir -p results + + +.PHONY: clean +clean:: + rm -rf *.o __pycache__ uarttx uartrx wishboneslavee aes results $(SIM_BUILD) diff --git a/pyuvm_tests/VaiBfm.py b/pyuvm_tests/VaiBfm.py new file mode 100644 index 0000000..729bd54 --- /dev/null +++ b/pyuvm_tests/VaiBfm.py @@ -0,0 +1,118 @@ +import cocotb +from cocotb.triggers import FallingEdge, RisingEdge, Timer +from cocotb.queue import QueueEmpty, Queue +from cocotb.clock import Clock +import logging +import enum +import pyuvm + + +# Logger setup +logging.basicConfig(level=logging.NOTSET) +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + + +# AES mode enum +@enum.unique +class Mode(enum.IntEnum): + Encrypt = 0 + Decrypt = 1 + + +# VAI BFM with queues for +class VaiBfm(metaclass=pyuvm.Singleton): + """Valid-Accept Bfm""" + + def __init__(self): + self.log = logging.getLogger() + self.log.info("Valid-accept BFM") + self.log.info(" Copyright (c) 2022 Torsten Meissner") + self.dut = cocotb.top + self.driver_queue = Queue(maxsize=1) + self.in_monitor_queue = Queue(maxsize=0) + self.out_monitor_queue = Queue(maxsize=0) + self.clock = Clock(self.dut.clk_i, 10, units="ns") # Create a 10 ns period clock + cocotb.start_soon(self.clock.start()) + + # Reset coroutine + async def reset(self): + self.dut.reset_i.value = 0 + self.dut.valid_i.value = 0 + self.dut.mode_i.value = 0 + self.dut.key_i.value = 0 + self.dut.data_i.value = 0 + self.dut.accept_i.value = 0 + await Timer(100, units="ns") + self.dut.reset_i.value = 1 + + # VAI input driver + async def __driver(self): + self.dut.valid_i.value = 0 + self.dut.key_i.value = 0 + self.dut.data_i.value = 0 + while True: + await RisingEdge(self.dut.clk_i) + if not self.dut.valid_i.value: + try: + (mode, key, data) = self.driver_queue.get_nowait() + self.dut.mode_i.value = mode + self.dut.key_i.value = key + self.dut.data_i.value = data + self.dut.valid_i.value = 1 + except QueueEmpty: + continue + else: + if self.dut.accept_o.value: + self.dut.valid_i.value = 0 + + # VAI output receiver + # We ignore data out, we use the output monitor instead + async def __receiver(self): + self.dut.accept_i.value = 0 + while True: + await RisingEdge(self.dut.clk_i) + if self.dut.valid_o.value and not self.dut.accept_i.value: + self.dut.accept_i.value = 1 + else: + self.dut.accept_i.value = 0 + + # VAI input monitor + async def __in_monitor(self): + while True: + await RisingEdge(self.dut.clk_i) + if self.dut.valid_i.value and self.dut.accept_o.value: + in_tuple = (self.dut.mode_i.value, + self.dut.key_i.value, + self.dut.data_i.value) + self.in_monitor_queue.put_nowait(in_tuple) + + # VAI output monitor + async def __out_monitor(self): + while True: + await RisingEdge(self.dut.clk_i) + if self.dut.valid_o.value and self.dut.accept_i.value: + out_data = self.dut.data_o.value + self.out_monitor_queue.put_nowait(out_data) + + + # Launching the coroutines using start_soon + def start_tasks(self): + cocotb.start_soon(self.__driver()) + cocotb.start_soon(self.__receiver()) + cocotb.start_soon(self.__in_monitor()) + cocotb.start_soon(self.__out_monitor()) + + # The get_input() coroutine returns the next VAI input + async def get_input(self): + data = await self.in_monitor_queue.get() + return data + + # The get_output() coroutine returns the next VAI output + async def get_output(self): + data = await self.out_monitor_queue.get() + return data + + # send_op puts the VAI input operation into the driver queue + async def send_op(self, mode, key, data): + await self.driver_queue.put((mode, key, data)) \ No newline at end of file diff --git a/pyuvm_tests/tb_aes.py b/pyuvm_tests/tb_aes.py new file mode 100644 index 0000000..35a0afb --- /dev/null +++ b/pyuvm_tests/tb_aes.py @@ -0,0 +1,232 @@ +from cocotb.queue import Queue +from cocotb.triggers import RisingEdge, Timer, Combine +from pyuvm import * +import cocotb +import pyuvm +import vsc +from vsc import get_coverage_report +from VaiBfm import VaiBfm, Mode +from Coverage import constraints, covergroup +from Crypto.Cipher import AES + + + +@pyuvm.test() +class AesTest(uvm_test): + def build_phase(self): + self.env = AesEnv("env", self) + + def end_of_elaboration_phase(self): + self.test_all = TestAllSeq.create("test_all") + + async def run_phase(self): + self.raise_objection() + await self.test_all.start() + self.drop_objection() + + +@pyuvm.test() +class ParallelTest(AesTest): + def end_of_elaboration_phase(self): + uvm_factory().set_type_override_by_type(TestAllSeq, TestAllParallelSeq) + return super().end_of_elaboration_phase() + + +# Virtual sequence that starts other sequences +class TestAllSeq(uvm_sequence): + + async def body(self): + # get the sequencer handle + seqr = ConfigDB().get(None, "", "SEQR") + enc_rand_seq = EncRandSeq("enc_random") + dec_rand_seq = DecRandSeq("dec_random") + await enc_rand_seq.start(seqr) + await dec_rand_seq.start(seqr) + + +# Running encryption and decryption sequences in parallel +class TestAllParallelSeq(uvm_sequence): + + async def body(self): + seqr = ConfigDB().get(None, "", "SEQR") + enc_rand_seq = EncRandSeq("enc_random") + dec_rand_seq = DecRandSeq("dec_random") + enc_rand_task = cocotb.start_soon(enc_rand_seq.start(seqr)) + dec_rand_task = cocotb.start_soon(dec_rand_seq.start(seqr)) + await Combine(enc_rand_task, dec_rand_task) + + +# Sequence item which holds the stimuli for one operation +class AesSeqItem(uvm_sequence_item): + + def __init__(self, name, mode, key, data): + super().__init__(name) + self.mode = mode + self.key = key + self.data = data + + def __eq__(self, other): + same = self.mode == other.mode and self.key == other.key and self.data == other.data + return same + + def __str__(self): + return f"{self.get_name()} : Mode: 0b{self.mode:01x} \ + Key: 0x{self.key:016x} Data: 0x{self.data:016x}" + + +# Abstract basis sequence class +# set_operands() has to be implemented by class that inherits from this class +class BaseSeq(uvm_sequence): + + async def body(self): + self.cr = constraints() + for _ in range(20): + aes_tr = AesSeqItem("aes_tr", 0, 0, 0) + await self.start_item(aes_tr) + self.set_operands(aes_tr) + await self.finish_item(aes_tr) + + def set_operands(self, tr): + pass + + +# Sequence for encryption tests with random stimuli +class EncRandSeq(BaseSeq): + def set_operands(self, tr): + self.cr.randomize() + tr.mode = 0 + tr.key = self.cr.key + tr.data = self.cr.data + + +# Sequence for decryption tests with random stimuli +class DecRandSeq(BaseSeq): + def set_operands(self, tr): + self.cr.randomize() + tr.mode = 1 + tr.key = self.cr.key + tr.data = self.cr.data + + +class Driver(uvm_driver): + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + def start_of_simulation_phase(self): + self.bfm = VaiBfm() + + async def launch_tb(self): + await self.bfm.reset() + self.bfm.start_tasks() + + async def run_phase(self): + await self.launch_tb() + while True: + op = await self.seq_item_port.get_next_item() + await self.bfm.send_op(op.mode, op.key, op.data) + result = await self.bfm.get_output() + self.ap.write(result) + self.seq_item_port.item_done() + + +class Scoreboard(uvm_component): + + def build_phase(self): + self.input_fifo = uvm_tlm_analysis_fifo("input_fifo", self) + self.output_fifo = uvm_tlm_analysis_fifo("output_fifo", self) + self.input_get_port = uvm_get_port("input_get_port", self) + self.output_get_port = uvm_get_port("output_get_port", self) + self.input_export = self.input_fifo.analysis_export + self.output_export = self.output_fifo.analysis_export + self.passed = True + + def connect_phase(self): + self.input_get_port.connect(self.input_fifo.get_export) + self.output_get_port.connect(self.output_fifo.get_export) + + def check_phase(self): + while self.output_get_port.can_get(): + _, result = self.output_get_port.try_get() + op_success, op = self.input_get_port.try_get() + if not op_success: + self.logger.critical(f"result {result} had no input operation") + else: + (mode, key, data) = op + aes = AES.new(key.buff, AES.MODE_ECB) + if not mode: + reference = aes.encrypt(data.buff) + else: + reference = aes.decrypt(data.buff) + if result.buff == reference: + self.logger.info(f"PASSED: {Mode(mode).name} {data.hex()} with key " + f"{key.hex()} = {result.hex()}") + else: + self.logger.error(f"FAILED: {Mode(mode).name} {data.hex()} with key " + f"{key.hex()} = 0x{result.hex()}, " + f"expected {reference.hex()}") + self.passed = False + + def report_phase(self): + assert self.passed, "Test failed" + + +class Monitor(uvm_component): + def __init__(self, name, parent, method_name): + super().__init__(name, parent) + self.bfm = VaiBfm() + self.get_method = getattr(self.bfm, method_name) + + def build_phase(self): + self.ap = uvm_analysis_port("ap", self) + + async def run_phase(self): + while True: + datum = await self.get_method() + self.logger.debug(f"MONITORED {datum}") + self.ap.write(datum) + + +# Coverage collector and checker +class Coverage(uvm_subscriber): + + def start_of_simulation_phase(self): + self.cg = covergroup() + try: + self.disable_errors = ConfigDB().get( + self, "", "DISABLE_COVERAGE_ERRORS") + except UVMConfigItemNotFound: + self.disable_errors = False + + def write(self, data): + (mode, key, _) = data + self.cg.sample(mode, key) + + def report_phase(self): + if not self.disable_errors: + if self.cg.get_coverage() != 100.0: + self.logger.warning( + f"Functional coverage incomplete.") + else: + self.logger.info("Covered all operations") + with open('results/tb_aes_fcover.txt', 'a', encoding='utf-8') as f: + f.write(get_coverage_report(details=True)) + vsc.write_coverage_db('results/tb_aes_fcover.xml') + + +# AES test bench environment +# Creates instances of components and connects them +class AesEnv(uvm_env): + + def build_phase(self): + self.seqr = uvm_sequencer("seqr", self) + ConfigDB().set(None, "*", "SEQR", self.seqr) + self.driver = Driver.create("driver", self) + self.input_mon = Monitor("input_mon", self, "get_input") + self.coverage = Coverage("coverage", self) + self.scoreboard = Scoreboard("scoreboard", self) + + def connect_phase(self): + self.driver.seq_item_port.connect(self.seqr.seq_item_export) + self.input_mon.ap.connect(self.scoreboard.input_export) + self.input_mon.ap.connect(self.coverage.analysis_export) + self.driver.ap.connect(self.scoreboard.output_export)