blob: 67c700b95cc7926dc8350d20ac9209ef5fcafa91 [file] [log] [blame]
rjw2e8229f2022-02-15 21:08:12 +08001#! python
2#
3# This module implements a special URL handler that wraps an other port,
4# print the traffic for debugging purposes. With this, it is possible
5# to debug the serial port traffic on every application that uses
6# serial_for_url.
7#
8# This file is part of pySerial. https://github.com/pyserial/pyserial
9# (C) 2015 Chris Liechti <cliechti@gmx.net>
10#
11# SPDX-License-Identifier: BSD-3-Clause
12#
13# URL format: spy://port[?option[=value][&option[=value]]]
14# options:
15# - dev=X a file or device to write to
16# - color use escape code to colorize output
17# - raw forward raw bytes instead of hexdump
18#
19# example:
20# redirect output to an other terminal window on Posix (Linux):
21# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
22
23from __future__ import absolute_import
24
25import sys
26import time
27
28import serial
29from serial.serialutil import to_bytes
30
31try:
32 import urlparse
33except ImportError:
34 import urllib.parse as urlparse
35
36
37def sixteen(data):
38 """\
39 yield tuples of hex and ASCII display in multiples of 16. Includes a
40 space after 8 bytes and (None, None) after 16 bytes and at the end.
41 """
42 n = 0
43 for b in serial.iterbytes(data):
44 yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
45 n += 1
46 if n == 8:
47 yield (' ', '')
48 elif n >= 16:
49 yield (None, None)
50 n = 0
51 if n > 0:
52 while n < 16:
53 n += 1
54 if n == 8:
55 yield (' ', '')
56 yield (' ', ' ')
57 yield (None, None)
58
59
60def hexdump(data):
61 """yield lines with hexdump of data"""
62 values = []
63 ascii = []
64 offset = 0
65 for h, a in sixteen(data):
66 if h is None:
67 yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
68 del values[:]
69 del ascii[:]
70 offset += 0x10
71 else:
72 values.append(h)
73 ascii.append(a)
74
75
76class FormatRaw(object):
77 """Forward only RX and TX data to output."""
78
79 def __init__(self, output, color):
80 self.output = output
81 self.color = color
82 self.rx_color = '\x1b[32m'
83 self.tx_color = '\x1b[31m'
84
85 def rx(self, data):
86 """show received data"""
87 if self.color:
88 self.output.write(self.rx_color)
89 self.output.write(data)
90 self.output.flush()
91
92 def tx(self, data):
93 """show transmitted data"""
94 if self.color:
95 self.output.write(self.tx_color)
96 self.output.write(data)
97 self.output.flush()
98
99 def control(self, name, value):
100 """(do not) show control calls"""
101 pass
102
103
104class FormatHexdump(object):
105 """\
106 Create a hex dump of RX ad TX data, show when control lines are read or
107 written.
108
109 output example::
110
111 000000.000 Q-RX flushInput
112 000002.469 RTS inactive
113 000002.773 RTS active
114 000003.001 TX 48 45 4C 4C 4F HELLO
115 000003.102 RX 48 45 4C 4C 4F HELLO
116
117 """
118
119 def __init__(self, output, color):
120 self.start_time = time.time()
121 self.output = output
122 self.color = color
123 self.rx_color = '\x1b[32m'
124 self.tx_color = '\x1b[31m'
125 self.control_color = '\x1b[37m'
126
127 def write_line(self, timestamp, label, value, value2=''):
128 self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
129 self.output.flush()
130
131 def rx(self, data):
132 """show received data as hex dump"""
133 if self.color:
134 self.output.write(self.rx_color)
135 if data:
136 for offset, row in hexdump(data):
137 self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row)
138 else:
139 self.write_line(time.time() - self.start_time, 'RX', '<empty>')
140
141 def tx(self, data):
142 """show transmitted data as hex dump"""
143 if self.color:
144 self.output.write(self.tx_color)
145 for offset, row in hexdump(data):
146 self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row)
147
148 def control(self, name, value):
149 """show control calls"""
150 if self.color:
151 self.output.write(self.control_color)
152 self.write_line(time.time() - self.start_time, name, value)
153
154
155class Serial(serial.Serial):
156 """\
157 Inherit the native Serial port implementation and wrap all the methods and
158 attributes.
159 """
160 # pylint: disable=no-member
161
162 def __init__(self, *args, **kwargs):
163 super(Serial, self).__init__(*args, **kwargs)
164 self.formatter = None
165 self.show_all = False
166
167 @serial.Serial.port.setter
168 def port(self, value):
169 if value is not None:
170 serial.Serial.port.__set__(self, self.from_url(value))
171
172 def from_url(self, url):
173 """extract host and port from an URL string"""
174 parts = urlparse.urlsplit(url)
175 if parts.scheme != 'spy':
176 raise serial.SerialException(
177 'expected a string in the form '
178 '"spy://port[?option[=value][&option[=value]]]": '
179 'not starting with spy:// ({!r})'.format(parts.scheme))
180 # process options now, directly altering self
181 formatter = FormatHexdump
182 color = False
183 output = sys.stderr
184 try:
185 for option, values in urlparse.parse_qs(parts.query, True).items():
186 if option == 'file':
187 output = open(values[0], 'w')
188 elif option == 'color':
189 color = True
190 elif option == 'raw':
191 formatter = FormatRaw
192 elif option == 'all':
193 self.show_all = True
194 else:
195 raise ValueError('unknown option: {!r}'.format(option))
196 except ValueError as e:
197 raise serial.SerialException(
198 'expected a string in the form '
199 '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
200 self.formatter = formatter(output, color)
201 return ''.join([parts.netloc, parts.path])
202
203 def write(self, tx):
204 tx = to_bytes(tx)
205 self.formatter.tx(tx)
206 return super(Serial, self).write(tx)
207
208 def read(self, size=1):
209 rx = super(Serial, self).read(size)
210 if rx or self.show_all:
211 self.formatter.rx(rx)
212 return rx
213
214 if hasattr(serial.Serial, 'cancel_read'):
215 def cancel_read(self):
216 self.formatter.control('Q-RX', 'cancel_read')
217 super(Serial, self).cancel_read()
218
219 if hasattr(serial.Serial, 'cancel_write'):
220 def cancel_write(self):
221 self.formatter.control('Q-TX', 'cancel_write')
222 super(Serial, self).cancel_write()
223
224 @property
225 def in_waiting(self):
226 n = super(Serial, self).in_waiting
227 if self.show_all:
228 self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
229 return n
230
231 def flush(self):
232 self.formatter.control('Q-TX', 'flush')
233 super(Serial, self).flush()
234
235 def reset_input_buffer(self):
236 self.formatter.control('Q-RX', 'reset_input_buffer')
237 super(Serial, self).reset_input_buffer()
238
239 def reset_output_buffer(self):
240 self.formatter.control('Q-TX', 'reset_output_buffer')
241 super(Serial, self).reset_output_buffer()
242
243 def send_break(self, duration=0.25):
244 self.formatter.control('BRK', 'send_break {}s'.format(duration))
245 super(Serial, self).send_break(duration)
246
247 @serial.Serial.break_condition.setter
248 def break_condition(self, level):
249 self.formatter.control('BRK', 'active' if level else 'inactive')
250 serial.Serial.break_condition.__set__(self, level)
251
252 @serial.Serial.rts.setter
253 def rts(self, level):
254 self.formatter.control('RTS', 'active' if level else 'inactive')
255 serial.Serial.rts.__set__(self, level)
256
257 @serial.Serial.dtr.setter
258 def dtr(self, level):
259 self.formatter.control('DTR', 'active' if level else 'inactive')
260 serial.Serial.dtr.__set__(self, level)
261
262 @serial.Serial.cts.getter
263 def cts(self):
264 level = super(Serial, self).cts
265 self.formatter.control('CTS', 'active' if level else 'inactive')
266 return level
267
268 @serial.Serial.dsr.getter
269 def dsr(self):
270 level = super(Serial, self).dsr
271 self.formatter.control('DSR', 'active' if level else 'inactive')
272 return level
273
274 @serial.Serial.ri.getter
275 def ri(self):
276 level = super(Serial, self).ri
277 self.formatter.control('RI', 'active' if level else 'inactive')
278 return level
279
280 @serial.Serial.cd.getter
281 def cd(self):
282 level = super(Serial, self).cd
283 self.formatter.control('CD', 'active' if level else 'inactive')
284 return level
285
286# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
287if __name__ == '__main__':
288 ser = Serial(None)
289 ser.port = 'spy:///dev/ttyS0'
290 print(ser)