|  | #! python | 
|  | # | 
|  | # This module implements a loop back connection receiving itself what it sent. | 
|  | # | 
|  | # The purpose of this module is.. well... You can run the unit tests with it. | 
|  | # and it was so easy to implement ;-) | 
|  | # | 
|  | # This file is part of pySerial. https://github.com/pyserial/pyserial | 
|  | # (C) 2001-2020 Chris Liechti <cliechti@gmx.net> | 
|  | # | 
|  | # SPDX-License-Identifier:    BSD-3-Clause | 
|  | # | 
|  | # URL format:    loop://[option[/option...]] | 
|  | # options: | 
|  | # - "debug" print diagnostic messages | 
|  | from __future__ import absolute_import | 
|  |  | 
|  | import logging | 
|  | import numbers | 
|  | import time | 
|  | try: | 
|  | import urlparse | 
|  | except ImportError: | 
|  | import urllib.parse as urlparse | 
|  | try: | 
|  | import queue | 
|  | except ImportError: | 
|  | import Queue as queue | 
|  |  | 
|  | from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError | 
|  |  | 
|  | # map log level names to constants. used in from_url() | 
|  | LOGGER_LEVELS = { | 
|  | 'debug': logging.DEBUG, | 
|  | 'info': logging.INFO, | 
|  | 'warning': logging.WARNING, | 
|  | 'error': logging.ERROR, | 
|  | } | 
|  |  | 
|  |  | 
|  | class Serial(SerialBase): | 
|  | """Serial port implementation that simulates a loop back connection in plain software.""" | 
|  |  | 
|  | BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, | 
|  | 9600, 19200, 38400, 57600, 115200) | 
|  |  | 
|  | def __init__(self, *args, **kwargs): | 
|  | self.buffer_size = 4096 | 
|  | self.queue = None | 
|  | self.logger = None | 
|  | self._cancel_write = False | 
|  | super(Serial, self).__init__(*args, **kwargs) | 
|  |  | 
|  | def open(self): | 
|  | """\ | 
|  | Open port with current settings. This may throw a SerialException | 
|  | if the port cannot be opened. | 
|  | """ | 
|  | if self.is_open: | 
|  | raise SerialException("Port is already open.") | 
|  | self.logger = None | 
|  | self.queue = queue.Queue(self.buffer_size) | 
|  |  | 
|  | if self._port is None: | 
|  | raise SerialException("Port must be configured before it can be used.") | 
|  | # not that there is anything to open, but the function applies the | 
|  | # options found in the URL | 
|  | self.from_url(self.port) | 
|  |  | 
|  | # not that there anything to configure... | 
|  | self._reconfigure_port() | 
|  | # all things set up get, now a clean start | 
|  | self.is_open = True | 
|  | if not self._dsrdtr: | 
|  | self._update_dtr_state() | 
|  | if not self._rtscts: | 
|  | self._update_rts_state() | 
|  | self.reset_input_buffer() | 
|  | self.reset_output_buffer() | 
|  |  | 
|  | def close(self): | 
|  | if self.is_open: | 
|  | self.is_open = False | 
|  | try: | 
|  | self.queue.put_nowait(None) | 
|  | except queue.Full: | 
|  | pass | 
|  | super(Serial, self).close() | 
|  |  | 
|  | def _reconfigure_port(self): | 
|  | """\ | 
|  | Set communication parameters on opened port. For the loop:// | 
|  | protocol all settings are ignored! | 
|  | """ | 
|  | # not that's it of any real use, but it helps in the unit tests | 
|  | if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32: | 
|  | raise ValueError("invalid baudrate: {!r}".format(self._baudrate)) | 
|  | if self.logger: | 
|  | self.logger.info('_reconfigure_port()') | 
|  |  | 
|  | def from_url(self, url): | 
|  | """extract host and port from an URL string""" | 
|  | parts = urlparse.urlsplit(url) | 
|  | if parts.scheme != "loop": | 
|  | raise SerialException( | 
|  | 'expected a string in the form ' | 
|  | '"loop://[?logging={debug|info|warning|error}]": not starting ' | 
|  | 'with loop:// ({!r})'.format(parts.scheme)) | 
|  | try: | 
|  | # process options now, directly altering self | 
|  | for option, values in urlparse.parse_qs(parts.query, True).items(): | 
|  | if option == 'logging': | 
|  | logging.basicConfig()   # XXX is that good to call it here? | 
|  | self.logger = logging.getLogger('pySerial.loop') | 
|  | self.logger.setLevel(LOGGER_LEVELS[values[0]]) | 
|  | self.logger.debug('enabled logging') | 
|  | else: | 
|  | raise ValueError('unknown option: {!r}'.format(option)) | 
|  | except ValueError as e: | 
|  | raise SerialException( | 
|  | 'expected a string in the form ' | 
|  | '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) | 
|  |  | 
|  | #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - | 
|  |  | 
|  | @property | 
|  | def in_waiting(self): | 
|  | """Return the number of bytes currently in the input buffer.""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | # attention the logged value can differ from return value in | 
|  | # threaded environments... | 
|  | self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize())) | 
|  | return self.queue.qsize() | 
|  |  | 
|  | def read(self, size=1): | 
|  | """\ | 
|  | Read size bytes from the serial port. If a timeout is set it may | 
|  | return less characters as requested. With no timeout it will block | 
|  | until the requested number of bytes is read. | 
|  | """ | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self._timeout is not None and self._timeout != 0: | 
|  | timeout = time.time() + self._timeout | 
|  | else: | 
|  | timeout = None | 
|  | data = bytearray() | 
|  | while size > 0 and self.is_open: | 
|  | try: | 
|  | b = self.queue.get(timeout=self._timeout)  # XXX inter char timeout | 
|  | except queue.Empty: | 
|  | if self._timeout == 0: | 
|  | break | 
|  | else: | 
|  | if b is not None: | 
|  | data += b | 
|  | size -= 1 | 
|  | else: | 
|  | break | 
|  | # check for timeout now, after data has been read. | 
|  | # useful for timeout = 0 (non blocking) read | 
|  | if timeout and time.time() > timeout: | 
|  | if self.logger: | 
|  | self.logger.info('read timeout') | 
|  | break | 
|  | return bytes(data) | 
|  |  | 
|  | def cancel_read(self): | 
|  | self.queue.put_nowait(None) | 
|  |  | 
|  | def cancel_write(self): | 
|  | self._cancel_write = True | 
|  |  | 
|  | def write(self, data): | 
|  | """\ | 
|  | Output the given byte string over the serial port. Can block if the | 
|  | connection is blocked. May raise SerialException if the connection is | 
|  | closed. | 
|  | """ | 
|  | self._cancel_write = False | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | data = to_bytes(data) | 
|  | # calculate aprox time that would be used to send the data | 
|  | time_used_to_send = 10.0 * len(data) / self._baudrate | 
|  | # when a write timeout is configured check if we would be successful | 
|  | # (not sending anything, not even the part that would have time) | 
|  | if self._write_timeout is not None and time_used_to_send > self._write_timeout: | 
|  | # must wait so that unit test succeeds | 
|  | time_left = self._write_timeout | 
|  | while time_left > 0 and not self._cancel_write: | 
|  | time.sleep(min(time_left, 0.5)) | 
|  | time_left -= 0.5 | 
|  | if self._cancel_write: | 
|  | return 0  # XXX | 
|  | raise SerialTimeoutException('Write timeout') | 
|  | for byte in iterbytes(data): | 
|  | self.queue.put(byte, timeout=self._write_timeout) | 
|  | return len(data) | 
|  |  | 
|  | def reset_input_buffer(self): | 
|  | """Clear input buffer, discarding all that is in the buffer.""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | self.logger.info('reset_input_buffer()') | 
|  | try: | 
|  | while self.queue.qsize(): | 
|  | self.queue.get_nowait() | 
|  | except queue.Empty: | 
|  | pass | 
|  |  | 
|  | def reset_output_buffer(self): | 
|  | """\ | 
|  | Clear output buffer, aborting the current output and | 
|  | discarding all that is in the buffer. | 
|  | """ | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | self.logger.info('reset_output_buffer()') | 
|  | try: | 
|  | while self.queue.qsize(): | 
|  | self.queue.get_nowait() | 
|  | except queue.Empty: | 
|  | pass | 
|  |  | 
|  | @property | 
|  | def out_waiting(self): | 
|  | """Return how many bytes the in the outgoing buffer""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | # attention the logged value can differ from return value in | 
|  | # threaded environments... | 
|  | self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize())) | 
|  | return self.queue.qsize() | 
|  |  | 
|  | def _update_break_state(self): | 
|  | """\ | 
|  | Set break: Controls TXD. When active, to transmitting is | 
|  | possible. | 
|  | """ | 
|  | if self.logger: | 
|  | self.logger.info('_update_break_state({!r})'.format(self._break_state)) | 
|  |  | 
|  | def _update_rts_state(self): | 
|  | """Set terminal status line: Request To Send""" | 
|  | if self.logger: | 
|  | self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state)) | 
|  |  | 
|  | def _update_dtr_state(self): | 
|  | """Set terminal status line: Data Terminal Ready""" | 
|  | if self.logger: | 
|  | self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state)) | 
|  |  | 
|  | @property | 
|  | def cts(self): | 
|  | """Read terminal status line: Clear To Send""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state)) | 
|  | return self._rts_state | 
|  |  | 
|  | @property | 
|  | def dsr(self): | 
|  | """Read terminal status line: Data Set Ready""" | 
|  | if self.logger: | 
|  | self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state)) | 
|  | return self._dtr_state | 
|  |  | 
|  | @property | 
|  | def ri(self): | 
|  | """Read terminal status line: Ring Indicator""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | self.logger.info('returning dummy for RI') | 
|  | return False | 
|  |  | 
|  | @property | 
|  | def cd(self): | 
|  | """Read terminal status line: Carrier Detect""" | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | if self.logger: | 
|  | self.logger.info('returning dummy for CD') | 
|  | return True | 
|  |  | 
|  | # - - - platform specific - - - | 
|  | # None so far | 
|  |  | 
|  |  | 
|  | # simple client test | 
|  | if __name__ == '__main__': | 
|  | import sys | 
|  | s = Serial('loop://') | 
|  | sys.stdout.write('{}\n'.format(s)) | 
|  |  | 
|  | sys.stdout.write("write...\n") | 
|  | s.write("hello\n") | 
|  | s.flush() | 
|  | sys.stdout.write("read: {!r}\n".format(s.read(5))) | 
|  |  | 
|  | s.close() |