123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799 |
- #!/usr/bin/env python3
- #
- # memtest.py
- #
- # Base utiity/driver classes for the various control software variants
- #
- # Copyright (C) 2020-2021 Sylvain Munaut <tnt@246tNt.com>
- # SPDX-License-Identifier: MIT
- #
- import binascii
- import random
- import serial
- import sys
- # ----------------------------------------------------------------------------
- # Serial commands
- # ----------------------------------------------------------------------------
- class WishboneInterface(object):
- COMMANDS = {
- 'SYNC' : 0,
- 'REG_ACCESS' : 1,
- 'DATA_SET' : 2,
- 'DATA_GET' : 3,
- 'AUX_CSR' : 4,
- }
- def __init__(self, port):
- self.ser = ser = serial.Serial()
- ser.port = port
- ser.baudrate = 2000000
- ser.stopbits = 2
- ser.timeout = 0.1
- ser.open()
- if not self.sync():
- raise RuntimeError("Unable to sync")
- def sync(self):
- for i in range(10):
- self.ser.write(b'\x00')
- d = self.ser.read(4)
- if (len(d) == 4) and (d == b'\xca\xfe\xba\xbe'):
- return True
- return False
- def write(self, addr, data):
- cmd_a = ((self.COMMANDS['DATA_SET'] << 36) | data).to_bytes(5, 'big')
- cmd_b = ((self.COMMANDS['REG_ACCESS'] << 36) | addr).to_bytes(5, 'big')
- self.ser.write(cmd_a + cmd_b)
- def read(self, addr):
- cmd_a = ((self.COMMANDS['REG_ACCESS'] << 36) | (1<<20) | addr).to_bytes(5, 'big')
- cmd_b = ((self.COMMANDS['DATA_GET'] << 36)).to_bytes(5, 'big')
- self.ser.write(cmd_a + cmd_b)
- d = self.ser.read(4)
- if len(d) != 4:
- raise RuntimeError('Comm error')
- return int.from_bytes(d, 'big')
- def aux_csr(self, value):
- cmd = ((self.COMMANDS['AUX_CSR'] << 36) | value).to_bytes(5, 'big')
- self.ser.write(cmd)
- # ----------------------------------------------------------------------------
- # QSPI controller
- # ----------------------------------------------------------------------------
- class QSPIController(object):
- CORE_REGS = {
- 'csr': 0,
- 'rf': 3,
- }
- def __init__(self, intf, base, cs=0):
- self.intf = intf
- self.base = base
- self.cs = cs
- self._end()
- def _write(self, reg, val):
- self.intf.write(self.base + self.CORE_REGS.get(reg, reg), val)
- def _read(self, reg):
- return self.intf.read(self.base + self.CORE_REGS.get(reg, reg))
- def _begin(self):
- # Request external control
- self._write('csr', 0x00000004 | (self.cs << 4))
- self._write('csr', 0x00000002 | (self.cs << 4))
- def _end(self):
- # Release external control
- self._write('csr', 0x00000004)
- def spi_xfer(self, tx_data, dummy_len=0, rx_len=0):
- # Start transaction
- self._begin()
- # Total length
- l = len(tx_data) + rx_len + dummy_len
- # Prep buffers
- tx_data = tx_data + bytes( ((l + 3) & ~3) - len(tx_data) )
- rx_data = b''
- # Run
- while l > 0:
- # Word and command
- w = int.from_bytes(tx_data[0:4], 'big')
- c = 0x13 if l >= 4 else (0x10 + l - 1)
- s = 0 if l >= 4 else 8*(4-l)
- # Issue
- self._write(c, w);
- w = self._read('rf')
- # Get RX
- rx_data = rx_data + ((w << s) & 0xffffffff).to_bytes(4, 'big')
- # Next
- l = l - 4
- tx_data = tx_data[4:]
- # End transaction
- self._end()
- # Return interesting part
- return rx_data[-rx_len:]
- def _qpi_tx(self, data, command=False):
- while len(data):
- # Base command
- cmd = 0x1c if command else 0x18
- # Grab chunk
- word = data[0:4]
- data = data[4:]
- cmd |= len(word) - 1
- word = word + bytes(-len(word) & 3)
- # Transmit
- self._write(cmd, int.from_bytes(word, 'big'));
- def _qpi_rx(self, l):
- data = b''
- while l > 0:
- # Issue read
- wl = 4 if l >= 4 else l
- cmd = 0x14 | (wl-1)
- self._write(cmd, 0)
- word = self._read('rf')
- # Accumulate
- data = data + (word & (0xffffffff >> (8*(4-wl)))).to_bytes(wl, 'big')
- # Next
- l = l - 4
- return data
- def qpi_xfer(self, cmd=b'', payload=b'', dummy_len=0, rx_len=0):
- # Start transaction
- self._begin()
- # TX command
- if cmd:
- self._qpi_tx(cmd, True)
- # TX payload
- if payload:
- self._qpi_tx(payload, False)
- # Dummy
- if dummy_len:
- self._qpi_rx(dummy_len)
- # RX payload
- if rx_len:
- rv = self._qpi_rx(rx_len)
- else:
- rv = None
- # End transaction
- self._end()
- return rv
- # ----------------------------------------------------------------------------
- # HyperRAM controller
- # ----------------------------------------------------------------------------
- class HyperRAMController(object):
- CORE_REGS = {
- 'csr': 0,
- 'cmd': 1,
- 'wq0': 2,
- 'wq1': 3,
- }
- CSR_RUN = (1 << 0)
- CSR_RESET = (1 << 1)
- CSR_IDLE_CFG = (1 << 2)
- CSR_IDLE_RUN = (1 << 3)
- CSR_CMD_LAT = lambda self, x: ((x-1) & 15) << 8
- CSR_CAP_LAT = lambda self, x: ((x-1) & 15) << 12
- CSR_PHY_DELAY = lambda self, x: (x & 15) << 16
- CSR_PHY_PHASE = lambda self, x: (x & 3) << 20
- CSR_PHY_EDGE = lambda self, x: (x & 1) << 22
- CMD_LEN = lambda self, x: ((x-1) & 15) << 8
- CMD_LAT = lambda self, x: ((x-1) & 15) << 4
- CMD_CS = lambda self, x: (x & 3) << 2
- CMD_REG = (1 << 1)
- CMD_MEM = (0 << 1)
- CMD_READ = (1 << 0)
- CMD_WRITE = (0 << 0)
- # Selected so:
- # - each byte is unique
- # - ORing all bytes in a single word == 255
- # - ANDing all bytes in a single word == 0
- CAL_WORDS = [ 0x600dbabe, 0xb16b00b5 ]
- # Register addresses
- HYPERRAM_REGS = {
- 'id0': 0,
- 'id1': 1,
- 'cr0': 0 | (1 << 11),
- 'cr1': 1 | (1 << 11),
- }
- def __init__(self, intf, base, latency=3, csm=0xf, burst_len=128):
- self.intf = intf
- self.base = base
- self.latency = latency
- self.csm = csm
- self.burst_len = burst_len
- # We're always in 2x latency mode, also the location where
- # latency start and the 1 cycle added by the core because it works
- # 32 bit at a time means we can remove 2 cycles of the latency
- self._cmd_latency = (2 * latency - 2) // 2
- def _write(self, reg, val):
- self.intf.write(self.base + self.CORE_REGS[reg], val)
- def _read(self, reg):
- return self.intf.read(self.base + self.CORE_REGS[reg])
- def _cr0(self, dpd=False, drive_strength=None, latency=6, fixed_latency=True, hybrid_burst=True, burst_len=32):
- DRIVE = {
- None: 0,
- 115: 1,
- 67: 2,
- 46: 3,
- 34: 4,
- 27: 5,
- 22: 6,
- 19: 7,
- }
- LATENCY = {
- 3: 14,
- 4: 15,
- 5: 0,
- 6: 1,
- }
- BURST_LEN = {
- 128: 0,
- 64: 1,
- 32: 3,
- 16: 2,
- }
- return (
- ((dpd ^ 1) << 15) |
- (DRIVE[drive_strength] << 12) |
- (0xf << 8) |
- (LATENCY[latency] << 4) |
- (fixed_latency << 3) |
- (hybrid_burst << 2) |
- (BURST_LEN[burst_len] << 0)
- )
- def _cr1(self, dri=None):
- DRI = {
- None: 2,
- "1x": 2 ,
- "1.5x": 3,
- "2x": 0,
- "4x": 1,
- }
- return DRI[dri]
- def _ca(self, addr, rwn=0, reg=0, linear=0):
- return (
- (rwn << 47) |
- (reg << 46) |
- ((linear | reg) << 45) |
- ((addr >> 3) << 16) |
- ((addr & 7) << 0)
- )
- def _wait_idle(self):
- # Wait until it's in IDLE Config mode
- for i in range(10):
- if self._read('csr') & self.CSR_IDLE_CFG:
- break
- else:
- raise RuntimeError('HyperRAM controller timeout')
- def _reg_write(self, cs, reg, val):
- ca = self._ca(self.HYPERRAM_REGS[reg], rwn=0, reg=1)
- self._write('wq1', 0x30)
- self._write('wq0', ca >> 16)
- self._write('wq0', ((ca & 0xffff) << 16) | val)
- self._write('wq0', 0)
- self._write('cmd',
- self.CMD_CS(cs) |
- self.CMD_REG |
- self.CMD_WRITE
- )
- self._wait_idle()
- def _reg_read(self, cs, reg):
- ca = self._ca(self.HYPERRAM_REGS[reg], rwn=1, reg=1)
- self._write('wq1', 0x30)
- self._write('wq0', ca >> 16)
- self._write('wq1', 0x20)
- self._write('wq0', (ca & 0xffff) << 16)
- self._write('wq1', 0x00)
- self._write('wq0', 0)
- self._write('cmd',
- self.CMD_LAT(self._cmd_latency) |
- self.CMD_CS(cs) |
- self.CMD_REG |
- self.CMD_READ
- )
- self._wait_idle()
- rv = []
- for i in range(3):
- w1 = self._read('wq1')
- w0 = self._read('wq0')
- rv.append( (w0, w1) )
- return rv[-1][0] >> 16
- def _mem_write(self, cs, addr, val, count=1, mask=0x0):
- ca = self._ca(addr, rwn=0, reg=0)
- self._write('wq1', 0x30)
- self._write('wq0', ca >> 16)
- self._write('wq1', 0x20)
- self._write('wq0', (ca & 0xffff) << 16)
- self._write('wq1', 0x30 | mask)
- self._write('wq0', val)
- self._write('cmd',
- self.CMD_LEN(count) |
- self.CMD_LAT(self._cmd_latency) |
- self.CMD_CS(cs) |
- self.CMD_MEM |
- self.CMD_WRITE
- )
- self._wait_idle()
- def _mem_read(self, cs, addr, count=3):
- if count > 3:
- raise ValueError('Unable to read more than 3 words at a time')
- ca = self._ca(addr, rwn=1, reg=0)
- self._write('wq1', 0x30)
- self._write('wq0', ca >> 16)
- self._write('wq1', 0x20)
- self._write('wq0', (ca & 0xffff) << 16)
- self._write('wq1', 0x00)
- self._write('wq0', 0)
- self._write('cmd',
- self.CMD_LEN(count) |
- self.CMD_LAT(self._cmd_latency) |
- self.CMD_CS(cs) |
- self.CMD_MEM |
- self.CMD_READ
- )
- self._wait_idle()
- rv = []
- for i in range(3):
- w1 = self._read('wq1')
- w0 = self._read('wq0')
- rv.append( (w0, w1) )
- return rv[-count:]
- def _train_check_edge_delay(self, cs, edge, delay):
- # Configure for base capture latency and phase
- self._write('csr',
- self.CSR_PHY_EDGE(edge) |
- self.CSR_PHY_PHASE(0) |
- self.CSR_PHY_DELAY(delay) |
- self.CSR_CMD_LAT(self._cmd_latency) |
- self.CSR_CAP_LAT(3)
- )
- # Find the capture latency and phase
- data = self._mem_read(cs, 0, count=3)
- for w,a in data:
- print(f"{bin(a)} {w:08x}")
- for i in range(3):
- if (data[i][1] & 0xf):
- break
- else:
- return None
- for j in range(4):
- if data[i][1] & (8 >> j):
- break
- cap_latency = 3 + i + (j > 0)
- phase = (4 - j) % 4
- # Re-configure core
- self._write('csr',
- self.CSR_PHY_EDGE(edge) |
- self.CSR_PHY_PHASE(phase) |
- self.CSR_PHY_DELAY(delay) |
- self.CSR_CMD_LAT(self._cmd_latency) |
- self.CSR_CAP_LAT(cap_latency)
- )
- # Confirm data
- data = self._mem_read(cs, 0, count=3)
- ref = [
- (self.CAL_WORDS[0], 0x3a),
- (self.CAL_WORDS[1], 0x3a),
- (self.CAL_WORDS[0], 0x3a),
- ]
- if data != ref:
- return None
- return (cap_latency, phase)
- def _train_consolidate(self, train):
- # Checks combination valid for all chips
- rv = {}
- for delay, results in train.items():
- r = [v for k,v in results.items() if self.csm & (1 << k)]
- for x in r:
- if (x is None) or (x != r[0]):
- print("[.] delay=%2d -> Invalid" % delay)
- rv[delay] = None
- break
- else:
- print("[.] delay=%2d -> cap_latency=%d, phase=%d" % (delay, *r[0]))
- rv[delay] = r[0]
- return rv
- def _train_group(self, train):
- groups = []
- c_v = None
- c_d = []
- c_first = False
- c_last = False
- for idx, (delay, result) in enumerate(sorted(train.items())):
- # First / Last checks
- is_first = idx == 0
- is_last = idx == (len(train) - 1)
- # Continue ?
- if result and (c_v == result):
- c_d.append(delay)
- c_first |= is_first
- c_last |= is_last
- # Or not ...
- else:
- # Flush current
- if c_v is not None:
- groups.append( (c_v, c_d, c_first, c_last) )
- # New item
- c_v = result
- c_d = [ delay ]
- c_first = is_first
- c_last = is_last
- if c_v is not None:
- groups.append( (c_v, c_d, c_first, c_last) )
- return groups
- def _train_pick_params(self, best):
- # Pick delay
- if best[2] and best[3]:
- d = (best[1][0] + best[1][-1]) // 2
- elif best[2]:
- d = min(best[1])
- elif best[3]:
- d = max(best[1])
- else:
- d = int(round(sum(best[1]) / len(best[1])))
- # If the group is only a single value 'wide', print warning it might be marginal
- if len(best[1]) == 1:
- print("[w] Training results might be marginal. Consider switching capture clock phase by 90 deg")
- # Return delay and params
- return d, best[0][0], best[0][1]
- def init(self):
- # Reset HyperRAM and controller
- self._write('csr', self.CSR_RESET)
- self._wait_idle()
- self._write('csr', 0)
- self._wait_idle()
- # Chip config
- self.cr0 = self._cr0(latency=self.latency, burst_len=self.burst_len)
- self.cr1 = self._cr1()
- # DEBUG
- if False:
- cs = 0
- for i in range(5):
- print(hex(self.cr0))
- self._reg_write(cs, 'cr0', self.cr0)
- self._mem_write(cs, 0, self.CAL_WORDS[0], count=3)
- self._mem_write(cs, 2, self.CAL_WORDS[1], count=1)
- self._write('csr',
- self.CSR_PHY_EDGE(1) |
- self.CSR_PHY_PHASE(0) |
- self.CSR_PHY_DELAY(0) |
- self.CSR_CMD_LAT(self._cmd_latency) |
- self.CSR_CAP_LAT(3)
- )
- print(f"{self._read('csr'):08x}")
- for w,a in self._mem_read(cs, 0, count=3):
- print(f"{bin(a)} {w:08x}")
- return False
- # Execute configuration and training on all chips
- edge = 1
- train = {}
- for cs in range(4):
- if not self.csm & (1 << cs):
- continue
- # Debug
- print("[+] Training CS=%d" % cs)
- # CR write
- self._reg_write(cs, 'cr0', self.cr0)
- self._reg_write(cs, 'cr1', self.cr1)
- # Write the calibration words
- self._mem_write(cs, 0, self.CAL_WORDS[0], count=3)
- self._mem_write(cs, 2, self.CAL_WORDS[1], count=1)
- # Scan delays
- any_valid = False
- for delay in [0, 5, 10, 15]:
- d = self._train_check_edge_delay(cs, edge, delay)
- print("[.] delay=%2d -> %s" % (delay, "Failed" if (d is None) else ("cap_latency=%d, phase=%d" % d)))
- train.setdefault(delay, {})[cs] = d
- any_valid |= d is not None
- # If nothing valid found, assume chip is missing
- if not any_valid:
- print("[w] No working delay found, assuming chip is missing: disabling it !")
- self.csm &= ~(1 << cs)
- # Are any chips still enabled ?
- if not self.csm:
- print("[!] All chips disabled, somethins is wrong ...")
- return False
- # Find the best combination
- print("[+] Compiling training results")
- # Check what works for all chips
- train = self._train_consolidate(train)
- if not any(train.values()):
- print("[!] Unable to find single valid combination for all chips :(")
- return False
- # Group them
- groups = self._train_group(train)
- # Pick best group
- best = sorted(groups, key=lambda x: len(x[1]) + 2 * (x[2] + x[3]), reverse=True)[0]
- # Select delay
- self._delay, self._cap_latency, self._phase = self._train_pick_params(best)
- # Load final configuration
- print("[+] Core configured for cmd_latency=%d, capture_latency=%d, phase=%d, delay=%d" % (
- self._cmd_latency, self._cap_latency, self._phase, self._delay
- ))
- self._csr = (
- self.CSR_PHY_EDGE(edge) |
- self.CSR_PHY_PHASE(self._phase) |
- self.CSR_PHY_DELAY(self._delay) |
- self.CSR_CMD_LAT(self._cmd_latency) |
- self.CSR_CAP_LAT(self._cap_latency)
- )
- self._write('csr', self._csr)
- # Success
- return True
- def set_runtime(self, runtime):
- self._write('csr', self._csr | (self.CSR_RUN if runtime else 0))
- # ----------------------------------------------------------------------------
- # Memory tester
- # ----------------------------------------------------------------------------
- class MemoryTester(object):
- CORE_REGS = {
- 'cmd': 0,
- 'addr': 1,
- }
- CMD_DUAL = 1 << 18
- CMD_CHECK_RST = 1 << 17
- CMD_READ = 1 << 16
- CMD_WRITE = 0 << 16
- CMD_BUF_ADDR = lambda self, addr: addr << 8
- CMD_LEN = lambda self, l: (l-1) << 0
- def __init__(self, intf, base):
- self.intf = intf
- self.base = base
- def _write(self, reg, val):
- self.intf.write(self.base + self.CORE_REGS[reg], val)
- def _read(self, reg):
- return self.intf.read(self.base + self.CORE_REGS[reg])
- def ram_write(self, addr, val):
- self.intf.write(self.base + 0x100 + addr, val)
- def ram_read(self, addr):
- return self.intf.read(self.base + 0x100 + addr)
- def cmd_write(self, ram_addr, buf_addr, xfer_len):
- self._write('addr', ram_addr)
- self._write('cmd',
- self.CMD_WRITE |
- self.CMD_BUF_ADDR(buf_addr) |
- self.CMD_LEN(xfer_len)
- )
- def cmd_read(self, ram_addr, buf_addr, xfer_len, check_reset=False, dual=False):
- self._write('addr', ram_addr)
- self._write('cmd',
- (self.CMD_DUAL if dual else 0) |
- (self.CMD_CHECK_RST if check_reset else 0) |
- self.CMD_READ |
- self.CMD_BUF_ADDR(buf_addr) |
- self.CMD_LEN(xfer_len)
- )
- def load_data(self, addr, data):
- for base in range(0, len(data), 128):
- # Upload chunk to RAM (128 bytes = max burst len)
- for j in range(0, 128, 4):
- b = (data[base+j:base+j+4] + b'\x00\x00\x00\x00')[0:4]
- w = int.from_bytes(b, 'big')
- self.ram_write(j // 4, w)
- # Issue command to write chunk to RAM
- self.cmd_write(addr + (base // 4), 0, 32)
- def run(self, base, size):
- # Check alignement
- if (base & 31) or (size & 31):
- raise ValueError('Base Address and Size argument for memory testing must be aligned on 32-words')
- # Load random block of data
- ref_data = [
- random.randint(0, (1<<32)-1)
- for i in range(256)
- ]
- for i in range(256):
- self.ram_write(i, ref_data[i])
- # Fill memory
- for addr in range(base, base+size, 32):
- print(" . Writing block @ %08x\r" % (addr,), end='')
- self.cmd_write(addr, addr & 0xff, 32)
- # Validate all blocks
- all_good = True
- for addr in range(base, base+size, 32):
- blk_first = (addr & 0xfff) == 0x000
- blk_last = (addr & 0xfff) == 0xfe0
- print(" . Reading block @ %08x\r" % (addr,), end='')
- self.cmd_read(addr, addr & 0xff, 32, check_reset=blk_first)
- if blk_last:
- if not (self._read('cmd') & 2):
- print(" ! Failed at block %08x" % (addr,))
- all_good = False
- print(" \r", end='')
- return all_good
- # ----------------------------------------------------------------------------
- # HDMI Output
- # ----------------------------------------------------------------------------
- class HDMIOutput(object):
- def __init__(self, intf, base):
- self.intf = intf
- self.base = base
- def _write(self, reg, val):
- self.intf.write(self.base + self.CORE_REGS[reg], val)
- def _read(self, reg):
- return self.intf.read(self.base + self.CORE_REGS[reg])
- def pal_write(self, addr, val):
- self.intf.write(self.base + (1<<6) + addr, val)
- def enable(self, fb_addr, burst_len):
- # Frame Buffer address
- self.intf.write(self.base + 1, fb_addr)
- # Burst Config
- bn_cnt = ((1920 // 8) - 1) // burst_len
- bn_len = burst_len - 1
- bl_len = (1920 // 8) - (burst_len * bn_cnt) - 1
- bl_inc = bl_len
- self.intf.write(self.base + 0,
- (1 << 31) |
- (bn_cnt << 24) |
- (bn_len << 16) |
- (bl_len << 8) |
- (bl_inc << 0)
- )
- def disable(self):
- self.intf.write(self.base + 0, 0)
|