Browse Source

Add AES test which uses pyuvm

master
T. Meissner 10 months ago
parent
commit
f7d6238731
4 changed files with 447 additions and 0 deletions
  1. +46
    -0
      pyuvm_tests/Coverage.py
  2. +51
    -0
      pyuvm_tests/Makefile
  3. +118
    -0
      pyuvm_tests/VaiBfm.py
  4. +232
    -0
      pyuvm_tests/tb_aes.py

+ 46
- 0
pyuvm_tests/Coverage.py View File

@ -0,0 +1,46 @@
import vsc
# Random stimuli model class
@vsc.randobj
class constraints():
def __init__(self):
self.key = vsc.rand_bit_t(128)
self.data = vsc.rand_bit_t(128)
@vsc.constraint
def c(self):
self.data >= 0 and self.data <= 2**128-1
vsc.dist(self.key, [
vsc.weight(0, 15),
vsc.weight((1,2**128-2), 70),
vsc.weight((2**128-1), 15)])
# Stimuli covergroup
@vsc.covergroup
class covergroup():
def __init__(self, name="bla"):
self.options.name = name
self.with_sample(
mode = vsc.bit_t(1),
key = vsc.bit_t(128)
)
self.enc = vsc.coverpoint(self.mode, bins=dict(
enc = vsc.bin(0)))
self.dec = vsc.coverpoint(self.mode, bins=dict(
dec = vsc.bin(1)))
self.key0 = vsc.coverpoint(self.key, bins=dict(
key0 = vsc.bin(0)))
self.keyF = vsc.coverpoint(self.key, bins=dict(
keyF = vsc.bin(2**128-1)))
self.encXkey0 = vsc.cross([self.enc, self.key0])
self.encXkeyF = vsc.cross([self.enc, self.keyF])
self.decXkey0 = vsc.cross([self.dec, self.key0])
self.decXkeyF = vsc.cross([self.dec, self.keyF])

+ 51
- 0
pyuvm_tests/Makefile View File

@ -0,0 +1,51 @@
# Default test
DUT ?= aes
# Path to ext deps
EXT := ../ext
ifeq (${DUT}, wishbone)
TOPLEVEL := wishboneslavee
SIM_ARGS := -gSimulation=true \
-gAddressWidth=8 \
-gDataWidth=16
else
TOPLEVEL := ${DUT}
endif
# Cocotb related
MODULE := tb_${DUT}
COCOTB_LOG_LEVEL := DEBUG
CUSTOM_COMPILE_DEPS := results
COCOTB_RESULTS_FILE := results/${MODULE}.xml
# Simulator & RTL related
SIM ?= ghdl
TOPLEVEL_LANG := vhdl
VHDL_SOURCES_libvhdl := ${EXT}/libvhdl/common/UtilsP.vhd
VHDL_SOURCES := ${EXT}/libvhdl/syn/* \
${EXT}/cryptocores/aes/rtl/vhdl/*.vhd
SIM_BUILD := build
ifeq (${SIM}, ghdl)
COMPILE_ARGS := --std=08
SIM_ARGS += \
--wave=results/${MODULE}.ghw \
--psl-report=results/${MODULE}_psl.json \
--vpi-trace=results/${MODULE}_vpi.log
else
EXTRA_ARGS := --std=08
VHDL_LIB_ORDER := libvhdl
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
results:
mkdir -p results
.PHONY: clean
clean::
rm -rf *.o __pycache__ uarttx uartrx wishboneslavee aes results $(SIM_BUILD)

+ 118
- 0
pyuvm_tests/VaiBfm.py View File

@ -0,0 +1,118 @@
import cocotb
from cocotb.triggers import FallingEdge, RisingEdge, Timer
from cocotb.queue import QueueEmpty, Queue
from cocotb.clock import Clock
import logging
import enum
import pyuvm
# Logger setup
logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# AES mode enum
@enum.unique
class Mode(enum.IntEnum):
Encrypt = 0
Decrypt = 1
# VAI BFM with queues for
class VaiBfm(metaclass=pyuvm.Singleton):
"""Valid-Accept Bfm"""
def __init__(self):
self.log = logging.getLogger()
self.log.info("Valid-accept BFM")
self.log.info(" Copyright (c) 2022 Torsten Meissner")
self.dut = cocotb.top
self.driver_queue = Queue(maxsize=1)
self.in_monitor_queue = Queue(maxsize=0)
self.out_monitor_queue = Queue(maxsize=0)
self.clock = Clock(self.dut.clk_i, 10, units="ns") # Create a 10 ns period clock
cocotb.start_soon(self.clock.start())
# Reset coroutine
async def reset(self):
self.dut.reset_i.value = 0
self.dut.valid_i.value = 0
self.dut.mode_i.value = 0
self.dut.key_i.value = 0
self.dut.data_i.value = 0
self.dut.accept_i.value = 0
await Timer(100, units="ns")
self.dut.reset_i.value = 1
# VAI input driver
async def __driver(self):
self.dut.valid_i.value = 0
self.dut.key_i.value = 0
self.dut.data_i.value = 0
while True:
await RisingEdge(self.dut.clk_i)
if not self.dut.valid_i.value:
try:
(mode, key, data) = self.driver_queue.get_nowait()
self.dut.mode_i.value = mode
self.dut.key_i.value = key
self.dut.data_i.value = data
self.dut.valid_i.value = 1
except QueueEmpty:
continue
else:
if self.dut.accept_o.value:
self.dut.valid_i.value = 0
# VAI output receiver
# We ignore data out, we use the output monitor instead
async def __receiver(self):
self.dut.accept_i.value = 0
while True:
await RisingEdge(self.dut.clk_i)
if self.dut.valid_o.value and not self.dut.accept_i.value:
self.dut.accept_i.value = 1
else:
self.dut.accept_i.value = 0
# VAI input monitor
async def __in_monitor(self):
while True:
await RisingEdge(self.dut.clk_i)
if self.dut.valid_i.value and self.dut.accept_o.value:
in_tuple = (self.dut.mode_i.value,
self.dut.key_i.value,
self.dut.data_i.value)
self.in_monitor_queue.put_nowait(in_tuple)
# VAI output monitor
async def __out_monitor(self):
while True:
await RisingEdge(self.dut.clk_i)
if self.dut.valid_o.value and self.dut.accept_i.value:
out_data = self.dut.data_o.value
self.out_monitor_queue.put_nowait(out_data)
# Launching the coroutines using start_soon
def start_tasks(self):
cocotb.start_soon(self.__driver())
cocotb.start_soon(self.__receiver())
cocotb.start_soon(self.__in_monitor())
cocotb.start_soon(self.__out_monitor())
# The get_input() coroutine returns the next VAI input
async def get_input(self):
data = await self.in_monitor_queue.get()
return data
# The get_output() coroutine returns the next VAI output
async def get_output(self):
data = await self.out_monitor_queue.get()
return data
# send_op puts the VAI input operation into the driver queue
async def send_op(self, mode, key, data):
await self.driver_queue.put((mode, key, data))

+ 232
- 0
pyuvm_tests/tb_aes.py View File

@ -0,0 +1,232 @@
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Timer, Combine
from pyuvm import *
import cocotb
import pyuvm
import vsc
from vsc import get_coverage_report
from VaiBfm import VaiBfm, Mode
from Coverage import constraints, covergroup
from Crypto.Cipher import AES
@pyuvm.test()
class AesTest(uvm_test):
def build_phase(self):
self.env = AesEnv("env", self)
def end_of_elaboration_phase(self):
self.test_all = TestAllSeq.create("test_all")
async def run_phase(self):
self.raise_objection()
await self.test_all.start()
self.drop_objection()
@pyuvm.test()
class ParallelTest(AesTest):
def end_of_elaboration_phase(self):
uvm_factory().set_type_override_by_type(TestAllSeq, TestAllParallelSeq)
return super().end_of_elaboration_phase()
# Virtual sequence that starts other sequences
class TestAllSeq(uvm_sequence):
async def body(self):
# get the sequencer handle
seqr = ConfigDB().get(None, "", "SEQR")
enc_rand_seq = EncRandSeq("enc_random")
dec_rand_seq = DecRandSeq("dec_random")
await enc_rand_seq.start(seqr)
await dec_rand_seq.start(seqr)
# Running encryption and decryption sequences in parallel
class TestAllParallelSeq(uvm_sequence):
async def body(self):
seqr = ConfigDB().get(None, "", "SEQR")
enc_rand_seq = EncRandSeq("enc_random")
dec_rand_seq = DecRandSeq("dec_random")
enc_rand_task = cocotb.start_soon(enc_rand_seq.start(seqr))
dec_rand_task = cocotb.start_soon(dec_rand_seq.start(seqr))
await Combine(enc_rand_task, dec_rand_task)
# Sequence item which holds the stimuli for one operation
class AesSeqItem(uvm_sequence_item):
def __init__(self, name, mode, key, data):
super().__init__(name)
self.mode = mode
self.key = key
self.data = data
def __eq__(self, other):
same = self.mode == other.mode and self.key == other.key and self.data == other.data
return same
def __str__(self):
return f"{self.get_name()} : Mode: 0b{self.mode:01x} \
Key: 0x{self.key:016x} Data: 0x{self.data:016x}"
# Abstract basis sequence class
# set_operands() has to be implemented by class that inherits from this class
class BaseSeq(uvm_sequence):
async def body(self):
self.cr = constraints()
for _ in range(20):
aes_tr = AesSeqItem("aes_tr", 0, 0, 0)
await self.start_item(aes_tr)
self.set_operands(aes_tr)
await self.finish_item(aes_tr)
def set_operands(self, tr):
pass
# Sequence for encryption tests with random stimuli
class EncRandSeq(BaseSeq):
def set_operands(self, tr):
self.cr.randomize()
tr.mode = 0
tr.key = self.cr.key
tr.data = self.cr.data
# Sequence for decryption tests with random stimuli
class DecRandSeq(BaseSeq):
def set_operands(self, tr):
self.cr.randomize()
tr.mode = 1
tr.key = self.cr.key
tr.data = self.cr.data
class Driver(uvm_driver):
def build_phase(self):
self.ap = uvm_analysis_port("ap", self)
def start_of_simulation_phase(self):
self.bfm = VaiBfm()
async def launch_tb(self):
await self.bfm.reset()
self.bfm.start_tasks()
async def run_phase(self):
await self.launch_tb()
while True:
op = await self.seq_item_port.get_next_item()
await self.bfm.send_op(op.mode, op.key, op.data)
result = await self.bfm.get_output()
self.ap.write(result)
self.seq_item_port.item_done()
class Scoreboard(uvm_component):
def build_phase(self):
self.input_fifo = uvm_tlm_analysis_fifo("input_fifo", self)
self.output_fifo = uvm_tlm_analysis_fifo("output_fifo", self)
self.input_get_port = uvm_get_port("input_get_port", self)
self.output_get_port = uvm_get_port("output_get_port", self)
self.input_export = self.input_fifo.analysis_export
self.output_export = self.output_fifo.analysis_export
self.passed = True
def connect_phase(self):
self.input_get_port.connect(self.input_fifo.get_export)
self.output_get_port.connect(self.output_fifo.get_export)
def check_phase(self):
while self.output_get_port.can_get():
_, result = self.output_get_port.try_get()
op_success, op = self.input_get_port.try_get()
if not op_success:
self.logger.critical(f"result {result} had no input operation")
else:
(mode, key, data) = op
aes = AES.new(key.buff, AES.MODE_ECB)
if not mode:
reference = aes.encrypt(data.buff)
else:
reference = aes.decrypt(data.buff)
if result.buff == reference:
self.logger.info(f"PASSED: {Mode(mode).name} {data.hex()} with key "
f"{key.hex()} = {result.hex()}")
else:
self.logger.error(f"FAILED: {Mode(mode).name} {data.hex()} with key "
f"{key.hex()} = 0x{result.hex()}, "
f"expected {reference.hex()}")
self.passed = False
def report_phase(self):
assert self.passed, "Test failed"
class Monitor(uvm_component):
def __init__(self, name, parent, method_name):
super().__init__(name, parent)
self.bfm = VaiBfm()
self.get_method = getattr(self.bfm, method_name)
def build_phase(self):
self.ap = uvm_analysis_port("ap", self)
async def run_phase(self):
while True:
datum = await self.get_method()
self.logger.debug(f"MONITORED {datum}")
self.ap.write(datum)
# Coverage collector and checker
class Coverage(uvm_subscriber):
def start_of_simulation_phase(self):
self.cg = covergroup()
try:
self.disable_errors = ConfigDB().get(
self, "", "DISABLE_COVERAGE_ERRORS")
except UVMConfigItemNotFound:
self.disable_errors = False
def write(self, data):
(mode, key, _) = data
self.cg.sample(mode, key)
def report_phase(self):
if not self.disable_errors:
if self.cg.get_coverage() != 100.0:
self.logger.warning(
f"Functional coverage incomplete.")
else:
self.logger.info("Covered all operations")
with open('results/tb_aes_fcover.txt', 'a', encoding='utf-8') as f:
f.write(get_coverage_report(details=True))
vsc.write_coverage_db('results/tb_aes_fcover.xml')
# AES test bench environment
# Creates instances of components and connects them
class AesEnv(uvm_env):
def build_phase(self):
self.seqr = uvm_sequencer("seqr", self)
ConfigDB().set(None, "*", "SEQR", self.seqr)
self.driver = Driver.create("driver", self)
self.input_mon = Monitor("input_mon", self, "get_input")
self.coverage = Coverage("coverage", self)
self.scoreboard = Scoreboard("scoreboard", self)
def connect_phase(self):
self.driver.seq_item_port.connect(self.seqr.seq_item_export)
self.input_mon.ap.connect(self.scoreboard.input_export)
self.input_mon.ap.connect(self.coverage.analysis_export)
self.driver.ap.connect(self.scoreboard.output_export)

Loading…
Cancel
Save