blob: b8940b6d3d18797a0e8d2f7bce017a07f740f6ec [file] [log] [blame]
rjw2e8229f2022-02-15 21:08:12 +08001#!/usr/bin/env python3
2#
3# Working with threading and pySerial
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C) 2015-2016 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier: BSD-3-Clause
9"""\
10Support threading with serial ports.
11"""
12from __future__ import absolute_import
13
14import serial
15import threading
16
17
18class Protocol(object):
19 """\
20 Protocol as used by the ReaderThread. This base class provides empty
21 implementations of all methods.
22 """
23
24 def connection_made(self, transport):
25 """Called when reader thread is started"""
26
27 def data_received(self, data):
28 """Called with snippets received from the serial port"""
29
30 def connection_lost(self, exc):
31 """\
32 Called when the serial port is closed or the reader loop terminated
33 otherwise.
34 """
35 if isinstance(exc, Exception):
36 raise exc
37
38
39class Packetizer(Protocol):
40 """
41 Read binary packets from serial port. Packets are expected to be terminated
42 with a TERMINATOR byte (null byte by default).
43
44 The class also keeps track of the transport.
45 """
46
47 TERMINATOR = b'\0'
48
49 def __init__(self):
50 self.buffer = bytearray()
51 self.transport = None
52
53 def connection_made(self, transport):
54 """Store transport"""
55 self.transport = transport
56
57 def connection_lost(self, exc):
58 """Forget transport"""
59 self.transport = None
60 super(Packetizer, self).connection_lost(exc)
61
62 def data_received(self, data):
63 """Buffer received data, find TERMINATOR, call handle_packet"""
64 self.buffer.extend(data)
65 while self.TERMINATOR in self.buffer:
66 packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
67 self.handle_packet(packet)
68
69 def handle_packet(self, packet):
70 """Process packets - to be overridden by subclassing"""
71 raise NotImplementedError('please implement functionality in handle_packet')
72
73
74class FramedPacket(Protocol):
75 """
76 Read binary packets. Packets are expected to have a start and stop marker.
77
78 The class also keeps track of the transport.
79 """
80
81 START = b'('
82 STOP = b')'
83
84 def __init__(self):
85 self.packet = bytearray()
86 self.in_packet = False
87 self.transport = None
88
89 def connection_made(self, transport):
90 """Store transport"""
91 self.transport = transport
92
93 def connection_lost(self, exc):
94 """Forget transport"""
95 self.transport = None
96 self.in_packet = False
97 del self.packet[:]
98 super(FramedPacket, self).connection_lost(exc)
99
100 def data_received(self, data):
101 """Find data enclosed in START/STOP, call handle_packet"""
102 for byte in serial.iterbytes(data):
103 if byte == self.START:
104 self.in_packet = True
105 elif byte == self.STOP:
106 self.in_packet = False
107 self.handle_packet(bytes(self.packet)) # make read-only copy
108 del self.packet[:]
109 elif self.in_packet:
110 self.packet.extend(byte)
111 else:
112 self.handle_out_of_packet_data(byte)
113
114 def handle_packet(self, packet):
115 """Process packets - to be overridden by subclassing"""
116 raise NotImplementedError('please implement functionality in handle_packet')
117
118 def handle_out_of_packet_data(self, data):
119 """Process data that is received outside of packets"""
120 pass
121
122
123class LineReader(Packetizer):
124 """
125 Read and write (Unicode) lines from/to serial port.
126 The encoding is applied.
127 """
128
129 TERMINATOR = b'\r\n'
130 ENCODING = 'utf-8'
131 UNICODE_HANDLING = 'replace'
132
133 def handle_packet(self, packet):
134 self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
135
136 def handle_line(self, line):
137 """Process one line - to be overridden by subclassing"""
138 raise NotImplementedError('please implement functionality in handle_line')
139
140 def write_line(self, text):
141 """
142 Write text to the transport. ``text`` is a Unicode string and the encoding
143 is applied before sending ans also the newline is append.
144 """
145 # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
146 self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
147
148
149class ReaderThread(threading.Thread):
150 """\
151 Implement a serial port read loop and dispatch to a Protocol instance (like
152 the asyncio.Protocol) but do it with threads.
153
154 Calls to close() will close the serial port but it is also possible to just
155 stop() this thread and continue the serial port instance otherwise.
156 """
157
158 def __init__(self, serial_instance, protocol_factory):
159 """\
160 Initialize thread.
161
162 Note that the serial_instance' timeout is set to one second!
163 Other settings are not changed.
164 """
165 super(ReaderThread, self).__init__()
166 self.daemon = True
167 self.serial = serial_instance
168 self.protocol_factory = protocol_factory
169 self.alive = True
170 self._lock = threading.Lock()
171 self._connection_made = threading.Event()
172 self.protocol = None
173
174 def stop(self):
175 """Stop the reader thread"""
176 self.alive = False
177 if hasattr(self.serial, 'cancel_read'):
178 self.serial.cancel_read()
179 self.join(2)
180
181 def run(self):
182 """Reader loop"""
183 if not hasattr(self.serial, 'cancel_read'):
184 self.serial.timeout = 1
185 self.protocol = self.protocol_factory()
186 try:
187 self.protocol.connection_made(self)
188 except Exception as e:
189 self.alive = False
190 self.protocol.connection_lost(e)
191 self._connection_made.set()
192 return
193 error = None
194 self._connection_made.set()
195 while self.alive and self.serial.is_open:
196 try:
197 # read all that is there or wait for one byte (blocking)
198 data = self.serial.read(self.serial.in_waiting or 1)
199 except serial.SerialException as e:
200 # probably some I/O problem such as disconnected USB serial
201 # adapters -> exit
202 error = e
203 break
204 else:
205 if data:
206 # make a separated try-except for called user code
207 try:
208 self.protocol.data_received(data)
209 except Exception as e:
210 error = e
211 break
212 self.alive = False
213 self.protocol.connection_lost(error)
214 self.protocol = None
215
216 def write(self, data):
217 """Thread safe writing (uses lock)"""
218 with self._lock:
219 return self.serial.write(data)
220
221 def close(self):
222 """Close the serial port and exit reader thread (uses lock)"""
223 # use the lock to let other threads finish writing
224 with self._lock:
225 # first stop reading, so that closing can be done on idle port
226 self.stop()
227 self.serial.close()
228
229 def connect(self):
230 """
231 Wait until connection is set up and return the transport and protocol
232 instances.
233 """
234 if self.alive:
235 self._connection_made.wait()
236 if not self.alive:
237 raise RuntimeError('connection_lost already called')
238 return (self, self.protocol)
239 else:
240 raise RuntimeError('already stopped')
241
242 # - - context manager, returns protocol
243
244 def __enter__(self):
245 """\
246 Enter context handler. May raise RuntimeError in case the connection
247 could not be created.
248 """
249 self.start()
250 self._connection_made.wait()
251 if not self.alive:
252 raise RuntimeError('connection_lost already called')
253 return self.protocol
254
255 def __exit__(self, exc_type, exc_val, exc_tb):
256 """Leave context: close port"""
257 self.close()
258
259
260# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
261# test
262if __name__ == '__main__':
263 # pylint: disable=wrong-import-position
264 import sys
265 import time
266 import traceback
267
268 #~ PORT = 'spy:///dev/ttyUSB0'
269 PORT = 'loop://'
270
271 class PrintLines(LineReader):
272 def connection_made(self, transport):
273 super(PrintLines, self).connection_made(transport)
274 sys.stdout.write('port opened\n')
275 self.write_line('hello world')
276
277 def handle_line(self, data):
278 sys.stdout.write('line received: {!r}\n'.format(data))
279
280 def connection_lost(self, exc):
281 if exc:
282 traceback.print_exc(exc)
283 sys.stdout.write('port closed\n')
284
285 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
286 with ReaderThread(ser, PrintLines) as protocol:
287 protocol.write_line('hello')
288 time.sleep(2)
289
290 # alternative usage
291 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
292 t = ReaderThread(ser, PrintLines)
293 t.start()
294 transport, protocol = t.connect()
295 protocol.write_line('hello')
296 time.sleep(2)
297 t.close()