diff --git a/dc11.py b/dc11.py index 37f2a82..679ac5c 100644 --- a/dc11.py +++ b/dc11.py @@ -40,6 +40,6 @@ class DC11: DC11_NDEVS = 4 # four devices, each is 4 16-bit registers def __init__(self, ub, baseaddr=DC11_DEFAULT): + self.addr = self.DC11_DEFAULT self.ub = ub - self.addr = ub.mmio.register( - None, self.DC11_DEFAULT, self.DC11_NDEVS * 4) + self.ub.register(None, self.DC11_DEFAULT, self.DC11_NDEVS * 4) diff --git a/kl11.py b/kl11.py index 8309681..8fcaeaa 100644 --- a/kl11.py +++ b/kl11.py @@ -30,6 +30,7 @@ import threading import queue from pdptraps import PDPTraps +from unibus import BusCycle class KL11: @@ -46,9 +47,9 @@ class KL11: SERVERPORT = 1170 def __init__(self, ub, baseaddr=KL11_DEFAULT): + self.addr = baseaddr self.ub = ub - self.addr = ub.mmio.register(self.klregs, baseaddr, 4) - ub.mmio.devicereset_register(self.reset) + self.ub.register(ub.autobyte(self.klregs), baseaddr, 4) # output characters are just queued (via tq) to the output thread # input characters have to undergo a more careful 1-by-1 @@ -70,18 +71,16 @@ class KL11: self._t = threading.Thread(target=self._connectionserver, daemon=True) self._t.start() - def reset(self, ub): - """Called for UNIBUS resets (RESET instruction).""" - self.rcdone = False - self.r_ienable = False - self.r_tenable = False + def klregs(self, addr, cycle, /, *, value=None): + if cycle == BusCycle.RESET: + self.rcdone = False + self.r_ienable = False + self.r_tenable = False + return - def klregs(self, addr, value=None, /): match addr - self.addr: case 0: # rcsr - if value is None: - # *** READING *** - + if cycle == BusCycle.READ16: value = 0 if self.r_ienable: @@ -90,8 +89,7 @@ class KL11: if self.rcdone: value |= self.RCDONE - else: - # *** WRITING *** + elif cycle == BusCycle.WRITE16: if value & self.RDRENA: with self.rxc: # a request to get one character, which only @@ -101,8 +99,7 @@ class KL11: self.r_ienable = (value & self.IENABLE) self.rxc.notify() - case 2 if value is None: # rbuf - # *** READING *** + case 2 if cycle == BusCycle.READ16: # rbuf with self.rxc: value = self.rdrbuf self.rcdone = False @@ -110,36 +107,24 @@ class KL11: # transmit buffer status (sometimes called tcsr) case 4: - if value is None: - # *** READING *** + if cycle == BusCycle.READ16: value = self.TXRDY # always ready to send chars if self.t_ienable: value |= self.IENABLE - else: - # *** WRITING *** + elif cycle == BusCycle.WRITE16: prev = self.t_ienable self.t_ienable = (value & self.IENABLE) if self.t_ienable and not prev: self.ub.intmgr.simple_irq(pri=4, vector=0o64) # transmit buffer - case 6: - if value is None: - # *** READING *** - # manual says this is load-only; however automatic - # byte write support (byteme/mmio) requires this - # be readable. Probably byteme should be fixed instead - # to catch traps from unreadables and synthesize - # a zero there (?) - value = 0 - else: - # *** WRITING *** - value &= 0o177 - if (value != 0o177): - s = chr(value) - self.tq.put(s) - if self.t_ienable: - self.ub.intmgr.simple_irq(pri=4, vector=0o64) + case 6 if cycle == BusCycle.WRITE16: # tbuf + value &= 0o177 + if (value != 0o177): + s = chr(value) + self.tq.put(s) + if self.t_ienable: + self.ub.intmgr.simple_irq(pri=4, vector=0o64) case _: raise PDPTraps.AddressError diff --git a/kw11.py b/kw11.py index c331caa..c279c00 100644 --- a/kw11.py +++ b/kw11.py @@ -41,7 +41,7 @@ class KW11: target=self._cloop, args=(1/HZ, ub.intmgr), daemon=True) self.interrupts_enabled = False self.monbit = 1 # the manual says this starts as 1 - ub.mmio.register_simpleattr(self, 'LKS', self.KW11_OFFS, reset=True) + ub.register_simpleattr(self, 'LKS', self.KW11_OFFS, reset=True) self._t.start() # clock loop diff --git a/machine.py b/machine.py index b791018..90594bb 100644 --- a/machine.py +++ b/machine.py @@ -38,11 +38,6 @@ from op4 import op4_dispatch_table # monolithic/large class file seemed less than ideal. Python does not # allow multiple files for a single class. # -# Some parts of the implementation, like mmu and mmio, and various devices, -# made sense as separate component classes. The opcodes, however, are -# basically additional methods in separate files. Since they are not real -# methods they get passed a "cpu" argument manually instead of "self". -# # Was there a better way? Just give in and have one huge file?? # # The opcode parsing/dispatch starts with the top 4 bits of the opcode; @@ -248,7 +243,7 @@ class PDP11: ('systemID', self.SYSTEMID_OFFS), ('error_register', self.CPUERROR_OFFS), ('logging_hack', self.LOGGING_OFFS)): - self.ub.mmio.register_simpleattr(self, attrname, offs) + self.ub.register_simpleattr(self, attrname, offs) # console switches (read) and blinken lights (write) self.swleds = 0 @@ -326,7 +321,7 @@ class PDP11: # # device_instance = XYZ11(p.ub) # - # The device __init__ will typically use ub.mmio.register to connect up + # The device __init__ will typically use ub.register to connect up # to its UNIBUS addresses. That's all that needs to happen. # # Nevertheless, on general principles, it seems like the pdp instance @@ -826,9 +821,9 @@ class PDP1170(PDP11): self.r = self.registerfiles[self.psw_regset] # how the registers appear in IOPAGE space - self.ub.mmio.register(self._ioregsets, - self.IOPAGE_REGSETS_OFFS, - self.IOPAGE_REGSET_SIZE) + self.ub.register(self._ioregsets, + self.IOPAGE_REGSETS_OFFS, + self.IOPAGE_REGSET_SIZE) @property def r_alt(self): diff --git a/mmio.py b/mmio.py index eebb675..da8eb32 100644 --- a/mmio.py +++ b/mmio.py @@ -1,291 +1 @@ -# MIT License -# -# Copyright (c) 2023 Neil Webber -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -from pdptraps import PDPTraps - - -class MMIO: - """Memory Mapped I/O handling for the 8K byte I/O page.""" - - # - # memory-mapped I/O is just a 4K array of function callbacks, one - # entry per each I/O WORD (even) I/O offset. Most entries, of course, - # remain set to an initial default "non-existent address" callback. - # - # See register() for setting a callback on a specific offset or range. - # - # Reads and writes to any offset within the I/O page invoke the - # callback function. Assuming f is the callback function: - # - # READS: v = f(ioaddr) - # The function f must return a value 0 .. 65535 - # - # WRITES: f(ioaddr, v) - # v will be an integer 0 .. 65535 - # The return value is ignored. - # - # Callbacks must be declared like this: - # def f(ioaddr, value=None, /): - # if value is None: - # ... this is the read case - # else: - # ... this is the write case - # - # Zero, of course, is a possible and common value to write, so do not make - # the rookie mistake of testing value for truthiness; test for *is* None. - # - # The ioaddr will always be relative to the base of the I/O page: - # (ioaddr & 8191) == ioaddr will always be True - # - # Callbacks may also optionally receive a byte operation indicator, - # but only if they register for that. See BYTE OPERATIONS, below. - # Most callbacks will instead rely on the framework to synthesize - # byte operations from words; see _byte_wrapper and byteme. - # - # Callback functions that need context or other data can, of course, - # be bound methods (automatically receiving self) or also can use - # functools.partial() to get additional arguments passed in. - # - # - # ODD I/O ADDRESSES, BYTE OPERATIONS: - # - # The physical UNIBUS always *reads* full 16-bit words; there is no - # capability for a byte read at the electrical bus level. - # - # Oddly (haha) the bus *can* specify byte access for writes. - # Even odder (hahaha), devices that provide full-word access at - # odd addresses exist, the best example being the CPU itself. In some - # models the registers are available at a block of UNIBUS addresses, - # and some of the 16-bit registers have ODD addresses. - # For example, UNIBUS address 777707 is the PDP-11 cpu PC, as a - # full 16-bit word, while 777706 is the PDP-11 cpu KSP. - # - # This creates potential for havoc if programmers use byte operations - # on I/O addresses. Consider a typical sequence to read consecutive - # bytes from an address in R0: - # - # MOVB (R0)+,R1 # get the low byte of the word R0 points to - # MOVB (R0)+,R2 # get the high byte of that word - # - # If executed with R0 = 177706 (the KSP register virtual address - # with a typical 16-bit mapping of the upper page to I/O space) - # then R1 will be the low byte of KSP but the next access, which will - # be seen by the UNIBUS as a word read on an odd address, will pull - # the PC value (at 777707) as a word and extract the upper byte - # from THAT (PC upper byte in R2; KSP lower byte in R1). This is - # probably an unexpected result, which is a good argument against - # using byte operations in I/O space. Nevertheless, byte operations - # in I/O space abound in real world code. - # - # Thus: - # * DEVICES THAT DIRECTLY SUPPORT BYTE-WRITE OPERATIONS: - # Specify "byte_writes=True" in the registration call: - # - # mmio.register(somefunc, someoffset, somesize, byte_writes=True) - # - # and declare the somefunc callback like this: - # def somefunc(ioaddr, value=None, /, *, opsize=2): - # ... - # - # The opsize argument will be 2 for word operations and 1 for bytes. - # NOTE: Byte READS will never be seen; only byte WRITES. - # - # * DEVICES THAT DON'T CARE ABOUT BYTE-WRITES: - # The common/standard case. Let byte_writes default to False - # (i.e., leave it out of the registration call). The callback - # will be invoked with the simpler signature: f(ioaddr, v) - # - # * DEVICES THAT SUPPLY WORDS AT ODD ADDRESSES - # The cpu being the canonical example of this... register the - # I/O callback at the corresponding even address and use the ioaddr - # determine which address (the even or the odd) was requested. - # - # Devices that respond to a block of related addresses can register - # one callback to cover more than one word of Unibus address space. - # The callback gets the ioaddr which it can further decode. Again, - # the CPU register block is a good example of this. - # - # - # The PDP-11 RESET INSTRUCTION - # - # The PDP-11 RESET instruction causes the UNIBUS to be reset. - # Devices that want to know about this should: - # - # mmio.devicereset_register(resetfunc) - # - # And then resetfunc will be invoked as: - # resetfunc(mmio.ub) - # (i.e., passed a single argument, the UNIBUS object). - # - # For dead-simple cases, the optional reset=True argument can be - # supplied to register(), which will cause it to also arrange for the - # iofunc to be called at RESET time, like this: - # - # iofunc(baseaddr, 0) - # - # which, it should be noted, is indistinguishable from a program - # merely setting that I/O address to zero. Note too that if iofunc - # was registered once for an N-word block a RESET will still only call - # it ONCE, on the baseaddr of that block. - # - # If this convenience, with its limitations, is insufficient for a - # device then it must use the devicereset registration instead. - # - - def __init__(self, cpu): - self.cpu = cpu - self.mmiomap = [self.__nodev] * (self.cpu.IOPAGE_SIZE >> 1) - self.device_resets = set() - - # the default entry for unoccupied I/O: cause an AddressError trap - def __nodev(self, addr, value=None, /, *, opsize=2): - self.cpu.logger.info(f"Access to non-existent I/O {oct(addr)}") - raise PDPTraps.AddressError( - cpuerr=self.cpu.CPUERR_BITS.UNIBUS_TIMEOUT) - - # Devices may have simple "dummy" I/O addresses that always read zero - # and ignore writes; See "if iofunc is None" in register() method. - # NOTE: register() byteme-wraps this so opsize not needed. - def __ignoredev(self, addr, value=None, /): - self.cpu.logger.debug(f"dummy zero device @ {oct(addr)}, {value=}") - return 0 - - # register a call-back for an I/O address, which MUST be an offset - # within the 8K I/O page (which itself may reside at three different - # physical locations depending on configurations, thus explaining - # why this routine deals only in the offsets). - # - # Variations: - # iofunc=None -- implement a dummy: reads as zero, ignores writes - # reset=True -- also registers iofunc to be called at RESET time - # byte_writes=True -- Request byte writes be sent to the iofunc - # vs hidden/converted into word ops. - # - def register(self, iofunc, offsetaddr, nwords, *, - byte_writes=False, reset=False): - - if offsetaddr >= self.cpu.IOPAGE_SIZE: - raise ValueError(f"MMIO: I/O offset too large {oct(offsetaddr)}") - - # None is a shorthand for "this is a dummy always-zero addr" - if iofunc is None: - iofunc = self.__ignoredev - - # register this (raw/unwrapped) iofunc for reset if so requested - if reset: - self.devicereset_register(lambda ub: iofunc(offsetaddr, 0)) - - idx, odd = divmod(offsetaddr, 2) - if odd != 0: - # See discussion of odd I/O addrs in block comment elsewhere - raise ValueError("cannot register odd (byte) address in IO space") - - if not byte_writes: - # wrap the supplied I/O function with this code to implement - # byte write operations automatically in terms of words. - iofunc = self._byte_wrapper(iofunc) - - for i in range(nwords): - self.mmiomap[idx+i] = iofunc - return offsetaddr - - # wrap an I/O function with code that automatically handles byte writes. - def _byte_wrapper(self, iofunc): - def byteme(ioaddr, value=None, /, *, opsize=2): - if (value is None) or (opsize == 2): - return iofunc(ioaddr, value) - else: - # value is not None, and opsize is not 2 - # In other words: a byte write to I/O space. Synthesize it. - self.cpu.logger.debug(f"Byte write to {oct(ioaddr)} {value=}") - wv = self.wordRW(ioaddr) - if ioaddr & 1: - wv = (wv & 0o377) | (value << 8) - else: - wv = (wv & 0o177400) | value - self.wordRW(ioaddr, wv) - return byteme - - # Convenience method -- registers simple attributes (or properties) into - # I/O space in the obvious way: Make this attr (of obj) show at this addr - # - # If a device just needs some attributes set to zero on a RESET, - # it can specify them here with reset=True and they will be automatically - # set to zero by reset() (no need to devicereset_register). - def register_simpleattr(self, obj, attrname, addr, reset=False): - """Create and register a handler to read/write the named attr. - - obj - the object (often "self" for the caller of this method) - attrname - the attribute name to read/write - addr - the I/O address to register it to - - If attrname is None, the address is registered as a dummy location - that ignores writes and will always read as zero. This is sometimes - useful for features that have to exist but are emulated as no-op. - """ - - # could do this with partial, but can also do it with this nested - # func def. One way or another need this func logic anyway. - - def _rwattr(_, value=None, /): - """Read/write the named attr via the I/O callback protocol.""" - if attrname is None: - value = 0 - else: - if value is None: - value = getattr(obj, attrname) - else: - setattr(obj, attrname, value) - return value - - # NOTE: Registers a different ("closure of") rwattr each time. - return self.register(_rwattr, addr, 1, reset=reset) - - # In the real hardware, the PDP-11 RESET instruction pulls a reset line - # that all devices can see. In the emulation, devices that need to know - # about the RESET instruction must register themselves here: - def devicereset_register(self, func): - """Register func to be called whenever a RESET happens.""" - self.device_resets.add(func) - - # The PDP-11 RESET instruction eventually ends up here, causing - # a bus reset to be sent to all known registered devices. - def resetdevices(self): - for f in self.device_resets: - self.cpu.logger.debug(f"RESET callback: {f}") - f(self.cpu.ub) - - def wordRW(self, ioaddr, value=None, /): - """Read (value is None) or write the given I/O address.""" - if value is None: - value = self.mmiomap[ioaddr >> 1](ioaddr) - else: - self.mmiomap[ioaddr >> 1](ioaddr, value) - return value - - def byteRW(self, ioaddr, value=None, /): - """UNIBUS byte R/W - only write is legal.""" - if value is None: - raise ValueError("Unibus cannot read bytes") - else: - self.mmiomap[ioaddr >> 1](ioaddr, value, opsize=1) - return None +# this file has been deleted diff --git a/mmu.py b/mmu.py index 2cf3aa8..4f17a78 100644 --- a/mmu.py +++ b/mmu.py @@ -21,7 +21,8 @@ # SOFTWARE. from functools import partial -from pdptraps import PDPTraps +from pdptraps import PDPTraps, PDPTrap +from unibus import BusCycle from types import SimpleNamespace from collections import namedtuple @@ -68,8 +69,6 @@ class MemoryMgmt: self.cpu = cpu self.ub = cpu.ub - mmio = self.ub.mmio - # The "segment cache" dramatically speeds up address translation # for the most common MMU usage scenarios. # @@ -126,27 +125,27 @@ class MemoryMgmt: (0, self.DSPACE, 48)): ioaddr = base+offset iofunc = partial(self.io_parpdr, parpdr, mode, space, ioaddr) - mmio.register(iofunc, ioaddr, 8) # 8 words / 16 bytes + self.ub.register(iofunc, ioaddr, 8) # 8 words / 16 bytes # register the simple attrs MMR0 etc into I/O space: - mmio.register_simpleattr(self, 'MMR0', self.MMR0_OFFS, reset=True) - mmio.register_simpleattr(self, 'MMR1', self.MMR1_OFFS) - mmio.register_simpleattr(self, 'MMR2', self.MMR2_OFFS) - mmio.register_simpleattr(self, 'MMR3', self.MMR3_OFFS, reset=True) - mmio.register_simpleattr(self, None, self.MCR_OFFS) + self.ub.register_simpleattr(self, 'MMR0', self.MMR0_OFFS, reset=True) + self.ub.register_simpleattr(self, 'MMR1', self.MMR1_OFFS) + self.ub.register_simpleattr(self, 'MMR2', self.MMR2_OFFS) + self.ub.register_simpleattr(self, 'MMR3', self.MMR3_OFFS, reset=True) + self.ub.register_simpleattr(self, None, self.MCR_OFFS) - def io_parpdr(self, parpdr, mode, space, base, addr, value=None, /): - """mmio I/O function for MMU PARs and PDRs. + def io_parpdr(self, parpdr, mode, space, base, + addr, cycle, /, *, value=None): + """I/O function for MMU PARs and PDRs. NOTE: parpdr/mode/space/base args provided via partial() as supplied at registration time; see __init__. - The mmio module calls this simply as f(addr, value) """ aprnum = (addr - base) >> 1 aprfile = self.APR[(mode * 2) + space] - if value is None: + if cycle == BusCycle.READ16: return aprfile[aprnum][parpdr] - else: + elif cycle == BusCycle.WRITE16: # dump any matching cache entries in both reading/writing form. for reading in (True, False): # the "space" is a dilemma because it is tied up in @@ -504,7 +503,7 @@ class MemoryMgmt: pa = self.v2p(vaddr, mode, space, value is None, invis=_invis) if pa >= self.iopage_base: - return self.ub.mmio.wordRW(pa & self.cpu.IOPAGE_MASK, value) + return self.ub.wordRW(pa & self.cpu.IOPAGE_MASK, value) else: return self.cpu.physRW(pa, value) @@ -541,7 +540,7 @@ class MemoryMgmt: pa &= ~1 if pa >= self.iopage_base: - wv = self.ub.mmio.wordRW(pa & self.cpu.IOPAGE_MASK) + wv = self.ub.wordRW(pa & self.cpu.IOPAGE_MASK) else: wv = self.cpu.physRW(pa) return ((wv >> 8) if odd else wv) & 0o377 @@ -556,7 +555,7 @@ class MemoryMgmt: # Memory byte writes are synthesized. if pa >= self.iopage_base: - return self.ub.mmio.byteRW(pa & self.cpu.IOPAGE_MASK, value) + return self.ub.byteRW(pa & self.cpu.IOPAGE_MASK, value) else: wv = self.cpu.physRW(pa & ~1) if odd: diff --git a/pdptests.py b/pdptests.py index 81434fe..bd53831 100644 --- a/pdptests.py +++ b/pdptests.py @@ -24,13 +24,16 @@ from types import SimpleNamespace import breakpoints as BKP from machine import PDP1170 +from unibus import BusCycle from kl11 import KL11 from branches import BRANCH_CODES from pdptraps import PDPTraps import unittest import random import os +import io import hashlib +import boot from pdpasmhelper import InstructionBlock @@ -1696,6 +1699,35 @@ class TestMethods(unittest.TestCase): with self.subTest(i=i, val=val): self.assertEqual(val, p.physmem[recbase + i]) + # test loading the LDA format (absolute tape loader) + def test_load_lda(self): + p = self.make_pdp() + ldabytes = [ + 0, 0, 0, # testing zero skip + 1, 0, 9, 0, 0, 8, # address 0x800 (2k) + 1, 2, 3, # the data + 232, # checksum + + 0, 0, 0, 0, 0, # more zero skip testing + 1, 0, 9, 0, 3, 8, # address 0x803 (2k) + 4, 5, 6, # the data + 220, # checksum + 1, 0, 9, 0, 6, 8, # testing lack of zero skip + 7, 8, 9, + 208, + + # the END block + 1, 0, 6, 0, 1, 1, 247 + ] + with io.BytesIO(bytes(ldabytes)) as f: + addr = boot.load_lda_f(p, f) + self.assertEqual(addr, 257) # just "known" from data above + # this range of addresses and the related data is just "known" + # from the data bytes above + baseaddr = 0x800 + for offset in range(9): + self.assertEqual(p.mmu.byteRW(baseaddr+offset), offset+1) + def test_physrw_n(self): p = self.make_pdp() words = [1, 2, 3, 0, 0o40000, 65534] @@ -1723,6 +1755,79 @@ class TestMethods(unittest.TestCase): p.run(pc=startaddr) self.assertEqual(p.r[0], 1) + def test_io(self): + # unibus I/O callback tests + + p = self.make_pdp() + # this callback just records arguments taking advantage of closure + cbx = SimpleNamespace(value=0, arglog=[]) + + RESET_VALUE = 1234 # arbitrary + + def callback(ioaddr, cycle, /, **kwargs): + if len(kwargs) == 0: + cbx.arglog.append((ioaddr, cycle)) + elif len(kwargs) == 1: + cbx.arglog.append((ioaddr, cycle, kwargs['value'])) + else: + raise ValueError("invalid kwargs") + if cycle == BusCycle.READ16: + return cbx.value + elif cycle == BusCycle.RESET: + cbx.value = RESET_VALUE + elif cycle == BusCycle.WRITE16: + cbx.value = kwargs['value'] + elif cycle == BusCycle.WRITE8: + v = kwargs['value'] & 0xFF + if ioaddr & 1: + cbx.value = (cbx.value & 0x00FF) | (v << 8) + else: + cbx.value = (cbx.value & 0xFF00) | v + else: + assert False, "bad cycle" + + startaddr = 0o4000 + ioaddr = 0o177720 # arbitrary; in a "reserved" block fwiw + a = InstructionBlock() + a.mov(startaddr, 'sp') + a.mov(ioaddr, 'r5') + a.literal(5) # RESET instruction + a.mov('(r5)', 'r0') # expect r0 to be the RESET_VALUE + a.mov(1, '(r5)') # set new value to 1 + a.mov('(r5)', 'r1') # expect r1 to be 1 + a.mov(0o400, '(r5)') # setting a bit in the high byte + a.movb(2, '(r5)') # should only set the low part + a.mov('(r5)', 'r2') # expect r2 to be 0o402 + a.movb(0, '1(r5)') # should only clear the high part + a.mov('(r5)', 'r3') # expect r3 to be 2 + a.halt() + self.loadphysmem(p, a, startaddr) + + p.ub.register(callback, ioaddr & 8191) + p.run(pc=startaddr) + # per the various comments in the test sequence above + self.assertEqual(p.r[0], RESET_VALUE) + self.assertEqual(p.r[1], 1) + self.assertEqual(p.r[2], 0o402) + self.assertEqual(p.r[3], 2) + + # the sequence of arguments expected in the log - determined + # by inspection when the test code was created + offs = ioaddr & 8191 + gold = ( + (offs, BusCycle.RESET), # from the RESET instruction + (offs, BusCycle.READ16), # from mov (r5),r0 + (offs, BusCycle.WRITE16, 1), # from mov $1,(r5) + (offs, BusCycle.READ16), # from mov (r5),r1 + (offs, BusCycle.WRITE16, 0o400), # from mov $0400,(r5) + (offs, BusCycle.WRITE8, 2), # from the movb + (offs, BusCycle.READ16), # from mov (r5),r2 + (offs+1, BusCycle.WRITE8, 0), # from the movb + (offs, BusCycle.READ16), # from mov (r5),r3 + ) + for i, t in enumerate(cbx.arglog): + self.assertEqual(t, gold[i]) + def test_breakpoints1(self): # test the steps=N breakpoint capability diff --git a/unibus.py b/unibus.py index 0ad96bd..825ae7f 100644 --- a/unibus.py +++ b/unibus.py @@ -20,22 +20,214 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from enum import Enum from interrupts import InterruptManager -from mmio import MMIO +from pdptraps import PDPTraps, PDPTrap -# A convenient reference for addresses: -# https://gunkies.org/wiki/UNIBUS_Device_Addresses + +BusCycle = Enum('BusCycle', ('READ16', 'WRITE16', 'WRITE8', 'RESET')) class UNIBUS: + def __init__(self, cpu): self.cpu = cpu - self.mmio = MMIO(cpu) self.intmgr = InterruptManager() self.logger = cpu.logger + self.mmiomap = [self.__nodev] * (self.cpu.IOPAGE_SIZE >> 1) def resetbus(self): - self.mmio.resetdevices() + # this isn't especially efficient but it really doesn't matter. + # Construct a sequence of tuples: (a, f) + # where a is the address (really offset) in the I/O page + # and f is the callback + # but only for callbacks that are not __nodev + # and then invoke those callbacks w/BusCycle.RESET + resets = ((x << 1, f) + for x, f in enumerate(self.mmiomap) if f != self.__nodev) + for ioaddr, f in resets: + f(ioaddr, BusCycle.RESET) + + # register() -- connect a callback function to an I/O page address + # + # A convenient reference for addresses: + # https://gunkies.org/wiki/UNIBUS_Device_Addresses + # + # memory-mapped I/O is just a 4K array of function callbacks, one + # entry per each I/O WORD (even) I/O offset. Most entries, of course, + # remain set to an initial default "non-existent address" callback. + # + # Callbacks must be declared like this: + # def f(ioaddr, cycle, /, *, value=None): + # + # and should contain code dispatching on cycle and (potentially) ioaddr. + # If cycle is WRITE8 (byte) or WRITE16 (word), the 'value' argument will + # be supplied. Otherwise 'value' is not supplied. + # + # If cycle is READ16 (word), the read value should be returned. Note that + # no READ8 exists as the UNIBUS is not capable of expressing a byte read. + # + # The processor RESET instruction generates calls with a RESET cycle. + # + # The ioaddr will always be relative to the base of the I/O page: + # (ioaddr & 8191) == ioaddr will always be True + # + # If a device's semantics are simple enough to allow synthesizing a + # WRITE8 as a READ16 / byte-update / WRITE16 sequence it can use + # the autobyte wrapper and omit a WRITE8 cycle handler. If the underlying + # address is write-only the autobyte wrapper substitutes a zero value + # for the READ16 part of the synthesis. If this is not correct, the + # callback must handle WRITE8 itself directly. + # + # Callback functions that need context or other data can, of course, + # be bound methods (automatically receiving self) or also can use + # functools.partial() to get additional arguments passed in. + # + # + # ODD I/O ADDRESSES, BYTE OPERATIONS: + # + # The physical UNIBUS always *reads* full 16-bit words; there is no + # capability for a byte read at the electrical bus level. + # + # Oddly (haha) the bus *can* specify byte access for writes. + # Even odder (hahaha), devices that provide full-word access at + # odd addresses exist, the best example being the CPU itself -- though + # as it turns out only very few models allow **CPU** access to cpu + # registers via the Unibus (vs allowing other devices, e.g., the console + # switches/display, to access them that way). + # + # There are a lot of subtleties to take into account accessing I/O + # addresses via bytes (and possibly-odd addresses) but so be it. + # Such byte operations in I/O space abound in real world code. + # + # Devices that respond to a block of related addresses can register + # one callback to cover more than one word of Unibus address space. + # The callback gets the ioaddr which it can further decode. Again, + # the CPU register block is a good example of this. + # + + def register(self, iofunc, offsetaddr, nwords=1, /): + """register a callback for an I/O address. + + Arguments: + iofunc -- callback function. See documentation for signature. + offsetaddr -- Offset within the 8K I/O page. + nwords -- Optional. Default=1. Number of words to span. + + iofunc may be None in which case a dummy is supplied that ignores + all write operations and always reads as zero. + """ + + if offsetaddr >= self.cpu.IOPAGE_SIZE: + raise ValueError(f"UNIBUS: offset too large {oct(offsetaddr)}") + + # None is a shorthand for "this is a dummy always-zero addr" + if iofunc is None: + iofunc = self.autobyte(self.__ignoredev) + + idx, odd = divmod(offsetaddr, 2) + if odd != 0: + # See discussion of odd I/O addrs in block comment elsewhere + raise ValueError("cannot register odd (byte) address in IO space") + + for i in range(nwords): + self.mmiomap[idx+i] = iofunc + + # the default entry for unoccupied I/O: cause an AddressError trap + def __nodev(self, addr, cycle, /, *, value=None): + self.cpu.logger.info(f"Access to non-existent I/O {oct(addr)}") + raise PDPTraps.AddressError(cpuerr=self.cpu.CPUERR_BITS.UNIBUS_TIMEOUT) + + # Devices may have simple "dummy" I/O addresses that always read zero + # and ignore writes; See "if iofunc is None" in register() method. + # NOTE: register() byteme-wraps this so opsize not needed. + def __ignoredev(self, addr, cycle, /, *, value=None): + self.cpu.logger.debug(f"dummy zero device @ {oct(addr)}, {value=}") + return 0 + + # wrap an I/O function with code that automatically handles byte writes. + # CAUTION: Some devices may have semantics that make this ill-advised. + # Such devices should handle WRITE8 explicitly themselves. + # This is essentially a decorator but typically is invoked explicitly + # rather than by '@' syntax. + def autobyte(self, iofunc): + def byteme(ioaddr, cycle, /, **kwargs): + if cycle != BusCycle.WRITE8: + return iofunc(ioaddr, cycle, **kwargs) + + # A byte write to I/O space. Synthesize it. + # 'value' is present (by definition) in kwargs + value = kwargs['value'] + self.cpu.logger.debug(f"Byte write to {oct(ioaddr)} {value=}") + try: + wv = iofunc(ioaddr & 0o177776, BusCycle.READ16) + except PDPTrap: # this can happen on write-only registers + wv = 0 + if ioaddr & 1: + wv = (wv & 0o377) | (value << 8) + else: + wv = (wv & 0o177400) | value + iofunc(ioaddr & 0o177776, BusCycle.WRITE16, value=wv) + return byteme + + # Convenience method -- registers simple attributes (or properties) into + # I/O space in the obvious way: Make this attr (of obj) show at this addr + # + # BusCycle.RESET handling: If a device wants RESET to zero the attribute, + # it should specify reset=True. Otherwise RESET will be ignored. + # + # BusCycle.WRITE8 handling: autobyte() is used. If those semantics are not + # suitable, the device must create its own i/o func instead of this. + def register_simpleattr(self, obj, attrname, addr, reset=False): + """Create and register a handler to read/write the named attr. + + obj - the object (often "self" for the caller of this method) + attrname - the attribute name to read/write + addr - the I/O address to register it to + + If attrname is None, the address is registered as a dummy location + that ignores writes and will always read as zero. This is sometimes + useful for features that have to exist but are emulated as no-op. + """ + + # could do this with partial, but can also do it with this nested + # func def. One way or another need this func logic anyway. + + if attrname is None: + def _rwattr(_, cycle, /, *, value=None): + if cycle == BusCycle.READ16: + return 0 + # all other operations just silently ignored + else: + def _rwattr(_, cycle, /, *, value=None): + if cycle == BusCycle.READ16: + return getattr(obj, attrname) + elif cycle == BusCycle.WRITE16: + setattr(obj, attrname, value) + elif cycle == BusCycle.RESET: + if reset: + setattr(obj, attrname, 0) + else: + assert False, f"Unknown {cycle=} in simpleattr" + + # NOTE: it's a new defn/closure of _rwattr each time through + self.register(self.autobyte(_rwattr), addr) + + def wordRW(self, ioaddr, value=None, /): + """Read (value is None) or write the given I/O address.""" + if value is None: + value = self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.READ16) + else: + self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.WRITE16, value=value) + return value + + def byteRW(self, ioaddr, value=None, /): + """UNIBUS byte R/W - only write is legal.""" + if value is None: + raise ValueError("Unibus cannot read bytes") + else: + self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.WRITE8, value=value) + return None class UNIBUS_1170(UNIBUS): @@ -49,27 +241,30 @@ class UNIBUS_1170(UNIBUS): # are just stored natively that way and broken down # into 16-bit components by the mmio function as needed. self.ubas = [0] * (self.UBMAP_N // 2) - self.mmio.register(self.uba_mmio, self.UBMAP_OFFS, self.UBMAP_N) + self.register( + self.autobyte(self.uba_mmio), self.UBMAP_OFFS, self.UBMAP_N) - def uba_mmio(self, addr, value=None, /): + def uba_mmio(self, addr, cycle, /, *, value=None): ubanum, hi22 = divmod(addr - self.UBMAP_OFFS, 4) uba22 = self.ubas[ubanum] self.logger.debug(f"UBA addr={oct(addr)}, {value=}") self.logger.debug(f"{ubanum=}, {hi22=}") - if value is None: + if cycle == BusCycle.READ16: if hi22 == 0: return (uba22 >> 16) & 0o077 else: return uba22 & 0o177777 - else: + elif cycle == BusCycle.WRITE16: # the low bit is enforced to be zero if hi22: uba22 = ((value & 0o077) << 16) | (uba22 & 0o177776) else: uba22 = (uba22 & 0o17600000) | (value & 0o177776) self.ubas[ubanum] = uba22 + elif cycle == BusCycle.RESET: + pass def busRW(self, ubaddr, value=None): pass