Examples of using cocotb for functional verification of VHDL designs with GHDL.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

244 lines
7.7 KiB

  1. from cocotb.triggers import Combine
  2. from pyuvm import (
  3. uvm_test,
  4. uvm_sequence,
  5. uvm_sequence_item,
  6. uvm_sequencer,
  7. uvm_driver,
  8. uvm_component,
  9. uvm_subscriber,
  10. uvm_env,
  11. uvm_factory,
  12. uvm_analysis_port,
  13. uvm_tlm_analysis_fifo,
  14. uvm_get_port,
  15. ConfigDB,
  16. UVMConfigItemNotFound,
  17. )
  18. from vsc import get_coverage_report
  19. from VaiBfm import VaiBfm, Mode
  20. from Coverage import constraints, covergroup
  21. from Crypto.Cipher import AES
  22. import cocotb
  23. import pyuvm
  24. import vsc
  25. @pyuvm.test()
  26. class AesTest(uvm_test):
  27. def build_phase(self):
  28. self.env = AesEnv("env", self)
  29. def end_of_elaboration_phase(self):
  30. self.test_all = TestAllSeq.create("test_all")
  31. async def run_phase(self):
  32. self.raise_objection()
  33. await self.test_all.start()
  34. self.drop_objection()
  35. @pyuvm.test()
  36. class ParallelTest(AesTest):
  37. def end_of_elaboration_phase(self):
  38. uvm_factory().set_type_override_by_type(TestAllSeq, TestAllParallelSeq)
  39. return super().end_of_elaboration_phase()
  40. # Virtual sequence that starts other sequences
  41. class TestAllSeq(uvm_sequence):
  42. async def body(self):
  43. # get the sequencer handle
  44. seqr = ConfigDB().get(None, "", "SEQR")
  45. enc_rand_seq = EncRandSeq("enc_random")
  46. dec_rand_seq = DecRandSeq("dec_random")
  47. await enc_rand_seq.start(seqr)
  48. await dec_rand_seq.start(seqr)
  49. # Running encryption and decryption sequences in parallel
  50. class TestAllParallelSeq(uvm_sequence):
  51. async def body(self):
  52. seqr = ConfigDB().get(None, "", "SEQR")
  53. enc_rand_seq = EncRandSeq("enc_random")
  54. dec_rand_seq = DecRandSeq("dec_random")
  55. enc_rand_task = cocotb.start_soon(enc_rand_seq.start(seqr))
  56. dec_rand_task = cocotb.start_soon(dec_rand_seq.start(seqr))
  57. await Combine(enc_rand_task, dec_rand_task)
  58. # Sequence item which holds the stimuli for one operation
  59. class AesSeqItem(uvm_sequence_item):
  60. def __init__(self, name, mode, key, data):
  61. super().__init__(name)
  62. self.mode = mode
  63. self.key = key
  64. self.data = data
  65. def __eq__(self, other):
  66. same = (
  67. self.mode == other.mode
  68. and self.key == other.key
  69. and self.data == other.data
  70. )
  71. return same
  72. def __str__(self):
  73. return f"{self.get_name()} : Mode: 0b{self.mode:01x} \
  74. Key: 0x{self.key:016x} Data: 0x{self.data:016x}"
  75. # Abstract basis sequence class
  76. # set_operands() has to be implemented by class that inherits from this class
  77. class BaseSeq(uvm_sequence):
  78. async def body(self):
  79. self.cr = constraints()
  80. for _ in range(20):
  81. aes_tr = AesSeqItem("aes_tr", 0, 0, 0)
  82. await self.start_item(aes_tr)
  83. self.set_operands(aes_tr)
  84. await self.finish_item(aes_tr)
  85. def set_operands(self, tr):
  86. pass
  87. # Sequence for encryption tests with random stimuli
  88. class EncRandSeq(BaseSeq):
  89. def set_operands(self, tr):
  90. self.cr.randomize()
  91. tr.mode = 0
  92. tr.key = self.cr.key
  93. tr.data = self.cr.data
  94. # Sequence for decryption tests with random stimuli
  95. class DecRandSeq(BaseSeq):
  96. def set_operands(self, tr):
  97. self.cr.randomize()
  98. tr.mode = 1
  99. tr.key = self.cr.key
  100. tr.data = self.cr.data
  101. class Driver(uvm_driver):
  102. def build_phase(self):
  103. self.ap = uvm_analysis_port("ap", self)
  104. def start_of_simulation_phase(self):
  105. self.bfm = VaiBfm()
  106. async def launch_tb(self):
  107. await self.bfm.reset()
  108. self.bfm.start_tasks()
  109. async def run_phase(self):
  110. await self.launch_tb()
  111. while True:
  112. op = await self.seq_item_port.get_next_item()
  113. await self.bfm.send_op(op.mode, op.key, op.data)
  114. result = await self.bfm.get_output()
  115. self.ap.write(result)
  116. self.seq_item_port.item_done()
  117. class Scoreboard(uvm_component):
  118. def build_phase(self):
  119. self.input_fifo = uvm_tlm_analysis_fifo("input_fifo", self)
  120. self.output_fifo = uvm_tlm_analysis_fifo("output_fifo", self)
  121. self.input_get_port = uvm_get_port("input_get_port", self)
  122. self.output_get_port = uvm_get_port("output_get_port", self)
  123. self.input_export = self.input_fifo.analysis_export
  124. self.output_export = self.output_fifo.analysis_export
  125. self.passed = True
  126. def connect_phase(self):
  127. self.input_get_port.connect(self.input_fifo.get_export)
  128. self.output_get_port.connect(self.output_fifo.get_export)
  129. def check_phase(self):
  130. while self.output_get_port.can_get():
  131. _, result = self.output_get_port.try_get()
  132. op_success, op = self.input_get_port.try_get()
  133. if not op_success:
  134. self.logger.critical(f"result {result} had no input operation")
  135. else:
  136. (mode, key, data) = op
  137. aes = AES.new(key.buff, AES.MODE_ECB)
  138. if not mode:
  139. reference = aes.encrypt(data.buff)
  140. else:
  141. reference = aes.decrypt(data.buff)
  142. if result.buff == reference:
  143. self.logger.info(
  144. f"PASSED: {Mode(mode).name} 0x{data.integer:032x} with key "
  145. f"0x{key.integer:032x} = 0x{result.integer:032x}"
  146. )
  147. else:
  148. self.logger.error(
  149. f"FAILED: {Mode(mode).name} 0x{data.integer:032x} with key "
  150. f"0x{key.integer:032x} = 0x{result.integer:032x}, "
  151. f"expected 0x{int.from_bytes(reference, 'big'):032x}"
  152. )
  153. self.passed = False
  154. def report_phase(self):
  155. assert self.passed, "Test failed"
  156. class Monitor(uvm_component):
  157. def __init__(self, name, parent, method_name):
  158. super().__init__(name, parent)
  159. self.bfm = VaiBfm()
  160. self.get_method = getattr(self.bfm, method_name)
  161. def build_phase(self):
  162. self.ap = uvm_analysis_port("ap", self)
  163. async def run_phase(self):
  164. while True:
  165. datum = await self.get_method()
  166. self.logger.debug(f"MONITORED {datum}")
  167. self.ap.write(datum)
  168. # Coverage collector and checker
  169. class Coverage(uvm_subscriber):
  170. def start_of_simulation_phase(self):
  171. self.cg = covergroup()
  172. try:
  173. self.disable_errors = ConfigDB().get(self, "", "DISABLE_COVERAGE_ERRORS")
  174. except UVMConfigItemNotFound:
  175. self.disable_errors = False
  176. def write(self, data):
  177. (mode, key, _) = data
  178. self.cg.sample(mode, key)
  179. def report_phase(self):
  180. if not self.disable_errors:
  181. if self.cg.get_coverage() != 100.0:
  182. self.logger.warning("Functional coverage incomplete.")
  183. else:
  184. self.logger.info("Covered all operations")
  185. with open("results/tb_aes_fcover.txt", "a", encoding="utf-8") as f:
  186. f.write(get_coverage_report(details=True))
  187. vsc.write_coverage_db("results/tb_aes_fcover.xml")
  188. # AES test bench environment
  189. # Creates instances of components and connects them
  190. class AesEnv(uvm_env):
  191. def build_phase(self):
  192. self.seqr = uvm_sequencer("seqr", self)
  193. ConfigDB().set(None, "*", "SEQR", self.seqr)
  194. self.driver = Driver.create("driver", self)
  195. self.input_mon = Monitor("input_mon", self, "get_input")
  196. self.coverage = Coverage("coverage", self)
  197. self.scoreboard = Scoreboard("scoreboard", self)
  198. def connect_phase(self):
  199. self.driver.seq_item_port.connect(self.seqr.seq_item_export)
  200. self.input_mon.ap.connect(self.scoreboard.input_export)
  201. self.input_mon.ap.connect(self.coverage.analysis_export)
  202. self.driver.ap.connect(self.scoreboard.output_export)