blob: 2aeebfc746396b6bbf1d8b272d2ee25c15621f73 [file] [log] [blame]
rjw2e8229f2022-02-15 21:08:12 +08001#! python
2#
3# This module implements a loop back connection receiving itself what it sent.
4#
5# The purpose of this module is.. well... You can run the unit tests with it.
6# and it was so easy to implement ;-)
7#
8# This file is part of pySerial. https://github.com/pyserial/pyserial
9# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
10#
11# SPDX-License-Identifier: BSD-3-Clause
12#
13# URL format: loop://[option[/option...]]
14# options:
15# - "debug" print diagnostic messages
16from __future__ import absolute_import
17
18import logging
19import numbers
20import time
21try:
22 import urlparse
23except ImportError:
24 import urllib.parse as urlparse
25try:
26 import queue
27except ImportError:
28 import Queue as queue
29
30from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError
31
32# map log level names to constants. used in from_url()
33LOGGER_LEVELS = {
34 'debug': logging.DEBUG,
35 'info': logging.INFO,
36 'warning': logging.WARNING,
37 'error': logging.ERROR,
38}
39
40
41class Serial(SerialBase):
42 """Serial port implementation that simulates a loop back connection in plain software."""
43
44 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
45 9600, 19200, 38400, 57600, 115200)
46
47 def __init__(self, *args, **kwargs):
48 self.buffer_size = 4096
49 self.queue = None
50 self.logger = None
51 self._cancel_write = False
52 super(Serial, self).__init__(*args, **kwargs)
53
54 def open(self):
55 """\
56 Open port with current settings. This may throw a SerialException
57 if the port cannot be opened.
58 """
59 if self.is_open:
60 raise SerialException("Port is already open.")
61 self.logger = None
62 self.queue = queue.Queue(self.buffer_size)
63
64 if self._port is None:
65 raise SerialException("Port must be configured before it can be used.")
66 # not that there is anything to open, but the function applies the
67 # options found in the URL
68 self.from_url(self.port)
69
70 # not that there anything to configure...
71 self._reconfigure_port()
72 # all things set up get, now a clean start
73 self.is_open = True
74 if not self._dsrdtr:
75 self._update_dtr_state()
76 if not self._rtscts:
77 self._update_rts_state()
78 self.reset_input_buffer()
79 self.reset_output_buffer()
80
81 def close(self):
82 if self.is_open:
83 self.is_open = False
84 try:
85 self.queue.put_nowait(None)
86 except queue.Full:
87 pass
88 super(Serial, self).close()
89
90 def _reconfigure_port(self):
91 """\
92 Set communication parameters on opened port. For the loop://
93 protocol all settings are ignored!
94 """
95 # not that's it of any real use, but it helps in the unit tests
96 if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32:
97 raise ValueError("invalid baudrate: {!r}".format(self._baudrate))
98 if self.logger:
99 self.logger.info('_reconfigure_port()')
100
101 def from_url(self, url):
102 """extract host and port from an URL string"""
103 parts = urlparse.urlsplit(url)
104 if parts.scheme != "loop":
105 raise SerialException(
106 'expected a string in the form '
107 '"loop://[?logging={debug|info|warning|error}]": not starting '
108 'with loop:// ({!r})'.format(parts.scheme))
109 try:
110 # process options now, directly altering self
111 for option, values in urlparse.parse_qs(parts.query, True).items():
112 if option == 'logging':
113 logging.basicConfig() # XXX is that good to call it here?
114 self.logger = logging.getLogger('pySerial.loop')
115 self.logger.setLevel(LOGGER_LEVELS[values[0]])
116 self.logger.debug('enabled logging')
117 else:
118 raise ValueError('unknown option: {!r}'.format(option))
119 except ValueError as e:
120 raise SerialException(
121 'expected a string in the form '
122 '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
123
124 # - - - - - - - - - - - - - - - - - - - - - - - -
125
126 @property
127 def in_waiting(self):
128 """Return the number of bytes currently in the input buffer."""
129 if not self.is_open:
130 raise PortNotOpenError()
131 if self.logger:
132 # attention the logged value can differ from return value in
133 # threaded environments...
134 self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize()))
135 return self.queue.qsize()
136
137 def read(self, size=1):
138 """\
139 Read size bytes from the serial port. If a timeout is set it may
140 return less characters as requested. With no timeout it will block
141 until the requested number of bytes is read.
142 """
143 if not self.is_open:
144 raise PortNotOpenError()
145 if self._timeout is not None and self._timeout != 0:
146 timeout = time.time() + self._timeout
147 else:
148 timeout = None
149 data = bytearray()
150 while size > 0 and self.is_open:
151 try:
152 b = self.queue.get(timeout=self._timeout) # XXX inter char timeout
153 except queue.Empty:
154 if self._timeout == 0:
155 break
156 else:
157 if b is not None:
158 data += b
159 size -= 1
160 else:
161 break
162 # check for timeout now, after data has been read.
163 # useful for timeout = 0 (non blocking) read
164 if timeout and time.time() > timeout:
165 if self.logger:
166 self.logger.info('read timeout')
167 break
168 return bytes(data)
169
170 def cancel_read(self):
171 self.queue.put_nowait(None)
172
173 def cancel_write(self):
174 self._cancel_write = True
175
176 def write(self, data):
177 """\
178 Output the given byte string over the serial port. Can block if the
179 connection is blocked. May raise SerialException if the connection is
180 closed.
181 """
182 self._cancel_write = False
183 if not self.is_open:
184 raise PortNotOpenError()
185 data = to_bytes(data)
186 # calculate aprox time that would be used to send the data
187 time_used_to_send = 10.0 * len(data) / self._baudrate
188 # when a write timeout is configured check if we would be successful
189 # (not sending anything, not even the part that would have time)
190 if self._write_timeout is not None and time_used_to_send > self._write_timeout:
191 # must wait so that unit test succeeds
192 time_left = self._write_timeout
193 while time_left > 0 and not self._cancel_write:
194 time.sleep(min(time_left, 0.5))
195 time_left -= 0.5
196 if self._cancel_write:
197 return 0 # XXX
198 raise SerialTimeoutException('Write timeout')
199 for byte in iterbytes(data):
200 self.queue.put(byte, timeout=self._write_timeout)
201 return len(data)
202
203 def reset_input_buffer(self):
204 """Clear input buffer, discarding all that is in the buffer."""
205 if not self.is_open:
206 raise PortNotOpenError()
207 if self.logger:
208 self.logger.info('reset_input_buffer()')
209 try:
210 while self.queue.qsize():
211 self.queue.get_nowait()
212 except queue.Empty:
213 pass
214
215 def reset_output_buffer(self):
216 """\
217 Clear output buffer, aborting the current output and
218 discarding all that is in the buffer.
219 """
220 if not self.is_open:
221 raise PortNotOpenError()
222 if self.logger:
223 self.logger.info('reset_output_buffer()')
224 try:
225 while self.queue.qsize():
226 self.queue.get_nowait()
227 except queue.Empty:
228 pass
229
230 @property
231 def out_waiting(self):
232 """Return how many bytes the in the outgoing buffer"""
233 if not self.is_open:
234 raise PortNotOpenError()
235 if self.logger:
236 # attention the logged value can differ from return value in
237 # threaded environments...
238 self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize()))
239 return self.queue.qsize()
240
241 def _update_break_state(self):
242 """\
243 Set break: Controls TXD. When active, to transmitting is
244 possible.
245 """
246 if self.logger:
247 self.logger.info('_update_break_state({!r})'.format(self._break_state))
248
249 def _update_rts_state(self):
250 """Set terminal status line: Request To Send"""
251 if self.logger:
252 self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state))
253
254 def _update_dtr_state(self):
255 """Set terminal status line: Data Terminal Ready"""
256 if self.logger:
257 self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state))
258
259 @property
260 def cts(self):
261 """Read terminal status line: Clear To Send"""
262 if not self.is_open:
263 raise PortNotOpenError()
264 if self.logger:
265 self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state))
266 return self._rts_state
267
268 @property
269 def dsr(self):
270 """Read terminal status line: Data Set Ready"""
271 if self.logger:
272 self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state))
273 return self._dtr_state
274
275 @property
276 def ri(self):
277 """Read terminal status line: Ring Indicator"""
278 if not self.is_open:
279 raise PortNotOpenError()
280 if self.logger:
281 self.logger.info('returning dummy for RI')
282 return False
283
284 @property
285 def cd(self):
286 """Read terminal status line: Carrier Detect"""
287 if not self.is_open:
288 raise PortNotOpenError()
289 if self.logger:
290 self.logger.info('returning dummy for CD')
291 return True
292
293 # - - - platform specific - - -
294 # None so far
295
296
297# simple client test
298if __name__ == '__main__':
299 import sys
300 s = Serial('loop://')
301 sys.stdout.write('{}\n'.format(s))
302
303 sys.stdout.write("write...\n")
304 s.write("hello\n")
305 s.flush()
306 sys.stdout.write("read: {!r}\n".format(s.read(5)))
307
308 s.close()