replay¶
Replay network data.
The network replay module provides methods and objects designed to replay logged network data.
The main object responsible for replaying network data is the Replay
object. The Replay object depends on the remaining objects:
Replay of network data works by launching a process to read data from log files
and insert them into a multiprocess queue. This is done using the
BufferData object which reads data from the log files using the
ReadDirectory object. The data is formatted as a dictionary using the
following fields:
{'elapsed_time: <float>,
'topic': <string>,
'payload': <dict or :class:`.Message`>}
- where:
- topic is the topic that was associated with the message.
- message: is the network message, delivered as a MCL
Messageobject.
A process is launched to read the data (stored as dictionaries) from the
multiprocess queue and broadcast them as if they were occurring in real-time.
This is done using the ScheduleBroadcasts object. The broadcasts are
scheduled such that they occur in time order and follow the timing specified by
elapsed_time. A summary of the NetworkReplay object is shown
below:
Log files Broadcasts
| ^
___________|_________________________________|___________
|___________|_________________________________|___________|
| | | |
| | Replay() | |
| ________V____________ _______________|________ |
| |_____________________| |________________________| |
| | | | | |
| | | | | |
| | BufferData() | | ScheduleBroadcasts() | |
| | | | | |
| |_____________________| |________________________| |
| | ^ |
| | | |
| | ----------------------------- | |
| -->| multiprocessing.Queue() |--- |
| ----------------------------- |
|_________________________________________________________|
Classes
-
class
BufferData(reader, length=5000)[source]¶ Asynchronously buffer historic data to a queue.
The
BufferDataasynchronously reads historic data and inserts each message into a multiprocessing.Queue:ReaderObject() BufferData.queue.get() | ^ ___________|_______________________________|__________ |___________|_______________________________|__________| | | | | | | BufferData() | | | ________V____________ | | | |_____________________| | | | | | | | | | Load data using | | | | | slow calls to | | | | | ReaderObject.read() | | | | |_____________________| | | | | | | | | | | | | --------------------------- | | | -->| multiprocessing.Queue() |--- | | --------------------------- | |______________________________________________________|The purpose of this object is to continually cache a small amount of data from a slow resource (e.g. hard-disk), to memory for faster access. It is designed to continually load data into a multiprocessing.Queue, such that the queue is always full.
Parameters: - reader (obj) – Data reader object.
- length (int) – Sets the upper-bound limit on the number of items that can be placed in the queue.
-
queue¶ -
Queue used to buffer data loaded from the log files.
-
length¶ int
Sets the upperbound limit on the number of items that can be placed in the queue.
Raises: TypeError– If any of the inputs are the wrong type.-
is_alive()[source]¶ Return whether data is being buffered.
Returns: Returns Trueif the object is buffering data. ReturnsFalseif the object is NOT buffering data.Return type: bool
-
is_data_pending()[source]¶ Return whether data is available for buffering.
Returns: Returns Trueif more data is available. If all data has been read and buffered,Falseis returned.Return type: bool
-
is_ready()[source]¶ Return whether the queue is full or all data has been read.
This property returns
Truewhen all data has been read from the source or if the queue is full - whichever condition is reached first. OtherwiseFalseis returned. Once set toTrue, the flag will not reset until thereset()method is called.This property can be used to delay processes that read from the buffer until the buffer is full.
Returns: Returns Truewhen all data has been read from the source or the queue is full otherwiseFalseis returned.Return type: bool
-
reset()[source]¶ Reset the log file read locations and flush the buffer.
This method is used to reset the buffering process and re-start buffering data from the beginning of the log files.
-
start()[source]¶ Start buffering logged data on a new process.
The
start()method starts buffering data from log files to a multiprocessing.Queue. Data is retrieved using aReadDirectoryobject. The format of data inserted into the queue is documented inReadDirectory.read().Note
This method will start buffering data to the end of the queue. If the process has been stopped (
stop()) and restarted, it will recommence from where it left off. To start buffering data from the beginning of the log files, the queue must be cleared by callingreset().Returns: Returns Trueif started buffering logged data. If the logged data is already being buffered, the request is ignored and the method returnsFalse.Return type: bool
-
stop()[source]¶ Stop buffering logged data.
The
stop()method stops data from being buffered to the queue. It can be used to pause buffering. The read location in the log files and items in the queue are not reset when this method is called.To reset the buffer and start buffering from the beginning, call the
reset()method.Returns: Returns Trueif stopped buffering logged data. If the logged data was not being buffered, the request is ignored and the method returnsFalse.Return type: bool
-
class
Replay(reader, speed=1.0)[source]¶ Re-broadcast historic data.
The
NetworkReplayobject replays MCL messages in real-time. To replay data from log files:# Initialise object and commence replay from files. from mcl.logging.network_dump_replay import NetworkReplay replay = NetworkReplay(source='./dataset/') replay.start() # Pause replay. replay.pause() # Stop replay (resets object to read data from beginning). replay.stop()
Parameters: - () (reader) – data reader
- speed (float) – Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
-
speed¶ float
Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
Raises: IOError– If the log directory does not exist.TypeError– If the any of the inputs are an incorrect type.
-
is_alive()[source]¶ Return whether the object is replaying data.
Returns: Returns Trueif the object is replaying data. ReturnsFalseif the object is NOT replaying data.Return type: bool
-
is_data_pending()[source]¶ Return whether data is available for buffering before replay.
Note: Replay can still occur once all data can be read and buffered.
Returns: Returns Trueif more data is available. If all data has been read and buffered,Falseis returned.Return type: bool
-
pause()[source]¶ Pause replay of data.
Returns: Returns Trueif replay was paused. If replay is inactive or already paused, the request is ignored and the method returnsFalse.Return type: bool
-
class
ScheduleBroadcasts(queue, speed=1.0)[source]¶ Re-broadcast messages in a queue on a new process.
The
ScheduleBroadcastsobject reads messages from a multiprocessing queue and rebroadcasts the data in simulated real-time (on a new process). If the hooks options is specified, the messages will not be re-broadcast over the network. Instead they will be replayed through the callbacks specified in the hooks option (on a new thread).Parameters: - queue (multiprocessing.Queue) – Queue used to load buffer messages.
- speed (float) – Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
-
queue¶ multiprocessing.Queue
Queue used to load buffer messages.
-
speed¶ float
Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
Raises: TypeError– If the any of the inputs are an incorrect type.-
is_alive()[source]¶ Return whether the messages are being broadcast from the queue.
Returns: Returns Trueif the object is broadcasting data. ReturnsFalseif the object is NOT broadcasting data.Return type: bool