Source code for cnxman.serial

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
.. currentmodule:: cnxman.serial
.. moduleauthor:: Pat Daburu <pat@daburu.net>

Let's manage serial port connections!
"""

from .logging import loggable_class as loggable
from cnxman.basics import Connection
from enum import Enum
from pydispatch import dispatcher
import serial as pyserial
from serial import Serial, SerialException
import threading


[docs]class SerialListener(threading.Thread): """ This is a thread object that listens for incoming data from a serial connection. """
[docs] class Signals(Enum): """ These are the used by serial listener objects. :seealso: :py:func:`pydispatch.dispatcher` """ DATA_RECEIVED = 'data-received' # We received some data! Hooray! READ_ERROR = 'read-error' # We couldn't read from the connection.
[docs] def __init__(self, serial: pyserial.Serial): super().__init__() # Threads of this type run as daemons. self.daemon = True self._serial = serial # the serial connection we're monitoring self._terminate_event = threading.Event() # a threading event to tell us when its time to stop
@property def serial(self) -> pyserial.Serial: """ This is the serial object we're monitoring. :rtype: :py:class:`pyserial.Serial` """ return self._serial
[docs] def terminate(self): """ Terminate the listener. """ try: self._serial.close() except: pass # TODO: Log this properly. # Set the termination event. self._terminate_event.set()
[docs] def run(self): """ Start listening for data on the serial connection. """ # If the serial port hasn't been opened... if not self._serial.is_open: # ...let's try to do that now. try: self._serial.open() except: # TODO: Improve the exception handling! print("couldn't open the serial port.") # Let any interested parties know something went wrong. dispatcher.send(signal=SerialListener.Signals.READ_ERROR, sender=self) # We're finished now. self.terminate() return while not self._terminate_event.is_set(): try: data = self._serial.read() # Notify interested parties that we got something! dispatcher.send(signal=SerialListener.Signals.DATA_RECEIVED, sender=self, data=data) #print("got some data: ", data) except: # TODO: Improve the exception handling! print("an error occurred while we were reading!") # Any error results in immediate termination of the listener. self.terminate() # Let any interested parties know. dispatcher.send(signal=SerialListener.Signals.READ_ERROR, sender=self) # Bail out. return
[docs]@loggable() class SerialConnection(Connection):
[docs] class Signals(Enum): """ These are the used by serial listener objects. :seealso: :py:func:`pydispatch.dispatcher` """ DATA_RECEIVED = 'data-received' # We received some data! Hooray!
[docs] def __init__(self, port: str, baudrate: int=9600, bytesize: int = pyserial.EIGHTBITS, parity: str=pyserial.PARITY_NONE, stopbits: int=pyserial.STOPBITS_ONE, timeout=None): super().__init__() # Make copies of the port parameters so that we may construct serial ports. self._port = port # To what port are we connecting? self._baudrate = baudrate # What's the rate on the port? self._bytesize = bytesize # How much is a byte? self._parity = parity # What's the parity on the port? self._stopbits = stopbits # How many stop bits? self._timeout = timeout # What's the timeout interval. self._listener: SerialListener = None # the background thread serial monitor
[docs] def try_connect(self) -> bool: """ Attempt to connect to the serial port. :return: ``True`` if and only if the connection attempt is successful, otherwise ``False``. :rtype: ``bool`` """ # If we're already listening and everything is all right... if self._listener is not None and self._listener.serial.is_open: # ...there's nothing more to do here. return True try: serial = Serial(port=self._port, baudrate=self._baudrate, bytesize=self._bytesize, parity=self._parity, stopbits=self._stopbits, timeout=self._timeout) # If the serial connection didn't open automatically... if not serial.is_open: # ...open it now. serial.open() # So far so good. Set up a background thread. self._listener = SerialListener(serial=serial) # We want to be notified if the connection sends data. dispatcher.connect(self._handle_listener_data_received, signal=SerialListener.Signals.DATA_RECEIVED, sender=self._listener) # We want to be notified if there is a read error, also. dispatcher.connect(self._handle_listener_read_error, signal=SerialListener.Signals.READ_ERROR, sender=self._listener) self._listener.run() # If we got this far, the connection succeeded. return True except SerialException as sex: print("boom boom", sex) # TODO: Proper logging! return False
[docs] def disconnect(self): """ Disconnect from the serial port. """ if self._listener is not None: # Disconnect from any further signals sent by the listener. dispatcher.disconnect(dispatcher.Any, self._listener) # Stop the listener. self._listener.stop() self._listener = None
[docs] def teardown(self): """ Release the serial port entirely. """ self.disconnect()
def _handle_listener_data_received(self, data): print("The SerialConnection heard: ", data) def _handle_listener_read_error(self): # Raise the alarm! self.raise_alarm()