ostore.dispatch
Class Classifier

java.lang.Object
  |
  +--seda.sandStorm.core.SimpleSink
        |
        +--ostore.dispatch.Classifier
All Implemented Interfaces:
ProfilableIF, SinkIF

public class Classifier
extends SimpleSink

The event classification engine for an Oceanstore replica.

Work modules register a queue satisfying the SinkIF interface, which will receive all QueueElementIF objects that satisfy the specified Filter. A single SinkIF may subscribe with several Filters, or with several priorities for a single Filter. Each SinkIF will receive exactly one copy of each QueueElementIF which satisfies any of its Filters, at the highest of its priorities.

By default, all Filters are subscribed with PRIORITY_NORMAL. The Classifier will throw an exception if it cannot properly enqueue an event on the corresponding SinkIF. Modules may register with lower priorities, such as PRIORITY_LOW. Any exceptions thrown by queues registered at this lower priority will be ignored.

The Classifier stores a trie of filter requirements for each type of event. A node in the trie represents the set of QueueElementIFs that pass all requirements on the path to it from the trie root. Each node stores the SinkIFs requesting all QueueElementIFs from that set. The tries are stored in an unsynchronized HashMap, indexed by its event type.

On dispatch, the Classifier uses the event type to search the HashMap for the proper trie. It then passes down the trie, checking field requirements and collecting the SinkIFs for nodes which it passes. Finally, a single copy of the QueueElementIF is placed into each of these SinkIFs.

The HashMap of tries is unsynchronized, to maximize parallelism. When it must be modified, to add a new event type, a new HashMap is constructed and atomically replaces the old one. All calls to dispatch can proceed without blocking.

Version:
$Id: Classifier.java,v 1.50 2004/04/30 22:09:36 hweather Exp $
Author:
Dennis Geels
See Also:
Filter

Nested Class Summary
protected  class Classifier.DefaultTimerCB
          The default implementation of TimerCB, which uses seda.sandStorm.core.ssTimers to store and trigger events.
 class Classifier.SlowLink
          Records information used to delay events sent from one named stage to another.
static interface Classifier.TimerCB
          A Simple callback interface for timers.
 
Field Summary
protected  NodeId _node_id
          The NodeId that is associated with this classifier.
protected static Object _synch_o
          The synch_o object is used to synchronize the instantiation of an ssTimer object.
protected static ssTimer _timer
          The ssTimer which stores and triggers events.
static Integer PRIORITY_HIGH
           
static Integer PRIORITY_LOW
           
static Integer PRIORITY_NORMAL
           
protected  Class root_class
          The Class object for the root of the type hierarchy.
protected  Classifier.TimerCB timer_cb
          A timer used to delay event delivery
protected  Vector tsinks
          Vector of TSinks
protected  HashMap type_nodes
          A map of all the type nodes, indexed by Class objects.
 
Constructor Summary
Classifier(NodeId node_id)
          Create a new Classifier object.
 
Method Summary
 Boolean addTSinkType(TSink ts, Class event_type)
          Called by TSink.enqueue(), adds ts to tsinks.
 void cancel(Object token)
          Cancels an event previously scheduled by dispatch_later.
 Object dispatch_later(QueueElementIF event, long millis)
          Enqueues the event at a later time into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies.
 int dispatch_lossy(QueueElementIF event)
          Enqueues the event into each SinkIF subscribed with a Filter the given QueueElementIF satisfies.
 int dispatch(QueueElementIF event)
          Enqueues the event into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies.
 int dispatch(QueueElementIF event, Integer error_mask)
          Enqueues the event into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies.
 void enqueue(QueueElementIF element)
          Dispatches an event, just like dispatch.
static Classifier getClassifier(NodeId node_id)
          Returns the Classifier instance corresponding to a NodeId.
static Set getNodeIds()
          Returns a Set containing the NodeIds for each Classifier in this jvm.
 TSink getTSink(String stagename)
          Gets the TSink for the stage given as an argument.
 NodeId node_id()
          The NodeId that is associated with this classifier.
static void removeClassifier(NodeId node_id)
          Deletes the Classifier instance corresponding to a NodeId.
 void set_timer_cb(Classifier.TimerCB callback)
          Allows the timer callback to be changed; this is used by the simulator to install a timer using simulated time rather than operating system timers based on real time.
 boolean subscribe(Filter filter, SinkIF sink)
          Subscribe a normal priority SinkIF to an event stream defined by a Filter.
 boolean subscribe(Filter filter, SinkIF sink, Integer priority)
          Subscribes a SinkIF to an event stream defined by a Filter.
 Boolean subscribedTo(Class event_type)
          Returns Boolean.TRUE if type has been subscribed to.
 boolean unsubscribe(Filter filter)
          Unsubscribe a Filter.
 boolean unsubscribe(SinkIF sink)
          Unsubscribe a SinkIF, removing its Filters from the trie.
 
Methods inherited from class seda.sandStorm.core.SimpleSink
enqueue_abort, enqueue_commit, enqueue_lossy, enqueue_many, enqueue_prepare, getEnqueuePredicate, profileSize, setEnqueuePredicate, size
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

PRIORITY_HIGH

public static final Integer PRIORITY_HIGH

PRIORITY_NORMAL

public static final Integer PRIORITY_NORMAL

PRIORITY_LOW

public static final Integer PRIORITY_LOW

_timer

protected static ssTimer _timer
The ssTimer which stores and triggers events.


_synch_o

protected static Object _synch_o
The synch_o object is used to synchronize the instantiation of an ssTimer object.


type_nodes

protected HashMap type_nodes
A map of all the type nodes, indexed by Class objects.


root_class

protected Class root_class
The Class object for the root of the type hierarchy.


timer_cb

protected Classifier.TimerCB timer_cb
A timer used to delay event delivery


_node_id

protected NodeId _node_id
The NodeId that is associated with this classifier.


tsinks

protected Vector tsinks
Vector of TSinks

Constructor Detail

Classifier

public Classifier(NodeId node_id)
Create a new Classifier object.

Method Detail

getClassifier

public static Classifier getClassifier(NodeId node_id)
Returns the Classifier instance corresponding to a NodeId.

Parameters:
node_id - The NodeId which parameterizes a virtual SandStorm instance.
Returns:
the (possibly new) Classifier instance corresponding to node_id.

removeClassifier

public static void removeClassifier(NodeId node_id)
Deletes the Classifier instance corresponding to a NodeId. Should be called only when a node is going down and going to come up again with a new node id. Use very carefully!

Parameters:
node_id - The NodeId which parameterizes a virtual SandStorm instance.

getNodeIds

public static Set getNodeIds()
Returns a Set containing the NodeIds for each Classifier in this jvm. The Set is backed by the internal map; do not attempt to change it.

WARNING: Used for shutdown control only. Do not call this method.


node_id

public NodeId node_id()
The NodeId that is associated with this classifier.


subscribe

public boolean subscribe(Filter filter,
                         SinkIF sink,
                         Integer priority)
Subscribes a SinkIF to an event stream defined by a Filter.

Each event class corresponds to a trie of field restrictions. This method registers the sink at the appropriate leaf of each trie corresponding to the required event type or any of its subclasses.

Currently each trie is modified atomically, but the whole set of tries is not. The Classifier may dispatch events of some subclasses before this Filter has been registered everywhere. All tries will be ready when this method returns. If this level of consistency is insufficient, use SinkIF.setEnqueuePredicate(seda.sandStorm.api.EnqueuePredicateIF) to disable the sink completely until after subscription is complete.

If the HashMap of tries requires modification (e.g. for a new type of event), a new HashMap is constructed and moved into place atomically.

The type hierarchy must be a tree. We disallow subscriptions which require a type with multiple independent parents in the hierarchy.

Parameters:
filter - defines the set of QueueElementIFs which the caller requests.
sink - the SinkIF into which we place satisfying QueueElementIFs.
priority - a number between 0 (lowest priority) and 10 (highest). Only the PRIORITY_* constants are meaningful.
Returns:
true iff subscription succeeded.

subscribe

public boolean subscribe(Filter filter,
                         SinkIF sink)
Subscribe a normal priority SinkIF to an event stream defined by a Filter.

Parameters:
filter - defines the set of QueueElementIFs which the caller requests.
sink - the SinkIF into which we place satisfying QueueElementIFs.
Returns:
true iff subscription succeeded.

unsubscribe

public boolean unsubscribe(Filter filter)
Unsubscribe a Filter. This method will not remove a SinkIF from the trie completely if other Filters for that SinkIF remain.

Parameters:
filter - The Filter to be removed.
Returns:
true iff unsubscription succeeded.

unsubscribe

public boolean unsubscribe(SinkIF sink)
Unsubscribe a SinkIF, removing its Filters from the trie.

Parameters:
sink - The SinkIF to be removed.
Returns:
true iff unsubscription succeeded.

dispatch

public int dispatch(QueueElementIF event)
             throws SinkFullException
Enqueues the event into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies. Uses the default error mask (PRIORITY_NORMAL).

Parameters:
event - The QueueElementIF to dispatch.
Returns:
the number of SinkIFs into which the event was successfully enqueued.
Throws:
SinkFullException - indicates that at least one receiving sink of high priority was temporarily full.

dispatch_lossy

public int dispatch_lossy(QueueElementIF event)
Enqueues the event into each SinkIF subscribed with a Filter the given QueueElementIF satisfies. This method is lossy in that it silently fails to enqueue the event into any SinkIF which returns an Exception. This dispatch variant is meant for "low priority" events which can be safely dropped.

Parameters:
event - The QueueElementIF to dispatch.
Returns:
the number of SinkIFs into which the event was successfully enqueued.

dispatch

public int dispatch(QueueElementIF event,
                    Integer error_mask)
             throws SinkFullException
Enqueues the event into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies. All SinkIFs registered at or above the specified priority are enqueued atomically. If any throws a SinkFullException, that exception is thrown and no SinkIF receives the the event. SinkIFs which were registered below the specified priority are processed second; Any SinkFullExceptions they throw are ignored.

Parameters:
event - The QueueElementIF to dispatch.
error_mask - The lowest priority for SinkIFs whose errors we want to see.
Returns:
the number of SinkIFs into which the event was successfully enqueued.
Throws:
SinkFullException - indicates that at least one receiving sink of high priority was temporarily full.

dispatch_later

public Object dispatch_later(QueueElementIF event,
                             long millis)
Enqueues the event at a later time into each SinkIF subscribed with a Filter that the given QueueElementIF satisfies. An internal timer holds the event for the specified time, and then dispatches it. The event is classified at dispatch time, not when this method is called. Therefore the set of sinks which receive the event may change in the meantime. NOTE: please cancel any unneeded events. Each pending event increases the resources consumed by the internal timer.

Parameters:
event - The QueueElementIF to dispatch.
millis - The number of milliseconds from now to dispatch the event.
Returns:
a token to pass to cancel in order to cancel the event before it is dispatched.
See Also:
cancel(java.lang.Object)

cancel

public void cancel(Object token)
Cancels an event previously scheduled by dispatch_later. If the event has already been dispatched, this method has no effect.

Parameters:
token - The Object returned by dispatch_later.
Throws:
IllegalArgumentException - if token is not a token returned by the internal timer.
See Also:
dispatch_later(seda.sandStorm.api.QueueElementIF, long)

getTSink

public TSink getTSink(String stagename)
               throws NoSuchStageException
Gets the TSink for the stage given as an argument. When the requesting stage has the TSink for the given stage, it will be able to pass events directly instead of invoking this Classifier, which should be more efficient.

Parameters:
stagename - String name of the stage whose TSink is requested
Returns:
TSink A TSink in which events can be enqueued.
Throws:
NoSuchStageException - if there is no stage named stagename.

subscribedTo

public Boolean subscribedTo(Class event_type)
Returns Boolean.TRUE if type has been subscribed to.

Parameters:
event_type - The type of event
Returns:
Boolean true if event_type has been subscribed to

addTSinkType

public Boolean addTSinkType(TSink ts,
                            Class event_type)
Called by TSink.enqueue(), adds ts to tsinks.

Parameters:
ts - The TSink calling this method
event_type - The type which ts is enqueueing

enqueue

public void enqueue(QueueElementIF element)
             throws SinkFullException
Dispatches an event, just like dispatch.

DO NOT USE THIS METHOD . This method exists only to satisfy the SinkIF interface. We implement that interface (by extending the SimpleSink class) to allow signals and network messages to travel through the Classifier.

Specified by:
enqueue in interface SinkIF
Specified by:
enqueue in class SimpleSink
Parameters:
element - The event to dispatch.
SinkFullException
See Also:
dispatch(seda.sandStorm.api.QueueElementIF)

set_timer_cb

public void set_timer_cb(Classifier.TimerCB callback)
Allows the timer callback to be changed; this is used by the simulator to install a timer using simulated time rather than operating system timers based on real time.