Browse Source

Incorporate latest additions & fixes

master
T. Meissner 3 years ago
parent
commit
0fcdc7fbed
6 changed files with 177 additions and 45 deletions
  1. +2
    -2
      tests/Sram.py
  2. +50
    -3
      tests/Vai.py
  3. +96
    -17
      tests/tb_aes.py
  4. +2
    -4
      tests/tb_uartrx.py
  5. +1
    -3
      tests/tb_uarttx.py
  6. +26
    -16
      tests/tb_wishbone.py

+ 2
- 2
tests/Sram.py View File

@ -48,7 +48,7 @@ class SramRead(Sram):
if self._ren.value == 1: if self._ren.value == 1:
_data = self._mem[str(self._adr.value)] _data = self._mem[str(self._adr.value)]
self._din.value = _data self._din.value = _data
self.log.info("Read data: %s from adr: %s", hex(_data), hex(self._adr.value))
self.log.info(f"Read data: {hex(_data)} from adr: {hex(self._adr.value)}")
class SramWrite(Sram): class SramWrite(Sram):
@ -76,7 +76,7 @@ class SramWrite(Sram):
await self._clkedge await self._clkedge
if self._wen.value == 1: if self._wen.value == 1:
self._mem[str(self._adr.value)] = self._dout.value self._mem[str(self._adr.value)] = self._dout.value
self.log.info("Wrote data: %s to adr: %s", hex(self._dout.value), hex(self._adr.value))
self.log.info(f"Wrote data: {hex(self._dout.value)} to adr: {hex(self._adr.value)}")
class SramMonitor(Sram): class SramMonitor(Sram):


+ 50
- 3
tests/Vai.py View File

@ -1,4 +1,6 @@
import logging import logging
import cocotb
from cocotb.utils import get_sim_time
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly
@ -41,13 +43,17 @@ class VaiDriver(Vai):
await self._clkedge await self._clkedge
self._valid.value = 1 self._valid.value = 1
# Hack to drive lists of signals
if isinstance(self._data, list): if isinstance(self._data, list):
_info = ', '.join(map(lambda x: str(hex(x)), data))
for i in range(len(self._data)): for i in range(len(self._data)):
self._data[i].value = data[i] self._data[i].value = data[i]
else: else:
self.log.info("Sending data: %s", hex(data))
self._data.value = data self._data.value = data
_info = hex(data)
self.log.info(f"Send data: {_info}")
while True: while True:
await ReadOnly() await ReadOnly()
@ -86,9 +92,50 @@ class VaiReceiver(Vai):
await self._clkedge await self._clkedge
self._accept.value = 1 self._accept.value = 1
_rec = self._data.value _rec = self._data.value
self.log.info("Received data: %s", hex(_rec))
self.log.info(f"Receive data: {hex(_rec)}")
await self._clkedge await self._clkedge
self._accept.value = 0 self._accept.value = 0
return _rec return _rec
class VaiMonitor(Vai):
"""Valid-Accept Receiver"""
def __init__(self, clock, data, valid, accept, queue=None, *args, **kwargs):
super().__init__(clock, data, valid, accept, *args, **kwargs)
self.log.info("Valid-accept monitor")
self.log.info(" cocotbext-vai version %s", self._version)
self.log.info(" Copyright (c) 2022 Torsten Meissner")
self._active = None
self._queue = queue
self._transactions = {}
self._restart()
def _restart(self):
self.log.debug("SramMonitor._restart()")
if self._active is not None:
self._active.kill()
# Schedule VAI read to run concurrently
self._active = cocotb.start_soon(self._read())
async def _read(self, cb=None):
while True:
await self._clkedge
if self._valid.value and self._accept.value:
if self._queue:
await self._queue.put(self._data)
#self._transactions[str(get_sim_time('ns'))] = {
# "data" : self._data.value}
@property
def transactions(self, index=None):
if index:
key = list(self._transactions.keys())[index]
return {key: self._transactions[key]}
else:
return self._transactions

+ 96
- 17
tests/tb_aes.py View File

@ -2,12 +2,13 @@ import logging
import random import random
import cocotb import cocotb
import pprint import pprint
from Vai import VaiDriver, VaiReceiver
from Vai import VaiDriver, VaiReceiver, VaiMonitor
from cocotb.clock import Clock from cocotb.clock import Clock
from cocotb.queue import Queue
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util.number import long_to_bytes, getRandomNBitInteger from Crypto.Util.number import long_to_bytes, getRandomNBitInteger
import binascii
import vsc
# Reset coroutine # Reset coroutine
@ -17,7 +18,57 @@ async def reset_dut(reset_n, duration_ns):
reset_n.value = 1 reset_n.value = 1
@cocotb.test()
# Stimuli model class
@vsc.randobj
class constraints():
def __init__(self):
self.key = vsc.rand_bit_t(128)
self.data = vsc.rand_bit_t(128)
@vsc.constraint
def c(self):
self.data >= 0 and self.data <= 2**128-1
vsc.dist(self.key, [
vsc.weight(0, 25),
vsc.weight((1,2**128-2), 50),
vsc.weight((2**128-1), 25)])
# Stimuli covergroup
@vsc.covergroup
class covergroup():
def __init__(self):
self.with_sample(
mode = vsc.bit_t(1),
key = vsc.bit_t(128)
)
self.enc = vsc.coverpoint(self.mode, bins=dict(
enc = vsc.bin(0)))
self.dec = vsc.coverpoint(self.mode, bins=dict(
dec = vsc.bin(1)))
self.key0 = vsc.coverpoint(self.key, bins=dict(
key0 = vsc.bin(0)))
self.keyF = vsc.coverpoint(self.key, bins=dict(
keyF = vsc.bin(2**128-1)))
self.encXkey0 = vsc.cross([self.enc, self.key0])
self.encXkeyF = vsc.cross([self.enc, self.keyF])
self.decXkey0 = vsc.cross([self.dec, self.key0])
self.decXkeyF = vsc.cross([self.dec, self.keyF])
async def cg_sample(cg, queue):
while True:
_data = await queue.get()
cg.sample(_data[0].value, _data[1].value)
@cocotb.test(skip=False)
async def test_aes_enc(dut): async def test_aes_enc(dut):
""" Test AES encryption """ """ Test AES encryption """
@ -26,10 +77,19 @@ async def test_aes_enc(dut):
# Connect reset # Connect reset
reset = dut.reset_i reset = dut.reset_i
# Instantiate VAI driver & receiver
_input = [dut.mode_i, dut.key_i, dut.data_i] _input = [dut.mode_i, dut.key_i, dut.data_i]
_output = dut.data_o
# DUT input side
vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o) vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o)
vai_in_queue = cocotb.queue.Queue()
vai_in_monitor = VaiMonitor(dut.clk_i, _input, dut.valid_i, dut.accept_o, vai_in_queue)
# DUT output side
vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i) vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i)
vai_out_monitor = VaiMonitor(dut.clk_i, _output, dut.valid_o, dut.accept_i)
cr = constraints()
cg = covergroup()
cocotb.start_soon(cg_sample(cg, vai_in_queue))
# Drive input defaults (setimmediatevalue to avoid x asserts) # Drive input defaults (setimmediatevalue to avoid x asserts)
dut.mode_i.setimmediatevalue(0) dut.mode_i.setimmediatevalue(0)
@ -48,20 +108,24 @@ async def test_aes_enc(dut):
# Test 10 AES calculations # Test 10 AES calculations
for i in range(10): for i in range(10):
# Get now random stimuli
cr.randomize()
_key = cr.key
_data = cr.data
await clkedge await clkedge
_key = getRandomNBitInteger(128)
_data = getRandomNBitInteger(128)
# Drive AES inputs # Drive AES inputs
await vai_driver.send([0, _key, _data]) await vai_driver.send([0, _key, _data])
# Calc reference data # Calc reference data
_aes = AES.new(long_to_bytes(_key), AES.MODE_ECB)
_ref = _aes.encrypt(long_to_bytes(_data))
_aes = AES.new(_key.to_bytes(16, 'big'), AES.MODE_ECB)
_ref = _aes.encrypt(_data.to_bytes(16, 'big'))
# Get DUT output data # Get DUT output data
_rec = await vai_receiver.receive() _rec = await vai_receiver.receive()
assert _rec.buff == _ref, f"Encrypt error, got {_rec.buff}, expected {_ref}"
# Equivalence check
assert _rec.buff == _ref, \
f"Encrypt error, got 0x{_rec.buff.hex()}, expected 0x{_ref.hex()}"
@cocotb.test()
@cocotb.test(skip=False)
async def test_aes_dec(dut): async def test_aes_dec(dut):
""" Test AES decryption """ """ Test AES decryption """
@ -70,10 +134,19 @@ async def test_aes_dec(dut):
# Connect reset # Connect reset
reset = dut.reset_i reset = dut.reset_i
# Instantiate VAI driver & receiver
_input = [dut.mode_i, dut.key_i, dut.data_i] _input = [dut.mode_i, dut.key_i, dut.data_i]
_output = dut.data_o
# DUT input side
vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o) vai_driver = VaiDriver(dut.clk_i, _input, dut.valid_i, dut.accept_o)
vai_in_queue = cocotb.queue.Queue()
vai_in_monitor = VaiMonitor(dut.clk_i, _input, dut.valid_i, dut.accept_o, vai_in_queue)
# DUT output side
vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i) vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i)
vai_out_monitor = VaiMonitor(dut.clk_i, _output, dut.valid_o, dut.accept_i)
cr = constraints()
cg = covergroup()
cocotb.start_soon(cg_sample(cg, vai_in_queue))
# Drive input defaults (setimmediatevalue to avoid x asserts) # Drive input defaults (setimmediatevalue to avoid x asserts)
dut.mode_i.setimmediatevalue(0) dut.mode_i.setimmediatevalue(0)
@ -86,20 +159,26 @@ async def test_aes_dec(dut):
cocotb.start_soon(clock.start()) # Start the clock cocotb.start_soon(clock.start()) # Start the clock
# Execution will block until reset_dut has completed # Execution will block until reset_dut has completed
dut._log.info("Hold reset")
await reset_dut(reset, 100) await reset_dut(reset, 100)
dut._log.info("Released reset") dut._log.info("Released reset")
# Test 10 AES calculations # Test 10 AES calculations
for i in range(10): for i in range(10):
# Get now random stimuli
cr.randomize()
_key = cr.key
_data = cr.data
await clkedge await clkedge
_key = getRandomNBitInteger(128)
_data = getRandomNBitInteger(128)
# Drive AES inputs # Drive AES inputs
await vai_driver.send([1, _key, _data]) await vai_driver.send([1, _key, _data])
# Calc reference data # Calc reference data
_aes = AES.new(long_to_bytes(_key), AES.MODE_ECB)
_ref = _aes.decrypt(long_to_bytes(_data))
_aes = AES.new(_key.to_bytes(16, 'big'), AES.MODE_ECB)
_ref = _aes.decrypt(_data.to_bytes(16, 'big'))
# Get DUT output data # Get DUT output data
_rec = await vai_receiver.receive() _rec = await vai_receiver.receive()
assert _rec.buff == _ref, f"Decrypt error, got {_rec.buff}, expected {_ref}"
# Equivalence check
assert _rec.buff == _ref, \
f"Decrypt error, got 0x{_rec.buff.hex()}, expected 0x{_ref.hex()}"
with open('results/tb_aes_fcover.txt', 'w', encoding='utf-8') as f:
f.write(vsc.get_coverage_report())

+ 2
- 4
tests/tb_uartrx.py View File

@ -1,7 +1,6 @@
import logging import logging
import random import random
import cocotb import cocotb
import wavedrom
from Uart import UartDriver, UartReceiver from Uart import UartDriver, UartReceiver
from Vai import VaiDriver, VaiReceiver from Vai import VaiDriver, VaiReceiver
from cocotb.clock import Clock from cocotb.clock import Clock
@ -33,17 +32,16 @@ async def test_uartrx(dut):
dut.rx_i.setimmediatevalue(1) dut.rx_i.setimmediatevalue(1)
dut.accept_i.setimmediatevalue(0) dut.accept_i.setimmediatevalue(0)
clock = Clock(dut.clk_i, 10, units="ns") # Create a 1 us period clock
clock = Clock(dut.clk_i, 10, units="ns") # Create a 10 ns period clock
cocotb.start_soon(clock.start()) # Start the clock cocotb.start_soon(clock.start()) # Start the clock
# Execution will block until reset_dut has completed # Execution will block until reset_dut has completed
dut._log.info("Hold reset")
await reset_dut(reset_n, 100) await reset_dut(reset_n, 100)
dut._log.info("Released reset") dut._log.info("Released reset")
# Test 10 UART transmissions # Test 10 UART transmissions
for i in range(10): for i in range(10):
await clkedge
await Timer(100, units="ns")
val = random.randint(0, 255) val = random.randint(0, 255)
await uart_driver.send(val) await uart_driver.send(val)
rec = await vai_receiver.receive(); rec = await vai_receiver.receive();


+ 1
- 3
tests/tb_uarttx.py View File

@ -1,7 +1,6 @@
import logging import logging
import random import random
import cocotb import cocotb
import wavedrom
from Uart import UartDriver, UartReceiver from Uart import UartDriver, UartReceiver
from Vai import VaiDriver, VaiReceiver from Vai import VaiDriver, VaiReceiver
from cocotb.clock import Clock from cocotb.clock import Clock
@ -37,13 +36,12 @@ async def test_uarttx(dut):
cocotb.start_soon(clock.start()) # Start the clock cocotb.start_soon(clock.start()) # Start the clock
# Execution will block until reset_dut has completed # Execution will block until reset_dut has completed
dut._log.info("Hold reset")
await reset_dut(reset_n, 100) await reset_dut(reset_n, 100)
dut._log.info("Released reset") dut._log.info("Released reset")
# Test 10 UART transmissions # Test 10 UART transmissions
for i in range(10): for i in range(10):
await clkedge
await Timer(100, units="ns")
val = random.randint(0, 255) val = random.randint(0, 255)
await vai_driver.send(val) await vai_driver.send(val)
rec = await uart_receiver.receive(); rec = await uart_receiver.receive();


+ 26
- 16
tests/tb_wishbone.py View File

@ -1,14 +1,13 @@
# test_uart.py
import logging import logging
import random import random
import cocotb import cocotb
import pprint
import wavedrom
from collections import defaultdict from collections import defaultdict
from Sram import SramRead, SramWrite, SramMonitor from Sram import SramRead, SramWrite, SramMonitor
from cocotb.clock import Clock from cocotb.clock import Clock
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly
from cocotbext.wishbone.driver import WishboneMaster, WBOp from cocotbext.wishbone.driver import WishboneMaster, WBOp
from cocotb.wavedrom import Wavedrom, trace
# Reset coroutine # Reset coroutine
@ -17,8 +16,9 @@ async def reset_dut(reset_n, duration_ns):
await Timer(duration_ns, units="ns") await Timer(duration_ns, units="ns")
reset_n.value = 0 reset_n.value = 0
def bv_to_hexstr(data):
return str(hex(data.integer))
def wave2svg(wave, file):
svg = wavedrom.render(wave)
svg.saveas(file)
@cocotb.test() @cocotb.test()
@ -63,20 +63,30 @@ async def test_wishbone(dut):
cocotb.start_soon(clock.start()) # Start the clock cocotb.start_soon(clock.start()) # Start the clock
# Execution will block until reset_dut has completed # Execution will block until reset_dut has completed
dut._log.info("Hold reset")
await reset_dut(reset, 100) await reset_dut(reset, 100)
dut._log.info("Released reset") dut._log.info("Released reset")
# Test 10 Wishbone transmissions
for i in range(10):
await clkedge
adr = random.randint(0, 255)
data = random.randint(0, 2**16-1)
await wbmaster.send_cycle([WBOp(adr=adr, dat=data)])
rec = await wbmaster.send_cycle([WBOp(adr=adr)])
# Trace transmissions using wavedrom
with trace(dut.wbcyc_i, dut.wbstb_i, dut.wbwe_i, dut.wback_o,
dut.wbadr_i, dut.wbdat_i, dut.wbdat_o, clk=dut.wbclk_i) as waves:
# Test 10 Wishbone transmissions
for i in range(10):
await clkedge
adr = random.randint(0, 255)
data = random.randint(0, 2**16-1)
await wbmaster.send_cycle([WBOp(adr=adr, dat=data)])
rec = await wbmaster.send_cycle([WBOp(adr=adr)])
# Print out waveforms as json & svg
_wave = waves.dumpj()
with open('results/tb_wishbone_wave.json', 'w', encoding='utf-8') as f:
f.write(_wave)
wave2svg(_wave, 'results/tb_wishbone_wave.svg')
# Example to print transactions collected by SRAM monitor # Example to print transactions collected by SRAM monitor
with open('results/sram_transactions.log', 'w', encoding='utf-8') as f:
with open('results/tb_wishbone_sram_transactions.log', 'w', encoding='utf-8') as f:
f.write((f"{'Time':7}{'Type':7}{'Adr':6}{'Data'}\n")) f.write((f"{'Time':7}{'Type':7}{'Adr':6}{'Data'}\n"))
for k, v in sram_monitor.transactions.items():
f.write((f"{k:7}{v['type']:7}{bv_to_hexstr(v['adr']):6}{bv_to_hexstr(v['data'])} \n"))
for key, value in sram_monitor.transactions.items():
f.write((f"{key:7}{value['type']:7}{hex(value['adr']):6}{hex(value['data'])}\n"))

Loading…
Cancel
Save