module Cf_gadget:Monadic composition of complex stream processors. An experimental interface for constructing highly interactive functional systems in a single thread of control.sig
..end
This module implements a marginally more general version of the Gadget system described in Chapter 30 of Magnus Carlsson's and Thomas Hallgren's joint Ph.D. thesis.
In the context of this module, a "gadget" is a monad that evaluates into
a Cf_flow
object, capable of alternately reading from a source of input
values and writing to a sink of output values. The continuation monad is
specialized over an abstract "process" monad type, and a scheduler handles
the calls and jumps between multiple simultaneous processes communicating
with one another over a very lightweight message passing abstraction called
a "wire".
The abstract process monad is a kind of state-continuation monad for
operations over the internal Cf_flow
value. The operations it supports
are lifted into the gadget monad, as are briefly sumamrized as follows:
A wire is logically composed of a receiver and a transmitter, with weak mutual references between them. When either end of the wire is reclaimed by the memory allocator, the other end is automatically rendered into a null wire, i.e. receivers never get messages and transmitters put messages by discarding them.
A pair of classes are provided to represent the receiver and the
transmitter on a wire. Objects of the rx
class define a get
method for
creating a "gate" that can receive a message. Objects of the tx
class
define a put
method for transmitting a message. Both objects can be
constructed with a wire object, and a convenience operators are defined for
creating a new wire and construction a pair of associated rx
and tx
objects.
Each process contains an encapsulated state, initialized to a value when the process is started. As a process receives and transmits messages, it can easily manipulate this encapsulated state. When a process has no more messages to receive or transmit, the scheduler reclaims its resources and the final state is discarded.
Any process may read from the internal input stream or write to the external output stream. Conventionally, it is often simpler to define a a reader process and a writer process to localize these effects.
Note: see Magnus Carlsson's and Thomas Hallgren's joint
Ph.D. thesis for a complete
dissertation on the nature of the system of concepts behind this module.
type ('a, 'b, 'c)
work_t
type ('a, 'b, 'c)
gate_t
('s, 'i, 'o) process_t
using the guard
function.type ('a, 'b, 'c)
wire_t
'x
from a sender to a
a receiver in a ('s, 'i, 'o) work_t
continuation.type('a, 'b, 'c, 'd)
guard_t =(('a, 'b, 'c) gate_t, 'd) Cf_cmonad.t
type('a, 'b, 'c, 'd)
t =(('a, 'b, 'c) work_t, 'd) Cf_cmonad.t
val eval : ('a, 'b, 'c, unit) t -> 'a -> ('b, 'c) Cf_flow.t
eval y s
to obtain a new flow by evaluating the gadget monad y
with
a state initializer of a
.val start : ('a, 'b, 'c, unit) t -> 'a -> ('d, 'b, 'c, unit) t
start y s
to start a new process evaluating the gadget
y
with a state initializer s
.val guard : ('a, 'b, 'c, unit) guard_t -> ('a, 'b, 'c, unit) t
guard m
to receive the next message guard by m
.val wire : ('a, 'b, 'c, ('d, 'b, 'c) wire_t) t
wire
to create a new wire object for sending messages of type 'x
.val null : ('a, 'b, 'c) wire_t
null
to construct a rx
object that produces gates that never
receive any messages, and a tx
object that discards every message
transmitted without deliver it. This object can be useful for default
arguments to some gadget functions.val read : ('a, 'b, 'c, 'b) t
read
to get the next input value from the external stream.val write : 'a -> ('b, 'c, 'a, unit) t
write obj
to put the next output value into the
external stream.val load : ('a, 'b, 'c, 'a) t
load
to get the current state encapsulated in the process.val store : 'a -> ('a, 'b, 'c, unit) t
store obj
to store the state obj
as the encapsulated
state for the current monad.val modify : ('a -> 'a) -> ('a, 'b, 'c, unit) t
modify f
to apply f
to the current encapsulated
state of the process and store the resulting new state.class type connector =object
..end
class[['a, 'b, 'c]]
rx :('a, 'b, 'c) wire_t ->
object
..end
class[['a, 'b, 'c]]
tx :('a, 'b, 'c) wire_t ->
object
..end
type('a, 'b, 'c)
simplex_t =('a, 'b, 'c) rx * ('a, 'b, 'c) tx
rx
and tx
objects of a wire.val fsimplex : f:(('a, 'b, 'c) wire_t ->
('a, 'b, 'c) #rx * ('a, 'b, 'c) #tx) ->
('d, 'b, 'c, ('a, 'b, 'c) rx * ('a, 'b, 'c) tx)
t
fsimplex ~f
to construct a new wire and apply it to the constructor
function f
to pass along a new matching pair of rx
and tx
objects.val simplex : ('a, 'b, 'c, ('d, 'b, 'c) rx * ('d, 'b, 'c) tx)
t
simplex
to construct a new maching pair of rx
and tx
objects.type('a, 'b, 'c, 'd)
duplex_t =(('a, 'c, 'd) rx * ('b, 'c, 'd) tx) *
(('b, 'c, 'd) rx * ('a, 'c, 'd) tx)
rx
and tx
tuples, one tuple for each end of a duplex communication. Each tuple is
composed of the rx
and the tx
objects to use on one end of the
communication.val duplex : ('a, 'b, 'c, ('d, 'e, 'b, 'c) duplex_t) t
duplex
to construct a new duplex communication channel, composed of
two wires each in opposite flowl.val wrap : ('a, 'b, 'c) #rx ->
('d, 'b, 'c) #tx ->
('a, 'd) Cf_flow.t -> ('e, 'b, 'c, unit) t
wrap rx tx w
to start a new process that wraps the flow w
, so that
it reads output from the flow (copying it to tx
object) and writes input
to the flow (copying it from the rx
object).