|  | #!/usr/bin/env python | 
|  | # | 
|  | # Very simple serial terminal | 
|  | # | 
|  | # This file is part of pySerial. https://github.com/pyserial/pyserial | 
|  | # (C)2002-2020 Chris Liechti <cliechti@gmx.net> | 
|  | # | 
|  | # SPDX-License-Identifier:    BSD-3-Clause | 
|  |  | 
|  | from __future__ import absolute_import | 
|  |  | 
|  | import codecs | 
|  | import os | 
|  | import sys | 
|  | import threading | 
|  |  | 
|  | import serial | 
|  | from serial.tools.list_ports import comports | 
|  | from serial.tools import hexlify_codec | 
|  |  | 
|  | # pylint: disable=wrong-import-order,wrong-import-position | 
|  |  | 
|  | codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None) | 
|  |  | 
|  | try: | 
|  | raw_input | 
|  | except NameError: | 
|  | # pylint: disable=redefined-builtin,invalid-name | 
|  | raw_input = input   # in python3 it's "raw" | 
|  | unichr = chr | 
|  |  | 
|  |  | 
|  | def key_description(character): | 
|  | """generate a readable description for a key""" | 
|  | ascii_code = ord(character) | 
|  | if ascii_code < 32: | 
|  | return 'Ctrl+{:c}'.format(ord('@') + ascii_code) | 
|  | else: | 
|  | return repr(character) | 
|  |  | 
|  |  | 
|  | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | 
|  | class ConsoleBase(object): | 
|  | """OS abstraction for console (input/output codec, no echo)""" | 
|  |  | 
|  | def __init__(self): | 
|  | if sys.version_info >= (3, 0): | 
|  | self.byte_output = sys.stdout.buffer | 
|  | else: | 
|  | self.byte_output = sys.stdout | 
|  | self.output = sys.stdout | 
|  |  | 
|  | def setup(self): | 
|  | """Set console to read single characters, no echo""" | 
|  |  | 
|  | def cleanup(self): | 
|  | """Restore default console settings""" | 
|  |  | 
|  | def getkey(self): | 
|  | """Read a single key from the console""" | 
|  | return None | 
|  |  | 
|  | def write_bytes(self, byte_string): | 
|  | """Write bytes (already encoded)""" | 
|  | self.byte_output.write(byte_string) | 
|  | self.byte_output.flush() | 
|  |  | 
|  | def write(self, text): | 
|  | """Write string""" | 
|  | self.output.write(text) | 
|  | self.output.flush() | 
|  |  | 
|  | def cancel(self): | 
|  | """Cancel getkey operation""" | 
|  |  | 
|  | #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - | 
|  | # context manager: | 
|  | # switch terminal temporary to normal mode (e.g. to get user input) | 
|  |  | 
|  | def __enter__(self): | 
|  | self.cleanup() | 
|  | return self | 
|  |  | 
|  | def __exit__(self, *args, **kwargs): | 
|  | self.setup() | 
|  |  | 
|  |  | 
|  | if os.name == 'nt':  # noqa | 
|  | import msvcrt | 
|  | import ctypes | 
|  | import platform | 
|  |  | 
|  | class Out(object): | 
|  | """file-like wrapper that uses os.write""" | 
|  |  | 
|  | def __init__(self, fd): | 
|  | self.fd = fd | 
|  |  | 
|  | def flush(self): | 
|  | pass | 
|  |  | 
|  | def write(self, s): | 
|  | os.write(self.fd, s) | 
|  |  | 
|  | class Console(ConsoleBase): | 
|  | fncodes = { | 
|  | ';': '\1bOP',  # F1 | 
|  | '<': '\1bOQ',  # F2 | 
|  | '=': '\1bOR',  # F3 | 
|  | '>': '\1bOS',  # F4 | 
|  | '?': '\1b[15~',  # F5 | 
|  | '@': '\1b[17~',  # F6 | 
|  | 'A': '\1b[18~',  # F7 | 
|  | 'B': '\1b[19~',  # F8 | 
|  | 'C': '\1b[20~',  # F9 | 
|  | 'D': '\1b[21~',  # F10 | 
|  | } | 
|  | navcodes = { | 
|  | 'H': '\x1b[A',  # UP | 
|  | 'P': '\x1b[B',  # DOWN | 
|  | 'K': '\x1b[D',  # LEFT | 
|  | 'M': '\x1b[C',  # RIGHT | 
|  | 'G': '\x1b[H',  # HOME | 
|  | 'O': '\x1b[F',  # END | 
|  | 'R': '\x1b[2~',  # INSERT | 
|  | 'S': '\x1b[3~',  # DELETE | 
|  | 'I': '\x1b[5~',  # PGUP | 
|  | 'Q': '\x1b[6~',  # PGDN | 
|  | } | 
|  |  | 
|  | def __init__(self): | 
|  | super(Console, self).__init__() | 
|  | self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP() | 
|  | self._saved_icp = ctypes.windll.kernel32.GetConsoleCP() | 
|  | ctypes.windll.kernel32.SetConsoleOutputCP(65001) | 
|  | ctypes.windll.kernel32.SetConsoleCP(65001) | 
|  | # ANSI handling available through SetConsoleMode since Windows 10 v1511 | 
|  | # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1 | 
|  | if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586: | 
|  | ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 | 
|  | import ctypes.wintypes as wintypes | 
|  | if not hasattr(wintypes, 'LPDWORD'): # PY2 | 
|  | wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) | 
|  | SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode | 
|  | GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode | 
|  | GetStdHandle = ctypes.windll.kernel32.GetStdHandle | 
|  | mode = wintypes.DWORD() | 
|  | GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode)) | 
|  | if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: | 
|  | SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) | 
|  | self._saved_cm = mode | 
|  | self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace') | 
|  | # the change of the code page is not propagated to Python, manually fix it | 
|  | sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace') | 
|  | sys.stdout = self.output | 
|  | self.output.encoding = 'UTF-8'  # needed for input | 
|  |  | 
|  | def __del__(self): | 
|  | ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp) | 
|  | ctypes.windll.kernel32.SetConsoleCP(self._saved_icp) | 
|  | try: | 
|  | ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm) | 
|  | except AttributeError: # in case no _saved_cm | 
|  | pass | 
|  |  | 
|  | def getkey(self): | 
|  | while True: | 
|  | z = msvcrt.getwch() | 
|  | if z == unichr(13): | 
|  | return unichr(10) | 
|  | elif z is unichr(0) or z is unichr(0xe0): | 
|  | try: | 
|  | code = msvcrt.getwch() | 
|  | if z is unichr(0): | 
|  | return self.fncodes[code] | 
|  | else: | 
|  | return self.navcodes[code] | 
|  | except KeyError: | 
|  | pass | 
|  | else: | 
|  | return z | 
|  |  | 
|  | def cancel(self): | 
|  | # CancelIo, CancelSynchronousIo do not seem to work when using | 
|  | # getwch, so instead, send a key to the window with the console | 
|  | hwnd = ctypes.windll.kernel32.GetConsoleWindow() | 
|  | ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0) | 
|  |  | 
|  | elif os.name == 'posix': | 
|  | import atexit | 
|  | import termios | 
|  | import fcntl | 
|  |  | 
|  | class Console(ConsoleBase): | 
|  | def __init__(self): | 
|  | super(Console, self).__init__() | 
|  | self.fd = sys.stdin.fileno() | 
|  | self.old = termios.tcgetattr(self.fd) | 
|  | atexit.register(self.cleanup) | 
|  | if sys.version_info < (3, 0): | 
|  | self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin) | 
|  | else: | 
|  | self.enc_stdin = sys.stdin | 
|  |  | 
|  | def setup(self): | 
|  | new = termios.tcgetattr(self.fd) | 
|  | new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG | 
|  | new[6][termios.VMIN] = 1 | 
|  | new[6][termios.VTIME] = 0 | 
|  | termios.tcsetattr(self.fd, termios.TCSANOW, new) | 
|  |  | 
|  | def getkey(self): | 
|  | c = self.enc_stdin.read(1) | 
|  | if c == unichr(0x7f): | 
|  | c = unichr(8)    # map the BS key (which yields DEL) to backspace | 
|  | return c | 
|  |  | 
|  | def cancel(self): | 
|  | fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') | 
|  |  | 
|  | def cleanup(self): | 
|  | termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) | 
|  |  | 
|  | else: | 
|  | raise NotImplementedError( | 
|  | 'Sorry no implementation for your platform ({}) available.'.format(sys.platform)) | 
|  |  | 
|  |  | 
|  | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | 
|  |  | 
|  | class Transform(object): | 
|  | """do-nothing: forward all data unchanged""" | 
|  | def rx(self, text): | 
|  | """text received from serial port""" | 
|  | return text | 
|  |  | 
|  | def tx(self, text): | 
|  | """text to be sent to serial port""" | 
|  | return text | 
|  |  | 
|  | def echo(self, text): | 
|  | """text to be sent but displayed on console""" | 
|  | return text | 
|  |  | 
|  |  | 
|  | class CRLF(Transform): | 
|  | """ENTER sends CR+LF""" | 
|  |  | 
|  | def tx(self, text): | 
|  | return text.replace('\n', '\r\n') | 
|  |  | 
|  |  | 
|  | class CR(Transform): | 
|  | """ENTER sends CR""" | 
|  |  | 
|  | def rx(self, text): | 
|  | return text.replace('\r', '\n') | 
|  |  | 
|  | def tx(self, text): | 
|  | return text.replace('\n', '\r') | 
|  |  | 
|  |  | 
|  | class LF(Transform): | 
|  | """ENTER sends LF""" | 
|  |  | 
|  |  | 
|  | class NoTerminal(Transform): | 
|  | """remove typical terminal control codes from input""" | 
|  |  | 
|  | REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t') | 
|  | REPLACEMENT_MAP.update( | 
|  | { | 
|  | 0x7F: 0x2421,  # DEL | 
|  | 0x9B: 0x2425,  # CSI | 
|  | }) | 
|  |  | 
|  | def rx(self, text): | 
|  | return text.translate(self.REPLACEMENT_MAP) | 
|  |  | 
|  | echo = rx | 
|  |  | 
|  |  | 
|  | class NoControls(NoTerminal): | 
|  | """Remove all control codes, incl. CR+LF""" | 
|  |  | 
|  | REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32)) | 
|  | REPLACEMENT_MAP.update( | 
|  | { | 
|  | 0x20: 0x2423,  # visual space | 
|  | 0x7F: 0x2421,  # DEL | 
|  | 0x9B: 0x2425,  # CSI | 
|  | }) | 
|  |  | 
|  |  | 
|  | class Printable(Transform): | 
|  | """Show decimal code for all non-ASCII characters and replace most control codes""" | 
|  |  | 
|  | def rx(self, text): | 
|  | r = [] | 
|  | for c in text: | 
|  | if ' ' <= c < '\x7f' or c in '\r\n\b\t': | 
|  | r.append(c) | 
|  | elif c < ' ': | 
|  | r.append(unichr(0x2400 + ord(c))) | 
|  | else: | 
|  | r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c))) | 
|  | r.append(' ') | 
|  | return ''.join(r) | 
|  |  | 
|  | echo = rx | 
|  |  | 
|  |  | 
|  | class Colorize(Transform): | 
|  | """Apply different colors for received and echo""" | 
|  |  | 
|  | def __init__(self): | 
|  | # XXX make it configurable, use colorama? | 
|  | self.input_color = '\x1b[37m' | 
|  | self.echo_color = '\x1b[31m' | 
|  |  | 
|  | def rx(self, text): | 
|  | return self.input_color + text | 
|  |  | 
|  | def echo(self, text): | 
|  | return self.echo_color + text | 
|  |  | 
|  |  | 
|  | class DebugIO(Transform): | 
|  | """Print what is sent and received""" | 
|  |  | 
|  | def rx(self, text): | 
|  | sys.stderr.write(' [RX:{!r}] '.format(text)) | 
|  | sys.stderr.flush() | 
|  | return text | 
|  |  | 
|  | def tx(self, text): | 
|  | sys.stderr.write(' [TX:{!r}] '.format(text)) | 
|  | sys.stderr.flush() | 
|  | return text | 
|  |  | 
|  |  | 
|  | # other ideas: | 
|  | # - add date/time for each newline | 
|  | # - insert newline after: a) timeout b) packet end character | 
|  |  | 
|  | EOL_TRANSFORMATIONS = { | 
|  | 'crlf': CRLF, | 
|  | 'cr': CR, | 
|  | 'lf': LF, | 
|  | } | 
|  |  | 
|  | TRANSFORMATIONS = { | 
|  | 'direct': Transform,    # no transformation | 
|  | 'default': NoTerminal, | 
|  | 'nocontrol': NoControls, | 
|  | 'printable': Printable, | 
|  | 'colorize': Colorize, | 
|  | 'debug': DebugIO, | 
|  | } | 
|  |  | 
|  |  | 
|  | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | 
|  | def ask_for_port(): | 
|  | """\ | 
|  | Show a list of ports and ask the user for a choice. To make selection | 
|  | easier on systems with long device names, also allow the input of an | 
|  | index. | 
|  | """ | 
|  | sys.stderr.write('\n--- Available ports:\n') | 
|  | ports = [] | 
|  | for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): | 
|  | sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) | 
|  | ports.append(port) | 
|  | while True: | 
|  | port = raw_input('--- Enter port index or full name: ') | 
|  | try: | 
|  | index = int(port) - 1 | 
|  | if not 0 <= index < len(ports): | 
|  | sys.stderr.write('--- Invalid index!\n') | 
|  | continue | 
|  | except ValueError: | 
|  | pass | 
|  | else: | 
|  | port = ports[index] | 
|  | return port | 
|  |  | 
|  |  | 
|  | class Miniterm(object): | 
|  | """\ | 
|  | Terminal application. Copy data from serial port to console and vice versa. | 
|  | Handle special keys from the console to show menu etc. | 
|  | """ | 
|  |  | 
|  | def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): | 
|  | self.console = Console() | 
|  | self.serial = serial_instance | 
|  | self.echo = echo | 
|  | self.raw = False | 
|  | self.input_encoding = 'UTF-8' | 
|  | self.output_encoding = 'UTF-8' | 
|  | self.eol = eol | 
|  | self.filters = filters | 
|  | self.update_transformations() | 
|  | self.exit_character = unichr(0x1d)  # GS/CTRL+] | 
|  | self.menu_character = unichr(0x14)  # Menu: CTRL+T | 
|  | self.alive = None | 
|  | self._reader_alive = None | 
|  | self.receiver_thread = None | 
|  | self.rx_decoder = None | 
|  | self.tx_decoder = None | 
|  |  | 
|  | def _start_reader(self): | 
|  | """Start reader thread""" | 
|  | self._reader_alive = True | 
|  | # start serial->console thread | 
|  | self.receiver_thread = threading.Thread(target=self.reader, name='rx') | 
|  | self.receiver_thread.daemon = True | 
|  | self.receiver_thread.start() | 
|  |  | 
|  | def _stop_reader(self): | 
|  | """Stop reader thread only, wait for clean exit of thread""" | 
|  | self._reader_alive = False | 
|  | if hasattr(self.serial, 'cancel_read'): | 
|  | self.serial.cancel_read() | 
|  | self.receiver_thread.join() | 
|  |  | 
|  | def start(self): | 
|  | """start worker threads""" | 
|  | self.alive = True | 
|  | self._start_reader() | 
|  | # enter console->serial loop | 
|  | self.transmitter_thread = threading.Thread(target=self.writer, name='tx') | 
|  | self.transmitter_thread.daemon = True | 
|  | self.transmitter_thread.start() | 
|  | self.console.setup() | 
|  |  | 
|  | def stop(self): | 
|  | """set flag to stop worker threads""" | 
|  | self.alive = False | 
|  |  | 
|  | def join(self, transmit_only=False): | 
|  | """wait for worker threads to terminate""" | 
|  | self.transmitter_thread.join() | 
|  | if not transmit_only: | 
|  | if hasattr(self.serial, 'cancel_read'): | 
|  | self.serial.cancel_read() | 
|  | self.receiver_thread.join() | 
|  |  | 
|  | def close(self): | 
|  | self.serial.close() | 
|  |  | 
|  | def update_transformations(self): | 
|  | """take list of transformation classes and instantiate them for rx and tx""" | 
|  | transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f] | 
|  | for f in self.filters] | 
|  | self.tx_transformations = [t() for t in transformations] | 
|  | self.rx_transformations = list(reversed(self.tx_transformations)) | 
|  |  | 
|  | def set_rx_encoding(self, encoding, errors='replace'): | 
|  | """set encoding for received data""" | 
|  | self.input_encoding = encoding | 
|  | self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors) | 
|  |  | 
|  | def set_tx_encoding(self, encoding, errors='replace'): | 
|  | """set encoding for transmitted data""" | 
|  | self.output_encoding = encoding | 
|  | self.tx_encoder = codecs.getincrementalencoder(encoding)(errors) | 
|  |  | 
|  | def dump_port_settings(self): | 
|  | """Write current settings to sys.stderr""" | 
|  | sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( | 
|  | p=self.serial)) | 
|  | sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format( | 
|  | ('active' if self.serial.rts else 'inactive'), | 
|  | ('active' if self.serial.dtr else 'inactive'), | 
|  | ('active' if self.serial.break_condition else 'inactive'))) | 
|  | try: | 
|  | sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format( | 
|  | ('active' if self.serial.cts else 'inactive'), | 
|  | ('active' if self.serial.dsr else 'inactive'), | 
|  | ('active' if self.serial.ri else 'inactive'), | 
|  | ('active' if self.serial.cd else 'inactive'))) | 
|  | except serial.SerialException: | 
|  | # on RFC 2217 ports, it can happen if no modem state notification was | 
|  | # yet received. ignore this error. | 
|  | pass | 
|  | sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) | 
|  | sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) | 
|  | sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) | 
|  | sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) | 
|  | sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) | 
|  | sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) | 
|  |  | 
|  | def reader(self): | 
|  | """loop and copy serial->console""" | 
|  | try: | 
|  | while self.alive and self._reader_alive: | 
|  | # read all that is there or wait for one byte | 
|  | data = self.serial.read(self.serial.in_waiting or 1) | 
|  | if data: | 
|  | if self.raw: | 
|  | self.console.write_bytes(data) | 
|  | else: | 
|  | text = self.rx_decoder.decode(data) | 
|  | for transformation in self.rx_transformations: | 
|  | text = transformation.rx(text) | 
|  | self.console.write(text) | 
|  | except serial.SerialException: | 
|  | self.alive = False | 
|  | self.console.cancel() | 
|  | raise       # XXX handle instead of re-raise? | 
|  |  | 
|  | def writer(self): | 
|  | """\ | 
|  | Loop and copy console->serial until self.exit_character character is | 
|  | found. When self.menu_character is found, interpret the next key | 
|  | locally. | 
|  | """ | 
|  | menu_active = False | 
|  | try: | 
|  | while self.alive: | 
|  | try: | 
|  | c = self.console.getkey() | 
|  | except KeyboardInterrupt: | 
|  | c = '\x03' | 
|  | if not self.alive: | 
|  | break | 
|  | if menu_active: | 
|  | self.handle_menu_key(c) | 
|  | menu_active = False | 
|  | elif c == self.menu_character: | 
|  | menu_active = True      # next char will be for menu | 
|  | elif c == self.exit_character: | 
|  | self.stop()             # exit app | 
|  | break | 
|  | else: | 
|  | #~ if self.raw: | 
|  | text = c | 
|  | for transformation in self.tx_transformations: | 
|  | text = transformation.tx(text) | 
|  | self.serial.write(self.tx_encoder.encode(text)) | 
|  | if self.echo: | 
|  | echo_text = c | 
|  | for transformation in self.tx_transformations: | 
|  | echo_text = transformation.echo(echo_text) | 
|  | self.console.write(echo_text) | 
|  | except: | 
|  | self.alive = False | 
|  | raise | 
|  |  | 
|  | def handle_menu_key(self, c): | 
|  | """Implement a simple menu / settings""" | 
|  | if c == self.menu_character or c == self.exit_character: | 
|  | # Menu/exit character again -> send itself | 
|  | self.serial.write(self.tx_encoder.encode(c)) | 
|  | if self.echo: | 
|  | self.console.write(c) | 
|  | elif c == '\x15':                       # CTRL+U -> upload file | 
|  | self.upload_file() | 
|  | elif c in '\x08hH?':                    # CTRL+H, h, H, ? -> Show help | 
|  | sys.stderr.write(self.get_help_text()) | 
|  | elif c == '\x12':                       # CTRL+R -> Toggle RTS | 
|  | self.serial.rts = not self.serial.rts | 
|  | sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive')) | 
|  | elif c == '\x04':                       # CTRL+D -> Toggle DTR | 
|  | self.serial.dtr = not self.serial.dtr | 
|  | sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive')) | 
|  | elif c == '\x02':                       # CTRL+B -> toggle BREAK condition | 
|  | self.serial.break_condition = not self.serial.break_condition | 
|  | sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive')) | 
|  | elif c == '\x05':                       # CTRL+E -> toggle local echo | 
|  | self.echo = not self.echo | 
|  | sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) | 
|  | elif c == '\x06':                       # CTRL+F -> edit filters | 
|  | self.change_filter() | 
|  | elif c == '\x0c':                       # CTRL+L -> EOL mode | 
|  | modes = list(EOL_TRANSFORMATIONS)   # keys | 
|  | eol = modes.index(self.eol) + 1 | 
|  | if eol >= len(modes): | 
|  | eol = 0 | 
|  | self.eol = modes[eol] | 
|  | sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) | 
|  | self.update_transformations() | 
|  | elif c == '\x01':                       # CTRL+A -> set encoding | 
|  | self.change_encoding() | 
|  | elif c == '\x09':                       # CTRL+I -> info | 
|  | self.dump_port_settings() | 
|  | #~ elif c == '\x01':                       # CTRL+A -> cycle escape mode | 
|  | #~ elif c == '\x0c':                       # CTRL+L -> cycle linefeed mode | 
|  | elif c in 'pP':                         # P -> change port | 
|  | self.change_port() | 
|  | elif c in 'zZ':                         # S -> suspend / open port temporarily | 
|  | self.suspend_port() | 
|  | elif c in 'bB':                         # B -> change baudrate | 
|  | self.change_baudrate() | 
|  | elif c == '8':                          # 8 -> change to 8 bits | 
|  | self.serial.bytesize = serial.EIGHTBITS | 
|  | self.dump_port_settings() | 
|  | elif c == '7':                          # 7 -> change to 8 bits | 
|  | self.serial.bytesize = serial.SEVENBITS | 
|  | self.dump_port_settings() | 
|  | elif c in 'eE':                         # E -> change to even parity | 
|  | self.serial.parity = serial.PARITY_EVEN | 
|  | self.dump_port_settings() | 
|  | elif c in 'oO':                         # O -> change to odd parity | 
|  | self.serial.parity = serial.PARITY_ODD | 
|  | self.dump_port_settings() | 
|  | elif c in 'mM':                         # M -> change to mark parity | 
|  | self.serial.parity = serial.PARITY_MARK | 
|  | self.dump_port_settings() | 
|  | elif c in 'sS':                         # S -> change to space parity | 
|  | self.serial.parity = serial.PARITY_SPACE | 
|  | self.dump_port_settings() | 
|  | elif c in 'nN':                         # N -> change to no parity | 
|  | self.serial.parity = serial.PARITY_NONE | 
|  | self.dump_port_settings() | 
|  | elif c == '1':                          # 1 -> change to 1 stop bits | 
|  | self.serial.stopbits = serial.STOPBITS_ONE | 
|  | self.dump_port_settings() | 
|  | elif c == '2':                          # 2 -> change to 2 stop bits | 
|  | self.serial.stopbits = serial.STOPBITS_TWO | 
|  | self.dump_port_settings() | 
|  | elif c == '3':                          # 3 -> change to 1.5 stop bits | 
|  | self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE | 
|  | self.dump_port_settings() | 
|  | elif c in 'xX':                         # X -> change software flow control | 
|  | self.serial.xonxoff = (c == 'X') | 
|  | self.dump_port_settings() | 
|  | elif c in 'rR':                         # R -> change hardware flow control | 
|  | self.serial.rtscts = (c == 'R') | 
|  | self.dump_port_settings() | 
|  | elif c in 'qQ': | 
|  | self.stop()                         # Q -> exit app | 
|  | else: | 
|  | sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) | 
|  |  | 
|  | def upload_file(self): | 
|  | """Ask user for filenname and send its contents""" | 
|  | sys.stderr.write('\n--- File to upload: ') | 
|  | sys.stderr.flush() | 
|  | with self.console: | 
|  | filename = sys.stdin.readline().rstrip('\r\n') | 
|  | if filename: | 
|  | try: | 
|  | with open(filename, 'rb') as f: | 
|  | sys.stderr.write('--- Sending file {} ---\n'.format(filename)) | 
|  | while True: | 
|  | block = f.read(1024) | 
|  | if not block: | 
|  | break | 
|  | self.serial.write(block) | 
|  | # Wait for output buffer to drain. | 
|  | self.serial.flush() | 
|  | sys.stderr.write('.')   # Progress indicator. | 
|  | sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) | 
|  | except IOError as e: | 
|  | sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) | 
|  |  | 
|  | def change_filter(self): | 
|  | """change the i/o transformations""" | 
|  | sys.stderr.write('\n--- Available Filters:\n') | 
|  | sys.stderr.write('\n'.join( | 
|  | '---   {:<10} = {.__doc__}'.format(k, v) | 
|  | for k, v in sorted(TRANSFORMATIONS.items()))) | 
|  | sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) | 
|  | with self.console: | 
|  | new_filters = sys.stdin.readline().lower().split() | 
|  | if new_filters: | 
|  | for f in new_filters: | 
|  | if f not in TRANSFORMATIONS: | 
|  | sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) | 
|  | break | 
|  | else: | 
|  | self.filters = new_filters | 
|  | self.update_transformations() | 
|  | sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) | 
|  |  | 
|  | def change_encoding(self): | 
|  | """change encoding on the serial port""" | 
|  | sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) | 
|  | with self.console: | 
|  | new_encoding = sys.stdin.readline().strip() | 
|  | if new_encoding: | 
|  | try: | 
|  | codecs.lookup(new_encoding) | 
|  | except LookupError: | 
|  | sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) | 
|  | else: | 
|  | self.set_rx_encoding(new_encoding) | 
|  | self.set_tx_encoding(new_encoding) | 
|  | sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) | 
|  | sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) | 
|  |  | 
|  | def change_baudrate(self): | 
|  | """change the baudrate""" | 
|  | sys.stderr.write('\n--- Baudrate: ') | 
|  | sys.stderr.flush() | 
|  | with self.console: | 
|  | backup = self.serial.baudrate | 
|  | try: | 
|  | self.serial.baudrate = int(sys.stdin.readline().strip()) | 
|  | except ValueError as e: | 
|  | sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) | 
|  | self.serial.baudrate = backup | 
|  | else: | 
|  | self.dump_port_settings() | 
|  |  | 
|  | def change_port(self): | 
|  | """Have a conversation with the user to change the serial port""" | 
|  | with self.console: | 
|  | try: | 
|  | port = ask_for_port() | 
|  | except KeyboardInterrupt: | 
|  | port = None | 
|  | if port and port != self.serial.port: | 
|  | # reader thread needs to be shut down | 
|  | self._stop_reader() | 
|  | # save settings | 
|  | settings = self.serial.getSettingsDict() | 
|  | try: | 
|  | new_serial = serial.serial_for_url(port, do_not_open=True) | 
|  | # restore settings and open | 
|  | new_serial.applySettingsDict(settings) | 
|  | new_serial.rts = self.serial.rts | 
|  | new_serial.dtr = self.serial.dtr | 
|  | new_serial.open() | 
|  | new_serial.break_condition = self.serial.break_condition | 
|  | except Exception as e: | 
|  | sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) | 
|  | new_serial.close() | 
|  | else: | 
|  | self.serial.close() | 
|  | self.serial = new_serial | 
|  | sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) | 
|  | # and restart the reader thread | 
|  | self._start_reader() | 
|  |  | 
|  | def suspend_port(self): | 
|  | """\ | 
|  | open port temporarily, allow reconnect, exit and port change to get | 
|  | out of the loop | 
|  | """ | 
|  | # reader thread needs to be shut down | 
|  | self._stop_reader() | 
|  | self.serial.close() | 
|  | sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) | 
|  | do_change_port = False | 
|  | while not self.serial.is_open: | 
|  | sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( | 
|  | exit=key_description(self.exit_character))) | 
|  | k = self.console.getkey() | 
|  | if k == self.exit_character: | 
|  | self.stop()             # exit app | 
|  | break | 
|  | elif k in 'pP': | 
|  | do_change_port = True | 
|  | break | 
|  | try: | 
|  | self.serial.open() | 
|  | except Exception as e: | 
|  | sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) | 
|  | if do_change_port: | 
|  | self.change_port() | 
|  | else: | 
|  | # and restart the reader thread | 
|  | self._start_reader() | 
|  | sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) | 
|  |  | 
|  | def get_help_text(self): | 
|  | """return the help text""" | 
|  | # help text, starts with blank line! | 
|  | return """ | 
|  | --- pySerial ({version}) - miniterm - help | 
|  | --- | 
|  | --- {exit:8} Exit program (alias {menu} Q) | 
|  | --- {menu:8} Menu escape key, followed by: | 
|  | --- Menu keys: | 
|  | ---    {menu:7} Send the menu character itself to remote | 
|  | ---    {exit:7} Send the exit character itself to remote | 
|  | ---    {info:7} Show info | 
|  | ---    {upload:7} Upload file (prompt will be shown) | 
|  | ---    {repr:7} encoding | 
|  | ---    {filter:7} edit filters | 
|  | --- Toggles: | 
|  | ---    {rts:7} RTS   {dtr:7} DTR   {brk:7} BREAK | 
|  | ---    {echo:7} echo  {eol:7} EOL | 
|  | --- | 
|  | --- Port settings ({menu} followed by the following): | 
|  | ---    p          change port | 
|  | ---    7 8        set data bits | 
|  | ---    N E O S M  change parity (None, Even, Odd, Space, Mark) | 
|  | ---    1 2 3      set stop bits (1, 2, 1.5) | 
|  | ---    b          change baud rate | 
|  | ---    x X        disable/enable software flow control | 
|  | ---    r R        disable/enable hardware flow control | 
|  | """.format(version=getattr(serial, 'VERSION', 'unknown version'), | 
|  | exit=key_description(self.exit_character), | 
|  | menu=key_description(self.menu_character), | 
|  | rts=key_description('\x12'), | 
|  | dtr=key_description('\x04'), | 
|  | brk=key_description('\x02'), | 
|  | echo=key_description('\x05'), | 
|  | info=key_description('\x09'), | 
|  | upload=key_description('\x15'), | 
|  | repr=key_description('\x01'), | 
|  | filter=key_description('\x06'), | 
|  | eol=key_description('\x0c')) | 
|  |  | 
|  |  | 
|  | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | 
|  | # default args can be used to override when calling main() from an other script | 
|  | # e.g to create a miniterm-my-device.py | 
|  | def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None): | 
|  | """Command line tool, entry point""" | 
|  |  | 
|  | import argparse | 
|  |  | 
|  | parser = argparse.ArgumentParser( | 
|  | description='Miniterm - A simple terminal program for the serial port.') | 
|  |  | 
|  | parser.add_argument( | 
|  | 'port', | 
|  | nargs='?', | 
|  | help='serial port name ("-" to show port list)', | 
|  | default=default_port) | 
|  |  | 
|  | parser.add_argument( | 
|  | 'baudrate', | 
|  | nargs='?', | 
|  | type=int, | 
|  | help='set baud rate, default: %(default)s', | 
|  | default=default_baudrate) | 
|  |  | 
|  | group = parser.add_argument_group('port settings') | 
|  |  | 
|  | group.add_argument( | 
|  | '--parity', | 
|  | choices=['N', 'E', 'O', 'S', 'M'], | 
|  | type=lambda c: c.upper(), | 
|  | help='set parity, one of {N E O S M}, default: N', | 
|  | default='N') | 
|  |  | 
|  | group.add_argument( | 
|  | '--rtscts', | 
|  | action='store_true', | 
|  | help='enable RTS/CTS flow control (default off)', | 
|  | default=False) | 
|  |  | 
|  | group.add_argument( | 
|  | '--xonxoff', | 
|  | action='store_true', | 
|  | help='enable software flow control (default off)', | 
|  | default=False) | 
|  |  | 
|  | group.add_argument( | 
|  | '--rts', | 
|  | type=int, | 
|  | help='set initial RTS line state (possible values: 0, 1)', | 
|  | default=default_rts) | 
|  |  | 
|  | group.add_argument( | 
|  | '--dtr', | 
|  | type=int, | 
|  | help='set initial DTR line state (possible values: 0, 1)', | 
|  | default=default_dtr) | 
|  |  | 
|  | group.add_argument( | 
|  | '--non-exclusive', | 
|  | dest='exclusive', | 
|  | action='store_false', | 
|  | help='disable locking for native ports', | 
|  | default=True) | 
|  |  | 
|  | group.add_argument( | 
|  | '--ask', | 
|  | action='store_true', | 
|  | help='ask again for port when open fails', | 
|  | default=False) | 
|  |  | 
|  | group = parser.add_argument_group('data handling') | 
|  |  | 
|  | group.add_argument( | 
|  | '-e', '--echo', | 
|  | action='store_true', | 
|  | help='enable local echo (default off)', | 
|  | default=False) | 
|  |  | 
|  | group.add_argument( | 
|  | '--encoding', | 
|  | dest='serial_port_encoding', | 
|  | metavar='CODEC', | 
|  | help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', | 
|  | default='UTF-8') | 
|  |  | 
|  | group.add_argument( | 
|  | '-f', '--filter', | 
|  | action='append', | 
|  | metavar='NAME', | 
|  | help='add text transformation', | 
|  | default=[]) | 
|  |  | 
|  | group.add_argument( | 
|  | '--eol', | 
|  | choices=['CR', 'LF', 'CRLF'], | 
|  | type=lambda c: c.upper(), | 
|  | help='end of line mode', | 
|  | default='CRLF') | 
|  |  | 
|  | group.add_argument( | 
|  | '--raw', | 
|  | action='store_true', | 
|  | help='Do no apply any encodings/transformations', | 
|  | default=False) | 
|  |  | 
|  | group = parser.add_argument_group('hotkeys') | 
|  |  | 
|  | group.add_argument( | 
|  | '--exit-char', | 
|  | type=int, | 
|  | metavar='NUM', | 
|  | help='Unicode of special character that is used to exit the application, default: %(default)s', | 
|  | default=0x1d)  # GS/CTRL+] | 
|  |  | 
|  | group.add_argument( | 
|  | '--menu-char', | 
|  | type=int, | 
|  | metavar='NUM', | 
|  | help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', | 
|  | default=0x14)  # Menu: CTRL+T | 
|  |  | 
|  | group = parser.add_argument_group('diagnostics') | 
|  |  | 
|  | group.add_argument( | 
|  | '-q', '--quiet', | 
|  | action='store_true', | 
|  | help='suppress non-error messages', | 
|  | default=False) | 
|  |  | 
|  | group.add_argument( | 
|  | '--develop', | 
|  | action='store_true', | 
|  | help='show Python traceback on error', | 
|  | default=False) | 
|  |  | 
|  | args = parser.parse_args() | 
|  |  | 
|  | if args.menu_char == args.exit_char: | 
|  | parser.error('--exit-char can not be the same as --menu-char') | 
|  |  | 
|  | if args.filter: | 
|  | if 'help' in args.filter: | 
|  | sys.stderr.write('Available filters:\n') | 
|  | sys.stderr.write('\n'.join( | 
|  | '{:<10} = {.__doc__}'.format(k, v) | 
|  | for k, v in sorted(TRANSFORMATIONS.items()))) | 
|  | sys.stderr.write('\n') | 
|  | sys.exit(1) | 
|  | filters = args.filter | 
|  | else: | 
|  | filters = ['default'] | 
|  |  | 
|  | while True: | 
|  | # no port given on command line -> ask user now | 
|  | if args.port is None or args.port == '-': | 
|  | try: | 
|  | args.port = ask_for_port() | 
|  | except KeyboardInterrupt: | 
|  | sys.stderr.write('\n') | 
|  | parser.error('user aborted and port is not given') | 
|  | else: | 
|  | if not args.port: | 
|  | parser.error('port is not given') | 
|  | try: | 
|  | serial_instance = serial.serial_for_url( | 
|  | args.port, | 
|  | args.baudrate, | 
|  | parity=args.parity, | 
|  | rtscts=args.rtscts, | 
|  | xonxoff=args.xonxoff, | 
|  | do_not_open=True) | 
|  |  | 
|  | if not hasattr(serial_instance, 'cancel_read'): | 
|  | # enable timeout for alive flag polling if cancel_read is not available | 
|  | serial_instance.timeout = 1 | 
|  |  | 
|  | if args.dtr is not None: | 
|  | if not args.quiet: | 
|  | sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive')) | 
|  | serial_instance.dtr = args.dtr | 
|  | if args.rts is not None: | 
|  | if not args.quiet: | 
|  | sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) | 
|  | serial_instance.rts = args.rts | 
|  |  | 
|  | if isinstance(serial_instance, serial.Serial): | 
|  | serial_instance.exclusive = args.exclusive | 
|  |  | 
|  | serial_instance.open() | 
|  | except serial.SerialException as e: | 
|  | sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) | 
|  | if args.develop: | 
|  | raise | 
|  | if not args.ask: | 
|  | sys.exit(1) | 
|  | else: | 
|  | args.port = '-' | 
|  | else: | 
|  | break | 
|  |  | 
|  | miniterm = Miniterm( | 
|  | serial_instance, | 
|  | echo=args.echo, | 
|  | eol=args.eol.lower(), | 
|  | filters=filters) | 
|  | miniterm.exit_character = unichr(args.exit_char) | 
|  | miniterm.menu_character = unichr(args.menu_char) | 
|  | miniterm.raw = args.raw | 
|  | miniterm.set_rx_encoding(args.serial_port_encoding) | 
|  | miniterm.set_tx_encoding(args.serial_port_encoding) | 
|  |  | 
|  | if not args.quiet: | 
|  | sys.stderr.write('--- Miniterm on {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format( | 
|  | p=miniterm.serial)) | 
|  | sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format( | 
|  | key_description(miniterm.exit_character), | 
|  | key_description(miniterm.menu_character), | 
|  | key_description(miniterm.menu_character), | 
|  | key_description('\x08'))) | 
|  |  | 
|  | miniterm.start() | 
|  | try: | 
|  | miniterm.join(True) | 
|  | except KeyboardInterrupt: | 
|  | pass | 
|  | if not args.quiet: | 
|  | sys.stderr.write('\n--- exit ---\n') | 
|  | miniterm.join() | 
|  | miniterm.close() | 
|  |  | 
|  | # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | 
|  | if __name__ == '__main__': | 
|  | main() |