Source code for mcl.messages.messages

"""Object specification for creating messages in MCL.

The :mod:`~.messages.messages` module provides a means for implementing MCL
message objects. This is done through the :class:`.Message` object.  Since
:class:`.Message` objects derive from python dictionaries, they operate near
identically.

:class:`.Message` objects are a specification of what structure of data is
being transmitted on a particular :class:`.abstract.Connection`. As a result
:class:`.Message` objects are defined by:

    - mandatory message attributes that must be present when instances of the
      new :class:`.Message` objects are created

    - a :class:`~.abstract.Connection` object instance specifying where the
      message can be broadcast and received

Creating MCL :class:`.Message` objects is simple and is demonstrated in the
following example:

.. testcode::

    from mcl import Message
    from mcl.network.udp import Connection as UdpConnection

    # Define a message.
    class ExampleMessage(Message):
        mandatory = ('text', )
        connection = UdpConnection('ff15::a')

    # Create instance of message.
    msg = ExampleMessage(text='hello world')
    print msg

    # Messages objects contain the a 'timestamp' key which records the UTC time
    # of when the message object was instantiated. To update the timestamp and
    # message attributes, use the update() method.
    msg.update(text="I'm a lumberjack")
    print msg

    # To update a message attribute without updating the timestamp, set it
    # directly.
    msg['text'] = 'Spam! Spam!'
    print msg

    # Serialise message into a msgpack binary string. Hex-ify the string for
    # demonstration and printability.
    print msg.encode().encode('hex')

    # The message can also be encoded as a JSON object.
    print msg.to_json()

.. testoutput::
   :hide:

   {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'hello world'}
   {'timestamp': ..., 'name': 'ExampleMessage', 'text': "I'm a lumberjack"}
   {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'Spam! Spam!'}
   ...

The following functions can be used to retrieve and manipulate
:class:`.Message` objects:

    - :func:`~.messages.get_message_objects` return :class:`.Message` object(s)
      from name(s)

    - :func:`~.messages.list_messages` list message objects derived from
      :class:`.Message`

    - :func:`~.messages.remove_message_object` de-register a :class:`.Message`
      object from the list of known messages

.. codeauthor:: Asher Bender <a.bender@acfr.usyd.edu.au>
.. codeauthor:: James Ward <j.ward@acfr.usyd.edu.au>
.. sectionauthor:: Asher Bender <a.bender@acfr.usyd.edu.au>

"""
import sets
import json
import time
import msgpack
import mcl.network.abstract


# Globally track Message() definitions. The meta-class _RegisterMeta() inserts
# Message() definitions into _MESSAGES when Message() objects are subclassed.
_MESSAGES = list()


class _MessageMeta(type):
    """Meta-class for manufacturing and globally registering Message() objects.

    The :class:`._MessageMeta` object is a meta-class designed to manufacture
    MCL :class:`.Message` classes. The meta-class works by dynamically adding
    mandatory attributes to a class definition at run time if and ONLY if the
    class inherits from :class:`.abstract.Connection`.

    Classes that inherit from :class:`.Message` must implement the `mandatory`
    and `connection` attributes where:

        - `mandatory` is a list of strings defining the names of mandatory
          message attributes that must be present when instances of the new
          :class:`.Message` objects are created. During instantiation the input
          list *args is mapped to the attributes defined by `mandatory`. If
          `mandatory` is not present, a :exc:`.TypeError` will be raised.

        - `connection` is an instance of a :class:`~.abstract.Connection`
          object specifying where the message can be broadcast and received.

    The meta-class also maintains a global register of :class:`.Message`
    sub-classes. :class:`.Message` sub-classes are added to the register when
    they are defined. During this process :class:`._MessageMeta` checks to see
    if a :class:`.Message` class with the same name has already been defined.

    Note that the list of :class:`.Message` sub-classes can be acquired by
    calling::

        messages = Message.__subclasses__()

    The reason the :class:`._MessageMeta` is preferred is that it can provide
    error checking at the time of definition. Note that sub-classes cannot
    easily be removed from the list returned by
    ``Message.__subclasses__()``. By using this meta-class, :class:`.Message`
    objects can be removed from the global register via other methods (see
    :func:`.remove_message_object`).

    Raises:
        TypeError: If a :class:`.Message` object with the same name already
            exists.
        TypeError: If the parent class is a :class:`.Message` object and
            `mandatory` is ill-specified.

    """

    def __new__(cls, name, bases, dct):
        """Manufacture a message class.

        Manufacture a Message class for objects inheriting from
        :class:`.Message`. This is done by searching the input dictionary `dct`
        for the keys `mandatory` and `connection` where:

            - `mandatory` is a list of strings defining the names of mandatory
              message attributes that must be present when instances of the new
              :class:`.Message` object are created. During instantiation the
              input list *args is mapped to the attributes defined by
              `mandatory`. If `mandatory` is not present, a :exc:`.TypeError`
              will be raised.

            - `connection` is an instance of a :class:`~.abstract.Connection`
              object specifying where the message can be broadcast and
              received.

        A new message class is manufactured using the definition specified by
        the attribute `mandatory`. The property 'mandatory' is attached to the
        returned class.

        Args:
          cls (class): is the class being instantiated.
          name (string): is the name of the new class.
          bases (tuple): base classes of the new class.
          dct (dict): dictionary mapping the class attribute names to objects.

        Returns:
            :class:`.Message`: sub-class of :class:`.Message` with mandatory
                attributes defined by the original `mandatory` attribute.

        Raises:
            NameError: If the `name` is message or a :class:`.Message` subclass
                with the same name already exists.
            TypeError: If the `mandatory` or `connection` attributes are
                ill-specified.
            ValueError: If the `mandatory` attribute contains the words
                `mandatory` or `connection`.

        """

        # Do not look for the mandatory attributes in the Message() base class.
        if (name == 'Message') and (bases == (dict,)):
            return super(_MessageMeta, cls).__new__(cls, name, bases, dct)

        # Do not look for the mandatory attributes in sub-classes of the
        # Message() base class.
        elif bases != (Message,):
            return super(_MessageMeta, cls).__new__(cls, name, bases, dct)

        # Cannot call messages 'Message'.
        if name == 'Message':
            raise NameError("Cannot name Message() subclasses 'Message'.")

        # Check that a message with the same name does not exist.
        elif name in [message.__name__ for message in _MESSAGES]:
            msg = "A Message() with the name '%s' already exists."
            raise NameError(msg % name)

        # Objects inheriting from Message() are required to have a 'mandatory'
        # and 'connection' attribute.
        mandatory = dct.get('mandatory', {})
        connection = dct.get('connection', None)

        # Ensure 'mandatory' is a list or tuple of strings.
        if ((not isinstance(mandatory, (list, tuple))) or
            (not all(isinstance(item, basestring) for item in mandatory))):
            msg = "'mandatory' must be a list or tuple or strings."
            raise TypeError(msg)

        # Ensure the connection object is properly specified.
        if not isinstance(connection, mcl.network.abstract.Connection):
            msg = "The argument 'connection' must be an instance of a "
            msg += "Connection() subclass."
            raise TypeError(msg)

        # Check that a message with the same connection does not exist.
        for message in _MESSAGES:
            if connection.to_dict() == message.connection.to_dict():
                msg = 'A Connection() with the same parameters already exists:'
                msg += ' %s' % str(connection)
                raise Exception(msg)

        # Detect duplicate attribute names.
        seen_attr = set()
        for attr in mandatory:
            if (attr == 'mandatory') or (attr == 'connection'):
                msg = "Field names cannot be 'mandatory' or 'connection'."
                raise ValueError(msg)
            if attr in seen_attr:
                raise ValueError('Encountered duplicate field name: %r' % attr)
            seen_attr.add(attr)

        # Add basic message attributes as read-only CLASS attributes. This is
        # done by dynamically manufacturing a meta-class with properties
        # returning the basic message attributes.
        metacls = type('%sMeta' % name, (cls,),
                       {'name': property(lambda cls: name),
                        'mandatory': property(lambda cls: mandatory),
                        'connection': property(lambda cls: connection)})

        # Add basic message attributes as read-only INSTANCE attributes. This
        # is done by adding properties that return the basic message attributes
        # to the manufactured class.
        del(dct['mandatory'])
        del(dct['connection'])
        dct['name'] = property(lambda cls: name)
        dct['mandatory'] = property(lambda cls: mandatory)
        dct['connection'] = property(lambda cls: connection)
        obj = super(_MessageMeta, cls).__new__(metacls, name, bases, dct)

        # Store message definition.
        _MESSAGES.append(obj)
        return obj


[docs]class Message(dict): """Base class for MCL message objects. The :class:`.Message` object provides a base class for defining MCL message objects. Objects inheriting from :class:`.Message` must implement the attribute `mandatory` where: - `mandatory` is a list of strings defining the names of mandatory connection parameters that must be present when instances of the new :class:`~.abstract.Connection` object are created. If `mandatory` is not present, a TypeError will be raised. These attributes define a message format and allow :class:`.Message` to manufacture a message class adhering to the specified definition. Raises: TypeError: If any of the input argument are invalid. """ __metaclass__ = _MessageMeta def __init__(self, *args, **kwargs): # If no inputs were passed into the constructor, initialise the object # with empty fields. if not args and not kwargs: empty = [None] * len(self.mandatory) kwargs = dict(zip(self.mandatory, empty)) # Initialise message object with items. super(Message, self).__init__() self.update(*args, **kwargs) # Ensure the message adheres to specification. if not sets.Set(self.keys()).issuperset(sets.Set(self.mandatory)): msg = "'%s' must have the following items: [" % self['name'] msg += ', '.join(self.mandatory) msg += '].' raise TypeError(msg) def __setitem__(self, key, value): """Set an item to a new value. Prevent write access to the keys 'name'. """ # Prevent write access to Message name. if key == 'name' and key in self: msg = "The key value '%s' in '%s' is read-only." raise ValueError(msg % (key, self.__class__.__name__)) # All other items can be accessed normally. else: super(Message, self).__setitem__(key, value) def __set_time(self): """Update the CPU time-stamp in milliseconds from UTC epoch. """ # Return the time in seconds since the epoch as a floating point # number. Note that even though the time is always returned as a # floating point number, not all systems provide time with a better # precision than 1 second. While this function normally returns # non-decreasing values, it can return a lower value than a previous # call if the system clock has been set back between the two calls. # # Note: The datetime documentation claims datetime.datetime.now() # supplies more precision than can be gotten from time.time() # timestamp if possible. To simplify the code # # From: # https://docs.python.org/2/library/time.html#time.time # https://docs.python.org/2/library/datetime.html#datetime.datetime.now # super(Message, self).__setitem__('timestamp', time.time())
[docs] def to_json(self): """Return the contents of the message as a JSON string. Returns: str: JSON formatted representation of the message contents. """ return json.dumps(self)
[docs] def encode(self): """Return the contents of the message as serialised binary msgpack data. Returns: str: serialised binary msgpack representation of the message contents. """ return msgpack.dumps(self)
def __decode(self, data): """Unpack msgpack serialised binary data. Args: data (str): msgpack serialised message data. Returns: dict: unpacked message contents. Raises: TypeError: If the input binary data could not be unpacked. """ try: dct = msgpack.loads(data) # The transmitted object is a dictionary. if type(dct) is dict: # Check if mandatory attributes are missing. missing = sets.Set(self.mandatory) - sets.Set(dct.keys()) # Decode was successful. if not missing: return dct # Transmitted object is missing mandatory fields. else: msg = 'The transmitted object was missing the following ' msg += 'mandatory items: [' + ', '.join(missing) + '].' # Transmitted object was decoded but is not a dictionary. else: msg = "Serialised object is of type '%s' and not a dictionary." msg = msg % str(type(dct)) # Decoding was unsuccessful. except Exception as e: msg = "Could not unpack message. Error encountered:\n\n%s" % str(e) # Raise error encountered during unpacking. raise TypeError(msg)
[docs] def update(self, *args, **kwargs): """Update message contents with new values. Update message contents from an optional positional argument and/or a set of keyword arguments. If a positional argument is given and it is a serialised binary msgpack representation of the message contents, it is unpacked and used to update the contents of the message. .. testcode:: serialised = ExampleMessage(text='hello world') print ExampleMessage(serialised) .. testoutput:: :hide: {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'hello world'} If a positional argument is given and it is a mapping object, the message is updated with the same key-value pairs as the mapping object. .. testcode:: print ExampleMessage({'text': 'hello world'}) .. testoutput:: :hide: {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'hello world'} If the positional argument is an iterable object. Each item in the iterable must itself be an iterable with exactly two objects. The first object of each item becomes a key in the new dictionary, and the second object the corresponding value. If a key occurs more than once, the last value for that key becomes the corresponding value in the message. .. testcode:: print ExampleMessage(zip(('text',), ('hello world',))) .. testoutput:: :hide: {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'hello world'} If keyword arguments are given, the keyword arguments and their values are used to update the contents of the message .. testcode:: print ExampleMessage(text='hello world') .. testoutput:: :hide: {'timestamp': ..., 'name': 'ExampleMessage', 'text': 'hello world'} If the key 'timestamp' is present in the input, the timestamp of the message is set to the input value. If no 'timestamp' value is specified, the CPU time-stamp, in milliseconds from UTC epoch, at the end of the update is recorded. Args: *args (list): positional arguments *kwargs (dict): keyword arguments. Raises: TypeError: If the message contents could not be updated. """ # Set the default timestamp to None. If it is updated by the passed in # arguments, we won't update it automatically. if 'timestamp' not in self: self['timestamp'] = None original_time = self['timestamp'] if len(args) > 1: msg = 'Input argument must be a msgpack serialised dictionary, ' msg += 'a mapping object or iterable object.' raise TypeError(msg) # Update message with a serialised dictionary: # # msg.update(binary) # if (len(args) == 1) and (type(args[0]) is str): super(Message, self).update(self.__decode(args[0])) return # Update message with a dictionary (and keyword arguments): # # msg.update(one=1, two=2, three=3) # msg.update(zip(['one', 'two', 'three'], [1, 2, 3])) # msg.update([('two', 2), ('one', 1), ('three', 3)]) # msg.update({'three': 3, 'one': 1, 'two': 2}) # else: try: super(Message, self).update(*args, **kwargs) except Exception as e: msg = "Could not update message. Error encountered:\n\n%s" raise TypeError(msg % str(e)) # Populate the name key with the message name. if 'name' not in self: super(Message, self).__setitem__('name', self.__class__.__name__) # The name parameter was modified. elif self['name'] != self.__class__.__name__: msg = "Attempted to set the read-only key value %s['%s'] = '%s'." raise ValueError(msg % (self.__class__.__name__, 'name', self['name'])) # Record the time of update if the 'timestamp' field was not # specified. By checking for changes to the 'timestamp' field, users # can set null values (None) or falsy values (a timestamp of 0). if self['timestamp'] == original_time: self.__set_time()
[docs]def remove_message_object(name): """De-register a :class:`.Message` object from the list of known messages. Args: name (string): Name of the :class:`.Message` object to de-register. Returns: bool: :data:`.True` if the :class:`.Message` object was de-registered. :data:`.False` if the :class:`.Message` object does not exist. """ # Create name of available messages. names = [msg.__name__ for msg in _MESSAGES] # The message exists, remove it from the list. if name in names: index = names.index(name) del _MESSAGES[index] return True # The message does not exist. No action required. else: return False
[docs]def list_messages(include=None, exclude=None): """List objects derived from :class:`.Message`. Args: include (list): list of message object names to include. exclude (list): list of message object names to exclude. Returns: list: a list of message objects derived from :class:`.Message` is returned. """ # Save includes. if isinstance(include, basestring): include = [include, ] elif include is not None: if ((not hasattr(include, '__iter__')) or (not all([isinstance(itm, basestring) for itm in include]))): msg = "'include' must be a string or a list of strings.'" raise TypeError(msg) # Save excludes. if isinstance(exclude, basestring): exclude = [exclude, ] elif exclude is not None: if ((not hasattr(exclude, '__iter__')) or (not all([isinstance(itm, basestring) for itm in exclude]))): msg = "'exclude' must be a string or a list of strings.'" raise TypeError(msg) # Filter available messages. messages = list() for message in _MESSAGES: # Do not include messages in the black list. if exclude and message.name in exclude: continue # Only include messages in the white list (if it exists). if include and message.name not in include: continue messages.append(message) return messages
[docs]def get_message_objects(names): """Return :class:`.Message` object(s) from name(s). Args: name (:obj:`python:string` or :obj:`python:list`): The name (as a string) of a single message object to retrieve. To retrieve multiple message objects, input a list containing the object names. Returns: Message or list: If a single message object is requested (string input), the requested py:class:`.Message` is returned. If multiple message objects are requested (list input), a list of message objects is returned. Raises: TypeError: If `names` is not a string or list/tuple of strings. NameError: If `names` does not exist or multiple message objects are found. """ # Input is a string. if isinstance(names, basestring): # Create name of available messages. messages = [(msg, msg.__name__) for msg in _MESSAGES] # Cache messages with a matching name. matches = list() for message in messages: if message[1] == names: matches.append(message) # Message does not exist. if len(matches) == 0: raise NameError("Could locate the message named: '%s'." % names) # Multiple messages with the same name exist. elif len(matches) > 1: msg = "Multiple messages named '%s' found including:\n" % names for message in matches: msg += ' %s.%s\n' % (message[0].__module__, message[1]) raise NameError(msg) # Return unique message. return matches[0][0] # Input is a list or tuple. elif ((isinstance(names, (list, tuple))) and (all([isinstance(itm, basestring) for itm in names]))): messages = list() for name in names: try: messages.append(get_message_objects(name)) except: raise return messages # Invalid input type. else: msg = "The input 'names' must be a string or a list/tuple of strings." raise TypeError(msg)