|
||||||||||
| PREV PACKAGE NEXT PACKAGE | FRAMES NO FRAMES | |||||||||
See:
Description
| Interface Summary | |
| StreamInterface | Interface for sending and/or receiving a stream of records, each indexed by a small standard header. |
| Class Summary | |
| BasicPlayer | |
| BasicRecorder | BasicRecorder demonstrates how to build a simple implementation of a StreamInterface for portably emitting a stream of records. |
| StreamField | |
| Exception Summary | |
| streamException | |
This package supports efficient multi-point network monitoring infrastructure for collection of streaming data from many Monitors.
| Bytes | Java Type | Interpretation |
| 0..3 | int | Service code (record data type) |
| 4..7 | int | Source code (writer identification) |
| 8..15 | double | Timestamp (seconds since epoch) |
| 16..19 | int | Record length (bytes to follow) |
| 20..end | bytes | User-defined data |
getRecordTypeCode(String) and
getRecordSourceCode(String) methods,
and resolved using the getRecordTypeString(int) and
getRecordSourceString(int)
methods, all specified in interface StreamInterface.
The Streams package contains four classes for very basic record-oriented streaming data export from SSFNet simulations. These classes implement the simple streaming data protocol, described above.
Simple exception class that can be thrown by stream setup methods.
BasicRecorder demonstrates how to build a simple implementation of a StreamInterface for portably emitting a stream of records.
BasicPlayer demonstrates how to build a simple implementation of a StreamInterface for portably processing a stream of records.
Interface for sending and/or receiving a stream of records, each indexed by a small standard header. This header identifies the type of each record, the writer of the record, the time at which the record was generated, and the number of bytes to follow in a user-defined format. The type and writer are given as integer codes, which correspond to arbitrary-length strings sent in-stream to construct a pair of queryable dynamic data dictionaries. See the description of the small streaming data protocol below for more details. The interface specifies the following operations:
Connect the stream to a data sink or source at the given URL, throwing a streamException if there are any problems:public void connectWrite(String url) throws streamException; public void connectRead(String url) throws streamException;
Return true if this stream has been successfully connected to a data source or sink, and not disconnected:public boolean isConnected();
Signal that no more records are to be received (if reading) or sent (if writing):public void disconnect();
Process a single incoming record in a data stream connected for reading:public int receive(int type_code, int source_code, double timestamp, byte[] bytes, int offset, int length);
Emit a single record on a data stream connected for writing, returning zero if the record is successfully emitted, or a nonzero value if there is an error or if a filter has suppressed the record from being written. The short form (without payload) may be used to test whether a record will be emitted or suppressed, to save the overhead of actually preparing it for transmission:public int send(int type_code, int source_code, double timestamp); public int send(int type_code, int source_code, double timestamp, byte[] bytes, int offset, int length); // long form
Map a user-defined record type string to an integer code, or vice-versa:public String getRecordTypeString(int code); public int getRecordTypeCode(String name);
Map a user-defined sender ID string to an integer code, or vice-versa:public String getRecordSourceString(int code); public int getRecordSourceCode(String name);
To write a stream of records, the user typically constructs a BasicRecorder, connects it to a data sink, and calls send() repeatedly to emit records before disconnecting(). Note that the sender uses the short form of send() with no payload to test stream status before committing to the overhead of preparing the payload bytes. There's no sense preparing bytes that will be dropped because the stream is dropping or suppressing output for some reason.
StreamInterface myRecorder = new BasicRecorder("this names my stream");
myRecorder.connectWrite("file:/tmp/stream.dat");
int tid = myRecorder.getRecordTypeCode("my record type");
int sid = myRecorder.getRecordSourceCode("my writer id");
double now = .. ; // get timestamp from simulator or clock
if (0 == myRecorder.send(tid,sid,now)) { // test for suppression
byte[] mybuffer = .. ; // prepare the bytes to be emitted
myRecorder.send(tid,sid,now, mybuffer,0,mybuffer.length);
}
// .. do more sends until finished ..
myRecoder.disconnect();
To read the records later, the user typically constructs a BasicPlayer and connects it to a data source; the BasicPlayer calls back receive() each time a record arrives:
/** For this example only: use anonymous inner class BasicPlayer
* to demonstrate specialized record processing. We override
* the default behavior for one type of record, and defer to the
* base class default for all other types of records.
*/
StreamInterface myPlayer = new BasicPlayer("this names my stream") {
public void receive(int tid, int sid, double time,
byte[] buf, int offset, int length) {
if (tid == getRecordTypeCode("my record type")) {
// .. process this record content appropriately
}
else super.receive(tid,sid,time,buf,offset,length);
}
};
myPlayer.connectRead("file:/tmp/stream.dat"); // calls back receive()..
Finally, one new SSFNet class makes it easier to manage record streams
in a multitimeline context. Configure an instance of the
ProbeSession protocol under the standard name "probe" in
each host or router where probing is to be enabled:
ProtocolGraph [
# .. traditional protocols here
ProtocolSession [
name probe use SSF.OS.ProbeSession
file "/tmp/mystream.dat"
stream "My Stream"
]
Then, from any protocol or protocol-related code, access the "probe"
protocol and call getRecorder to get a handle on an
implementation of StreamInterface suitable for sending
records:
ProtocolSession theProbe =
(ProbeSession)inGraph().SessionForName("probe");
StreamInterface theStream = theProbe.getRecorder(); // preconnected
int myHostCode = theProbe.getHostCode(); // uses the NHI address
int myDatatypeCode = theStream.getRecordTypeCode("my record type");
theStream.send(myDatatypeCode,myHostCode,now()/Net.frequency(),
myBytes, 0, myBytes.length);
The streams accessed in this way are managed by the system-wide collection
of ProbeSessions; they are automatically disconnected at the end of
simulation time. The stream IDs and file names are not used directly;
the ProbeSessions attach a dot (".") and the integer ID of the local
timeline. In the following example, in a four-timeline model, four streams
are actually created behind the scenes:
ProtocolSession [
name probe use SSF.OS.ProbeSession
file "/tmp/mystream.dat"
stream "My Stream"
]
#
# Streams created: "My Stream.0" "My Stream.1" "My Stream.2" "My Stream.3"
#
# Files created: /tmp/mystream.dat.[0-3]
#
|
||||||||||
| PREV PACKAGE NEXT PACKAGE | FRAMES NO FRAMES | |||||||||