| #!/usr/bin/env python3 |
| # |
| # Working with threading and pySerial |
| # |
| # This file is part of pySerial. https://github.com/pyserial/pyserial |
| # (C) 2015-2016 Chris Liechti <cliechti@gmx.net> |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| """\ |
| Support threading with serial ports. |
| """ |
| from __future__ import absolute_import |
| |
| import serial |
| import threading |
| |
| |
| class Protocol(object): |
| """\ |
| Protocol as used by the ReaderThread. This base class provides empty |
| implementations of all methods. |
| """ |
| |
| def connection_made(self, transport): |
| """Called when reader thread is started""" |
| |
| def data_received(self, data): |
| """Called with snippets received from the serial port""" |
| |
| def connection_lost(self, exc): |
| """\ |
| Called when the serial port is closed or the reader loop terminated |
| otherwise. |
| """ |
| if isinstance(exc, Exception): |
| raise exc |
| |
| |
| class Packetizer(Protocol): |
| """ |
| Read binary packets from serial port. Packets are expected to be terminated |
| with a TERMINATOR byte (null byte by default). |
| |
| The class also keeps track of the transport. |
| """ |
| |
| TERMINATOR = b'\0' |
| |
| def __init__(self): |
| self.buffer = bytearray() |
| self.transport = None |
| |
| def connection_made(self, transport): |
| """Store transport""" |
| self.transport = transport |
| |
| def connection_lost(self, exc): |
| """Forget transport""" |
| self.transport = None |
| super(Packetizer, self).connection_lost(exc) |
| |
| def data_received(self, data): |
| """Buffer received data, find TERMINATOR, call handle_packet""" |
| self.buffer.extend(data) |
| while self.TERMINATOR in self.buffer: |
| packet, self.buffer = self.buffer.split(self.TERMINATOR, 1) |
| self.handle_packet(packet) |
| |
| def handle_packet(self, packet): |
| """Process packets - to be overridden by subclassing""" |
| raise NotImplementedError('please implement functionality in handle_packet') |
| |
| |
| class FramedPacket(Protocol): |
| """ |
| Read binary packets. Packets are expected to have a start and stop marker. |
| |
| The class also keeps track of the transport. |
| """ |
| |
| START = b'(' |
| STOP = b')' |
| |
| def __init__(self): |
| self.packet = bytearray() |
| self.in_packet = False |
| self.transport = None |
| |
| def connection_made(self, transport): |
| """Store transport""" |
| self.transport = transport |
| |
| def connection_lost(self, exc): |
| """Forget transport""" |
| self.transport = None |
| self.in_packet = False |
| del self.packet[:] |
| super(FramedPacket, self).connection_lost(exc) |
| |
| def data_received(self, data): |
| """Find data enclosed in START/STOP, call handle_packet""" |
| for byte in serial.iterbytes(data): |
| if byte == self.START: |
| self.in_packet = True |
| elif byte == self.STOP: |
| self.in_packet = False |
| self.handle_packet(bytes(self.packet)) # make read-only copy |
| del self.packet[:] |
| elif self.in_packet: |
| self.packet.extend(byte) |
| else: |
| self.handle_out_of_packet_data(byte) |
| |
| def handle_packet(self, packet): |
| """Process packets - to be overridden by subclassing""" |
| raise NotImplementedError('please implement functionality in handle_packet') |
| |
| def handle_out_of_packet_data(self, data): |
| """Process data that is received outside of packets""" |
| pass |
| |
| |
| class LineReader(Packetizer): |
| """ |
| Read and write (Unicode) lines from/to serial port. |
| The encoding is applied. |
| """ |
| |
| TERMINATOR = b'\r\n' |
| ENCODING = 'utf-8' |
| UNICODE_HANDLING = 'replace' |
| |
| def handle_packet(self, packet): |
| self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING)) |
| |
| def handle_line(self, line): |
| """Process one line - to be overridden by subclassing""" |
| raise NotImplementedError('please implement functionality in handle_line') |
| |
| def write_line(self, text): |
| """ |
| Write text to the transport. ``text`` is a Unicode string and the encoding |
| is applied before sending ans also the newline is append. |
| """ |
| # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call |
| self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR) |
| |
| |
| class ReaderThread(threading.Thread): |
| """\ |
| Implement a serial port read loop and dispatch to a Protocol instance (like |
| the asyncio.Protocol) but do it with threads. |
| |
| Calls to close() will close the serial port but it is also possible to just |
| stop() this thread and continue the serial port instance otherwise. |
| """ |
| |
| def __init__(self, serial_instance, protocol_factory): |
| """\ |
| Initialize thread. |
| |
| Note that the serial_instance' timeout is set to one second! |
| Other settings are not changed. |
| """ |
| super(ReaderThread, self).__init__() |
| self.daemon = True |
| self.serial = serial_instance |
| self.protocol_factory = protocol_factory |
| self.alive = True |
| self._lock = threading.Lock() |
| self._connection_made = threading.Event() |
| self.protocol = None |
| |
| def stop(self): |
| """Stop the reader thread""" |
| self.alive = False |
| if hasattr(self.serial, 'cancel_read'): |
| self.serial.cancel_read() |
| self.join(2) |
| |
| def run(self): |
| """Reader loop""" |
| if not hasattr(self.serial, 'cancel_read'): |
| self.serial.timeout = 1 |
| self.protocol = self.protocol_factory() |
| try: |
| self.protocol.connection_made(self) |
| except Exception as e: |
| self.alive = False |
| self.protocol.connection_lost(e) |
| self._connection_made.set() |
| return |
| error = None |
| self._connection_made.set() |
| while self.alive and self.serial.is_open: |
| try: |
| # read all that is there or wait for one byte (blocking) |
| data = self.serial.read(self.serial.in_waiting or 1) |
| except serial.SerialException as e: |
| # probably some I/O problem such as disconnected USB serial |
| # adapters -> exit |
| error = e |
| break |
| else: |
| if data: |
| # make a separated try-except for called user code |
| try: |
| self.protocol.data_received(data) |
| except Exception as e: |
| error = e |
| break |
| self.alive = False |
| self.protocol.connection_lost(error) |
| self.protocol = None |
| |
| def write(self, data): |
| """Thread safe writing (uses lock)""" |
| with self._lock: |
| return self.serial.write(data) |
| |
| def close(self): |
| """Close the serial port and exit reader thread (uses lock)""" |
| # use the lock to let other threads finish writing |
| with self._lock: |
| # first stop reading, so that closing can be done on idle port |
| self.stop() |
| self.serial.close() |
| |
| def connect(self): |
| """ |
| Wait until connection is set up and return the transport and protocol |
| instances. |
| """ |
| if self.alive: |
| self._connection_made.wait() |
| if not self.alive: |
| raise RuntimeError('connection_lost already called') |
| return (self, self.protocol) |
| else: |
| raise RuntimeError('already stopped') |
| |
| # - - context manager, returns protocol |
| |
| def __enter__(self): |
| """\ |
| Enter context handler. May raise RuntimeError in case the connection |
| could not be created. |
| """ |
| self.start() |
| self._connection_made.wait() |
| if not self.alive: |
| raise RuntimeError('connection_lost already called') |
| return self.protocol |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """Leave context: close port""" |
| self.close() |
| |
| |
| # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - |
| # test |
| if __name__ == '__main__': |
| # pylint: disable=wrong-import-position |
| import sys |
| import time |
| import traceback |
| |
| #~ PORT = 'spy:///dev/ttyUSB0' |
| PORT = 'loop://' |
| |
| class PrintLines(LineReader): |
| def connection_made(self, transport): |
| super(PrintLines, self).connection_made(transport) |
| sys.stdout.write('port opened\n') |
| self.write_line('hello world') |
| |
| def handle_line(self, data): |
| sys.stdout.write('line received: {!r}\n'.format(data)) |
| |
| def connection_lost(self, exc): |
| if exc: |
| traceback.print_exc(exc) |
| sys.stdout.write('port closed\n') |
| |
| ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) |
| with ReaderThread(ser, PrintLines) as protocol: |
| protocol.write_line('hello') |
| time.sleep(2) |
| |
| # alternative usage |
| ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) |
| t = ReaderThread(ser, PrintLines) |
| t.start() |
| transport, protocol = t.connect() |
| protocol.write_line('hello') |
| time.sleep(2) |
| t.close() |