network

Generic interface for publishing and receiving data in MCL.

This module provides generic methods and objects for creating network broadcasters and listeners from Connection or Message objects. Since the details of interface connections are encapsulated by Connection objects, the specific implementation of broadcasters and listeners can be abstracted away. Broadcasting and receiving data from MCL networks can be handled by generic functions and objects.

The functions and objects provided in this module provide generic tools for interacting with MCL network interfaces. The only implementation specific objects that need to be used, when writing applications, are Connection objects (e.g. udp.Connection).

Broadcasters and listeners can be created using a Connection object with the following functions:

Example usage:

import os
import time
from mcl import RawListener
from mcl import RawBroadcaster
from mcl.network.udp import Connection

# Create raw listener and broadcaster from IPv6 connection.
URL = 'ff15::c75d:ce41:ea8e:a000'
connection = Connection(URL)
listener = RawListener(connection)
broadcaster = RawBroadcaster(connection)

# Print received data to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']))

# Broadcast data.
broadcaster.publish('hello world')
time.sleep(0.1)

# Close connections.
listener.close()
broadcaster.close()

Similarly, message broadcasters and listeners can be created using a Message object with the following objects:

Example usage:

import os
import time
from mcl import Message
from mcl import MessageListener
from mcl import MessageBroadcaster
from mcl.network.udp import Connection

# Define MCL message.
class ExampleMessage(Message):
    mandatory = ('text',)
    connection = Connection('ff15::c75d:ce41:ea8e:b000')

# Create raw listener and broadcaster from IPv6 connection.
listener = MessageListener(ExampleMessage)
broadcaster = MessageBroadcaster(ExampleMessage)

# Print received message to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text']))

# Broadcast message.
broadcaster.publish(ExampleMessage(text='hello world'))
time.sleep(0.1)

# Close connections.
listener.close()
broadcaster.close()

The object QueuedListener can operate as a RawListener() or a MessageListener depending on the input. This object differs from other listener objects by receiving network data on a separate process and inserting the data to a multi-processing queue. Callbacks are issued on a separate thread. The intention is to use a light-weight process for receiving data so as to achieve more accurate timing by avoiding restrictions of the GIL - resource allocation is left to the operating system. This object maintains the same interface as other listener objects.

Functions


RawBroadcaster(connection, topic=None)[source]

Return an object for sending data over a network interface.

Example usage:

from mcl import RawBroadcaster
from mcl.network.udp import Connection

# Create raw broadcaster from IPv6 connection.
URL = 'ff15::c75d:ce41:ea8e:0a00'
connection = Connection(URL)
broadcaster = RawBroadcaster(connection)

# Broadcast data.
broadcaster.publish('hello world')

# Close connection.
broadcaster.close()
Parameters:
  • connection (Connection) – Connection object.
  • topic (str) – Topic associated with the network interface.
connection

Connection

Connection object.

topic

str

Topic associated with the network interface broadcasts.

is_open

bool

Returns True if the network interface is open. Otherwise returns False.

Raises:TypeError – If any of the inputs are ill-specified.

RawListener(connection, topics=None)[source]

Return an object for receiving data over a network interface.

Objects returned by RawListener() make network data available to subscribers by issuing callbacks, when data arrives, in the following format:

{'topic': str,
 'payload': obj()}

where:

  • <topic> is a string containing the topic associated with the received data.
  • <payload> is the received (serialisable) data.

Example usage:

import os
from mcl import RawListener
from mcl.network.udp import Connection

# Create raw listener from IPv6 connection.
URL = 'ff15::c75d:ce41:ea8e:0b00'
connection = Connection(URL)
listener = RawListener(connection)

# Print received data to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']))

# Close connection.
listener.close()
Parameters:
  • connection (Connection) – Connection object.
  • topics (str or list) – Topics associated with the network interface represented as either a string or list of strings.
connection

Connection

Connection object.

topics

str or list

Topics associated with the network interface.

is_open

bool

Returns True if the network interface is open. Otherwise returns False.

Raises:TypeError – If any of the inputs are ill-specified.

Classes


class MessageBroadcaster[source]

Send messages over a network interface.

The MessageBroadcaster object is a factory which manufactures objects for broadcasting MCL Message objects over a network. The returned object overloads the publish() of a RawBroadcaster object to serialise the contents of a Message before transmission. Message pack is used to serialise Message objects into byte string.

Example usage:

from mcl import Message
from mcl import MessageBroadcaster
from mcl.network.udp import Connection

# Define MCL message.
class ExampleMessage(Message):
    mandatory = ('text',)
    connection = Connection('ff15::c75d:ce41:ea8e:00a0')

# Create message broadcaster from IPv6 connection.
broadcaster = MessageBroadcaster(ExampleMessage)

# Broadcast message.
broadcaster.publish(ExampleMessage(text='hello world'))

# Close connection.
broadcaster.close()

For a list of available methods and attributes in the returned object, see RawBroadcaster.

Parameters:
  • message (Message) – MCL message object.
  • topic (str) – Topic associated with the network interface.

class MessageListener[source]

Receive messages over a network interface.

The MessageListener object is a factory which manufactures objects for receiving MCL Message objects over a network. The returned object inherits from the RawListener class. When data is received, it is decoded into a Message object before an event is raised to forward the received data to subscribed callbacks in the following format:

{'topic': str,
 'payload': Message()}

where:

  • <topic> is a string containing the topic associated with the received data.
  • <payload> is the received Message object.

Example usage:

from mcl import Message
from mcl import MessageListener
from mcl.network.udp import Connection

# Define MCL message.
class ExampleMessage(Message):
    mandatory = ('text',)
    connection = Connection('ff15::c75d:ce41:ea8e:00b0')

# Create message listener from IPv6 connection.
listener = MessageListener(ExampleMessage)

# Print received message to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text']))

# Close connection.
listener.close()

For a list of available methods and attributes in the returned object, see RawListener.

Warning

MessageListener objects expect the transmitted data to be formatted as MCL Message objects. If the received data cannot be converted into a MCL Message, an exception will be raised on the I/O loop (thread) of the base RawListener. This will prevent exceptions from being raised on the main thread and messages from being published. If no messages are being received, check the connections, ensure the data is being formatted correctly prior to transmission and refer to any stack-traces being printed on stdout.

Parameters:
  • message (Message) – MCL message object.
  • topics (str) – List of strings containing topics MessageListener will receive and process.

class QueuedListener(connection, topics=None, open_init=True)[source]

Open a broadcast address and listen for data.

The QueuedListener object subscribes to a network broadcast and issues publish events when data is received.

The difference between this object and other network listeners is that network data is received on a separate process and written to a multi-processing queue. The intention is to use a light-weight process to achieve more accurate timing by avoiding the GIL. Resource allocation is left to the operating system.

A summary of the QueuedListener object is shown below:

         Data broadcast           Data republished
         (over network)           (local callbacks)
              |                            ^
 _____________|____________________________|______________
|_____________|____________________________|______________|
|             |                            |              |
|             |      QueuedListener()      |              |
|             |                            |              |
|     ________V_________           ________|_________     |
|    |__________________|         |__________________|    |
|    | Process          |         | Thread           |    |
|    |                  |         |                  |    |
|    |     Add data     |         |    Read data     |    |
|    |     to queue     |         |    from queue    |    |
|    |__________________|         |__________________|    |
|             |                            ^              |
|             v                            |              |
|             ------------------------------              |
|             |   multiprocessing.Queue()  |              |
|             ------------------------------              |
|_________________________________________________________|

If network data is handled immediately upon reception, long callbacks may cause data packets to be lost. By inserting data into a queue on a separate process, it is less likely data will be dropped. A separate thread can read buffered network data from the queue and issue lengthy callbacks with minimal impact to the reception process. If data is received faster than it can be processed the queue will grow.

Data are published as a dictionary in the following format:

{'topic': str(),
 'payload': obj(),
 'time_received': datetime}

where:

  • <topic> is a string representing the topic associated with the current data packet. This can be used for filtering broadcasts.
  • <payload> contains the contents of the data transmission.
  • <time_received> is a datetime object containing the time the data was received and queued.

Example usage emulating objects returned from RawListener():

import os
import time
from mcl import QueuedListener
from mcl import RawBroadcaster
from mcl.network.udp import Connection

# Define MCL connection.
URL = 'ff15::c75d:ce41:ea8e:00c0'
connection = Connection(URL)

# Print received data to screen.
listener = QueuedListener(connection)
broadcaster = RawBroadcaster(connection)
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']))

# Broadcast data.
broadcaster.publish('hello world')
time.sleep(0.1)

# Close connections.
listener.close()

Example usage emulating objects returned from MessageListener(). Note how the interface is the same as the previous example but only the input object has changed:

import os
import time
from mcl import Message
from mcl import QueuedListener
from mcl import MessageBroadcaster
from mcl.network.udp import Connection

# Define MCL message.
class ExampleMessage(Message):
    mandatory = ('text',)
    connection = Connection('ff15::c75d:ce41:ea8e:00c0')

# Print received message to screen.
listener = QueuedListener(ExampleMessage)
broadcaster = MessageBroadcaster(ExampleMessage)
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']['text']))

# Broadcast message.
broadcaster.publish(ExampleMessage(text='hello world'))
time.sleep(0.1)

# Close connections.
listener.close()
Parameters:
  • connection (Connection or Message) – an instance of a MCL connection object or a reference to a MCL message type.
  • topics (str or list) – Topics associated with the network interface represented as either a string or list of strings.
  • open_init (bool) – open connection immediately after initialisation.
close()[source]

Close connection to queued listener.

Returns:Returns True if the queued listener was closed. If the queued listener was already closed, the request is ignored and the method returns False.
Return type:bool
is_open()[source]

Return whether the object is listening for broadcasts.

Returns:Returns True if the object is listening for broadcasts. Returns False if the object is NOT listening for broadcast.
Return type:bool
open()[source]

Open connection to queued listener and start publishing broadcasts.

Returns:Returns True if a connection to the queued listener is opened. If the queued listener is already open, the request is ignored and the method returns False.
Return type:bool