python-pdp1170/interrupts.py
2023-09-18 21:15:20 -05:00

255 lines
10 KiB
Python

# MIT License
#
# Copyright (c) 2023 Neil Webber
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import namedtuple
import threading
from pdptraps import PDPTrap
# an interrupt is, at the cpu implementation level, just a flavor of trap.
class InterruptTrap(PDPTrap):
def __init__(self, pri, vector):
super().__init__()
self.pri = pri
self.vector = vector
# contains the details for a pending interrupt (see discussion)
PendingInterrupt = namedtuple(
'PendingInterrupt', ('pri', 'vector', 'callback'))
# Interrupts are priority sorted by pri (duh), but (less obviously)
# two interrupts with the same pri are further priority sorted by
# vector, with lower vector being higher priority. This calculation
# encapsulates that into a single integer value, with the knowledge
# that no vectors can ever be anywhere near as high as 16384.
def _qpri(pdi):
return (pdi.pri * 16384) + (16384 - pdi.vector)
# To cause an interrupt, a device creates a PendingInterrupt containing:
# pri -- priority
# vector -- the interrupt vector
# callback -- see discussion
#
# and then calls pend_interrupt() to get things going.
#
# The interrupt does not, of course, occur right away; it pends until
# the processor is willing to accept it. The processor only accepts
# interrupts at instruction boundaries, and even then only if the current
# processor priority level is below the interrupt pri.
#
# When those conditions occur, the interrupt is accepted by the processor
# (via a get_pending() method called from the processor). At that time the
# interrupt request is "granted" and the callback function, if any is
# provided, is invoked (from the cpu thread).
#
# The callback is invoked with no arguments and the return value is ignored.
# Use partial() or other python techniques if the callback function requires
# arguments for more context information (most will not).
#
# The purpose of this callback protocol is that some devices have internal
# operations they want to perform when the interrupt is acknowledged, not
# just when it is first made pending. Callbacks allow for that to happen.
# CAUTION: The callback obviously executes in a separate thread and
# will be asynchronous to any device-internal threads.
#
# In the simplest/common cases where none of this is needed, the
# method simple_irq() bundles all this minutia up for the caller.
class InterruptManager:
def __init__(self):
self.pri_pending = 0
self.requests = []
self.condition = threading.Condition()
def simple_irq(self, pri, vector):
"""Pend an interrupt at the given pri/vector."""
self.pend_interrupt(PendingInterrupt(pri, vector, callback=None))
def pend_interrupt(self, irq):
"""Pend a request for interrupt 'irq'."""
with self.condition:
# special case to accelerate zero-to-one common transition
if not self.requests:
self.requests = [irq]
self.pri_pending = irq.pri
else:
# multiple identical requests are not pended
# (it works this way in the hardware too of course --
# if a device has asserted the interrupt request line
# but that request hasn't been acknowledged/cleared by
# by the bus signal protocol yet, you can't assert the
# same interrupt line again ... it's already asserted)
if irq not in self.requests:
self.requests = sorted(self.requests + [irq], key=_qpri)
self.pri_pending = self.requests[-1].pri
self.condition.notify_all()
# called by the processor, to get one pending interrupt (if any).
# An InterruptTrap with the highest priority is returned, IF it is
# above the given processor priority. Else None.
def get_pending(self, processor_pri):
"""Returns an InterruptTrap, or None."""
with self.condition:
try:
if self.pri_pending > processor_pri:
irq = self.requests.pop()
else:
return None
except IndexError:
return None
else:
if self.requests:
self.pri_pending = self.requests[-1].pri
else:
self.pri_pending = 0
if irq.callback:
irq.callback()
return InterruptTrap(irq.pri, irq.vector)
def waitstate(self, processor_pri):
"""Sit idle until any interrupt happens."""
with self.condition:
if self.pri_pending > processor_pri:
return
self.condition.wait_for(lambda: self.pri_pending)
if __name__ == "__main__":
import unittest
from functools import partial
class TestMethods(unittest.TestCase):
def test__init__(self):
IM = InterruptManager()
# initial state starts with no pending interrupts
self.assertEqual(IM.pri_pending, 0)
# verify get_pending still "works" (returns None)
self.assertEqual(IM.get_pending(0), None)
def test_queue1(self):
IM = InterruptManager()
test_pri = 4 # arbitrary
test_vec = 17 # arbitrary
IM.simple_irq(test_pri, test_vec)
self.assertEqual(IM.pri_pending, test_pri)
iinfo = IM.get_pending(0)
self.assertEqual(IM.pri_pending, 0)
self.assertEqual(iinfo.pri, test_pri)
self.assertEqual(iinfo.vector, test_vec)
# support function for test cases, do a bunch of actions on an IM
def _actions(self, IM, prog):
cpupri = 0
for action in prog:
match action[0], action[1]:
case 'RQ', t:
IM.simple_irq(*t)
case 'PRI', cpupri:
pass
case 'GET', xt:
t = IM.get_pending(cpupri)
if t is None:
self.assertEqual(t, xt)
else:
xpri, xvec = xt
# If the vector position is a tuple then that
# means to accept anything in that tuple
try:
_ = (t.vector in xvec)
except TypeError:
pass
else:
xvec = t.vector # i.e., it's ok
self.assertEqual(t.pri, xpri)
self.assertEqual(t.vector, xvec)
case 'CHK', pri:
self.assertEqual(IM.pri_pending, pri)
case _:
raise ValueError("bad action", action)
def test_mixedops(self):
testprogs = (
# (ACTION, ACTION-INFO)
(('RQ', (4, 44)), # request IRQ 4
('RQ', (5, 55)), # request IRQ 5
('GET', (5, 55)), # get one, check that it is 5
('CHK', 4), # check that pri_pending is 4
('RQ', (3, 33)), # request IRQ 3
('CHK', 4), # check that pri_pending is 4
('RQ', (6, 66)), # request IRQ 6
('CHK', 6), # check that pri_pending is 6
('GET', (6, 66)), # get one, check that it is 6
('CHK', 4), # check that pri_pending is 6
('GET', (4, 44)), # get one, check that it is 4
('CHK', 3), # check that pri_pending is 3
('GET', (3, 33)), # get one, check that it is 3
('CHK', 0), # check that pri_pending is 0
('GET', None), # check that getting from empty works
),
# check priority filtering
(('RQ', (4, 44)), # request IRQ 4
('RQ', (5, 55)), # request IRQ 5
('PRI', 7), # spl7
('GET', None), # shouldn't see anything
('PRI', 5), # spl5
('GET', None), # still shouldn't see anything
('RQ', (6, 66)), # request IRQ 6
('RQ', (7, 77)), # request IRQ 7
('RQ', (6, 666)), # request IRQ 6
('RQ', (7, 777)), # request IRQ 7
('PRI', 6), # spl6
('GET', (7, (77, 777))), # should get one of these
('GET', (7, (77, 777))), # should get the other
('GET', None), # no more
('PRI', 0), # spl0
('GET', (6, (66, 666))), # should get one of these
('GET', (6, (66, 666))), # should get one of these
('GET', (5, 55)),
('GET', (4, 44)),
('GET', None)),
)
for tp in testprogs:
IM = InterruptManager()
self._actions(IM, tp)
def test_vectorcallback(self):
def foo(d):
d['foo'] = 1234
foodict = {}
pfoo = partial(foo, foodict)
IM = InterruptManager()
IM.pend_interrupt(PendingInterrupt(4, 888, pfoo))
iinfo = IM.get_pending(0)
self.assertEqual(foodict['foo'], 1234)
unittest.main()