TMXR: Properly apply output rate limit timing to all lines on a mux

Compute proper scheduling delays for all mux polling activities.
This commit is contained in:
Mark Pizzolato 2018-02-13 01:38:33 -08:00
parent f33ca2a1d9
commit 611679a5cd
2 changed files with 142 additions and 92 deletions

View file

@ -339,6 +339,8 @@
#include "sim_tmxr.h"
#include "scp.h"
#define MIN(a,b) (((a) < (b)) ? (a) : (b))
#include <ctype.h>
#include <math.h>
@ -490,7 +492,6 @@ if (lp->txpb) {
lp->txpb = NULL;
}
memset (lp->rbr, 0, lp->rxbsz); /* clear break status array */
return;
}
@ -561,7 +562,6 @@ if (unwritten == 0) /* buffer now empty? */
lp->xmte = 1; /* reenable transmission if paused */
lp->txcnt -= (int32)strlen (msgbuf); /* adjust statistics */
return;
}
@ -580,7 +580,6 @@ static void tmxr_report_disconnection (TMLN *lp)
if (lp->notelnet)
return;
tmxr_linemsgf (lp, "\r\nDisconnected from the %s simulator\r\n\n", sim_name);/* report disconnection */
return;
}
static int32 loop_write_ex (TMLN *lp, char *buf, int32 length, t_bool prefix_datagram)
@ -702,16 +701,14 @@ static int32 tmxr_write (TMLN *lp, int32 length)
int32 written;
int32 i = lp->txbpr;
if ((lp->txbps) && (sim_gtime () < lp->txnexttime) && (sim_is_running))
return 0;
if (lp->loopback)
return loop_write (lp, &(lp->txb[i]), length);
if (lp->serport) { /* serial port connection? */
if ((sim_gtime () < lp->txnexttime) && (sim_is_running))
return 0;
written = sim_write_serial (lp->serport, &(lp->txb[i]), length);
if ((written > 0) && (sim_is_running))
lp->txnexttime = floor (sim_gtime () + (written * lp->txdelta * sim_timer_inst_per_sec ()));
return written;
}
else { /* Telnet connection */
written = sim_write_sock (lp->sock, &(lp->txb[i]), length);
@ -721,9 +718,10 @@ else { /* Telnet connection */
return written; /* ignore errors on datagram sockets */
else
return -1; /* return error indication */
else
return written;
}
if ((written > 0) && (lp->txbps) && (sim_is_running))
lp->txnexttime = floor (sim_gtime () + ((written * lp->txdelta * sim_timer_inst_per_sec ()) / TMXR_RX_BPS_UNIT_SCALE));
return written;
}
@ -743,7 +741,6 @@ for ( ; p < lp->rxbpi; p++) { /* work from "p" through
lp->rbr[p] = 0; /* clear potential break from vacated slot */
lp->rxbpi = lp->rxbpi - 1; /* drop buffer insert index */
return;
}
@ -992,15 +989,11 @@ if (mp->last_poll_time == 0) { /* first poll initializa
mp->poll_interval = TMXR_DEFAULT_CONNECT_POLL_INTERVAL;
if (!(uptr->dynflags & TMUF_NOASYNCH)) { /* if asynch not disabled */
uptr->dynflags |= UNIT_TM_POLL; /* tag as polling unit */
sim_cancel (uptr);
}
for (i=0; i < mp->lines; i++) {
uptr = mp->ldsc[i].uptr ? mp->ldsc[i].uptr : mp->uptr;
if (!(mp->uptr->dynflags & TMUF_NOASYNCH)) { /* if asynch not disabled */
uptr->dynflags |= UNIT_TM_POLL; /* tag as polling unit */
sim_cancel (uptr);
sim_cancel (mp->ldsc[i].uptr);
}
}
}
@ -1683,9 +1676,11 @@ if ((lp->conn && lp->rcve) && /* conn & enb & */
} /* end if conn */
if (lp->rxbpi == lp->rxbpr) /* empty? zero ptrs */
lp->rxbpi = lp->rxbpr = 0;
if (lp->rxbps) {
if (val)
if (val) { /* Got something? */
if (lp->rxbps)
lp->rxnexttime = floor (sim_gtime () + ((lp->rxdelta * sim_timer_inst_per_sec ()) / lp->rxbpsfactor));
else
lp->rxnexttime = floor (sim_gtime () + ((lp->mp->uptr->wait * sim_timer_inst_per_sec ()) / TMXR_RX_BPS_UNIT_SCALE));
}
tmxr_debug_return(lp, val);
return val;
@ -2000,7 +1995,6 @@ for (i = 0; i < mp->lines; i++) { /* loop thru lines */
if (lp->rxbpi == lp->rxbpr) /* if buf empty, */
lp->rxbpi = lp->rxbpr = 0; /* reset pointers */
} /* end for */
return;
}
@ -2055,8 +2049,10 @@ if ((lp->txbfd && !lp->notelnet) || (TXBUF_AVAIL(lp) > 1)) {/* room for char (+
if ((TN_IAC == (u_char) chr) && (!lp->notelnet)) /* char == IAC in telnet session? */
TXBUF_CHAR (lp, TN_IAC); /* stuff extra IAC char */
TXBUF_CHAR (lp, chr); /* buffer char & adv pointer */
if ((!lp->txbfd) && (TXBUF_AVAIL (lp) <= TMXR_GUARD))/* near full? */
lp->xmte = 0; /* disable line */
if (((!lp->txbfd) &&
(TXBUF_AVAIL (lp) <= TMXR_GUARD)) || /* near full? */
(lp->txbps)) /* or we're rate limiting output */
lp->xmte = 0; /* disable line transmit until space available or character time has passed */
if (lp->txlog) { /* log if available */
extern TMLN *sim_oline; /* Make sure to avoid recursion */
TMLN *save_oline = sim_oline; /* when logging to a socket */
@ -2066,10 +2062,11 @@ if ((lp->txbfd && !lp->notelnet) || (TXBUF_AVAIL(lp) > 1)) {/* room for char (+
sim_oline = save_oline; /* resture output socket */
}
sim_exp_check (&lp->expect, chr); /* process expect rules as needed */
if ((sim_interval > 0) && /* not called within sim_process_event? */
(lp->txbps) && (lp->txdelta > 1000)) { /* and rate limiting output slower than 1000 cps */
if (!sim_is_running) { /* attach message or other non simulation time message? */
tmxr_send_buffered_data (lp); /* put data on wire */
sim_os_ms_sleep((lp->txdelta - 1000) / 1000); /* wait an approximate character delay */
sim_os_ms_sleep(((lp->txbps) && (lp->txdelta > 1000)) ? /* rate limiting output slower than 1000 cps */
(lp->txdelta - 1000) / 1000 :
10); /* wait an approximate character delay */
}
return SCPE_OK; /* char sent */
}
@ -2159,10 +2156,12 @@ for (i = 0; i < mp->lines; i++) { /* loop thru lines */
tmxr_rqln (lp))
_sim_activate (ruptr, 0);
#endif
if ((lp->xmte == 0) &&
((lp->txbps == 0) ||
(lp->txnexttime >= sim_gtime ())))
lp->xmte = 1; /* enable line transmit */
}
} /* end for */
return;
}
@ -2900,10 +2899,11 @@ return r;
Implementation note:
Only devices which poll on a unit different from the unit provided
- This routine must be called before the MUX is attached.
- Only devices which poll on a unit different from the unit provided
at MUX attach time need call this function. Calling this API is
necessary for asynchronous multiplexer support and unnecessary
otherwise.
necessary for asynchronous multiplexer support and if speed limited
behaviors are desired.
*/
@ -2915,7 +2915,8 @@ mp->ldsc[line].uptr = uptr_poll;
return SCPE_OK;
}
/* Declare which unit polls for output
/* Declare which unit performs output transmission in its unit service
routine for a particular line.
Inputs:
*mp = the mux
@ -2925,12 +2926,15 @@ return SCPE_OK;
Outputs:
none
Implementation note:
Implementation notes:
Only devices which poll on a unit different from the unit provided
- This routine must be called before the MUX is attached.
- Only devices which poll on a unit different from the unit provided
at MUX attach time need call this function ABD different from the
unit which polls for input. Calling this API is necessary for
asynchronous multiplexer support and unnecessary otherwise.
asynchronous multiplexer support and if speed limited behaviors are
desired.
*/
@ -3689,13 +3693,21 @@ if (!async || (uptr->flags & TMUF_NOASYNCH)) /* if asynch disabled */
#else
uptr->dynflags |= TMUF_NOASYNCH; /* tag as no asynch */
#endif
uptr->dynflags |= UNIT_TM_POLL; /* tag as polling unit */
if (mp->dptr) {
for (i=0; i<mp->lines; i++) {
mp->ldsc[i].expect.dptr = mp->dptr;
mp->ldsc[i].expect.dbit = TMXR_DBG_EXP;
mp->ldsc[i].send.dptr = mp->dptr;
mp->ldsc[i].send.dbit = TMXR_DBG_SEND;
if (mp->ldsc[i].uptr == NULL)
mp->ldsc[i].uptr = mp->uptr;
mp->ldsc[i].uptr->tmxr = (void *)mp;
mp->ldsc[i].uptr->dynflags |= UNIT_TM_POLL; /* tag as polling unit */
if (mp->ldsc[i].o_uptr == NULL)
mp->ldsc[i].o_uptr = mp->ldsc[i].uptr;
mp->ldsc[i].o_uptr->tmxr = (void *)mp;
mp->ldsc[i].o_uptr->dynflags |= UNIT_TM_POLL; /* tag as polling unit */
}
}
tmxr_add_to_open_list (mp);
@ -3770,6 +3782,8 @@ else {
fprintf (st, " - %stelnet", lp->notelnet ? "no" : "");
if (lp->uptr && (lp->uptr != lp->mp->uptr))
fprintf (st, " - Unit: %s", sim_uname (lp->uptr));
if (lp->o_uptr && (lp->o_uptr != lp->mp->uptr) && (lp->o_uptr != lp->uptr))
fprintf (st, " - Output Unit: %s", sim_uname (lp->o_uptr));
if (mp->modem_control != lp->modem_control)
fprintf(st, ", ModemControl=%s", lp->modem_control ? "enabled" : "disabled");
if (lp->loopback)
@ -3885,38 +3899,94 @@ uptr->filename = NULL;
uptr->tmxr = NULL;
mp->last_poll_time = 0;
for (i=0; i < mp->lines; i++) {
UNIT *uptr = mp->ldsc[i].uptr ? mp->ldsc[i].uptr : mp->uptr;
UNIT *o_uptr = mp->ldsc[i].o_uptr ? mp->ldsc[i].o_uptr : mp->uptr;
uptr->dynflags &= ~UNIT_TM_POLL; /* no polling */
o_uptr->dynflags &= ~UNIT_TM_POLL; /* no polling */
mp->ldsc[i].uptr->tmxr = NULL;
mp->ldsc[i].uptr->dynflags &= ~UNIT_TM_POLL; /* no polling */
mp->ldsc[i].o_uptr->tmxr = NULL;
mp->ldsc[i].o_uptr->dynflags &= ~UNIT_TM_POLL; /* no polling */
}
uptr->flags &= ~(UNIT_ATT); /* not attached */
uptr->dynflags &= ~(UNIT_TM_POLL|TMUF_NOASYNCH); /* no polling, not asynch disabled */
return SCPE_OK;
}
static int32 _tmxr_activate_delay (UNIT *uptr, int32 interval)
{
TMXR *mp = (TMXR *)uptr->tmxr;
int32 i, sooner = interval, due;
double sim_gtime_now = sim_gtime ();
for (i=0; i<mp->lines; i++) {
TMLN *lp = &mp->ldsc[i];
if ((uptr == lp->uptr) && /* read polling unit? */
(lp->rxbps) && /* while rate limiting? */
(tmxr_rqln_bare (lp, FALSE))) { /* with pending input data */
if (lp->rxnexttime > sim_gtime_now)
due = (int32)(lp->rxnexttime - sim_gtime_now);
else
due = sim_processing_event ? 1 : 0; /* avoid potential infinite loop if called from service routine */
sooner = MIN(sooner, due);
}
if ((uptr == lp->o_uptr) && /* output completion unit? */
(lp->txbps) && /* while rate limiting */
(tmxr_tqln(lp))) { /* with queued output data */
if (lp->txnexttime > sim_gtime_now)
due = (int32)(lp->txnexttime - sim_gtime_now);
else
due = sim_processing_event ? 1 : 0; /* avoid potential infinite loop if called from service routine */
sooner = MIN(sooner, due);
}
}
return sooner;
}
t_stat tmxr_activate (UNIT *uptr, int32 interval)
{
int32 sooner;
if (uptr->dynflags & UNIT_TMR_UNIT)
return sim_timer_activate (uptr, interval);
#if defined(SIM_ASYNCH_MUX)
if ((!(uptr->dynflags & UNIT_TM_POLL)) ||
(!sim_asynch_enabled)) {
return _sim_activate (uptr, interval);
return sim_timer_activate (uptr, interval); /* Handle the timer case */
if (!(uptr->dynflags & UNIT_TM_POLL))
return _sim_activate (uptr, interval); /* Handle the non mux case */
sooner = _tmxr_activate_delay (uptr, interval);
if (sooner != interval) {
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "scheduling %s after %d instructions rather than %d instructions\n", sim_uname (uptr), sooner, interval);
return _sim_activate (uptr, sooner); /* Handle the busy case */
}
#if defined(SIM_ASYNCH_MUX)
if (!sim_asynch_enabled)
return _sim_activate (uptr, interval);
return SCPE_OK;
#else
return _sim_activate (uptr, interval);
#endif
}
t_stat tmxr_activate_abs (UNIT *uptr, int32 interval)
{
AIO_VALIDATE; /* Can't call asynchronously */
sim_cancel (uptr);
return tmxr_activate (uptr, interval);
}
t_stat tmxr_activate_after (UNIT *uptr, uint32 usecs_walltime)
{
int32 sooner;
if (uptr->dynflags & UNIT_TMR_UNIT)
return _sim_activate_after (uptr, (double)usecs_walltime); /* Handle the timer case */
if (!(uptr->dynflags & UNIT_TM_POLL))
return _sim_activate_after (uptr, (double)usecs_walltime); /* Handle the non mux case */
sooner = _tmxr_activate_delay (uptr, 0x7FFFFFFF);
if (sooner != 0x7FFFFFFF) {
if (sooner < 0) {
sooner = _tmxr_activate_delay (uptr, 0x7FFFFFFF);
}
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "scheduling %s after %d instructions rather than %u usecs\n", sim_uname (uptr), sooner, usecs_walltime);
return _sim_activate (uptr, sooner); /* Handle the busy case directly */
}
#if defined(SIM_ASYNCH_MUX)
if ((!(uptr->dynflags & UNIT_TM_POLL)) ||
(!sim_asynch_enabled)) {
if (!sim_asynch_enabled)
return _sim_activate_after (uptr, (double)usecs_walltime);
}
return SCPE_OK;
@ -3927,17 +3997,9 @@ return _sim_activate_after (uptr, (double)usecs_walltime);
t_stat tmxr_activate_after_abs (UNIT *uptr, uint32 usecs_walltime)
{
#if defined(SIM_ASYNCH_MUX)
if ((!(uptr->dynflags & UNIT_TM_POLL)) ||
(!sim_asynch_enabled)) {
return _sim_activate_after_abs (uptr, (double)usecs_walltime);
sim_cancel (uptr);
return tmxr_activate_after (uptr, usecs_walltime);
}
return SCPE_OK;
#else
return _sim_activate_after_abs (uptr, (double)usecs_walltime);
#endif
}
t_stat tmxr_clock_coschedule (UNIT *uptr, int32 interval)
{
@ -3953,45 +4015,27 @@ sim_cancel (uptr);
return tmxr_clock_coschedule (uptr, interval);
}
#define MIN(a,b) (((a) < (b)) ? (a) : (b))
t_stat tmxr_clock_coschedule_tmr (UNIT *uptr, int32 tmr, int32 ticks)
{
TMXR *mp = (TMXR *)uptr->tmxr;
int32 interval = ticks * sim_rtcn_tick_size (tmr);
int32 sooner;
if (uptr->dynflags & UNIT_TMR_UNIT)
return sim_clock_coschedule_tmr (uptr, tmr, ticks); /* Handle the timer case */
if (!(uptr->dynflags & UNIT_TM_POLL))
return sim_clock_coschedule_tmr (uptr, tmr, ticks); /* Handle the non mux case */
sooner = _tmxr_activate_delay (uptr, interval);
if (sooner != interval) {
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "scheduling %s after %d instructions rather than %d ticks (%d instructions)\n", sim_uname (uptr), sooner, ticks, interval);
return _sim_activate (uptr, sooner); /* Handle the busy case directly */
}
#if defined(SIM_ASYNCH_MUX)
if ((!(uptr->dynflags & UNIT_TM_POLL)) ||
(!sim_asynch_enabled)) {
if (!sim_asynch_enabled) {
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "coscheduling %s after interval %d ticks\n", sim_uname (uptr), ticks);
return sim_clock_coschedule (uptr, tmr, ticks);
}
return SCPE_OK;
#else
if (mp) {
int32 i, soon = interval;
double sim_gtime_now = sim_gtime ();
for (i = 0; i < mp->lines; i++) {
TMLN *lp = &mp->ldsc[i];
if (tmxr_rqln_bare (lp, FALSE)) {
int32 due;
if (lp->rxbps)
if (lp->rxnexttime > sim_gtime_now)
due = (int32)(lp->rxnexttime - sim_gtime_now);
else
due = sim_processing_event ? 1 : 0; /* avoid potential infinite loop if called from service routine */
else
due = (int32)((uptr->wait * sim_timer_inst_per_sec ())/TMXR_RX_BPS_UNIT_SCALE);
soon = MIN(soon, due);
}
}
if (soon != interval) {
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "scheduling %s after %d instructions\n", sim_uname (uptr), soon);
return _sim_activate (uptr, soon);
}
}
sim_debug (TIMER_DBG_MUX, &sim_timer_dev, "coscheduling %s after interval %d ticks\n", sim_uname (uptr), ticks);
return sim_clock_coschedule_tmr (uptr, tmr, ticks);
#endif
@ -4224,7 +4268,6 @@ void tmxr_msg (SOCKET sock, const char *msg)
{
if ((sock) && (sock != INVALID_SOCKET))
sim_write_sock (sock, msg, (int32)strlen (msg));
return;
}
@ -4238,7 +4281,6 @@ while (*msg) {
sim_os_ms_sleep (10);
++msg;
}
return;
}
@ -4299,7 +4341,6 @@ for (i = 0; i < len; ++i) {
}
if (buf != stackbuf)
free (buf);
return;
}
@ -4368,7 +4409,6 @@ if (lp->expect.buf)
sim_exp_showall (st, &lp->expect);
if (lp->txlog)
fprintf (st, " Logging to %s\n", lp->txlogname);
return;
}
@ -4398,6 +4438,15 @@ else {
fprintf (st, " packet data queued/packets sent = %d/%d",
tmxr_tpqln (lp), lp->txpcnt);
fprintf (st, "\n");
if ((lp->rxbps) || (lp->txbps)) {
if ((lp->rxbps == lp->txbps))
fprintf (st, " speed = %u", lp->rxbps);
else
fprintf (st, " speed = %u/%u", lp->rxbps, lp->txbps);
if (lp->rxbpsfactor / TMXR_RX_BPS_UNIT_SCALE > 1.0)
fprintf (st, "*%.0f", lp->rxbpsfactor / TMXR_RX_BPS_UNIT_SCALE);
fprintf (st, " bps\n");
}
}
if (lp->txbfd)
fprintf (st, " output buffer size = %d\n", lp->txbsz);
@ -4408,7 +4457,6 @@ if (lp->txdrp)
fprintf (st, " dropped = %d\n", lp->txdrp);
if (lp->txstall)
fprintf (st, " stalled = %d\n", lp->txstall);
return;
}

View file

@ -279,6 +279,7 @@ t_stat tmxr_show_cstat (FILE *st, UNIT *uptr, int32 val, CONST void *desc);
t_stat tmxr_show_lines (FILE *st, UNIT *uptr, int32 val, CONST void *desc);
t_stat tmxr_show_open_devices (FILE* st, DEVICE *dptr, UNIT* uptr, int32 val, CONST char* desc);
t_stat tmxr_activate (UNIT *uptr, int32 interval);
t_stat tmxr_activate_abs (UNIT *uptr, int32 interval);
t_stat tmxr_activate_after (UNIT *uptr, uint32 usecs_walltime);
t_stat tmxr_activate_after_abs (UNIT *uptr, uint32 usecs_walltime);
t_stat tmxr_clock_coschedule (UNIT *uptr, int32 interval);
@ -314,6 +315,7 @@ void _tmxr_debug (uint32 dbits, TMLN *lp, const char *msg, char *buf, int bufsiz
#endif
#if (!defined(NOT_MUX_USING_CODE))
#define sim_activate tmxr_activate
#define sim_activate_abs tmxr_activate_abs
#define sim_activate_after tmxr_activate_after
#define sim_activate_after_abs tmxr_activate_after_abs
#define sim_clock_coschedule tmxr_clock_coschedule