"""Publish and receive data using UDP sockets.
This module provides an interface for publishing data over UDP sockets. The
main objects responsible for transmitting data over UDP sockets are:
- :class:`~.udp.Connection`
- :class:`~.udp.RawBroadcaster`
- :class:`~.udp.RawListener`
Data are transmitted using IPv6 `multicasts
<http://en.wikipedia.org/wiki/Multicast>`_. Note that this module inherits the
advantages and disadvantages of UDP. That is; UDP allows connectionless,
low-latency broadcasts with no guarantee of delivery, ordering, or duplicate
protection.
Example usage:
.. testcode:: send-receive
import os
import time
from mcl.network.udp import Connection
from mcl.network.udp import RawListener
from mcl.network.udp import RawBroadcaster
# Create UDP connection.
connection = Connection('ff15::c73d:cf41:ea8e:b0a0')
# Create raw listener and broadcaster from IPv6 connection.
listener = RawListener(connection)
broadcaster = RawBroadcaster(connection)
# Print received data to screen.
listener.subscribe(lambda d: os.sys.stdout.write(d['payload']))
# Broadcast data.
broadcaster.publish('hello world')
time.sleep(0.1)
# Close connections.
listener.close()
broadcaster.close()
.. testoutput:: send-receive
:hide:
hello world
.. note::
To use IPv6 multicast, a connected network interface must exist on the
system. If no interface is available, linux systems can be configured to
work offline with the following:
.. code-block:: bash
sudo modprobe dummy
sudo ip -6 addr add fd34::1/64 dev dummy0
.. note::
It is advised to increase the UDP kernel buffer size:
http://lcm.googlecode.com/svn/www/reference/lcm/multicast.html
In linux, a temporary method (does not persist across reboots) of
increasing the UDP kernel buffer size to 2MB can be achieved by issuing:
.. code-block:: bash
sudo sysctl -w net.core.rmem_max=2097152
sudo sysctl -w net.core.rmem_default=2097152
A permanent solution is to add the following lines to
``/etc/sysctl.conf``::
net.core.rmem_max=2097152
net.core.rmem_default=2097152
.. warning::
A bug introduced into the `linux kernel
<https://github.com/torvalds/linux/commit/efe4208f47f907>`_ prevents IPv6
multicast packets from reaching the destination sockets under certain
conditions. This is believed to affect linux `kernels 3.13 to 3.15`. The
regression was `fixed
<https://github.com/torvalds/linux/commit/3bfdc59a6c24608ed23e903f670aaf5f58c7a6f3>`_
and should not be present in recent kernels.
.. codeauthor:: Asher Bender <a.bender@acfr.usyd.edu.au>
.. codeauthor:: Stewart Worrall <s.worrall@acfr.usyd.edu.au>
"""
import time
import select
import socket
import struct
import msgpack
import threading
import mcl.network.abstract
# Use a fixed port number for all UDP messages. Specify maximum transmission
# unit (MTU) to determine transmission fragmentation.
UDP_PORT = 26000
ALLOWED_MULTICAST_HOPS = 3
MTU = 60000
MTU_MAX = 65000
# Time in milliseconds to break out of I/O loop. This number determines the
# responsiveness of RawListeners to stop signals.
READ_TIMEOUT = 200
[docs]class RawBroadcaster(mcl.network.abstract.RawBroadcaster):
"""Send data over the network using a UDP socket.
The :class:`~.udp.RawBroadcaster` object allows data to be published over a
UDP socket. The object marshalls broadcasts and ensures large items will be
fragmented into smaller sub-packets which obey the network maximum
transmission unit (MTU) constraints.
Args:
connection (:class:`.Connection`): Connection object.
topic (str): Default topic associated with the IPv6 interface.
Attributes:
connection (:class:`.Connection`): Connection object.
topic (str): Default topic associated with the IPv6 interface.
is_open (bool): Return whether the UDP socket is open.
Raises:
TypeError: If any of the inputs are ill-specified.
"""
def __init__(self, connection, topic=None):
"""Document the __init__ method at the class level."""
# Ensure the connection object is properly specified.
if not isinstance(connection, Connection):
msg = "The argument 'connection' must be an instance of a "
msg += "UDP Connection()."
raise TypeError(msg)
# Attempt to initialise broadcaster base-class.
else:
try:
super(RawBroadcaster, self).__init__(connection, topic=topic)
except:
raise
# Create objects for handling UDP broadcasts.
self.__socket = None
self.__sockaddr = None
self.__is_open = False
# Attempt to connect to UDP interface.
success = self._open()
if not success:
msg = "Could not connect to '%s'." % str(self.connection)
raise IOError(msg)
@property
def is_open(self):
return self.__is_open
def _open(self):
"""Open connection to UDP broadcast interface.
Returns:
:class:`bool`: Returns :data:`True` if the socket was created. If
the socket already exists, the request is ignored and the
method returns :data:`False`.
"""
if not self.is_open:
# Fetch address information.
addrinfo = socket.getaddrinfo(self.connection.url, None)[0]
self.__sockaddr = (addrinfo[4][0], self.connection.port)
# Number of hops to allow.
self.__socket = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
# Set Time-to-live (optional).
ttl_message = struct.pack('@i', ALLOWED_MULTICAST_HOPS)
self.__socket.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_MULTICAST_HOPS,
ttl_message)
self.__is_open = True
return True
else:
return False
[docs] def publish(self, data, topic=None):
"""Send data over UDP interface.
Large data is fragmented into smaller MTU sized packets. The protocol
used during publishing is documented in :class:`~.udp.RawBroadcaster`.
Args:
data (obj): Serialisable object to broadcast over UDP.
topic (str): Topic associated with published data. This option will
temporarily override the topic specified during instantiation.
"""
# Note:
#
# Data is published as a msgpack-serialised tuple using the
# following protocol::
#
# (topic, payload)
#
# where:
#
# - Topic is a string representing the topic associated with
# the current data packet. This can be used for filtering
# broadcasts.
# - Payload is the transmitted data.
#
# If the msgpack-serialised tuple is larger than the MTU, the
# payload is serialised using msgpacked and split into multiple
# fragments. The payload is then transmitted by publishing several
# msgpack-serialised tuples using the following protocol::
#
# (topic, packet number, total packets, payload)
#
# where:
#
# - Packet number is an integer indicating that the current
# packet is the Nth packet out of M packets in a sequence.
# - Total packets is an integer indicating that the current
# packet is a member of a sequence of M packets.
#
# Note that 'payload' is a fragment of the serialised data. To
# remarshall the payload, the fragments must be joined in the
# correct order and unpacked.
#
if self.is_open:
# Validate input arguments.
try:
super(RawBroadcaster, self).publish(data, topic=topic)
if topic is None:
topic = self.topic
except:
raise
# Optimise for 'small' messages - assume data can be sent in a
# single packet.
packet = msgpack.dumps((topic, data))
# Send data in single packet.
if len(packet) <= MTU:
self.__socket.sendto(packet, self.__sockaddr)
# Fragment data into multiple packets.
else:
# Accept inefficiency of double-encoding for 'large' messages.
data = msgpack.dumps(data)
# Calculate number of MTU-sized packets required to send input
# data over the network.
#
# Note: Integer math is used to calculate the number of
# packets. This calculation does not account for the
# corner case the data is the same length as the MTU.
data_len = len(data)
packets = (data_len / MTU) + 1
for packet in range(packets):
start_ptr = packet * MTU
end_ptr = min(data_len, (packet + 1) * MTU)
fragment = msgpack.dumps((topic,
packet + 1,
packets,
data[start_ptr:end_ptr]))
self.__socket.sendto(fragment, self.__sockaddr)
else:
msg = 'Connection must be opened before publishing.'
raise IOError(msg)
[docs] def close(self):
"""Close connection to UDP broadcast interface.
Returns:
:class:`bool`: Returns :data:`True` if the socket was closed. If
the socket was already closed, the request is ignored and the
method returns :data:`False`.
"""
if self.is_open:
self.__socket.close()
self.__is_open = False
return True
else:
return False
[docs]class RawListener(mcl.network.abstract.RawListener):
"""Receive data from the network using a UDP socket.
The :class:`~.udp.RawListener` object subscribes to a UDP socket and issues
publish events when UDP data are received. Network data are made available
to other objects by issuing callbacks in the following format::
{'topic': str(),
'payload': obj()}
where:
- **<topic>** is a string containing the topic associated with the
received data.
- **<payload>** is the received (serialisable) data.
The :class:`~.udp.RawListener` marshalls broadcasts and ensures multiple,
fragmented packets will be recomposed into a single packet before issuing a
publish event.
.. note::
:class:`~.udp.RawListener` does not interpret the received data in
anyway. Code receiving the data must be aware of how to handle it. A
method for simplifying data handling is to pair a specific data type
with a unique network address. By adopting this paradigm, handling the
data is trivial if the network address is known.
Args:
connection (:class:`.Connection`): Connection object.
topics (str or list): Topics associated with the
:class:`~.udp.RawListener` interface.
Attributes:
connection (:class:`.Connection`): Connection object.
topics (str or list): Topics associated with the
:class:`~.udp.RawListener` interface.
is_open (bool): Return whether the UDP socket is open.
"""
def __init__(self, connection, topics=None):
"""Document the __init__ method at the class level."""
# Ensure the connection object is properly specified.
if not isinstance(connection, Connection):
msg = "The argument 'connection' must be an instance of a "
msg += "UDP Connection()."
raise TypeError(msg)
# Attempt to initialise listener base-class.
else:
try:
super(RawListener, self).__init__(connection, topics=topics)
except:
raise
# Number of messages to buffer.
self.__buffer_size = 5
# Create objects for handling received UDP messages.
self.__socket = None
self.__stop_event = None
self.__listen_thread = None
self.__is_open = False
# Attempt to connect to UDP interface.
success = self._open()
if not success:
msg = "Could not connect to '%s'." % str(self.connection)
raise IOError(msg)
@property
def is_open(self):
return self.__is_open
def _open(self):
"""Open connection to UDP receive interface.
Returns:
:class:`bool`: Returns :data:`True` if the socket was created. If
the socket already exists, the request is ignored and the
method returns :data:`False`.
"""
if not self.__is_open:
try:
# Fetch address information.
addrinfo = socket.getaddrinfo(self.connection.url, None)[0]
# Create socket.
self.__socket = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
# Set to non-blocking mode. In non-blocking mode, if a recv()
# call doesn't find any data, a error exception is raised.
self.__socket.setblocking(False)
# Allow multiple copies of this program on one machine (not
# strictly needed).
self.__socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# Join group.
group_name = socket.inet_pton(addrinfo[0], addrinfo[4][0])
group_addr = group_name + struct.pack('@I', 0)
self.__socket.setsockopt(socket.IPPROTO_IPV6,
socket.IPV6_JOIN_GROUP,
group_addr)
# Bind socket to the address/port.
self.__socket.bind((self.connection.url, self.connection.port))
# Register the socket with the select poller so that incoming
# data triggers an event.
self.__poller = select.poll()
self.__poller.register(self.__socket, select.POLLIN)
# Could not create socket. Raise return failure.
except:
return False
# Start servicing UDP data on a new thread.
self.__stop_event = threading.Event()
self.__stop_event.clear()
self.__listen_thread = threading.Thread(target=self.__read)
self.__listen_thread.daemon = True
self.__listen_thread.start()
# Wait for thread to start.
while not self.__listen_thread.is_alive:
time.sleep(0.1) # pragma: no cover
self.__is_open = True
return True
else:
return False
def __read(self):
"""Read data from UDP socket."""
# Create buffer for receiving fragmented data.
receive_buffer = dict()
# Poll UDP socket and publish data.
while not self.__stop_event.is_set():
# Wait for a data event in the socket.
events = self.__poller.poll(READ_TIMEOUT)
if events and events[0][1] & select.POLLIN:
# Read multiple packets from the socket.
socket_data = list()
while True:
try:
socket_data.append(self.__socket.recvfrom(MTU_MAX))
except:
break
# Remarshal and issue data to callbacks.
self.__remarshal(socket_data, receive_buffer)
else:
continue
# Close socket on exiting thread.
self.__socket.close()
def __remarshal(self, socket_data, receive_buffer):
for frame, sender in socket_data:
# Unpack frame of data.
try:
frame = msgpack.loads(frame)
# Data transmitted in single packets.
if len(frame) == 2:
complete = True
topic, payload = frame
# Data transmitted in multiple packets.
else:
complete = False
topic, packet, packets, payload = frame
except:
continue
# Topic filtering is enabled.
if self.topics:
# White list of topics is a string which does not match the
# current topic. Skip this data frame.
if (isinstance(self.topics, basestring) and
(topic != self.topics)):
continue
# White list of topics must be a list. The list does not
# contain the current topic. Skip this data frame.
elif topic not in self.topics:
continue
# Data fits into one frame. Publish data immediately.
if complete:
try:
self.__trigger__({'topic': topic,
'payload': payload})
continue
except Exception as e:
msg = '\nCould not service UDP recieve callback. The '
msg += 'following exception was raised:\n\n%s\n'
raise Exception(msg % e.message)
# Data fits into multiple frames. The code from this point forwards
# remarshalls the data frames into a single payload.
#
# Frames of a single message are stored in a dictionary buffer.
# Once all frames for a particular message have been received, they
# are recombined and published. Note that the frames can be
# received out of order. The process works by:
#
# 1) Assigning a unique identifier (key) to a message
#
# 2) If the identifier (key) does NOT exist in the dictionary
# buffer:
#
# - If the buffer is full. Drop the oldest incomplete
# message to prevent the buffer from accumulating a
# large history of incomplete messages (memory leak).
#
# - An empty list is associated with the new dictionary
# key. The list contains one element for each frame in
# the message.
#
# - The frame that was received is populated with data.
#
# 3) If the identifier DOES exist:
#
# - If the frame has not been populated with data, the
# frame that was received is populated with data.
#
# - If the frame HAS been populated with data, the
# identifier is not unique and is clobbering cached
# data. Clobber the 'stale' message by re-allocating
# memory and populate the frame that was received with
# data (in theory this should not happen).
#
# 4) If all frames for the message have been received, the
# frames are recombined and published.
# Assign a unique identifier to the sender of the data frame.
frame_identifier = (sender[0], packets, topic)
# The unique identifier does not exist in the buffer. Allocate
# space in the buffer.
if frame_identifier not in receive_buffer:
array = [None] * packets
receive_buffer[frame_identifier] = array
# The unique identifier exists in the buffer and new frame is
# overwriting an existing fragment. Re-allocate space on the stale
# fragment.
elif receive_buffer[frame_identifier][packet - 1]:
receive_buffer[frame_identifier] = [None] * packets
# Store fragment.
receive_buffer[frame_identifier][packet - 1] = payload
# Publish data when all fragments have been received.
if receive_buffer[frame_identifier].count(None) == 0:
# Combine fragments into one payload and publish.
payload = ''.join(receive_buffer[frame_identifier])
payload = msgpack.loads(payload)
self.__trigger__({'topic': topic,
'payload': payload})
# Free space in buffer.
del receive_buffer[frame_identifier]
[docs] def close(self):
"""Close connection to UDP receive interface.
Returns:
:class:`bool`: Returns :data:`True` if the socket was closed. If
the socket was already closed, the request is ignored and the
method returns :data:`False`.
"""
if self.is_open:
# Stop thread and wait for thread to terminate.
self.__stop_event.set()
self.__listen_thread.join()
# Close socket.
self.__socket.close()
self.__is_open = False
return True
else:
return False
[docs]class Connection(mcl.network.abstract.Connection):
"""Object for encapsulating UDP connection parameters.
Args:
url (str): IPv6 address of connection.
port (int): Port to use (between 1024 and 65535).
Attributes:
url (str): IPv6 address of connection.
port (int): Port used in connection.
Raises:
TypeError: If ``url`` is not a string or ``port`` is not an integer
between 1024 and 65536.
"""
mandatory = ('url',)
optional = {'port': UDP_PORT}
broadcaster = RawBroadcaster
listener = RawListener
def __init__(self, url, port=UDP_PORT):
# Check 'url' is a string.
if not isinstance(url, basestring):
msg = "'url' must be a string."
raise TypeError(msg)
# Check 'port' is a positive integer between 1024 and 65535. The port
# numbers in the range from 0 to 1023 are the well-known ports or
# system ports and are avoided.
if not isinstance(port, (int, long)):
msg = 'Port must be an integer value.'
raise TypeError(msg)
elif (port < 1024) or (port > 65535):
msg = 'The port must be a positive integer between 1024 and 65535.'
raise TypeError(msg)
super(Connection, self).__init__(url, port)