@ -0,0 +1,4 @@ | |||
#!/bin/bash | |||
python3 -m pip install -r requirements.txt | |||
python3 -m pip install cocotbext-wishbone/ |
@ -0,0 +1,11 @@ | |||
#!/bin/bash | |||
if [ ! -d cocotbext-wishbone/.git ]; then | |||
git clone https://github.com/wallento/cocotbext-wishbone.git | |||
cd cocotbext-wishbone && patch < ../setup.py.patch | |||
cd .. | |||
fi | |||
if [ ! -d libvhdl/.git ]; then | |||
git clone https://github.com/tmeissner/libvhdl.git | |||
fi |
@ -0,0 +1,17 @@ | |||
# basic bus model | |||
cocotb-bus | |||
# bus functional models | |||
cocotbext-axi | |||
cocotbext-pcie | |||
cocotbext-eth | |||
cocotbext-uart | |||
cocotbext-spi | |||
# verification libraries | |||
cocotb-coverage | |||
pyvsc | |||
pyuvm | |||
# utilities | |||
wavedrom |
@ -0,0 +1,23 @@ | |||
--- setup.py 2022-02-07 11:35:49.632921471 +0100 | |||
+++ setup.py.new 2022-02-07 11:40:11.084930778 +0100 | |||
@@ -5,19 +5,13 @@ | |||
setuptools.setup( | |||
name="cocotbext-wishbone", | |||
- use_scm_version={ | |||
- "relative_to": __file__, | |||
- "write_to": "cocotbext/wishbone/version.py", | |||
- }, | |||
+ version="0.2.1", | |||
author="Staf Verhaegen, Mathias Kreider", | |||
author_email="staf@stafverhaegen.be, m.kreider@gsi.de", | |||
description="Cocotb Wishbone modules", | |||
long_description=long_description, | |||
packages=["cocotbext.wishbone"], | |||
install_requires=['cocotb>=1.6.0', 'cocotb_bus'], | |||
- setup_requires=[ | |||
- 'setuptools_scm', | |||
- ], | |||
classifiers=[ | |||
"Programming Language :: Python :: 3", | |||
"License :: OSI Approved :: BSD License", |
@ -0,0 +1,48 @@ | |||
# Default test | |||
DUT ?= uarttx | |||
# Test related variables | |||
ifeq (${DUT}, uarttx) | |||
MODULE := tb_uart | |||
TOPLEVEL := ${DUT} | |||
else ifeq (${DUT}, uartrx) | |||
MODULE := tb_uart | |||
TOPLEVEL := ${DUT} | |||
else ifeq (${DUT}, wishbone) | |||
MODULE := tb_wishbone | |||
TOPLEVEL := wishboneslavee | |||
SIM_ARGS := -gSimulation=true | |||
else | |||
$(error ${DUT} not available) | |||
endif | |||
# Simulator (GHDL) & RTL related | |||
SIM := ghdl | |||
TOPLEVEL_LANG := vhdl | |||
VHDL_SOURCES_libvhdl := ../libvhdl/common/UtilsP.vhd | |||
VHDL_SOURCES := ../libvhdl/syn/*.vhd | |||
SIM_BUILD := work | |||
COMPILE_ARGS := --std=08 | |||
SIM_ARGS += \ | |||
--wave=results/${TOPLEVEL}.ghw \ | |||
--psl-report=results/${TOPLEVEL}_psl.json \ | |||
--vpi-trace=results/${TOPLEVEL}_vpi.log | |||
# Cocotb related | |||
TESTCASE := test_${DUT} | |||
COCOTB_LOG_LEVEL := DEBUG | |||
CUSTOM_COMPILE_DEPS := results | |||
COCOTB_RESULTS_FILE := results/${TOPLEVEL}.xml | |||
include $(shell cocotb-config --makefiles)/Makefile.sim | |||
results: | |||
mkdir -p results | |||
.PHONY: clean | |||
clean:: | |||
rm -rf *.o __pycache__ uarttx uartrx results |
@ -0,0 +1,130 @@ | |||
import logging | |||
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly | |||
class Uart: | |||
"""UART base class""" | |||
def __init__(self, txrx, clock, div, bits, parity, *args, **kwargs): | |||
self._version = "0.0.1" | |||
self.log = logging.getLogger(f"cocotb.{txrx._path}") | |||
self._txrx = txrx | |||
self._clock = clock | |||
self._div = div | |||
self._bits = bits | |||
self._par = parity | |||
self._clkedge = RisingEdge(self._clock) | |||
async def _wait_cycle(self): | |||
for x in range(self._div): | |||
await self._clkedge | |||
@staticmethod | |||
def odd_parity(data): | |||
parity = True | |||
while data: | |||
parity = not parity | |||
data = data & (data - 1) | |||
return int(parity) | |||
class UartReceiver(Uart): | |||
def __init__(self, txrx, clock, div, bits, parity, *args, **kwargs): | |||
super().__init__(txrx, clock, div, bits, parity, *args, **kwargs) | |||
self.log.info("UART receiver") | |||
self.log.info(" cocotbext-uart version %s", self._version) | |||
self.log.info(" Copyright (c) 2022 Torsten Meissner") | |||
async def receive(self): | |||
"""Receive and return one UART frame""" | |||
# Wait for frame start | |||
await FallingEdge(self._txrx) | |||
# Consume start bit | |||
await self._get_start_bit() | |||
# Receive data bits | |||
self._rec = 0 | |||
for x in range(self._bits): | |||
await self._wait_cycle() | |||
await ReadOnly() | |||
self._rec |= bool(self._txrx.value.integer) << x | |||
if self._par: | |||
# Consume parity bit | |||
await self._get_parity_bit() | |||
# Consume stop bit | |||
await self._get_stop_bit() | |||
self.log.info("Received data: %s", hex(self._rec)) | |||
return self._rec | |||
async def _get_start_bit(self): | |||
"""Consume and check start bit""" | |||
for x in range(int(self._div/2)): | |||
await self._clkedge | |||
await ReadOnly() | |||
if self._txrx.value == 1: | |||
self.log.warning("Start bit set") | |||
async def _get_stop_bit(self): | |||
"""Consume and check stop bit""" | |||
await self._wait_cycle() | |||
await ReadOnly() | |||
if self._txrx.value == 0: | |||
self.log.warning("Stop bit not set") | |||
async def _get_parity_bit(self): | |||
"""Consume and check parity bit""" | |||
await self._wait_cycle() | |||
await ReadOnly() | |||
if self.odd_parity(self._rec) != self._txrx.value: | |||
self.log.warning("Parity wrong") | |||
class UartDriver(Uart): | |||
def __init__(self, txrx, clock, div, bits, parity, *args, **kwargs): | |||
super().__init__(txrx, clock, div, bits, parity, *args, **kwargs) | |||
self.log.info("UART sender") | |||
self.log.info(" cocotbext-uart version %s", self._version) | |||
self.log.info(" Copyright (c) 2022 Torsten Meissner") | |||
# Drive input defaults (setimmediatevalue to avoid x asserts) | |||
self._txrx.setimmediatevalue(1) | |||
async def send(self, data): | |||
"""Send one UART frame""" | |||
self._data = data; | |||
self.log.info("Sending data: %s", hex(self._data)) | |||
# Send start bit | |||
await self._send_bit(0) | |||
# Send data bits | |||
for x in range(self._bits): | |||
self._txrx.value = (self._data >> x) & 1 | |||
await self._wait_cycle() | |||
if self._par: | |||
# Send parity bit | |||
await self._send_bit(self.odd_parity(self._data)) | |||
# Consume stop bit | |||
await self._send_bit(1) | |||
async def _send_bit(self, data): | |||
self._txrx.value = data | |||
await self._wait_cycle() | |||
@ -0,0 +1,85 @@ | |||
import logging | |||
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly | |||
class Vai: | |||
"""VAI base class""" | |||
def __init__(self, clock, data, valid, accept, *args, **kwargs): | |||
self._version = "0.0.1" | |||
self.log = logging.getLogger(f"cocotb.{data._path}") | |||
self._data = data | |||
self._valid = valid | |||
self._accept = accept | |||
self._clock = clock | |||
self._clkedge = RisingEdge(self._clock) | |||
class VaiDriver(Vai): | |||
"""Valid-Accept Driver""" | |||
def __init__(self, clock, data, valid, accept, *args, **kwargs): | |||
super().__init__(clock, data, valid, accept, *args, **kwargs) | |||
self.log.info("Valid-accept driver") | |||
self.log.info(" cocotbext-vai version %s", self._version) | |||
self.log.info(" Copyright (c) 2022 Torsten Meissner") | |||
# Drive input defaults (setimmediatevalue to avoid x asserts) | |||
self._data.setimmediatevalue(0) | |||
self._valid.setimmediatevalue(0) | |||
async def send(self, data, sync=True): | |||
if sync: | |||
await self._clkedge | |||
self.log.info("Sending data: %s", hex(data)) | |||
self._valid.value = 1 | |||
self._data.value = data | |||
while True: | |||
await ReadOnly() | |||
if self._accept.value: | |||
break | |||
await self._clkedge | |||
await self._clkedge | |||
self._valid.value = 0 | |||
class VaiReceiver(Vai): | |||
"""Valid-Accept Receiver""" | |||
def __init__(self, clock, data, valid, accept, *args, **kwargs): | |||
super().__init__(clock, data, valid, accept, *args, **kwargs) | |||
self.log.info("Valid-accept receiver") | |||
self.log.info(" cocotbext-vai version %s", self._version) | |||
self.log.info(" Copyright (c) 2022 Torsten Meissner") | |||
# Drive input defaults (setimmediatevalue to avoid x asserts) | |||
self._accept.setimmediatevalue(0) | |||
async def receive(self, sync=True): | |||
if sync: | |||
await self._clkedge | |||
while True: | |||
await ReadOnly() | |||
if self._valid.value: | |||
break | |||
await self._clkedge | |||
await self._clkedge | |||
self._accept.value = 1 | |||
_rec = self._data.value | |||
self.log.info("Received data: %s", hex(_rec)) | |||
await self._clkedge | |||
self._accept.value = 0 | |||
return _rec |
@ -0,0 +1,94 @@ | |||
# test_uart.py | |||
import logging | |||
import random | |||
import cocotb | |||
import wavedrom | |||
from Uart import UartDriver, UartReceiver | |||
from Vai import VaiDriver, VaiReceiver | |||
from cocotb.clock import Clock | |||
from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly | |||
from cocotb.wavedrom import Wavedrom, trace | |||
# Reset coroutine | |||
async def reset_dut(reset_n, duration_ns): | |||
reset_n.value = 0 | |||
await Timer(duration_ns, units="ns") | |||
reset_n.value = 1 | |||
def wave2svg(wave, file): | |||
svg = wavedrom.render(wave) | |||
svg.saveas(file) | |||
@cocotb.test() | |||
async def test_uarttx(dut): | |||
""" First simple test """ | |||
clkedge = RisingEdge(dut.clk_i) | |||
# Connect reset | |||
reset_n = dut.reset_n_i | |||
# Instantiate VAI driver | |||
vai_driver = VaiDriver(dut.clk_i, dut.data_i, dut.valid_i, dut.accept_o) | |||
# Instantiate UART receiver | |||
uart_receiver = UartReceiver(dut.tx_o, dut.clk_i, 10, 8, True); | |||
# Drive input defaults (setimmediatevalue to avoid x asserts) | |||
dut.data_i.setimmediatevalue(0) | |||
dut.valid_i.setimmediatevalue(0) | |||
clock = Clock(dut.clk_i, 10, units="ns") # Create a 10 ns period clock | |||
cocotb.start_soon(clock.start()) # Start the clock | |||
# Execution will block until reset_dut has completed | |||
dut._log.info("Hold reset") | |||
await reset_dut(reset_n, 100) | |||
dut._log.info("Released reset") | |||
# Test 10 UART transmissions | |||
for i in range(10): | |||
await clkedge | |||
val = random.randint(0, 255) | |||
await vai_driver.send(val) | |||
rec = await uart_receiver.receive(); | |||
assert rec == val, "UART sent data was incorrect on the {}th cycle".format(i) | |||
@cocotb.test() | |||
async def test_uartrx(dut): | |||
""" First simple test """ | |||
clkedge = RisingEdge(dut.clk_i) | |||
# Connect reset | |||
reset_n = dut.reset_n_i | |||
# Instantiate UART driver | |||
uart_driver = UartDriver(dut.rx_i, dut.clk_i, 10, 8, True); | |||
# Instantiate VAI receiver | |||
vai_receiver = VaiReceiver(dut.clk_i, dut.data_o, dut.valid_o, dut.accept_i) | |||
# Drive input defaults (setimmediatevalue to avoid x asserts) | |||
dut.rx_i.setimmediatevalue(1) | |||
dut.accept_i.setimmediatevalue(0) | |||
clock = Clock(dut.clk_i, 10, units="ns") # Create a 1 us period clock | |||
cocotb.start_soon(clock.start()) # Start the clock | |||
# Execution will block until reset_dut has completed | |||
dut._log.info("Hold reset") | |||
await reset_dut(reset_n, 100) | |||
dut._log.info("Released reset") | |||
# Test 10 UART transmissions | |||
for i in range(10): | |||
await clkedge | |||
val = random.randint(0, 255) | |||
await uart_driver.send(val) | |||
rec = await vai_receiver.receive(); | |||
assert rec == val, "UART received data was incorrect on the {}th cycle".format(i) |