| #! python | 
 | # | 
 | # This module implements a loop back connection receiving itself what it sent. | 
 | # | 
 | # The purpose of this module is.. well... You can run the unit tests with it. | 
 | # and it was so easy to implement ;-) | 
 | # | 
 | # This file is part of pySerial. https://github.com/pyserial/pyserial | 
 | # (C) 2001-2020 Chris Liechti <cliechti@gmx.net> | 
 | # | 
 | # SPDX-License-Identifier:    BSD-3-Clause | 
 | # | 
 | # URL format:    loop://[option[/option...]] | 
 | # options: | 
 | # - "debug" print diagnostic messages | 
 | from __future__ import absolute_import | 
 |  | 
 | import logging | 
 | import numbers | 
 | import time | 
 | try: | 
 |     import urlparse | 
 | except ImportError: | 
 |     import urllib.parse as urlparse | 
 | try: | 
 |     import queue | 
 | except ImportError: | 
 |     import Queue as queue | 
 |  | 
 | from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError | 
 |  | 
 | # map log level names to constants. used in from_url() | 
 | LOGGER_LEVELS = { | 
 |     'debug': logging.DEBUG, | 
 |     'info': logging.INFO, | 
 |     'warning': logging.WARNING, | 
 |     'error': logging.ERROR, | 
 | } | 
 |  | 
 |  | 
 | class Serial(SerialBase): | 
 |     """Serial port implementation that simulates a loop back connection in plain software.""" | 
 |  | 
 |     BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, | 
 |                  9600, 19200, 38400, 57600, 115200) | 
 |  | 
 |     def __init__(self, *args, **kwargs): | 
 |         self.buffer_size = 4096 | 
 |         self.queue = None | 
 |         self.logger = None | 
 |         self._cancel_write = False | 
 |         super(Serial, self).__init__(*args, **kwargs) | 
 |  | 
 |     def open(self): | 
 |         """\ | 
 |         Open port with current settings. This may throw a SerialException | 
 |         if the port cannot be opened. | 
 |         """ | 
 |         if self.is_open: | 
 |             raise SerialException("Port is already open.") | 
 |         self.logger = None | 
 |         self.queue = queue.Queue(self.buffer_size) | 
 |  | 
 |         if self._port is None: | 
 |             raise SerialException("Port must be configured before it can be used.") | 
 |         # not that there is anything to open, but the function applies the | 
 |         # options found in the URL | 
 |         self.from_url(self.port) | 
 |  | 
 |         # not that there anything to configure... | 
 |         self._reconfigure_port() | 
 |         # all things set up get, now a clean start | 
 |         self.is_open = True | 
 |         if not self._dsrdtr: | 
 |             self._update_dtr_state() | 
 |         if not self._rtscts: | 
 |             self._update_rts_state() | 
 |         self.reset_input_buffer() | 
 |         self.reset_output_buffer() | 
 |  | 
 |     def close(self): | 
 |         if self.is_open: | 
 |             self.is_open = False | 
 |             try: | 
 |                 self.queue.put_nowait(None) | 
 |             except queue.Full: | 
 |                 pass | 
 |         super(Serial, self).close() | 
 |  | 
 |     def _reconfigure_port(self): | 
 |         """\ | 
 |         Set communication parameters on opened port. For the loop:// | 
 |         protocol all settings are ignored! | 
 |         """ | 
 |         # not that's it of any real use, but it helps in the unit tests | 
 |         if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32: | 
 |             raise ValueError("invalid baudrate: {!r}".format(self._baudrate)) | 
 |         if self.logger: | 
 |             self.logger.info('_reconfigure_port()') | 
 |  | 
 |     def from_url(self, url): | 
 |         """extract host and port from an URL string""" | 
 |         parts = urlparse.urlsplit(url) | 
 |         if parts.scheme != "loop": | 
 |             raise SerialException( | 
 |                 'expected a string in the form ' | 
 |                 '"loop://[?logging={debug|info|warning|error}]": not starting ' | 
 |                 'with loop:// ({!r})'.format(parts.scheme)) | 
 |         try: | 
 |             # process options now, directly altering self | 
 |             for option, values in urlparse.parse_qs(parts.query, True).items(): | 
 |                 if option == 'logging': | 
 |                     logging.basicConfig()   # XXX is that good to call it here? | 
 |                     self.logger = logging.getLogger('pySerial.loop') | 
 |                     self.logger.setLevel(LOGGER_LEVELS[values[0]]) | 
 |                     self.logger.debug('enabled logging') | 
 |                 else: | 
 |                     raise ValueError('unknown option: {!r}'.format(option)) | 
 |         except ValueError as e: | 
 |             raise SerialException( | 
 |                 'expected a string in the form ' | 
 |                 '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) | 
 |  | 
 |     #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - | 
 |  | 
 |     @property | 
 |     def in_waiting(self): | 
 |         """Return the number of bytes currently in the input buffer.""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             # attention the logged value can differ from return value in | 
 |             # threaded environments... | 
 |             self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize())) | 
 |         return self.queue.qsize() | 
 |  | 
 |     def read(self, size=1): | 
 |         """\ | 
 |         Read size bytes from the serial port. If a timeout is set it may | 
 |         return less characters as requested. With no timeout it will block | 
 |         until the requested number of bytes is read. | 
 |         """ | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self._timeout is not None and self._timeout != 0: | 
 |             timeout = time.time() + self._timeout | 
 |         else: | 
 |             timeout = None | 
 |         data = bytearray() | 
 |         while size > 0 and self.is_open: | 
 |             try: | 
 |                 b = self.queue.get(timeout=self._timeout)  # XXX inter char timeout | 
 |             except queue.Empty: | 
 |                 if self._timeout == 0: | 
 |                     break | 
 |             else: | 
 |                 if b is not None: | 
 |                     data += b | 
 |                     size -= 1 | 
 |                 else: | 
 |                     break | 
 |             # check for timeout now, after data has been read. | 
 |             # useful for timeout = 0 (non blocking) read | 
 |             if timeout and time.time() > timeout: | 
 |                 if self.logger: | 
 |                     self.logger.info('read timeout') | 
 |                 break | 
 |         return bytes(data) | 
 |  | 
 |     def cancel_read(self): | 
 |         self.queue.put_nowait(None) | 
 |  | 
 |     def cancel_write(self): | 
 |         self._cancel_write = True | 
 |  | 
 |     def write(self, data): | 
 |         """\ | 
 |         Output the given byte string over the serial port. Can block if the | 
 |         connection is blocked. May raise SerialException if the connection is | 
 |         closed. | 
 |         """ | 
 |         self._cancel_write = False | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         data = to_bytes(data) | 
 |         # calculate aprox time that would be used to send the data | 
 |         time_used_to_send = 10.0 * len(data) / self._baudrate | 
 |         # when a write timeout is configured check if we would be successful | 
 |         # (not sending anything, not even the part that would have time) | 
 |         if self._write_timeout is not None and time_used_to_send > self._write_timeout: | 
 |             # must wait so that unit test succeeds | 
 |             time_left = self._write_timeout | 
 |             while time_left > 0 and not self._cancel_write: | 
 |                 time.sleep(min(time_left, 0.5)) | 
 |                 time_left -= 0.5 | 
 |             if self._cancel_write: | 
 |                 return 0  # XXX | 
 |             raise SerialTimeoutException('Write timeout') | 
 |         for byte in iterbytes(data): | 
 |             self.queue.put(byte, timeout=self._write_timeout) | 
 |         return len(data) | 
 |  | 
 |     def reset_input_buffer(self): | 
 |         """Clear input buffer, discarding all that is in the buffer.""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             self.logger.info('reset_input_buffer()') | 
 |         try: | 
 |             while self.queue.qsize(): | 
 |                 self.queue.get_nowait() | 
 |         except queue.Empty: | 
 |             pass | 
 |  | 
 |     def reset_output_buffer(self): | 
 |         """\ | 
 |         Clear output buffer, aborting the current output and | 
 |         discarding all that is in the buffer. | 
 |         """ | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             self.logger.info('reset_output_buffer()') | 
 |         try: | 
 |             while self.queue.qsize(): | 
 |                 self.queue.get_nowait() | 
 |         except queue.Empty: | 
 |             pass | 
 |  | 
 |     @property | 
 |     def out_waiting(self): | 
 |         """Return how many bytes the in the outgoing buffer""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             # attention the logged value can differ from return value in | 
 |             # threaded environments... | 
 |             self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize())) | 
 |         return self.queue.qsize() | 
 |  | 
 |     def _update_break_state(self): | 
 |         """\ | 
 |         Set break: Controls TXD. When active, to transmitting is | 
 |         possible. | 
 |         """ | 
 |         if self.logger: | 
 |             self.logger.info('_update_break_state({!r})'.format(self._break_state)) | 
 |  | 
 |     def _update_rts_state(self): | 
 |         """Set terminal status line: Request To Send""" | 
 |         if self.logger: | 
 |             self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state)) | 
 |  | 
 |     def _update_dtr_state(self): | 
 |         """Set terminal status line: Data Terminal Ready""" | 
 |         if self.logger: | 
 |             self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state)) | 
 |  | 
 |     @property | 
 |     def cts(self): | 
 |         """Read terminal status line: Clear To Send""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state)) | 
 |         return self._rts_state | 
 |  | 
 |     @property | 
 |     def dsr(self): | 
 |         """Read terminal status line: Data Set Ready""" | 
 |         if self.logger: | 
 |             self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state)) | 
 |         return self._dtr_state | 
 |  | 
 |     @property | 
 |     def ri(self): | 
 |         """Read terminal status line: Ring Indicator""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             self.logger.info('returning dummy for RI') | 
 |         return False | 
 |  | 
 |     @property | 
 |     def cd(self): | 
 |         """Read terminal status line: Carrier Detect""" | 
 |         if not self.is_open: | 
 |             raise PortNotOpenError() | 
 |         if self.logger: | 
 |             self.logger.info('returning dummy for CD') | 
 |         return True | 
 |  | 
 |     # - - - platform specific - - - | 
 |     # None so far | 
 |  | 
 |  | 
 | # simple client test | 
 | if __name__ == '__main__': | 
 |     import sys | 
 |     s = Serial('loop://') | 
 |     sys.stdout.write('{}\n'.format(s)) | 
 |  | 
 |     sys.stdout.write("write...\n") | 
 |     s.write("hello\n") | 
 |     s.flush() | 
 |     sys.stdout.write("read: {!r}\n".format(s.read(5))) | 
 |  | 
 |     s.close() |