|
|||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object | +--seda.sandStorm.core.SimpleSink | +--ostore.dispatch.Classifier
The event classification engine for an Oceanstore replica.
Work modules register a queue satisfying the SinkIF
interface, which will receive all QueueElementIF
objects that
satisfy the specified Filter
. A single SinkIF
may subscribe with several Filter
s, or with several
priorities for a single Filter
. Each SinkIF
will receive exactly one copy of each QueueElementIF
which
satisfies any of its Filter
s, at the highest of its priorities.
By default, all Filter
s are subscribed with
PRIORITY_NORMAL
. The Classifier
will throw an
exception if it cannot properly enqueue an event on the corresponding
SinkIF
. Modules may register with lower priorities, such
as PRIORITY_LOW
. Any exceptions thrown by queues registered
at this lower priority will be ignored.
The Classifier
stores a trie of filter requirements for
each type of event. A node in the trie represents the set of
QueueElementIF
s that pass all requirements on the path to
it from the trie root. Each node stores the SinkIF
s
requesting all QueueElementIF
s from that set. The tries
are stored in an unsynchronized HashMap
, indexed by its
event type.
On dispatch, the Classifier
uses the event type to search
the HashMap
for the proper trie. It then passes down the
trie, checking field requirements and collecting the
SinkIF
s for nodes which it passes. Finally, a single copy
of the QueueElementIF
is placed into each of these
SinkIF
s.
The HashMap
of tries is unsynchronized, to maximize
parallelism. When it must be modified, to add a new event type, a new
HashMap
is constructed and atomically replaces the old
one. All calls to dispatch
can proceed without blocking.
Filter
Nested Class Summary | |
---|---|
protected class |
Classifier.DefaultTimerCB
The default implementation of TimerCB , which uses
seda.sandStorm.core.ssTimer s to store and trigger events. |
class |
Classifier.SlowLink
Records information used to delay events sent from one named stage to another. |
static interface |
Classifier.TimerCB
A Simple callback interface for timers. |
Field Summary | |
---|---|
protected NodeId |
_node_id
The NodeId that is associated with this classifier. |
protected static Object |
_synch_o
The synch_o object is used to synchronize
the instantiation of an ssTimer object. |
protected static ssTimer |
_timer
The ssTimer which stores and triggers events. |
static Integer |
PRIORITY_HIGH
|
static Integer |
PRIORITY_LOW
|
static Integer |
PRIORITY_NORMAL
|
protected Class |
root_class
The Class object for the root of the type hierarchy. |
protected Classifier.TimerCB |
timer_cb
A timer used to delay event delivery |
protected Vector |
tsinks
Vector of TSink s |
protected HashMap |
type_nodes
A map of all the type nodes, indexed by Class objects. |
Constructor Summary | |
---|---|
Classifier(NodeId node_id)
Create a new Classifier object. |
Method Summary | |
---|---|
Boolean |
addTSinkType(TSink ts,
Class event_type)
Called by TSink.enqueue(), adds ts to tsinks. |
void |
cancel(Object token)
Cancels an event previously scheduled by dispatch_later .
|
Object |
dispatch_later(QueueElementIF event,
long millis)
Enqueues the event at a later time into each SinkIF
subscribed with a Filter that the given
QueueElementIF satisfies.
|
int |
dispatch_lossy(QueueElementIF event)
Enqueues the event into each SinkIF subscribed with a
Filter the given QueueElementIF satisfies.
|
int |
dispatch(QueueElementIF event)
Enqueues the event into each SinkIF subscribed with a
Filter that the given QueueElementIF
satisfies.
|
int |
dispatch(QueueElementIF event,
Integer error_mask)
Enqueues the event into each SinkIF subscribed with a
Filter that the given QueueElementIF
satisfies. |
void |
enqueue(QueueElementIF element)
Dispatches an event, just like dispatch .
|
static Classifier |
getClassifier(NodeId node_id)
Returns the Classifier instance corresponding to a
NodeId . |
static Set |
getNodeIds()
Returns a Set containing the NodeId s for
each Classifier in this jvm. |
TSink |
getTSink(String stagename)
Gets the TSink for the stage given as an argument.
|
NodeId |
node_id()
The NodeId that is associated with this classifier. |
static void |
removeClassifier(NodeId node_id)
Deletes the Classifier instance corresponding to a
NodeId . |
void |
set_timer_cb(Classifier.TimerCB callback)
Allows the timer callback to be changed; this is used by the simulator to install a timer using simulated time rather than operating system timers based on real time. |
boolean |
subscribe(Filter filter,
SinkIF sink)
Subscribe a normal priority SinkIF to an event stream
defined by a Filter . |
boolean |
subscribe(Filter filter,
SinkIF sink,
Integer priority)
Subscribes a SinkIF to an event stream defined by a
Filter .
|
Boolean |
subscribedTo(Class event_type)
Returns Boolean.TRUE if type has been subscribed to. |
boolean |
unsubscribe(Filter filter)
Unsubscribe a Filter . |
boolean |
unsubscribe(SinkIF sink)
Unsubscribe a SinkIF , removing its Filter s
from the trie. |
Methods inherited from class seda.sandStorm.core.SimpleSink |
---|
enqueue_abort, enqueue_commit, enqueue_lossy, enqueue_many, enqueue_prepare, getEnqueuePredicate, profileSize, setEnqueuePredicate, size |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
public static final Integer PRIORITY_HIGH
public static final Integer PRIORITY_NORMAL
public static final Integer PRIORITY_LOW
protected static ssTimer _timer
ssTimer
which stores and triggers events.
protected static Object _synch_o
synch_o
object is used to synchronize
the instantiation of an ssTimer object.
protected HashMap type_nodes
Class
objects.
protected Class root_class
Class
object for the root of the type hierarchy.
protected Classifier.TimerCB timer_cb
protected NodeId _node_id
NodeId
that is associated with this classifier.
protected Vector tsinks
Vector
of TSink
s
Constructor Detail |
public Classifier(NodeId node_id)
Method Detail |
public static Classifier getClassifier(NodeId node_id)
Classifier
instance corresponding to a
NodeId
.
node_id
- The NodeId
which parameterizes a
virtual SandStorm instance.
Classifier
instance
corresponding to node_id
.public static void removeClassifier(NodeId node_id)
Classifier
instance corresponding to a
NodeId
. Should be called only when a node is going down
and going to come up again with a new node id. Use very carefully!
node_id
- The NodeId
which parameterizes a
virtual SandStorm instance.public static Set getNodeIds()
Set
containing the NodeId
s for
each Classifier
in this jvm. The Set
is
backed by the internal map; do not attempt to change it.
WARNING: Used for shutdown control only. Do not call this method.
public NodeId node_id()
NodeId
that is associated with this classifier.
public boolean subscribe(Filter filter, SinkIF sink, Integer priority)
SinkIF
to an event stream defined by a
Filter
.
Each event class corresponds to a trie of field restrictions. This method registers the sink at the appropriate leaf of each trie corresponding to the required event type or any of its subclasses.
Currently each trie is modified atomically, but the whole set of
tries is not. The Classifier
may dispatch events of
some subclasses before this Filter
has been registered
everywhere. All tries will be ready when this method returns. If
this level of consistency is insufficient, use
SinkIF.setEnqueuePredicate(seda.sandStorm.api.EnqueuePredicateIF)
to disable the
sink completely until after subscription is complete.
If the HashMap
of tries requires modification (e.g. for
a new type of event), a new HashMap
is constructed and
moved into place atomically.
The type hierarchy must be a tree. We disallow subscriptions which require a type with multiple independent parents in the hierarchy.
filter
- defines the set of QueueElementIF
s which
the caller requests.sink
- the SinkIF
into which we place satisfying
QueueElementIF
s.priority
- a number between 0 (lowest priority) and 10
(highest). Only the PRIORITY_*
constants are meaningful.
public boolean subscribe(Filter filter, SinkIF sink)
SinkIF
to an event stream
defined by a Filter
.
filter
- defines the set of QueueElementIF
s which
the caller requests.sink
- the SinkIF
into which we place satisfying
QueueElementIF
s.
public boolean unsubscribe(Filter filter)
Filter
. This method will not remove a
SinkIF
from the trie completely if other
Filter
s for that SinkIF
remain.
filter
- The Filter
to be removed.
public boolean unsubscribe(SinkIF sink)
SinkIF
, removing its Filter
s
from the trie.
sink
- The SinkIF
to be removed.
public int dispatch(QueueElementIF event) throws SinkFullException
SinkIF
subscribed with a
Filter
that the given QueueElementIF
satisfies.
Uses the default error mask (PRIORITY_NORMAL
).
event
- The QueueElementIF
to dispatch.
SinkIF
s into which the event was
successfully enqueued.
SinkFullException
- indicates that at least one
receiving sink of high priority was temporarily full.public int dispatch_lossy(QueueElementIF event)
SinkIF
subscribed with a
Filter
the given QueueElementIF
satisfies.
This method is lossy in that it silently fails to enqueue the event
into any SinkIF
which returns an Exception
.
This dispatch variant is meant for "low priority" events which can be
safely dropped.
event
- The QueueElementIF
to dispatch.
SinkIF
s into which the event was
successfully enqueued.public int dispatch(QueueElementIF event, Integer error_mask) throws SinkFullException
SinkIF
subscribed with a
Filter
that the given QueueElementIF
satisfies. All SinkIF
s registered at or above the
specified priority are enqueued atomically. If any throws a
SinkFullException
, that exception is thrown and no
SinkIF
receives the the event. SinkIF
s
which were registered below the specified priority are processed
second; Any SinkFullException
s they throw are ignored.
event
- The QueueElementIF
to dispatch.error_mask
- The lowest priority for SinkIF
s
whose errors we want to see.
SinkIF
s into which the event was
successfully enqueued.
SinkFullException
- indicates that at least one
receiving sink of high priority was temporarily full.public Object dispatch_later(QueueElementIF event, long millis)
SinkIF
subscribed with a Filter
that the given
QueueElementIF
satisfies.
An internal timer holds the event for the specified time, and then
dispatch
es it. The event is classified at dispatch
time, not when this method is called. Therefore the set of sinks
which receive the event may change in the meantime.
NOTE: please cancel
any unneeded events. Each pending
event increases the resources consumed by the internal timer.
event
- The QueueElementIF
to dispatch.millis
- The number of milliseconds from now to dispatch the
event.
cancel
in order to cancel
the event before it is dispatched.cancel(java.lang.Object)
public void cancel(Object token)
dispatch_later
.
If the event has already been dispatched, this method has no effect.
token
- The Object
returned by
dispatch_later
.
IllegalArgumentException
- if token
is not a
token returned by the internal timer.dispatch_later(seda.sandStorm.api.QueueElementIF, long)
public TSink getTSink(String stagename) throws NoSuchStageException
TSink
for the stage given as an argument.
When the requesting stage has the TSink
for the
given stage, it will be able to pass events directly instead of
invoking this Classifier
, which should be more
efficient.
stagename
- String name of the stage whose TSink is requested
NoSuchStageException
- if there is no stage named stagename.public Boolean subscribedTo(Class event_type)
event_type
- The type of event
public Boolean addTSinkType(TSink ts, Class event_type)
ts
- The TSink calling this methodevent_type
- The type which ts is enqueueingpublic void enqueue(QueueElementIF element) throws SinkFullException
dispatch
.
DO NOT USE THIS METHOD .
This method exists only to satisfy the SinkIF
interface.
We implement that interface (by extending the
SimpleSink
class) to allow signals and network messages to
travel through the Classifier.
enqueue
in interface SinkIF
enqueue
in class SimpleSink
element
- The event to dispatch.
SinkFullException
dispatch(seda.sandStorm.api.QueueElementIF)
public void set_timer_cb(Classifier.TimerCB callback)
|
|||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |