network¶
Generic interface for publishing and receiving data in MCL.
This module provides generic methods and objects for creating network
broadcasters and listeners from Connection or
Message objects. Since the details of interface connections are
encapsulated by Connection objects, the specific
implementation of broadcasters and listeners can be abstracted
away. Broadcasting and receiving data from MCL networks can be handled by
generic functions and objects.
The functions and objects provided in this module provide generic tools for
interacting with MCL network interfaces. The only implementation specific
objects that need to be used, when writing applications, are
Connection objects (e.g. udp.Connection).
Broadcasters and listeners can be created using a
Connection object with the following functions:
Example usage:
import os
import time
from mcl import RawListener
from mcl import RawBroadcaster
from mcl.network.udp import Connection
# Create raw listener and broadcaster from IPv6 connection.
URL = 'ff15::c75d:ce41:ea8e:a000'
connection = Connection(URL)
listener = RawListener(connection)
broadcaster = RawBroadcaster(connection)
# Print received data to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']))
# Broadcast data.
broadcaster.publish('hello world')
time.sleep(0.1)
# Close connections.
listener.close()
broadcaster.close()
Similarly, message broadcasters and listeners can be created using a
Message object with the following objects:
Example usage:
import os
import time
from mcl import Message
from mcl import MessageListener
from mcl import MessageBroadcaster
from mcl.network.udp import Connection
# Define MCL message.
class ExampleMessage(Message):
mandatory = ('text',)
connection = Connection('ff15::c75d:ce41:ea8e:b000')
# Create raw listener and broadcaster from IPv6 connection.
listener = MessageListener(ExampleMessage)
broadcaster = MessageBroadcaster(ExampleMessage)
# Print received message to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text']))
# Broadcast message.
broadcaster.publish(ExampleMessage(text='hello world'))
time.sleep(0.1)
# Close connections.
listener.close()
broadcaster.close()
The object QueuedListener can operate as a RawListener() or a
MessageListener depending on the input. This object differs from
other listener objects by receiving network data on a separate process and
inserting the data to a multi-processing queue. Callbacks are issued on a
separate thread. The intention is to use a light-weight process for receiving
data so as to achieve more accurate timing by avoiding restrictions of the GIL
- resource allocation is left to the operating system. This object maintains
the same interface as other listener objects.
Functions
-
RawBroadcaster(connection, topic=None)[source]¶ Return an object for sending data over a network interface.
Example usage:
from mcl import RawBroadcaster from mcl.network.udp import Connection # Create raw broadcaster from IPv6 connection. URL = 'ff15::c75d:ce41:ea8e:0a00' connection = Connection(URL) broadcaster = RawBroadcaster(connection) # Broadcast data. broadcaster.publish('hello world') # Close connection. broadcaster.close()
Parameters: - connection (
Connection) – Connection object. - topic (str) – Topic associated with the network interface.
-
connection¶ -
Connection object.
-
topic¶ str
Topic associated with the network interface broadcasts.
Raises: TypeError– If any of the inputs are ill-specified.- connection (
-
RawListener(connection, topics=None)[source]¶ Return an object for receiving data over a network interface.
Objects returned by
RawListener()make network data available to subscribers by issuing callbacks, when data arrives, in the following format:{'topic': str, 'payload': obj()}
where:
- <topic> is a string containing the topic associated with the received data.
- <payload> is the received (serialisable) data.
Example usage:
import os from mcl import RawListener from mcl.network.udp import Connection # Create raw listener from IPv6 connection. URL = 'ff15::c75d:ce41:ea8e:0b00' connection = Connection(URL) listener = RawListener(connection) # Print received data to screen. listener.subscribe(lambda d: os.sys.stdout.write(d['payload'])) # Close connection. listener.close()
Parameters: - connection (
Connection) – Connection object. - topics (str or list) – Topics associated with the network interface represented as either a string or list of strings.
-
connection -
Connection object.
-
topics¶ str or list
Topics associated with the network interface.
Raises: TypeError– If any of the inputs are ill-specified.
Classes
-
class
MessageBroadcaster[source]¶ Send messages over a network interface.
The
MessageBroadcasterobject is a factory which manufactures objects for broadcasting MCLMessageobjects over a network. The returned object overloads thepublish()of aRawBroadcasterobject to serialise the contents of aMessagebefore transmission. Message pack is used to serialiseMessageobjects into byte string.Example usage:
from mcl import Message from mcl import MessageBroadcaster from mcl.network.udp import Connection # Define MCL message. class ExampleMessage(Message): mandatory = ('text',) connection = Connection('ff15::c75d:ce41:ea8e:00a0') # Create message broadcaster from IPv6 connection. broadcaster = MessageBroadcaster(ExampleMessage) # Broadcast message. broadcaster.publish(ExampleMessage(text='hello world')) # Close connection. broadcaster.close()
For a list of available methods and attributes in the returned object, see
RawBroadcaster.Parameters:
-
class
MessageListener[source]¶ Receive messages over a network interface.
The
MessageListenerobject is a factory which manufactures objects for receiving MCLMessageobjects over a network. The returned object inherits from theRawListenerclass. When data is received, it is decoded into aMessageobject before an event is raised to forward the received data to subscribed callbacks in the following format:{'topic': str, 'payload': Message()}
where:
- <topic> is a string containing the topic associated with the received data.
- <payload> is the received
Messageobject.
Example usage:
from mcl import Message from mcl import MessageListener from mcl.network.udp import Connection # Define MCL message. class ExampleMessage(Message): mandatory = ('text',) connection = Connection('ff15::c75d:ce41:ea8e:00b0') # Create message listener from IPv6 connection. listener = MessageListener(ExampleMessage) # Print received message to screen. listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text'])) # Close connection. listener.close()
For a list of available methods and attributes in the returned object, see
RawListener.Warning
MessageListenerobjects expect the transmitted data to be formatted as MCLMessageobjects. If the received data cannot be converted into a MCLMessage, an exception will be raised on the I/O loop (thread) of the baseRawListener. This will prevent exceptions from being raised on the main thread and messages from being published. If no messages are being received, check the connections, ensure the data is being formatted correctly prior to transmission and refer to any stack-traces being printed on stdout.Parameters: - message (
Message) – MCL message object. - topics (str) – List of strings containing topics
MessageListenerwill receive and process.
-
class
QueuedListener(connection, topics=None, open_init=True)[source]¶ Open a broadcast address and listen for data.
The
QueuedListenerobject subscribes to a network broadcast and issues publish events when data is received.The difference between this object and other network listeners is that network data is received on a separate process and written to a multi-processing queue. The intention is to use a light-weight process to achieve more accurate timing by avoiding the GIL. Resource allocation is left to the operating system.
A summary of the
QueuedListenerobject is shown below:Data broadcast Data republished (over network) (local callbacks) | ^ _____________|____________________________|______________ |_____________|____________________________|______________| | | | | | | QueuedListener() | | | | | | | ________V_________ ________|_________ | | |__________________| |__________________| | | | Process | | Thread | | | | | | | | | | Add data | | Read data | | | | to queue | | from queue | | | |__________________| |__________________| | | | ^ | | v | | | ------------------------------ | | | multiprocessing.Queue() | | | ------------------------------ | |_________________________________________________________|If network data is handled immediately upon reception, long callbacks may cause data packets to be lost. By inserting data into a queue on a separate process, it is less likely data will be dropped. A separate thread can read buffered network data from the queue and issue lengthy callbacks with minimal impact to the reception process. If data is received faster than it can be processed the queue will grow.
Data are published as a dictionary in the following format:
{'topic': str(), 'payload': obj(), 'time_received': datetime}
where:
- <topic> is a string representing the topic associated with the current data packet. This can be used for filtering broadcasts.
- <payload> contains the contents of the data transmission.
- <time_received> is a datetime object containing the time the data was received and queued.
Example usage emulating objects returned from
RawListener():import os import time from mcl import QueuedListener from mcl import RawBroadcaster from mcl.network.udp import Connection # Define MCL connection. URL = 'ff15::c75d:ce41:ea8e:00c0' connection = Connection(URL) # Print received data to screen. listener = QueuedListener(connection) broadcaster = RawBroadcaster(connection) listener.subscribe(lambda d: os.sys.stdout.write(d['payload'])) # Broadcast data. broadcaster.publish('hello world') time.sleep(0.1) # Close connections. listener.close()
Example usage emulating objects returned from
MessageListener(). Note how the interface is the same as the previous example but only the input object has changed:import os import time from mcl import Message from mcl import QueuedListener from mcl import MessageBroadcaster from mcl.network.udp import Connection # Define MCL message. class ExampleMessage(Message): mandatory = ('text',) connection = Connection('ff15::c75d:ce41:ea8e:00c0') # Print received message to screen. listener = QueuedListener(ExampleMessage) broadcaster = MessageBroadcaster(ExampleMessage) listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text'])) # Broadcast message. broadcaster.publish(ExampleMessage(text='hello world')) time.sleep(0.1) # Close connections. listener.close()
Parameters: - connection (
ConnectionorMessage) – an instance of a MCL connection object or a reference to a MCL message type. - topics (str or list) – Topics associated with the network interface represented as either a string or list of strings.
- open_init (bool) – open connection immediately after initialisation.
-
close()[source]¶ Close connection to queued listener.
Returns: Returns Trueif the queued listener was closed. If the queued listener was already closed, the request is ignored and the method returnsFalse.Return type: bool