From 0fcdc7fbed6cf42b417bb6ed1bca0595888e457c Mon Sep 17 00:00:00 2001 From: tmeissner Date: Thu, 10 Feb 2022 19:19:55 +0100 Subject: [PATCH] Incorporate latest additions & fixes --- tests/Sram.py | 4 +- tests/Vai.py | 53 ++++++++++++++++++-- tests/tb_aes.py | 113 ++++++++++++++++++++++++++++++++++++------- tests/tb_uartrx.py | 6 +-- tests/tb_uarttx.py | 4 +- tests/tb_wishbone.py | 42 ++++++++++------ 6 files changed, 177 insertions(+), 45 deletions(-) diff --git a/tests/Sram.py b/tests/Sram.py index b658c46..9c90dad 100644 --- a/tests/Sram.py +++ b/tests/Sram.py @@ -48,7 +48,7 @@ class SramRead(Sram): if self._ren.value == 1: _data = self._mem[str(self._adr.value)] self._din.value = _data - self.log.info("Read data: %s from adr: %s", hex(_data), hex(self._adr.value)) + self.log.info(f"Read data: {hex(_data)} from adr: {hex(self._adr.value)}") class SramWrite(Sram): @@ -76,7 +76,7 @@ class SramWrite(Sram): await self._clkedge if self._wen.value == 1: self._mem[str(self._adr.value)] = self._dout.value - self.log.info("Wrote data: %s to adr: %s", hex(self._dout.value), hex(self._adr.value)) + self.log.info(f"Wrote data: {hex(self._dout.value)} to adr: {hex(self._adr.value)}") class SramMonitor(Sram): diff --git a/tests/Vai.py b/tests/Vai.py index d972391..8a89d07 100644 --- a/tests/Vai.py +++ b/tests/Vai.py @@ -1,4 +1,6 @@ import logging +import cocotb +from cocotb.utils import get_sim_time from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly @@ -41,13 +43,17 @@ class VaiDriver(Vai): await self._clkedge self._valid.value = 1 - # Hack to drive lists of signals + if isinstance(self._data, list): + _info = ', '.join(map(lambda x: str(hex(x)), data)) for i in range(len(self._data)): self._data[i].value = data[i] + else: - self.log.info("Sending data: %s", hex(data)) self._data.value = data + _info = hex(data) + + self.log.info(f"Send data: {_info}") while True: await ReadOnly() @@ -86,9 +92,50 @@ class VaiReceiver(Vai): await self._clkedge self._accept.value = 1 _rec = self._data.value - self.log.info("Received data: %s", hex(_rec)) + self.log.info(f"Receive data: {hex(_rec)}") await self._clkedge self._accept.value = 0 return _rec + + +class VaiMonitor(Vai): + """Valid-Accept Receiver""" + + def __init__(self, clock, data, valid, accept, queue=None, *args, **kwargs): + super().__init__(clock, data, valid, accept, *args, **kwargs) + + self.log.info("Valid-accept monitor") + self.log.info(" cocotbext-vai version %s", self._version) + self.log.info(" Copyright (c) 2022 Torsten Meissner") + + self._active = None + self._queue = queue + self._transactions = {} + self._restart() + + def _restart(self): + self.log.debug("SramMonitor._restart()") + if self._active is not None: + self._active.kill() + # Schedule VAI read to run concurrently + self._active = cocotb.start_soon(self._read()) + + async def _read(self, cb=None): + while True: + await self._clkedge + if self._valid.value and self._accept.value: + if self._queue: + await self._queue.put(self._data) + #self._transactions[str(get_sim_time('ns'))] = { + # "data" : self._data.value} + + + @property + def transactions(self, index=None): + if index: + key = list(self._transactions.keys())[index] + return {key: self._transactions[key]} + else: + return self._transactions diff --git a/tests/tb_aes.py b/tests/tb_aes.py index 8f1d3ab..8376f7c 100644 --- a/tests/tb_aes.py +++ b/tests/tb_aes.py @@ -2,12 +2,13 @@ import logging import random import cocotb import pprint -from Vai import VaiDriver, VaiReceiver +from Vai import VaiDriver, VaiReceiver, VaiMonitor from cocotb.clock import Clock +from cocotb.queue import Queue from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly from Crypto.Cipher import AES from Crypto.Util.number import long_to_bytes, getRandomNBitInteger -import binascii +import vsc # Reset coroutine @@ -17,7 +18,57 @@ async def reset_dut(reset_n, duration_ns): reset_n.value = 1 -@cocotb.test() +# Stimuli model class +@vsc.randobj +class constraints(): + def __init__(self): + self.key = vsc.rand_bit_t(128) + self.data = vsc.rand_bit_t(128) + + @vsc.constraint + def c(self): + self.data >= 0 and self.data <= 2**128-1 + vsc.dist(self.key, [ + vsc.weight(0, 25), + vsc.weight((1,2**128-2), 50), + vsc.weight((2**128-1), 25)]) + + +# Stimuli covergroup +@vsc.covergroup +class covergroup(): + def __init__(self): + self.with_sample( + mode = vsc.bit_t(1), + key = vsc.bit_t(128) + ) + + self.enc = vsc.coverpoint(self.mode, bins=dict( + enc = vsc.bin(0))) + + self.dec = vsc.coverpoint(self.mode, bins=dict( + dec = vsc.bin(1))) + + self.key0 = vsc.coverpoint(self.key, bins=dict( + key0 = vsc.bin(0))) + + self.keyF = vsc.coverpoint(self.key, bins=dict( + keyF = vsc.bin(2**128-1))) + + self.encXkey0 = vsc.cross([self.enc, self.key0]) + self.encXkeyF = vsc.cross([self.enc, self.keyF]) + + self.decXkey0 = vsc.cross([self.dec, self.key0]) + self.decXkeyF = vsc.cross([self.dec, self.keyF]) + + +async def cg_sample(cg, queue): + while True: + _data = await queue.get() + cg.sample(_data[0].value, _data[1].value) + + +@cocotb.test(skip=False) async def test_aes_enc(dut): """ Test AES encryption """ @@ -26,10 +77,19 @@ async def test_aes_enc(dut): # Connect reset reset = dut.reset_i - # Instantiate VAI driver & receiver _input = [dut.mode_i, dut.key_i, dut.data_i] + _output = dut.data_o + # DUT input side vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o) + vai_in_queue = cocotb.queue.Queue() + vai_in_monitor = VaiMonitor(dut.clk_i, _input, dut.valid_i, dut.accept_o, vai_in_queue) + # DUT output side vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i) + vai_out_monitor = VaiMonitor(dut.clk_i, _output, dut.valid_o, dut.accept_i) + + cr = constraints() + cg = covergroup() + cocotb.start_soon(cg_sample(cg, vai_in_queue)) # Drive input defaults (setimmediatevalue to avoid x asserts) dut.mode_i.setimmediatevalue(0) @@ -48,20 +108,24 @@ async def test_aes_enc(dut): # Test 10 AES calculations for i in range(10): + # Get now random stimuli + cr.randomize() + _key = cr.key + _data = cr.data await clkedge - _key = getRandomNBitInteger(128) - _data = getRandomNBitInteger(128) # Drive AES inputs await vai_driver.send([0, _key, _data]) # Calc reference data - _aes = AES.new(long_to_bytes(_key), AES.MODE_ECB) - _ref = _aes.encrypt(long_to_bytes(_data)) + _aes = AES.new(_key.to_bytes(16, 'big'), AES.MODE_ECB) + _ref = _aes.encrypt(_data.to_bytes(16, 'big')) # Get DUT output data _rec = await vai_receiver.receive() - assert _rec.buff == _ref, f"Encrypt error, got {_rec.buff}, expected {_ref}" + # Equivalence check + assert _rec.buff == _ref, \ + f"Encrypt error, got 0x{_rec.buff.hex()}, expected 0x{_ref.hex()}" -@cocotb.test() +@cocotb.test(skip=False) async def test_aes_dec(dut): """ Test AES decryption """ @@ -70,10 +134,19 @@ async def test_aes_dec(dut): # Connect reset reset = dut.reset_i - # Instantiate VAI driver & receiver _input = [dut.mode_i, dut.key_i, dut.data_i] + _output = dut.data_o + # DUT input side vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o) + vai_in_queue = cocotb.queue.Queue() + vai_in_monitor = VaiMonitor(dut.clk_i, _input, dut.valid_i, dut.accept_o, vai_in_queue) + # DUT output side vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i) + vai_out_monitor = VaiMonitor(dut.clk_i, _output, dut.valid_o, dut.accept_i) + + cr = constraints() + cg = covergroup() + cocotb.start_soon(cg_sample(cg, vai_in_queue)) # Drive input defaults (setimmediatevalue to avoid x asserts) dut.mode_i.setimmediatevalue(0) @@ -86,20 +159,26 @@ async def test_aes_dec(dut): cocotb.start_soon(clock.start()) # Start the clock # Execution will block until reset_dut has completed - dut._log.info("Hold reset") await reset_dut(reset, 100) dut._log.info("Released reset") # Test 10 AES calculations for i in range(10): + # Get now random stimuli + cr.randomize() + _key = cr.key + _data = cr.data await clkedge - _key = getRandomNBitInteger(128) - _data = getRandomNBitInteger(128) # Drive AES inputs await vai_driver.send([1, _key, _data]) # Calc reference data - _aes = AES.new(long_to_bytes(_key), AES.MODE_ECB) - _ref = _aes.decrypt(long_to_bytes(_data)) + _aes = AES.new(_key.to_bytes(16, 'big'), AES.MODE_ECB) + _ref = _aes.decrypt(_data.to_bytes(16, 'big')) # Get DUT output data _rec = await vai_receiver.receive() - assert _rec.buff == _ref, f"Decrypt error, got {_rec.buff}, expected {_ref}" + # Equivalence check + assert _rec.buff == _ref, \ + f"Decrypt error, got 0x{_rec.buff.hex()}, expected 0x{_ref.hex()}" + + with open('results/tb_aes_fcover.txt', 'w', encoding='utf-8') as f: + f.write(vsc.get_coverage_report()) diff --git a/tests/tb_uartrx.py b/tests/tb_uartrx.py index e624d8b..6f83b0c 100644 --- a/tests/tb_uartrx.py +++ b/tests/tb_uartrx.py @@ -1,7 +1,6 @@ import logging import random import cocotb -import wavedrom from Uart import UartDriver, UartReceiver from Vai import VaiDriver, VaiReceiver from cocotb.clock import Clock @@ -33,17 +32,16 @@ async def test_uartrx(dut): dut.rx_i.setimmediatevalue(1) dut.accept_i.setimmediatevalue(0) - clock = Clock(dut.clk_i, 10, units="ns") # Create a 1 us period clock + clock = Clock(dut.clk_i, 10, units="ns") # Create a 10 ns period clock cocotb.start_soon(clock.start()) # Start the clock # Execution will block until reset_dut has completed - dut._log.info("Hold reset") await reset_dut(reset_n, 100) dut._log.info("Released reset") # Test 10 UART transmissions for i in range(10): - await clkedge + await Timer(100, units="ns") val = random.randint(0, 255) await uart_driver.send(val) rec = await vai_receiver.receive(); diff --git a/tests/tb_uarttx.py b/tests/tb_uarttx.py index 1fdcdc7..b176ee6 100644 --- a/tests/tb_uarttx.py +++ b/tests/tb_uarttx.py @@ -1,7 +1,6 @@ import logging import random import cocotb -import wavedrom from Uart import UartDriver, UartReceiver from Vai import VaiDriver, VaiReceiver from cocotb.clock import Clock @@ -37,13 +36,12 @@ async def test_uarttx(dut): cocotb.start_soon(clock.start()) # Start the clock # Execution will block until reset_dut has completed - dut._log.info("Hold reset") await reset_dut(reset_n, 100) dut._log.info("Released reset") # Test 10 UART transmissions for i in range(10): - await clkedge + await Timer(100, units="ns") val = random.randint(0, 255) await vai_driver.send(val) rec = await uart_receiver.receive(); diff --git a/tests/tb_wishbone.py b/tests/tb_wishbone.py index af28640..09fa7e0 100644 --- a/tests/tb_wishbone.py +++ b/tests/tb_wishbone.py @@ -1,14 +1,13 @@ -# test_uart.py - import logging import random import cocotb -import pprint +import wavedrom from collections import defaultdict from Sram import SramRead, SramWrite, SramMonitor from cocotb.clock import Clock from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly from cocotbext.wishbone.driver import WishboneMaster, WBOp +from cocotb.wavedrom import Wavedrom, trace # Reset coroutine @@ -17,8 +16,9 @@ async def reset_dut(reset_n, duration_ns): await Timer(duration_ns, units="ns") reset_n.value = 0 -def bv_to_hexstr(data): - return str(hex(data.integer)) +def wave2svg(wave, file): + svg = wavedrom.render(wave) + svg.saveas(file) @cocotb.test() @@ -63,20 +63,30 @@ async def test_wishbone(dut): cocotb.start_soon(clock.start()) # Start the clock # Execution will block until reset_dut has completed - dut._log.info("Hold reset") await reset_dut(reset, 100) dut._log.info("Released reset") - # Test 10 Wishbone transmissions - for i in range(10): - await clkedge - adr = random.randint(0, 255) - data = random.randint(0, 2**16-1) - await wbmaster.send_cycle([WBOp(adr=adr, dat=data)]) - rec = await wbmaster.send_cycle([WBOp(adr=adr)]) + # Trace transmissions using wavedrom + with trace(dut.wbcyc_i, dut.wbstb_i, dut.wbwe_i, dut.wback_o, + dut.wbadr_i, dut.wbdat_i, dut.wbdat_o, clk=dut.wbclk_i) as waves: + + # Test 10 Wishbone transmissions + for i in range(10): + await clkedge + adr = random.randint(0, 255) + data = random.randint(0, 2**16-1) + await wbmaster.send_cycle([WBOp(adr=adr, dat=data)]) + rec = await wbmaster.send_cycle([WBOp(adr=adr)]) + + # Print out waveforms as json & svg + _wave = waves.dumpj() + with open('results/tb_wishbone_wave.json', 'w', encoding='utf-8') as f: + f.write(_wave) + wave2svg(_wave, 'results/tb_wishbone_wave.svg') + # Example to print transactions collected by SRAM monitor - with open('results/sram_transactions.log', 'w', encoding='utf-8') as f: + with open('results/tb_wishbone_sram_transactions.log', 'w', encoding='utf-8') as f: f.write((f"{'Time':7}{'Type':7}{'Adr':6}{'Data'}\n")) - for k, v in sram_monitor.transactions.items(): - f.write((f"{k:7}{v['type']:7}{bv_to_hexstr(v['adr']):6}{bv_to_hexstr(v['data'])} \n")) \ No newline at end of file + for key, value in sram_monitor.transactions.items(): + f.write((f"{key:7}{value['type']:7}{hex(value['adr']):6}{hex(value['data'])}\n"))