|  | #! python | 
|  | # | 
|  | # Backend for Silicon Labs CP2110/4 HID-to-UART devices. | 
|  | # | 
|  | # This file is part of pySerial. https://github.com/pyserial/pyserial | 
|  | # (C) 2001-2015 Chris Liechti <cliechti@gmx.net> | 
|  | # (C) 2019 Google LLC | 
|  | # | 
|  | # SPDX-License-Identifier:    BSD-3-Clause | 
|  |  | 
|  | # This backend implements support for HID-to-UART devices manufactured | 
|  | # by Silicon Labs and marketed as CP2110 and CP2114. The | 
|  | # implementation is (mostly) OS-independent and in userland. It relies | 
|  | # on cython-hidapi (https://github.com/trezor/cython-hidapi). | 
|  |  | 
|  | # The HID-to-UART protocol implemented by CP2110/4 is described in the | 
|  | # AN434 document from Silicon Labs: | 
|  | # https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf | 
|  |  | 
|  | # TODO items: | 
|  |  | 
|  | # - rtscts support is configured for hardware flow control, but the | 
|  | #   signaling is missing (AN434 suggests this is done through GPIO). | 
|  | # - Cancelling reads and writes is not supported. | 
|  | # - Baudrate validation is not implemented, as it depends on model and configuration. | 
|  |  | 
|  | import struct | 
|  | import threading | 
|  |  | 
|  | try: | 
|  | import urlparse | 
|  | except ImportError: | 
|  | import urllib.parse as urlparse | 
|  |  | 
|  | try: | 
|  | import Queue | 
|  | except ImportError: | 
|  | import queue as Queue | 
|  |  | 
|  | import hid  # hidapi | 
|  |  | 
|  | import serial | 
|  | from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout | 
|  |  | 
|  |  | 
|  | # Report IDs and related constant | 
|  | _REPORT_GETSET_UART_ENABLE = 0x41 | 
|  | _DISABLE_UART = 0x00 | 
|  | _ENABLE_UART = 0x01 | 
|  |  | 
|  | _REPORT_SET_PURGE_FIFOS = 0x43 | 
|  | _PURGE_TX_FIFO = 0x01 | 
|  | _PURGE_RX_FIFO = 0x02 | 
|  |  | 
|  | _REPORT_GETSET_UART_CONFIG = 0x50 | 
|  |  | 
|  | _REPORT_SET_TRANSMIT_LINE_BREAK = 0x51 | 
|  | _REPORT_SET_STOP_LINE_BREAK = 0x52 | 
|  |  | 
|  |  | 
|  | class Serial(SerialBase): | 
|  | # This is not quite correct. AN343 specifies that the minimum | 
|  | # baudrate is different between CP2110 and CP2114, and it's halved | 
|  | # when using non-8-bit symbols. | 
|  | BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200, | 
|  | 38400, 57600, 115200, 230400, 460800, 500000, 576000, | 
|  | 921600, 1000000) | 
|  |  | 
|  | def __init__(self, *args, **kwargs): | 
|  | self._hid_handle = None | 
|  | self._read_buffer = None | 
|  | self._thread = None | 
|  | super(Serial, self).__init__(*args, **kwargs) | 
|  |  | 
|  | def open(self): | 
|  | if self._port is None: | 
|  | raise SerialException("Port must be configured before it can be used.") | 
|  | if self.is_open: | 
|  | raise SerialException("Port is already open.") | 
|  |  | 
|  | self._read_buffer = Queue.Queue() | 
|  |  | 
|  | self._hid_handle = hid.device() | 
|  | try: | 
|  | portpath = self.from_url(self.portstr) | 
|  | self._hid_handle.open_path(portpath) | 
|  | except OSError as msg: | 
|  | raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg)) | 
|  |  | 
|  | try: | 
|  | self._reconfigure_port() | 
|  | except: | 
|  | try: | 
|  | self._hid_handle.close() | 
|  | except: | 
|  | pass | 
|  | self._hid_handle = None | 
|  | raise | 
|  | else: | 
|  | self.is_open = True | 
|  | self._thread = threading.Thread(target=self._hid_read_loop) | 
|  | self._thread.setDaemon(True) | 
|  | self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port)) | 
|  | self._thread.start() | 
|  |  | 
|  | def from_url(self, url): | 
|  | parts = urlparse.urlsplit(url) | 
|  | if parts.scheme != "cp2110": | 
|  | raise SerialException( | 
|  | 'expected a string in the forms ' | 
|  | '"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": ' | 
|  | 'not starting with cp2110:// {{!r}}'.format(parts.scheme)) | 
|  | if parts.netloc:  # cp2100://BUS:DEVICE:ENDPOINT, for libusb | 
|  | return parts.netloc.encode('utf-8') | 
|  | return parts.path.encode('utf-8') | 
|  |  | 
|  | def close(self): | 
|  | self.is_open = False | 
|  | if self._thread: | 
|  | self._thread.join(1)  # read timeout is 0.1 | 
|  | self._thread = None | 
|  | self._hid_handle.close() | 
|  | self._hid_handle = None | 
|  |  | 
|  | def _reconfigure_port(self): | 
|  | parity_value = None | 
|  | if self._parity == serial.PARITY_NONE: | 
|  | parity_value = 0x00 | 
|  | elif self._parity == serial.PARITY_ODD: | 
|  | parity_value = 0x01 | 
|  | elif self._parity == serial.PARITY_EVEN: | 
|  | parity_value = 0x02 | 
|  | elif self._parity == serial.PARITY_MARK: | 
|  | parity_value = 0x03 | 
|  | elif self._parity == serial.PARITY_SPACE: | 
|  | parity_value = 0x04 | 
|  | else: | 
|  | raise ValueError('Invalid parity: {!r}'.format(self._parity)) | 
|  |  | 
|  | if self.rtscts: | 
|  | flow_control_value = 0x01 | 
|  | else: | 
|  | flow_control_value = 0x00 | 
|  |  | 
|  | data_bits_value = None | 
|  | if self._bytesize == 5: | 
|  | data_bits_value = 0x00 | 
|  | elif self._bytesize == 6: | 
|  | data_bits_value = 0x01 | 
|  | elif self._bytesize == 7: | 
|  | data_bits_value = 0x02 | 
|  | elif self._bytesize == 8: | 
|  | data_bits_value = 0x03 | 
|  | else: | 
|  | raise ValueError('Invalid char len: {!r}'.format(self._bytesize)) | 
|  |  | 
|  | stop_bits_value = None | 
|  | if self._stopbits == serial.STOPBITS_ONE: | 
|  | stop_bits_value = 0x00 | 
|  | elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: | 
|  | stop_bits_value = 0x01 | 
|  | elif self._stopbits == serial.STOPBITS_TWO: | 
|  | stop_bits_value = 0x01 | 
|  | else: | 
|  | raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits)) | 
|  |  | 
|  | configuration_report = struct.pack( | 
|  | '>BLBBBB', | 
|  | _REPORT_GETSET_UART_CONFIG, | 
|  | self._baudrate, | 
|  | parity_value, | 
|  | flow_control_value, | 
|  | data_bits_value, | 
|  | stop_bits_value) | 
|  |  | 
|  | self._hid_handle.send_feature_report(configuration_report) | 
|  |  | 
|  | self._hid_handle.send_feature_report( | 
|  | bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART))) | 
|  | self._update_break_state() | 
|  |  | 
|  | @property | 
|  | def in_waiting(self): | 
|  | return self._read_buffer.qsize() | 
|  |  | 
|  | def reset_input_buffer(self): | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | self._hid_handle.send_feature_report( | 
|  | bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO))) | 
|  | # empty read buffer | 
|  | while self._read_buffer.qsize(): | 
|  | self._read_buffer.get(False) | 
|  |  | 
|  | def reset_output_buffer(self): | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | self._hid_handle.send_feature_report( | 
|  | bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO))) | 
|  |  | 
|  | def _update_break_state(self): | 
|  | if not self._hid_handle: | 
|  | raise PortNotOpenError() | 
|  |  | 
|  | if self._break_state: | 
|  | self._hid_handle.send_feature_report( | 
|  | bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0))) | 
|  | else: | 
|  | # Note that while AN434 states "There are no data bytes in | 
|  | # the payload other than the Report ID", either hidapi or | 
|  | # Linux does not seem to send the report otherwise. | 
|  | self._hid_handle.send_feature_report( | 
|  | bytes((_REPORT_SET_STOP_LINE_BREAK, 0))) | 
|  |  | 
|  | def read(self, size=1): | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  |  | 
|  | data = bytearray() | 
|  | try: | 
|  | timeout = Timeout(self._timeout) | 
|  | while len(data) < size: | 
|  | if self._thread is None: | 
|  | raise SerialException('connection failed (reader thread died)') | 
|  | buf = self._read_buffer.get(True, timeout.time_left()) | 
|  | if buf is None: | 
|  | return bytes(data) | 
|  | data += buf | 
|  | if timeout.expired(): | 
|  | break | 
|  | except Queue.Empty:  # -> timeout | 
|  | pass | 
|  | return bytes(data) | 
|  |  | 
|  | def write(self, data): | 
|  | if not self.is_open: | 
|  | raise PortNotOpenError() | 
|  | data = to_bytes(data) | 
|  | tx_len = len(data) | 
|  | while tx_len > 0: | 
|  | to_be_sent = min(tx_len, 0x3F) | 
|  | report = to_bytes([to_be_sent]) + data[:to_be_sent] | 
|  | self._hid_handle.write(report) | 
|  |  | 
|  | data = data[to_be_sent:] | 
|  | tx_len = len(data) | 
|  |  | 
|  | def _hid_read_loop(self): | 
|  | try: | 
|  | while self.is_open: | 
|  | data = self._hid_handle.read(64, timeout_ms=100) | 
|  | if not data: | 
|  | continue | 
|  | data_len = data.pop(0) | 
|  | assert data_len == len(data) | 
|  | self._read_buffer.put(bytearray(data)) | 
|  | finally: | 
|  | self._thread = None |