replay

Replay network data.

The network replay module provides methods and objects designed to replay logged network data.

The main object responsible for replaying network data is the Replay object. The Replay object depends on the remaining objects:

Replay of network data works by launching a process to read data from log files and insert them into a multiprocess queue. This is done using the BufferData object which reads data from the log files using the ReadDirectory object. The data is formatted as a dictionary using the following fields:

{'elapsed_time: <float>,
 'topic': <string>,
 'payload': <dict or :class:`.Message`>}
where:
  • topic is the topic that was associated with the message.
  • message: is the network message, delivered as a MCL Message object.

A process is launched to read the data (stored as dictionaries) from the multiprocess queue and broadcast them as if they were occurring in real-time. This is done using the ScheduleBroadcasts object. The broadcasts are scheduled such that they occur in time order and follow the timing specified by elapsed_time. A summary of the NetworkReplay object is shown below:

        Log files                         Broadcasts
            |                                 ^
 ___________|_________________________________|___________
|___________|_________________________________|___________|
|           |                                 |           |
|           |           Replay()              |           |
|   ________V____________      _______________|________   |
|  |_____________________|    |________________________|  |
|  |                     |    |                        |  |
|  |                     |    |                        |  |
|  |     BufferData()    |    |  ScheduleBroadcasts()  |  |
|  |                     |    |                        |  |
|  |_____________________|    |________________________|  |
|           |                                 ^           |
|           |                                 |           |
|           |  -----------------------------  |           |
|           -->|  multiprocessing.Queue()  |---           |
|              -----------------------------              |
|_________________________________________________________|

Classes


class BufferData(reader, length=5000)[source]

Asynchronously buffer historic data to a queue.

The BufferData asynchronously reads historic data and inserts each message into a multiprocessing.Queue:

       ReaderObject()               BufferData.queue.get()
            |                               ^
 ___________|_______________________________|__________
|___________|_______________________________|__________|
|           |                               |          |
|           |          BufferData()         |          |
|   ________V____________                   |          |
|  |_____________________|                  |          |
|  |                     |                  |          |
|  |   Load data using   |                  |          |
|  |    slow calls to    |                  |          |
|  | ReaderObject.read() |                  |          |
|  |_____________________|                  |          |
|           |                               |          |
|           |                               |          |
|           |  ---------------------------  |          |
|           -->| multiprocessing.Queue() |---          |
|              ---------------------------             |
|______________________________________________________|

The purpose of this object is to continually cache a small amount of data from a slow resource (e.g. hard-disk), to memory for faster access. It is designed to continually load data into a multiprocessing.Queue, such that the queue is always full.

Parameters:
  • reader (obj) – Data reader object.
  • length (int) – Sets the upper-bound limit on the number of items that can be placed in the queue.
queue

multiprocessing.Queue

Queue used to buffer data loaded from the log files.

length

int

Sets the upperbound limit on the number of items that can be placed in the queue.

Raises:TypeError – If any of the inputs are the wrong type.
is_alive()[source]

Return whether data is being buffered.

Returns:Returns True if the object is buffering data. Returns False if the object is NOT buffering data.
Return type:bool
is_data_pending()[source]

Return whether data is available for buffering.

Returns:Returns True if more data is available. If all data has been read and buffered, False is returned.
Return type:bool
is_ready()[source]

Return whether the queue is full or all data has been read.

This property returns True when all data has been read from the source or if the queue is full - whichever condition is reached first. Otherwise False is returned. Once set to True, the flag will not reset until the reset() method is called.

This property can be used to delay processes that read from the buffer until the buffer is full.

Returns:Returns True when all data has been read from the source or the queue is full otherwise False is returned.
Return type:bool
reset()[source]

Reset the log file read locations and flush the buffer.

This method is used to reset the buffering process and re-start buffering data from the beginning of the log files.

start()[source]

Start buffering logged data on a new process.

The start() method starts buffering data from log files to a multiprocessing.Queue. Data is retrieved using a ReadDirectory object. The format of data inserted into the queue is documented in ReadDirectory.read().

Note

This method will start buffering data to the end of the queue. If the process has been stopped (stop()) and restarted, it will recommence from where it left off. To start buffering data from the beginning of the log files, the queue must be cleared by calling reset().

Returns:Returns True if started buffering logged data. If the logged data is already being buffered, the request is ignored and the method returns False.
Return type:bool
stop()[source]

Stop buffering logged data.

The stop() method stops data from being buffered to the queue. It can be used to pause buffering. The read location in the log files and items in the queue are not reset when this method is called.

To reset the buffer and start buffering from the beginning, call the reset() method.

Returns:Returns True if stopped buffering logged data. If the logged data was not being buffered, the request is ignored and the method returns False.
Return type:bool

class Replay(reader, speed=1.0)[source]

Re-broadcast historic data.

The NetworkReplay object replays MCL messages in real-time. To replay data from log files:

# Initialise object and commence replay from files.
from mcl.logging.network_dump_replay import NetworkReplay
replay = NetworkReplay(source='./dataset/')
replay.start()

# Pause replay.
replay.pause()

# Stop replay (resets object to read data from beginning).
replay.stop()
Parameters:
  • () (reader) – data reader
  • speed (float) – Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
speed

float

Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.

Raises:
  • IOError – If the log directory does not exist.
  • TypeError – If the any of the inputs are an incorrect type.
is_alive()[source]

Return whether the object is replaying data.

Returns:Returns True if the object is replaying data. Returns False if the object is NOT replaying data.
Return type:bool
is_data_pending()[source]

Return whether data is available for buffering before replay.

Note: Replay can still occur once all data can be read and buffered.

Returns:Returns True if more data is available. If all data has been read and buffered, False is returned.
Return type:bool
pause()[source]

Pause replay of data.

Returns:Returns True if replay was paused. If replay is inactive or already paused, the request is ignored and the method returns False.
Return type:bool
start()[source]

Start replaying data.

Returns:Returns True if started replaying data. If replay could not be started or is replay is currently active, the request is ignored and the method returns False.
Return type:bool
stop()[source]

Stop replaying data and reset to beginning.

Returns:Returns True if replay was stopped. If replay is inactive, the request is ignored and the method returns False.
Return type:bool

class ScheduleBroadcasts(queue, speed=1.0)[source]

Re-broadcast messages in a queue on a new process.

The ScheduleBroadcasts object reads messages from a multiprocessing queue and rebroadcasts the data in simulated real-time (on a new process). If the hooks options is specified, the messages will not be re-broadcast over the network. Instead they will be replayed through the callbacks specified in the hooks option (on a new thread).

Parameters:
  • queue (multiprocessing.Queue) – Queue used to load buffer messages.
  • speed (float) – Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.
queue

multiprocessing.Queue

Queue used to load buffer messages.

speed

float

Speed multiplier for data replay. Values greater than 1.0 will result in a faster than real-time playback. Values less than 1.0 will result in a slower than real-time playback.

Raises:TypeError – If the any of the inputs are an incorrect type.
is_alive()[source]

Return whether the messages are being broadcast from the queue.

Returns:Returns True if the object is broadcasting data. Returns False if the object is NOT broadcasting data.
Return type:bool
start()[source]

Start scheduling queued broadcasts on a new process.

Returns:Returns True if started scheduling broadcasts. If the broadcasts are already being scheduled, the request is ignored and the method returns False.
Return type:bool
stop()[source]

Stop scheduling queued broadcasts.

Returns:Returns True if scheduled broadcasts were stopped. If the broadcasts are not being scheduled, the request is ignored and the method returns False.
Return type:bool