Revamp the unibus device callback system. New version has explicit BusCycle argument in one callback function vs the mishmash of time-evolved methods for reset, bytes, etc

This commit is contained in:
Neil Webber 2024-04-11 16:58:41 -05:00
parent 1eba151bdb
commit c78972654c
8 changed files with 355 additions and 366 deletions

View file

@ -40,6 +40,6 @@ class DC11:
DC11_NDEVS = 4 # four devices, each is 4 16-bit registers DC11_NDEVS = 4 # four devices, each is 4 16-bit registers
def __init__(self, ub, baseaddr=DC11_DEFAULT): def __init__(self, ub, baseaddr=DC11_DEFAULT):
self.addr = self.DC11_DEFAULT
self.ub = ub self.ub = ub
self.addr = ub.mmio.register( self.ub.register(None, self.DC11_DEFAULT, self.DC11_NDEVS * 4)
None, self.DC11_DEFAULT, self.DC11_NDEVS * 4)

57
kl11.py
View file

@ -30,6 +30,7 @@ import threading
import queue import queue
from pdptraps import PDPTraps from pdptraps import PDPTraps
from unibus import BusCycle
class KL11: class KL11:
@ -46,9 +47,9 @@ class KL11:
SERVERPORT = 1170 SERVERPORT = 1170
def __init__(self, ub, baseaddr=KL11_DEFAULT): def __init__(self, ub, baseaddr=KL11_DEFAULT):
self.addr = baseaddr
self.ub = ub self.ub = ub
self.addr = ub.mmio.register(self.klregs, baseaddr, 4) self.ub.register(ub.autobyte(self.klregs), baseaddr, 4)
ub.mmio.devicereset_register(self.reset)
# output characters are just queued (via tq) to the output thread # output characters are just queued (via tq) to the output thread
# input characters have to undergo a more careful 1-by-1 # input characters have to undergo a more careful 1-by-1
@ -70,18 +71,16 @@ class KL11:
self._t = threading.Thread(target=self._connectionserver, daemon=True) self._t = threading.Thread(target=self._connectionserver, daemon=True)
self._t.start() self._t.start()
def reset(self, ub): def klregs(self, addr, cycle, /, *, value=None):
"""Called for UNIBUS resets (RESET instruction).""" if cycle == BusCycle.RESET:
self.rcdone = False self.rcdone = False
self.r_ienable = False self.r_ienable = False
self.r_tenable = False self.r_tenable = False
return
def klregs(self, addr, value=None, /):
match addr - self.addr: match addr - self.addr:
case 0: # rcsr case 0: # rcsr
if value is None: if cycle == BusCycle.READ16:
# *** READING ***
value = 0 value = 0
if self.r_ienable: if self.r_ienable:
@ -90,8 +89,7 @@ class KL11:
if self.rcdone: if self.rcdone:
value |= self.RCDONE value |= self.RCDONE
else: elif cycle == BusCycle.WRITE16:
# *** WRITING ***
if value & self.RDRENA: if value & self.RDRENA:
with self.rxc: with self.rxc:
# a request to get one character, which only # a request to get one character, which only
@ -101,8 +99,7 @@ class KL11:
self.r_ienable = (value & self.IENABLE) self.r_ienable = (value & self.IENABLE)
self.rxc.notify() self.rxc.notify()
case 2 if value is None: # rbuf case 2 if cycle == BusCycle.READ16: # rbuf
# *** READING ***
with self.rxc: with self.rxc:
value = self.rdrbuf value = self.rdrbuf
self.rcdone = False self.rcdone = False
@ -110,36 +107,24 @@ class KL11:
# transmit buffer status (sometimes called tcsr) # transmit buffer status (sometimes called tcsr)
case 4: case 4:
if value is None: if cycle == BusCycle.READ16:
# *** READING ***
value = self.TXRDY # always ready to send chars value = self.TXRDY # always ready to send chars
if self.t_ienable: if self.t_ienable:
value |= self.IENABLE value |= self.IENABLE
else: elif cycle == BusCycle.WRITE16:
# *** WRITING ***
prev = self.t_ienable prev = self.t_ienable
self.t_ienable = (value & self.IENABLE) self.t_ienable = (value & self.IENABLE)
if self.t_ienable and not prev: if self.t_ienable and not prev:
self.ub.intmgr.simple_irq(pri=4, vector=0o64) self.ub.intmgr.simple_irq(pri=4, vector=0o64)
# transmit buffer # transmit buffer
case 6: case 6 if cycle == BusCycle.WRITE16: # tbuf
if value is None: value &= 0o177
# *** READING *** if (value != 0o177):
# manual says this is load-only; however automatic s = chr(value)
# byte write support (byteme/mmio) requires this self.tq.put(s)
# be readable. Probably byteme should be fixed instead if self.t_ienable:
# to catch traps from unreadables and synthesize self.ub.intmgr.simple_irq(pri=4, vector=0o64)
# a zero there (?)
value = 0
else:
# *** WRITING ***
value &= 0o177
if (value != 0o177):
s = chr(value)
self.tq.put(s)
if self.t_ienable:
self.ub.intmgr.simple_irq(pri=4, vector=0o64)
case _: case _:
raise PDPTraps.AddressError raise PDPTraps.AddressError

View file

@ -41,7 +41,7 @@ class KW11:
target=self._cloop, args=(1/HZ, ub.intmgr), daemon=True) target=self._cloop, args=(1/HZ, ub.intmgr), daemon=True)
self.interrupts_enabled = False self.interrupts_enabled = False
self.monbit = 1 # the manual says this starts as 1 self.monbit = 1 # the manual says this starts as 1
ub.mmio.register_simpleattr(self, 'LKS', self.KW11_OFFS, reset=True) ub.register_simpleattr(self, 'LKS', self.KW11_OFFS, reset=True)
self._t.start() self._t.start()
# clock loop # clock loop

View file

@ -38,11 +38,6 @@ from op4 import op4_dispatch_table
# monolithic/large class file seemed less than ideal. Python does not # monolithic/large class file seemed less than ideal. Python does not
# allow multiple files for a single class. # allow multiple files for a single class.
# #
# Some parts of the implementation, like mmu and mmio, and various devices,
# made sense as separate component classes. The opcodes, however, are
# basically additional methods in separate files. Since they are not real
# methods they get passed a "cpu" argument manually instead of "self".
#
# Was there a better way? Just give in and have one huge file?? # Was there a better way? Just give in and have one huge file??
# #
# The opcode parsing/dispatch starts with the top 4 bits of the opcode; # The opcode parsing/dispatch starts with the top 4 bits of the opcode;
@ -248,7 +243,7 @@ class PDP11:
('systemID', self.SYSTEMID_OFFS), ('systemID', self.SYSTEMID_OFFS),
('error_register', self.CPUERROR_OFFS), ('error_register', self.CPUERROR_OFFS),
('logging_hack', self.LOGGING_OFFS)): ('logging_hack', self.LOGGING_OFFS)):
self.ub.mmio.register_simpleattr(self, attrname, offs) self.ub.register_simpleattr(self, attrname, offs)
# console switches (read) and blinken lights (write) # console switches (read) and blinken lights (write)
self.swleds = 0 self.swleds = 0
@ -326,7 +321,7 @@ class PDP11:
# #
# device_instance = XYZ11(p.ub) # device_instance = XYZ11(p.ub)
# #
# The device __init__ will typically use ub.mmio.register to connect up # The device __init__ will typically use ub.register to connect up
# to its UNIBUS addresses. That's all that needs to happen. # to its UNIBUS addresses. That's all that needs to happen.
# #
# Nevertheless, on general principles, it seems like the pdp instance # Nevertheless, on general principles, it seems like the pdp instance
@ -826,9 +821,9 @@ class PDP1170(PDP11):
self.r = self.registerfiles[self.psw_regset] self.r = self.registerfiles[self.psw_regset]
# how the registers appear in IOPAGE space # how the registers appear in IOPAGE space
self.ub.mmio.register(self._ioregsets, self.ub.register(self._ioregsets,
self.IOPAGE_REGSETS_OFFS, self.IOPAGE_REGSETS_OFFS,
self.IOPAGE_REGSET_SIZE) self.IOPAGE_REGSET_SIZE)
@property @property
def r_alt(self): def r_alt(self):

292
mmio.py
View file

@ -1,291 +1 @@
# MIT License # this file has been deleted
#
# Copyright (c) 2023 Neil Webber
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from pdptraps import PDPTraps
class MMIO:
"""Memory Mapped I/O handling for the 8K byte I/O page."""
#
# memory-mapped I/O is just a 4K array of function callbacks, one
# entry per each I/O WORD (even) I/O offset. Most entries, of course,
# remain set to an initial default "non-existent address" callback.
#
# See register() for setting a callback on a specific offset or range.
#
# Reads and writes to any offset within the I/O page invoke the
# callback function. Assuming f is the callback function:
#
# READS: v = f(ioaddr)
# The function f must return a value 0 .. 65535
#
# WRITES: f(ioaddr, v)
# v will be an integer 0 .. 65535
# The return value is ignored.
#
# Callbacks must be declared like this:
# def f(ioaddr, value=None, /):
# if value is None:
# ... this is the read case
# else:
# ... this is the write case
#
# Zero, of course, is a possible and common value to write, so do not make
# the rookie mistake of testing value for truthiness; test for *is* None.
#
# The ioaddr will always be relative to the base of the I/O page:
# (ioaddr & 8191) == ioaddr will always be True
#
# Callbacks may also optionally receive a byte operation indicator,
# but only if they register for that. See BYTE OPERATIONS, below.
# Most callbacks will instead rely on the framework to synthesize
# byte operations from words; see _byte_wrapper and byteme.
#
# Callback functions that need context or other data can, of course,
# be bound methods (automatically receiving self) or also can use
# functools.partial() to get additional arguments passed in.
#
#
# ODD I/O ADDRESSES, BYTE OPERATIONS:
#
# The physical UNIBUS always *reads* full 16-bit words; there is no
# capability for a byte read at the electrical bus level.
#
# Oddly (haha) the bus *can* specify byte access for writes.
# Even odder (hahaha), devices that provide full-word access at
# odd addresses exist, the best example being the CPU itself. In some
# models the registers are available at a block of UNIBUS addresses,
# and some of the 16-bit registers have ODD addresses.
# For example, UNIBUS address 777707 is the PDP-11 cpu PC, as a
# full 16-bit word, while 777706 is the PDP-11 cpu KSP.
#
# This creates potential for havoc if programmers use byte operations
# on I/O addresses. Consider a typical sequence to read consecutive
# bytes from an address in R0:
#
# MOVB (R0)+,R1 # get the low byte of the word R0 points to
# MOVB (R0)+,R2 # get the high byte of that word
#
# If executed with R0 = 177706 (the KSP register virtual address
# with a typical 16-bit mapping of the upper page to I/O space)
# then R1 will be the low byte of KSP but the next access, which will
# be seen by the UNIBUS as a word read on an odd address, will pull
# the PC value (at 777707) as a word and extract the upper byte
# from THAT (PC upper byte in R2; KSP lower byte in R1). This is
# probably an unexpected result, which is a good argument against
# using byte operations in I/O space. Nevertheless, byte operations
# in I/O space abound in real world code.
#
# Thus:
# * DEVICES THAT DIRECTLY SUPPORT BYTE-WRITE OPERATIONS:
# Specify "byte_writes=True" in the registration call:
#
# mmio.register(somefunc, someoffset, somesize, byte_writes=True)
#
# and declare the somefunc callback like this:
# def somefunc(ioaddr, value=None, /, *, opsize=2):
# ...
#
# The opsize argument will be 2 for word operations and 1 for bytes.
# NOTE: Byte READS will never be seen; only byte WRITES.
#
# * DEVICES THAT DON'T CARE ABOUT BYTE-WRITES:
# The common/standard case. Let byte_writes default to False
# (i.e., leave it out of the registration call). The callback
# will be invoked with the simpler signature: f(ioaddr, v)
#
# * DEVICES THAT SUPPLY WORDS AT ODD ADDRESSES
# The cpu being the canonical example of this... register the
# I/O callback at the corresponding even address and use the ioaddr
# determine which address (the even or the odd) was requested.
#
# Devices that respond to a block of related addresses can register
# one callback to cover more than one word of Unibus address space.
# The callback gets the ioaddr which it can further decode. Again,
# the CPU register block is a good example of this.
#
#
# The PDP-11 RESET INSTRUCTION
#
# The PDP-11 RESET instruction causes the UNIBUS to be reset.
# Devices that want to know about this should:
#
# mmio.devicereset_register(resetfunc)
#
# And then resetfunc will be invoked as:
# resetfunc(mmio.ub)
# (i.e., passed a single argument, the UNIBUS object).
#
# For dead-simple cases, the optional reset=True argument can be
# supplied to register(), which will cause it to also arrange for the
# iofunc to be called at RESET time, like this:
#
# iofunc(baseaddr, 0)
#
# which, it should be noted, is indistinguishable from a program
# merely setting that I/O address to zero. Note too that if iofunc
# was registered once for an N-word block a RESET will still only call
# it ONCE, on the baseaddr of that block.
#
# If this convenience, with its limitations, is insufficient for a
# device then it must use the devicereset registration instead.
#
def __init__(self, cpu):
self.cpu = cpu
self.mmiomap = [self.__nodev] * (self.cpu.IOPAGE_SIZE >> 1)
self.device_resets = set()
# the default entry for unoccupied I/O: cause an AddressError trap
def __nodev(self, addr, value=None, /, *, opsize=2):
self.cpu.logger.info(f"Access to non-existent I/O {oct(addr)}")
raise PDPTraps.AddressError(
cpuerr=self.cpu.CPUERR_BITS.UNIBUS_TIMEOUT)
# Devices may have simple "dummy" I/O addresses that always read zero
# and ignore writes; See "if iofunc is None" in register() method.
# NOTE: register() byteme-wraps this so opsize not needed.
def __ignoredev(self, addr, value=None, /):
self.cpu.logger.debug(f"dummy zero device @ {oct(addr)}, {value=}")
return 0
# register a call-back for an I/O address, which MUST be an offset
# within the 8K I/O page (which itself may reside at three different
# physical locations depending on configurations, thus explaining
# why this routine deals only in the offsets).
#
# Variations:
# iofunc=None -- implement a dummy: reads as zero, ignores writes
# reset=True -- also registers iofunc to be called at RESET time
# byte_writes=True -- Request byte writes be sent to the iofunc
# vs hidden/converted into word ops.
#
def register(self, iofunc, offsetaddr, nwords, *,
byte_writes=False, reset=False):
if offsetaddr >= self.cpu.IOPAGE_SIZE:
raise ValueError(f"MMIO: I/O offset too large {oct(offsetaddr)}")
# None is a shorthand for "this is a dummy always-zero addr"
if iofunc is None:
iofunc = self.__ignoredev
# register this (raw/unwrapped) iofunc for reset if so requested
if reset:
self.devicereset_register(lambda ub: iofunc(offsetaddr, 0))
idx, odd = divmod(offsetaddr, 2)
if odd != 0:
# See discussion of odd I/O addrs in block comment elsewhere
raise ValueError("cannot register odd (byte) address in IO space")
if not byte_writes:
# wrap the supplied I/O function with this code to implement
# byte write operations automatically in terms of words.
iofunc = self._byte_wrapper(iofunc)
for i in range(nwords):
self.mmiomap[idx+i] = iofunc
return offsetaddr
# wrap an I/O function with code that automatically handles byte writes.
def _byte_wrapper(self, iofunc):
def byteme(ioaddr, value=None, /, *, opsize=2):
if (value is None) or (opsize == 2):
return iofunc(ioaddr, value)
else:
# value is not None, and opsize is not 2
# In other words: a byte write to I/O space. Synthesize it.
self.cpu.logger.debug(f"Byte write to {oct(ioaddr)} {value=}")
wv = self.wordRW(ioaddr)
if ioaddr & 1:
wv = (wv & 0o377) | (value << 8)
else:
wv = (wv & 0o177400) | value
self.wordRW(ioaddr, wv)
return byteme
# Convenience method -- registers simple attributes (or properties) into
# I/O space in the obvious way: Make this attr (of obj) show at this addr
#
# If a device just needs some attributes set to zero on a RESET,
# it can specify them here with reset=True and they will be automatically
# set to zero by reset() (no need to devicereset_register).
def register_simpleattr(self, obj, attrname, addr, reset=False):
"""Create and register a handler to read/write the named attr.
obj - the object (often "self" for the caller of this method)
attrname - the attribute name to read/write
addr - the I/O address to register it to
If attrname is None, the address is registered as a dummy location
that ignores writes and will always read as zero. This is sometimes
useful for features that have to exist but are emulated as no-op.
"""
# could do this with partial, but can also do it with this nested
# func def. One way or another need this func logic anyway.
def _rwattr(_, value=None, /):
"""Read/write the named attr via the I/O callback protocol."""
if attrname is None:
value = 0
else:
if value is None:
value = getattr(obj, attrname)
else:
setattr(obj, attrname, value)
return value
# NOTE: Registers a different ("closure of") rwattr each time.
return self.register(_rwattr, addr, 1, reset=reset)
# In the real hardware, the PDP-11 RESET instruction pulls a reset line
# that all devices can see. In the emulation, devices that need to know
# about the RESET instruction must register themselves here:
def devicereset_register(self, func):
"""Register func to be called whenever a RESET happens."""
self.device_resets.add(func)
# The PDP-11 RESET instruction eventually ends up here, causing
# a bus reset to be sent to all known registered devices.
def resetdevices(self):
for f in self.device_resets:
self.cpu.logger.debug(f"RESET callback: {f}")
f(self.cpu.ub)
def wordRW(self, ioaddr, value=None, /):
"""Read (value is None) or write the given I/O address."""
if value is None:
value = self.mmiomap[ioaddr >> 1](ioaddr)
else:
self.mmiomap[ioaddr >> 1](ioaddr, value)
return value
def byteRW(self, ioaddr, value=None, /):
"""UNIBUS byte R/W - only write is legal."""
if value is None:
raise ValueError("Unibus cannot read bytes")
else:
self.mmiomap[ioaddr >> 1](ioaddr, value, opsize=1)
return None

33
mmu.py
View file

@ -21,7 +21,8 @@
# SOFTWARE. # SOFTWARE.
from functools import partial from functools import partial
from pdptraps import PDPTraps from pdptraps import PDPTraps, PDPTrap
from unibus import BusCycle
from types import SimpleNamespace from types import SimpleNamespace
from collections import namedtuple from collections import namedtuple
@ -68,8 +69,6 @@ class MemoryMgmt:
self.cpu = cpu self.cpu = cpu
self.ub = cpu.ub self.ub = cpu.ub
mmio = self.ub.mmio
# The "segment cache" dramatically speeds up address translation # The "segment cache" dramatically speeds up address translation
# for the most common MMU usage scenarios. # for the most common MMU usage scenarios.
# #
@ -126,27 +125,27 @@ class MemoryMgmt:
(0, self.DSPACE, 48)): (0, self.DSPACE, 48)):
ioaddr = base+offset ioaddr = base+offset
iofunc = partial(self.io_parpdr, parpdr, mode, space, ioaddr) iofunc = partial(self.io_parpdr, parpdr, mode, space, ioaddr)
mmio.register(iofunc, ioaddr, 8) # 8 words / 16 bytes self.ub.register(iofunc, ioaddr, 8) # 8 words / 16 bytes
# register the simple attrs MMR0 etc into I/O space: # register the simple attrs MMR0 etc into I/O space:
mmio.register_simpleattr(self, 'MMR0', self.MMR0_OFFS, reset=True) self.ub.register_simpleattr(self, 'MMR0', self.MMR0_OFFS, reset=True)
mmio.register_simpleattr(self, 'MMR1', self.MMR1_OFFS) self.ub.register_simpleattr(self, 'MMR1', self.MMR1_OFFS)
mmio.register_simpleattr(self, 'MMR2', self.MMR2_OFFS) self.ub.register_simpleattr(self, 'MMR2', self.MMR2_OFFS)
mmio.register_simpleattr(self, 'MMR3', self.MMR3_OFFS, reset=True) self.ub.register_simpleattr(self, 'MMR3', self.MMR3_OFFS, reset=True)
mmio.register_simpleattr(self, None, self.MCR_OFFS) self.ub.register_simpleattr(self, None, self.MCR_OFFS)
def io_parpdr(self, parpdr, mode, space, base, addr, value=None, /): def io_parpdr(self, parpdr, mode, space, base,
"""mmio I/O function for MMU PARs and PDRs. addr, cycle, /, *, value=None):
"""I/O function for MMU PARs and PDRs.
NOTE: parpdr/mode/space/base args provided via partial() as NOTE: parpdr/mode/space/base args provided via partial() as
supplied at registration time; see __init__. supplied at registration time; see __init__.
The mmio module calls this simply as f(addr, value)
""" """
aprnum = (addr - base) >> 1 aprnum = (addr - base) >> 1
aprfile = self.APR[(mode * 2) + space] aprfile = self.APR[(mode * 2) + space]
if value is None: if cycle == BusCycle.READ16:
return aprfile[aprnum][parpdr] return aprfile[aprnum][parpdr]
else: elif cycle == BusCycle.WRITE16:
# dump any matching cache entries in both reading/writing form. # dump any matching cache entries in both reading/writing form.
for reading in (True, False): for reading in (True, False):
# the "space" is a dilemma because it is tied up in # the "space" is a dilemma because it is tied up in
@ -504,7 +503,7 @@ class MemoryMgmt:
pa = self.v2p(vaddr, mode, space, value is None, invis=_invis) pa = self.v2p(vaddr, mode, space, value is None, invis=_invis)
if pa >= self.iopage_base: if pa >= self.iopage_base:
return self.ub.mmio.wordRW(pa & self.cpu.IOPAGE_MASK, value) return self.ub.wordRW(pa & self.cpu.IOPAGE_MASK, value)
else: else:
return self.cpu.physRW(pa, value) return self.cpu.physRW(pa, value)
@ -541,7 +540,7 @@ class MemoryMgmt:
pa &= ~1 pa &= ~1
if pa >= self.iopage_base: if pa >= self.iopage_base:
wv = self.ub.mmio.wordRW(pa & self.cpu.IOPAGE_MASK) wv = self.ub.wordRW(pa & self.cpu.IOPAGE_MASK)
else: else:
wv = self.cpu.physRW(pa) wv = self.cpu.physRW(pa)
return ((wv >> 8) if odd else wv) & 0o377 return ((wv >> 8) if odd else wv) & 0o377
@ -556,7 +555,7 @@ class MemoryMgmt:
# Memory byte writes are synthesized. # Memory byte writes are synthesized.
if pa >= self.iopage_base: if pa >= self.iopage_base:
return self.ub.mmio.byteRW(pa & self.cpu.IOPAGE_MASK, value) return self.ub.byteRW(pa & self.cpu.IOPAGE_MASK, value)
else: else:
wv = self.cpu.physRW(pa & ~1) wv = self.cpu.physRW(pa & ~1)
if odd: if odd:

View file

@ -24,13 +24,16 @@ from types import SimpleNamespace
import breakpoints as BKP import breakpoints as BKP
from machine import PDP1170 from machine import PDP1170
from unibus import BusCycle
from kl11 import KL11 from kl11 import KL11
from branches import BRANCH_CODES from branches import BRANCH_CODES
from pdptraps import PDPTraps from pdptraps import PDPTraps
import unittest import unittest
import random import random
import os import os
import io
import hashlib import hashlib
import boot
from pdpasmhelper import InstructionBlock from pdpasmhelper import InstructionBlock
@ -1696,6 +1699,35 @@ class TestMethods(unittest.TestCase):
with self.subTest(i=i, val=val): with self.subTest(i=i, val=val):
self.assertEqual(val, p.physmem[recbase + i]) self.assertEqual(val, p.physmem[recbase + i])
# test loading the LDA format (absolute tape loader)
def test_load_lda(self):
p = self.make_pdp()
ldabytes = [
0, 0, 0, # testing zero skip
1, 0, 9, 0, 0, 8, # address 0x800 (2k)
1, 2, 3, # the data
232, # checksum
0, 0, 0, 0, 0, # more zero skip testing
1, 0, 9, 0, 3, 8, # address 0x803 (2k)
4, 5, 6, # the data
220, # checksum
1, 0, 9, 0, 6, 8, # testing lack of zero skip
7, 8, 9,
208,
# the END block
1, 0, 6, 0, 1, 1, 247
]
with io.BytesIO(bytes(ldabytes)) as f:
addr = boot.load_lda_f(p, f)
self.assertEqual(addr, 257) # just "known" from data above
# this range of addresses and the related data is just "known"
# from the data bytes above
baseaddr = 0x800
for offset in range(9):
self.assertEqual(p.mmu.byteRW(baseaddr+offset), offset+1)
def test_physrw_n(self): def test_physrw_n(self):
p = self.make_pdp() p = self.make_pdp()
words = [1, 2, 3, 0, 0o40000, 65534] words = [1, 2, 3, 0, 0o40000, 65534]
@ -1723,6 +1755,79 @@ class TestMethods(unittest.TestCase):
p.run(pc=startaddr) p.run(pc=startaddr)
self.assertEqual(p.r[0], 1) self.assertEqual(p.r[0], 1)
def test_io(self):
# unibus I/O callback tests
p = self.make_pdp()
# this callback just records arguments taking advantage of closure
cbx = SimpleNamespace(value=0, arglog=[])
RESET_VALUE = 1234 # arbitrary
def callback(ioaddr, cycle, /, **kwargs):
if len(kwargs) == 0:
cbx.arglog.append((ioaddr, cycle))
elif len(kwargs) == 1:
cbx.arglog.append((ioaddr, cycle, kwargs['value']))
else:
raise ValueError("invalid kwargs")
if cycle == BusCycle.READ16:
return cbx.value
elif cycle == BusCycle.RESET:
cbx.value = RESET_VALUE
elif cycle == BusCycle.WRITE16:
cbx.value = kwargs['value']
elif cycle == BusCycle.WRITE8:
v = kwargs['value'] & 0xFF
if ioaddr & 1:
cbx.value = (cbx.value & 0x00FF) | (v << 8)
else:
cbx.value = (cbx.value & 0xFF00) | v
else:
assert False, "bad cycle"
startaddr = 0o4000
ioaddr = 0o177720 # arbitrary; in a "reserved" block fwiw
a = InstructionBlock()
a.mov(startaddr, 'sp')
a.mov(ioaddr, 'r5')
a.literal(5) # RESET instruction
a.mov('(r5)', 'r0') # expect r0 to be the RESET_VALUE
a.mov(1, '(r5)') # set new value to 1
a.mov('(r5)', 'r1') # expect r1 to be 1
a.mov(0o400, '(r5)') # setting a bit in the high byte
a.movb(2, '(r5)') # should only set the low part
a.mov('(r5)', 'r2') # expect r2 to be 0o402
a.movb(0, '1(r5)') # should only clear the high part
a.mov('(r5)', 'r3') # expect r3 to be 2
a.halt()
self.loadphysmem(p, a, startaddr)
p.ub.register(callback, ioaddr & 8191)
p.run(pc=startaddr)
# per the various comments in the test sequence above
self.assertEqual(p.r[0], RESET_VALUE)
self.assertEqual(p.r[1], 1)
self.assertEqual(p.r[2], 0o402)
self.assertEqual(p.r[3], 2)
# the sequence of arguments expected in the log - determined
# by inspection when the test code was created
offs = ioaddr & 8191
gold = (
(offs, BusCycle.RESET), # from the RESET instruction
(offs, BusCycle.READ16), # from mov (r5),r0
(offs, BusCycle.WRITE16, 1), # from mov $1,(r5)
(offs, BusCycle.READ16), # from mov (r5),r1
(offs, BusCycle.WRITE16, 0o400), # from mov $0400,(r5)
(offs, BusCycle.WRITE8, 2), # from the movb
(offs, BusCycle.READ16), # from mov (r5),r2
(offs+1, BusCycle.WRITE8, 0), # from the movb
(offs, BusCycle.READ16), # from mov (r5),r3
)
for i, t in enumerate(cbx.arglog):
self.assertEqual(t, gold[i])
def test_breakpoints1(self): def test_breakpoints1(self):
# test the steps=N breakpoint capability # test the steps=N breakpoint capability

213
unibus.py
View file

@ -20,22 +20,214 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
from enum import Enum
from interrupts import InterruptManager from interrupts import InterruptManager
from mmio import MMIO from pdptraps import PDPTraps, PDPTrap
# A convenient reference for addresses:
# https://gunkies.org/wiki/UNIBUS_Device_Addresses BusCycle = Enum('BusCycle', ('READ16', 'WRITE16', 'WRITE8', 'RESET'))
class UNIBUS: class UNIBUS:
def __init__(self, cpu): def __init__(self, cpu):
self.cpu = cpu self.cpu = cpu
self.mmio = MMIO(cpu)
self.intmgr = InterruptManager() self.intmgr = InterruptManager()
self.logger = cpu.logger self.logger = cpu.logger
self.mmiomap = [self.__nodev] * (self.cpu.IOPAGE_SIZE >> 1)
def resetbus(self): def resetbus(self):
self.mmio.resetdevices() # this isn't especially efficient but it really doesn't matter.
# Construct a sequence of tuples: (a, f)
# where a is the address (really offset) in the I/O page
# and f is the callback
# but only for callbacks that are not __nodev
# and then invoke those callbacks w/BusCycle.RESET
resets = ((x << 1, f)
for x, f in enumerate(self.mmiomap) if f != self.__nodev)
for ioaddr, f in resets:
f(ioaddr, BusCycle.RESET)
# register() -- connect a callback function to an I/O page address
#
# A convenient reference for addresses:
# https://gunkies.org/wiki/UNIBUS_Device_Addresses
#
# memory-mapped I/O is just a 4K array of function callbacks, one
# entry per each I/O WORD (even) I/O offset. Most entries, of course,
# remain set to an initial default "non-existent address" callback.
#
# Callbacks must be declared like this:
# def f(ioaddr, cycle, /, *, value=None):
#
# and should contain code dispatching on cycle and (potentially) ioaddr.
# If cycle is WRITE8 (byte) or WRITE16 (word), the 'value' argument will
# be supplied. Otherwise 'value' is not supplied.
#
# If cycle is READ16 (word), the read value should be returned. Note that
# no READ8 exists as the UNIBUS is not capable of expressing a byte read.
#
# The processor RESET instruction generates calls with a RESET cycle.
#
# The ioaddr will always be relative to the base of the I/O page:
# (ioaddr & 8191) == ioaddr will always be True
#
# If a device's semantics are simple enough to allow synthesizing a
# WRITE8 as a READ16 / byte-update / WRITE16 sequence it can use
# the autobyte wrapper and omit a WRITE8 cycle handler. If the underlying
# address is write-only the autobyte wrapper substitutes a zero value
# for the READ16 part of the synthesis. If this is not correct, the
# callback must handle WRITE8 itself directly.
#
# Callback functions that need context or other data can, of course,
# be bound methods (automatically receiving self) or also can use
# functools.partial() to get additional arguments passed in.
#
#
# ODD I/O ADDRESSES, BYTE OPERATIONS:
#
# The physical UNIBUS always *reads* full 16-bit words; there is no
# capability for a byte read at the electrical bus level.
#
# Oddly (haha) the bus *can* specify byte access for writes.
# Even odder (hahaha), devices that provide full-word access at
# odd addresses exist, the best example being the CPU itself -- though
# as it turns out only very few models allow **CPU** access to cpu
# registers via the Unibus (vs allowing other devices, e.g., the console
# switches/display, to access them that way).
#
# There are a lot of subtleties to take into account accessing I/O
# addresses via bytes (and possibly-odd addresses) but so be it.
# Such byte operations in I/O space abound in real world code.
#
# Devices that respond to a block of related addresses can register
# one callback to cover more than one word of Unibus address space.
# The callback gets the ioaddr which it can further decode. Again,
# the CPU register block is a good example of this.
#
def register(self, iofunc, offsetaddr, nwords=1, /):
"""register a callback for an I/O address.
Arguments:
iofunc -- callback function. See documentation for signature.
offsetaddr -- Offset within the 8K I/O page.
nwords -- Optional. Default=1. Number of words to span.
iofunc may be None in which case a dummy is supplied that ignores
all write operations and always reads as zero.
"""
if offsetaddr >= self.cpu.IOPAGE_SIZE:
raise ValueError(f"UNIBUS: offset too large {oct(offsetaddr)}")
# None is a shorthand for "this is a dummy always-zero addr"
if iofunc is None:
iofunc = self.autobyte(self.__ignoredev)
idx, odd = divmod(offsetaddr, 2)
if odd != 0:
# See discussion of odd I/O addrs in block comment elsewhere
raise ValueError("cannot register odd (byte) address in IO space")
for i in range(nwords):
self.mmiomap[idx+i] = iofunc
# the default entry for unoccupied I/O: cause an AddressError trap
def __nodev(self, addr, cycle, /, *, value=None):
self.cpu.logger.info(f"Access to non-existent I/O {oct(addr)}")
raise PDPTraps.AddressError(cpuerr=self.cpu.CPUERR_BITS.UNIBUS_TIMEOUT)
# Devices may have simple "dummy" I/O addresses that always read zero
# and ignore writes; See "if iofunc is None" in register() method.
# NOTE: register() byteme-wraps this so opsize not needed.
def __ignoredev(self, addr, cycle, /, *, value=None):
self.cpu.logger.debug(f"dummy zero device @ {oct(addr)}, {value=}")
return 0
# wrap an I/O function with code that automatically handles byte writes.
# CAUTION: Some devices may have semantics that make this ill-advised.
# Such devices should handle WRITE8 explicitly themselves.
# This is essentially a decorator but typically is invoked explicitly
# rather than by '@' syntax.
def autobyte(self, iofunc):
def byteme(ioaddr, cycle, /, **kwargs):
if cycle != BusCycle.WRITE8:
return iofunc(ioaddr, cycle, **kwargs)
# A byte write to I/O space. Synthesize it.
# 'value' is present (by definition) in kwargs
value = kwargs['value']
self.cpu.logger.debug(f"Byte write to {oct(ioaddr)} {value=}")
try:
wv = iofunc(ioaddr & 0o177776, BusCycle.READ16)
except PDPTrap: # this can happen on write-only registers
wv = 0
if ioaddr & 1:
wv = (wv & 0o377) | (value << 8)
else:
wv = (wv & 0o177400) | value
iofunc(ioaddr & 0o177776, BusCycle.WRITE16, value=wv)
return byteme
# Convenience method -- registers simple attributes (or properties) into
# I/O space in the obvious way: Make this attr (of obj) show at this addr
#
# BusCycle.RESET handling: If a device wants RESET to zero the attribute,
# it should specify reset=True. Otherwise RESET will be ignored.
#
# BusCycle.WRITE8 handling: autobyte() is used. If those semantics are not
# suitable, the device must create its own i/o func instead of this.
def register_simpleattr(self, obj, attrname, addr, reset=False):
"""Create and register a handler to read/write the named attr.
obj - the object (often "self" for the caller of this method)
attrname - the attribute name to read/write
addr - the I/O address to register it to
If attrname is None, the address is registered as a dummy location
that ignores writes and will always read as zero. This is sometimes
useful for features that have to exist but are emulated as no-op.
"""
# could do this with partial, but can also do it with this nested
# func def. One way or another need this func logic anyway.
if attrname is None:
def _rwattr(_, cycle, /, *, value=None):
if cycle == BusCycle.READ16:
return 0
# all other operations just silently ignored
else:
def _rwattr(_, cycle, /, *, value=None):
if cycle == BusCycle.READ16:
return getattr(obj, attrname)
elif cycle == BusCycle.WRITE16:
setattr(obj, attrname, value)
elif cycle == BusCycle.RESET:
if reset:
setattr(obj, attrname, 0)
else:
assert False, f"Unknown {cycle=} in simpleattr"
# NOTE: it's a new defn/closure of _rwattr each time through
self.register(self.autobyte(_rwattr), addr)
def wordRW(self, ioaddr, value=None, /):
"""Read (value is None) or write the given I/O address."""
if value is None:
value = self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.READ16)
else:
self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.WRITE16, value=value)
return value
def byteRW(self, ioaddr, value=None, /):
"""UNIBUS byte R/W - only write is legal."""
if value is None:
raise ValueError("Unibus cannot read bytes")
else:
self.mmiomap[ioaddr >> 1](ioaddr, BusCycle.WRITE8, value=value)
return None
class UNIBUS_1170(UNIBUS): class UNIBUS_1170(UNIBUS):
@ -49,27 +241,30 @@ class UNIBUS_1170(UNIBUS):
# are just stored natively that way and broken down # are just stored natively that way and broken down
# into 16-bit components by the mmio function as needed. # into 16-bit components by the mmio function as needed.
self.ubas = [0] * (self.UBMAP_N // 2) self.ubas = [0] * (self.UBMAP_N // 2)
self.mmio.register(self.uba_mmio, self.UBMAP_OFFS, self.UBMAP_N) self.register(
self.autobyte(self.uba_mmio), self.UBMAP_OFFS, self.UBMAP_N)
def uba_mmio(self, addr, value=None, /): def uba_mmio(self, addr, cycle, /, *, value=None):
ubanum, hi22 = divmod(addr - self.UBMAP_OFFS, 4) ubanum, hi22 = divmod(addr - self.UBMAP_OFFS, 4)
uba22 = self.ubas[ubanum] uba22 = self.ubas[ubanum]
self.logger.debug(f"UBA addr={oct(addr)}, {value=}") self.logger.debug(f"UBA addr={oct(addr)}, {value=}")
self.logger.debug(f"{ubanum=}, {hi22=}") self.logger.debug(f"{ubanum=}, {hi22=}")
if value is None: if cycle == BusCycle.READ16:
if hi22 == 0: if hi22 == 0:
return (uba22 >> 16) & 0o077 return (uba22 >> 16) & 0o077
else: else:
return uba22 & 0o177777 return uba22 & 0o177777
else: elif cycle == BusCycle.WRITE16:
# the low bit is enforced to be zero # the low bit is enforced to be zero
if hi22: if hi22:
uba22 = ((value & 0o077) << 16) | (uba22 & 0o177776) uba22 = ((value & 0o077) << 16) | (uba22 & 0o177776)
else: else:
uba22 = (uba22 & 0o17600000) | (value & 0o177776) uba22 = (uba22 & 0o17600000) | (value & 0o177776)
self.ubas[ubanum] = uba22 self.ubas[ubanum] = uba22
elif cycle == BusCycle.RESET:
pass
def busRW(self, ubaddr, value=None): def busRW(self, ubaddr, value=None):
pass pass