Examples of using cocotb for functional verification of VHDL designs with GHDL.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.9 KiB

  1. import logging
  2. import cocotb
  3. from cocotb.utils import get_sim_time
  4. from cocotb.triggers import FallingEdge, RisingEdge, Timer
  5. class Vai:
  6. """VAI base class"""
  7. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  8. self._version = "0.0.1"
  9. self.log = logging.getLogger(f"cocotb.{valid._path}")
  10. self._data = data
  11. self._valid = valid
  12. self._accept = accept
  13. self._clock = clock
  14. self._clkedge = RisingEdge(self._clock)
  15. class VaiDriver(Vai):
  16. """Valid-Accept Driver"""
  17. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  18. super().__init__(clock, data, valid, accept, *args, **kwargs)
  19. self.log.info("Valid-accept driver")
  20. self.log.info(" cocotbext-vai version %s", self._version)
  21. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  22. # Hack to drive lists of signals
  23. if isinstance(self._data, list):
  24. for entry in self._data:
  25. entry.setimmediatevalue(0)
  26. else:
  27. self._data.setimmediatevalue(0)
  28. self._valid.setimmediatevalue(0)
  29. async def send(self, data, sync=True):
  30. if sync:
  31. await self._clkedge
  32. self._valid.value = 1
  33. if isinstance(self._data, list):
  34. _info = ', '.join(map(lambda x: str(hex(x)), data))
  35. for i in range(len(self._data)):
  36. self._data[i].value = data[i]
  37. else:
  38. self._data.value = data
  39. _info = hex(data)
  40. self.log.info(f"Send data: {_info}")
  41. while True:
  42. if self._accept.value:
  43. break
  44. await self._clkedge
  45. await self._clkedge
  46. self._valid.value = 0
  47. class VaiReceiver(Vai):
  48. """Valid-Accept Receiver"""
  49. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  50. super().__init__(clock, data, valid, accept, *args, **kwargs)
  51. self.log.info("Valid-accept receiver")
  52. self.log.info(" cocotbext-vai version %s", self._version)
  53. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  54. # Drive input defaults (setimmediatevalue to avoid x asserts)
  55. self._accept.setimmediatevalue(0)
  56. async def receive(self, sync=True):
  57. if sync:
  58. await self._clkedge
  59. while True:
  60. if self._valid.value:
  61. break
  62. await self._clkedge
  63. await self._clkedge
  64. self._accept.value = 1
  65. _rec = self._data.value
  66. self.log.info(f"Receive data: {hex(_rec)}")
  67. await self._clkedge
  68. self._accept.value = 0
  69. return _rec
  70. class VaiMonitor(Vai):
  71. """Valid-Accept Receiver"""
  72. def __init__(self, clock, data, valid, accept, queue=None, *args, **kwargs):
  73. super().__init__(clock, data, valid, accept, *args, **kwargs)
  74. self.log.info("Valid-accept monitor")
  75. self.log.info(" cocotbext-vai version %s", self._version)
  76. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  77. self._active = None
  78. self._queue = queue
  79. self._transactions = {}
  80. self._restart()
  81. def _restart(self):
  82. self.log.debug("SramMonitor._restart()")
  83. if self._active is not None:
  84. self._active.kill()
  85. # Schedule VAI read to run concurrently
  86. self._active = cocotb.start_soon(self._read())
  87. async def _read(self, cb=None):
  88. while True:
  89. await self._clkedge
  90. if self._valid.value and self._accept.value:
  91. if self._queue:
  92. await self._queue.put(self._data)
  93. #self._transactions[str(get_sim_time('ns'))] = {
  94. # "data" : self._data.value}
  95. @property
  96. def transactions(self, index=None):
  97. if index:
  98. key = list(self._transactions.keys())[index]
  99. return {key: self._transactions[key]}
  100. else:
  101. return self._transactions