Examples of using cocotb for functional verification of VHDL designs with GHDL.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.0 KiB

  1. import logging
  2. import cocotb
  3. from cocotb.utils import get_sim_time
  4. from cocotb.triggers import FallingEdge, RisingEdge, Timer, ReadOnly
  5. class Vai:
  6. """VAI base class"""
  7. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  8. self._version = "0.0.1"
  9. self.log = logging.getLogger(f"cocotb.{valid._path}")
  10. self._data = data
  11. self._valid = valid
  12. self._accept = accept
  13. self._clock = clock
  14. self._clkedge = RisingEdge(self._clock)
  15. class VaiDriver(Vai):
  16. """Valid-Accept Driver"""
  17. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  18. super().__init__(clock, data, valid, accept, *args, **kwargs)
  19. self.log.info("Valid-accept driver")
  20. self.log.info(" cocotbext-vai version %s", self._version)
  21. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  22. # Hack to drive lists of signals
  23. if isinstance(self._data, list):
  24. for entry in self._data:
  25. entry.setimmediatevalue(0)
  26. else:
  27. self._data.setimmediatevalue(0)
  28. self._valid.setimmediatevalue(0)
  29. async def send(self, data, sync=True):
  30. if sync:
  31. await self._clkedge
  32. self._valid.value = 1
  33. if isinstance(self._data, list):
  34. _info = ', '.join(map(lambda x: str(hex(x)), data))
  35. for i in range(len(self._data)):
  36. self._data[i].value = data[i]
  37. else:
  38. self._data.value = data
  39. _info = hex(data)
  40. self.log.info(f"Send data: {_info}")
  41. while True:
  42. await ReadOnly()
  43. if self._accept.value:
  44. break
  45. await self._clkedge
  46. await self._clkedge
  47. self._valid.value = 0
  48. class VaiReceiver(Vai):
  49. """Valid-Accept Receiver"""
  50. def __init__(self, clock, data, valid, accept, *args, **kwargs):
  51. super().__init__(clock, data, valid, accept, *args, **kwargs)
  52. self.log.info("Valid-accept receiver")
  53. self.log.info(" cocotbext-vai version %s", self._version)
  54. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  55. # Drive input defaults (setimmediatevalue to avoid x asserts)
  56. self._accept.setimmediatevalue(0)
  57. async def receive(self, sync=True):
  58. if sync:
  59. await self._clkedge
  60. while True:
  61. await ReadOnly()
  62. if self._valid.value:
  63. break
  64. await self._clkedge
  65. await self._clkedge
  66. self._accept.value = 1
  67. _rec = self._data.value
  68. self.log.info(f"Receive data: {hex(_rec)}")
  69. await self._clkedge
  70. self._accept.value = 0
  71. return _rec
  72. class VaiMonitor(Vai):
  73. """Valid-Accept Receiver"""
  74. def __init__(self, clock, data, valid, accept, queue=None, *args, **kwargs):
  75. super().__init__(clock, data, valid, accept, *args, **kwargs)
  76. self.log.info("Valid-accept monitor")
  77. self.log.info(" cocotbext-vai version %s", self._version)
  78. self.log.info(" Copyright (c) 2022 Torsten Meissner")
  79. self._active = None
  80. self._queue = queue
  81. self._transactions = {}
  82. self._restart()
  83. def _restart(self):
  84. self.log.debug("SramMonitor._restart()")
  85. if self._active is not None:
  86. self._active.kill()
  87. # Schedule VAI read to run concurrently
  88. self._active = cocotb.start_soon(self._read())
  89. async def _read(self, cb=None):
  90. while True:
  91. await self._clkedge
  92. if self._valid.value and self._accept.value:
  93. if self._queue:
  94. await self._queue.put(self._data)
  95. #self._transactions[str(get_sim_time('ns'))] = {
  96. # "data" : self._data.value}
  97. @property
  98. def transactions(self, index=None):
  99. if index:
  100. key = list(self._transactions.keys())[index]
  101. return {key: self._transactions[key]}
  102. else:
  103. return self._transactions