blob: 2cceff635c641b3be18f4a5184bf5d49dac5eaaf [file] [log] [blame]
rjw2e8229f2022-02-15 21:08:12 +08001#!/usr/bin/env python
2#
3# Very simple serial terminal
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C)2002-2020 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier: BSD-3-Clause
9
10from __future__ import absolute_import
11
12import codecs
13import os
14import sys
15import threading
16
17import serial
18from serial.tools.list_ports import comports
19from serial.tools import hexlify_codec
20
21# pylint: disable=wrong-import-order,wrong-import-position
22
23codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None)
24
25try:
26 raw_input
27except NameError:
28 # pylint: disable=redefined-builtin,invalid-name
29 raw_input = input # in python3 it's "raw"
30 unichr = chr
31
32
33def key_description(character):
34 """generate a readable description for a key"""
35 ascii_code = ord(character)
36 if ascii_code < 32:
37 return 'Ctrl+{:c}'.format(ord('@') + ascii_code)
38 else:
39 return repr(character)
40
41
42# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
43class ConsoleBase(object):
44 """OS abstraction for console (input/output codec, no echo)"""
45
46 def __init__(self):
47 if sys.version_info >= (3, 0):
48 self.byte_output = sys.stdout.buffer
49 else:
50 self.byte_output = sys.stdout
51 self.output = sys.stdout
52
53 def setup(self):
54 """Set console to read single characters, no echo"""
55
56 def cleanup(self):
57 """Restore default console settings"""
58
59 def getkey(self):
60 """Read a single key from the console"""
61 return None
62
63 def write_bytes(self, byte_string):
64 """Write bytes (already encoded)"""
65 self.byte_output.write(byte_string)
66 self.byte_output.flush()
67
68 def write(self, text):
69 """Write string"""
70 self.output.write(text)
71 self.output.flush()
72
73 def cancel(self):
74 """Cancel getkey operation"""
75
76 # - - - - - - - - - - - - - - - - - - - - - - - -
77 # context manager:
78 # switch terminal temporary to normal mode (e.g. to get user input)
79
80 def __enter__(self):
81 self.cleanup()
82 return self
83
84 def __exit__(self, *args, **kwargs):
85 self.setup()
86
87
88if os.name == 'nt': # noqa
89 import msvcrt
90 import ctypes
91 import platform
92
93 class Out(object):
94 """file-like wrapper that uses os.write"""
95
96 def __init__(self, fd):
97 self.fd = fd
98
99 def flush(self):
100 pass
101
102 def write(self, s):
103 os.write(self.fd, s)
104
105 class Console(ConsoleBase):
106 fncodes = {
107 ';': '\1bOP', # F1
108 '<': '\1bOQ', # F2
109 '=': '\1bOR', # F3
110 '>': '\1bOS', # F4
111 '?': '\1b[15~', # F5
112 '@': '\1b[17~', # F6
113 'A': '\1b[18~', # F7
114 'B': '\1b[19~', # F8
115 'C': '\1b[20~', # F9
116 'D': '\1b[21~', # F10
117 }
118 navcodes = {
119 'H': '\x1b[A', # UP
120 'P': '\x1b[B', # DOWN
121 'K': '\x1b[D', # LEFT
122 'M': '\x1b[C', # RIGHT
123 'G': '\x1b[H', # HOME
124 'O': '\x1b[F', # END
125 'R': '\x1b[2~', # INSERT
126 'S': '\x1b[3~', # DELETE
127 'I': '\x1b[5~', # PGUP
128 'Q': '\x1b[6~', # PGDN
129 }
130
131 def __init__(self):
132 super(Console, self).__init__()
133 self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP()
134 self._saved_icp = ctypes.windll.kernel32.GetConsoleCP()
135 ctypes.windll.kernel32.SetConsoleOutputCP(65001)
136 ctypes.windll.kernel32.SetConsoleCP(65001)
137 # ANSI handling available through SetConsoleMode since Windows 10 v1511
138 # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1
139 if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586:
140 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
141 import ctypes.wintypes as wintypes
142 if not hasattr(wintypes, 'LPDWORD'): # PY2
143 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
144 SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
145 GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
146 GetStdHandle = ctypes.windll.kernel32.GetStdHandle
147 mode = wintypes.DWORD()
148 GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode))
149 if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
150 SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
151 self._saved_cm = mode
152 self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace')
153 # the change of the code page is not propagated to Python, manually fix it
154 sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace')
155 sys.stdout = self.output
156 self.output.encoding = 'UTF-8' # needed for input
157
158 def __del__(self):
159 ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp)
160 ctypes.windll.kernel32.SetConsoleCP(self._saved_icp)
161 try:
162 ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm)
163 except AttributeError: # in case no _saved_cm
164 pass
165
166 def getkey(self):
167 while True:
168 z = msvcrt.getwch()
169 if z == unichr(13):
170 return unichr(10)
171 elif z is unichr(0) or z is unichr(0xe0):
172 try:
173 code = msvcrt.getwch()
174 if z is unichr(0):
175 return self.fncodes[code]
176 else:
177 return self.navcodes[code]
178 except KeyError:
179 pass
180 else:
181 return z
182
183 def cancel(self):
184 # CancelIo, CancelSynchronousIo do not seem to work when using
185 # getwch, so instead, send a key to the window with the console
186 hwnd = ctypes.windll.kernel32.GetConsoleWindow()
187 ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0)
188
189elif os.name == 'posix':
190 import atexit
191 import termios
192 import fcntl
193
194 class Console(ConsoleBase):
195 def __init__(self):
196 super(Console, self).__init__()
197 self.fd = sys.stdin.fileno()
198 self.old = termios.tcgetattr(self.fd)
199 atexit.register(self.cleanup)
200 if sys.version_info < (3, 0):
201 self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin)
202 else:
203 self.enc_stdin = sys.stdin
204
205 def setup(self):
206 new = termios.tcgetattr(self.fd)
207 new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
208 new[6][termios.VMIN] = 1
209 new[6][termios.VTIME] = 0
210 termios.tcsetattr(self.fd, termios.TCSANOW, new)
211
212 def getkey(self):
213 c = self.enc_stdin.read(1)
214 if c == unichr(0x7f):
215 c = unichr(8) # map the BS key (which yields DEL) to backspace
216 return c
217
218 def cancel(self):
219 fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0')
220
221 def cleanup(self):
222 termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
223
224else:
225 raise NotImplementedError(
226 'Sorry no implementation for your platform ({}) available.'.format(sys.platform))
227
228
229# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
230
231class Transform(object):
232 """do-nothing: forward all data unchanged"""
233 def rx(self, text):
234 """text received from serial port"""
235 return text
236
237 def tx(self, text):
238 """text to be sent to serial port"""
239 return text
240
241 def echo(self, text):
242 """text to be sent but displayed on console"""
243 return text
244
245
246class CRLF(Transform):
247 """ENTER sends CR+LF"""
248
249 def tx(self, text):
250 return text.replace('\n', '\r\n')
251
252
253class CR(Transform):
254 """ENTER sends CR"""
255
256 def rx(self, text):
257 return text.replace('\r', '\n')
258
259 def tx(self, text):
260 return text.replace('\n', '\r')
261
262
263class LF(Transform):
264 """ENTER sends LF"""
265
266
267class NoTerminal(Transform):
268 """remove typical terminal control codes from input"""
269
270 REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t')
271 REPLACEMENT_MAP.update(
272 {
273 0x7F: 0x2421, # DEL
274 0x9B: 0x2425, # CSI
275 })
276
277 def rx(self, text):
278 return text.translate(self.REPLACEMENT_MAP)
279
280 echo = rx
281
282
283class NoControls(NoTerminal):
284 """Remove all control codes, incl. CR+LF"""
285
286 REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32))
287 REPLACEMENT_MAP.update(
288 {
289 0x20: 0x2423, # visual space
290 0x7F: 0x2421, # DEL
291 0x9B: 0x2425, # CSI
292 })
293
294
295class Printable(Transform):
296 """Show decimal code for all non-ASCII characters and replace most control codes"""
297
298 def rx(self, text):
299 r = []
300 for c in text:
301 if ' ' <= c < '\x7f' or c in '\r\n\b\t':
302 r.append(c)
303 elif c < ' ':
304 r.append(unichr(0x2400 + ord(c)))
305 else:
306 r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c)))
307 r.append(' ')
308 return ''.join(r)
309
310 echo = rx
311
312
313class Colorize(Transform):
314 """Apply different colors for received and echo"""
315
316 def __init__(self):
317 # XXX make it configurable, use colorama?
318 self.input_color = '\x1b[37m'
319 self.echo_color = '\x1b[31m'
320
321 def rx(self, text):
322 return self.input_color + text
323
324 def echo(self, text):
325 return self.echo_color + text
326
327
328class DebugIO(Transform):
329 """Print what is sent and received"""
330
331 def rx(self, text):
332 sys.stderr.write(' [RX:{!r}] '.format(text))
333 sys.stderr.flush()
334 return text
335
336 def tx(self, text):
337 sys.stderr.write(' [TX:{!r}] '.format(text))
338 sys.stderr.flush()
339 return text
340
341
342# other ideas:
343# - add date/time for each newline
344# - insert newline after: a) timeout b) packet end character
345
346EOL_TRANSFORMATIONS = {
347 'crlf': CRLF,
348 'cr': CR,
349 'lf': LF,
350}
351
352TRANSFORMATIONS = {
353 'direct': Transform, # no transformation
354 'default': NoTerminal,
355 'nocontrol': NoControls,
356 'printable': Printable,
357 'colorize': Colorize,
358 'debug': DebugIO,
359}
360
361
362# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
363def ask_for_port():
364 """\
365 Show a list of ports and ask the user for a choice. To make selection
366 easier on systems with long device names, also allow the input of an
367 index.
368 """
369 sys.stderr.write('\n--- Available ports:\n')
370 ports = []
371 for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
372 sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
373 ports.append(port)
374 while True:
375 port = raw_input('--- Enter port index or full name: ')
376 try:
377 index = int(port) - 1
378 if not 0 <= index < len(ports):
379 sys.stderr.write('--- Invalid index!\n')
380 continue
381 except ValueError:
382 pass
383 else:
384 port = ports[index]
385 return port
386
387
388class Miniterm(object):
389 """\
390 Terminal application. Copy data from serial port to console and vice versa.
391 Handle special keys from the console to show menu etc.
392 """
393
394 def __init__(self, serial_instance, echo=False, eol='crlf', filters=()):
395 self.console = Console()
396 self.serial = serial_instance
397 self.echo = echo
398 self.raw = False
399 self.input_encoding = 'UTF-8'
400 self.output_encoding = 'UTF-8'
401 self.eol = eol
402 self.filters = filters
403 self.update_transformations()
404 self.exit_character = unichr(0x1d) # GS/CTRL+]
405 self.menu_character = unichr(0x14) # Menu: CTRL+T
406 self.alive = None
407 self._reader_alive = None
408 self.receiver_thread = None
409 self.rx_decoder = None
410 self.tx_decoder = None
411
412 def _start_reader(self):
413 """Start reader thread"""
414 self._reader_alive = True
415 # start serial->console thread
416 self.receiver_thread = threading.Thread(target=self.reader, name='rx')
417 self.receiver_thread.daemon = True
418 self.receiver_thread.start()
419
420 def _stop_reader(self):
421 """Stop reader thread only, wait for clean exit of thread"""
422 self._reader_alive = False
423 if hasattr(self.serial, 'cancel_read'):
424 self.serial.cancel_read()
425 self.receiver_thread.join()
426
427 def start(self):
428 """start worker threads"""
429 self.alive = True
430 self._start_reader()
431 # enter console->serial loop
432 self.transmitter_thread = threading.Thread(target=self.writer, name='tx')
433 self.transmitter_thread.daemon = True
434 self.transmitter_thread.start()
435 self.console.setup()
436
437 def stop(self):
438 """set flag to stop worker threads"""
439 self.alive = False
440
441 def join(self, transmit_only=False):
442 """wait for worker threads to terminate"""
443 self.transmitter_thread.join()
444 if not transmit_only:
445 if hasattr(self.serial, 'cancel_read'):
446 self.serial.cancel_read()
447 self.receiver_thread.join()
448
449 def close(self):
450 self.serial.close()
451
452 def update_transformations(self):
453 """take list of transformation classes and instantiate them for rx and tx"""
454 transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f]
455 for f in self.filters]
456 self.tx_transformations = [t() for t in transformations]
457 self.rx_transformations = list(reversed(self.tx_transformations))
458
459 def set_rx_encoding(self, encoding, errors='replace'):
460 """set encoding for received data"""
461 self.input_encoding = encoding
462 self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors)
463
464 def set_tx_encoding(self, encoding, errors='replace'):
465 """set encoding for transmitted data"""
466 self.output_encoding = encoding
467 self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
468
469 def dump_port_settings(self):
470 """Write current settings to sys.stderr"""
471 sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
472 p=self.serial))
473 sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
474 ('active' if self.serial.rts else 'inactive'),
475 ('active' if self.serial.dtr else 'inactive'),
476 ('active' if self.serial.break_condition else 'inactive')))
477 try:
478 sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
479 ('active' if self.serial.cts else 'inactive'),
480 ('active' if self.serial.dsr else 'inactive'),
481 ('active' if self.serial.ri else 'inactive'),
482 ('active' if self.serial.cd else 'inactive')))
483 except serial.SerialException:
484 # on RFC 2217 ports, it can happen if no modem state notification was
485 # yet received. ignore this error.
486 pass
487 sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
488 sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
489 sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
490 sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
491 sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
492 sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
493
494 def reader(self):
495 """loop and copy serial->console"""
496 try:
497 while self.alive and self._reader_alive:
498 # read all that is there or wait for one byte
499 data = self.serial.read(self.serial.in_waiting or 1)
500 if data:
501 if self.raw:
502 self.console.write_bytes(data)
503 else:
504 text = self.rx_decoder.decode(data)
505 for transformation in self.rx_transformations:
506 text = transformation.rx(text)
507 self.console.write(text)
508 except serial.SerialException:
509 self.alive = False
510 self.console.cancel()
511 raise # XXX handle instead of re-raise?
512
513 def writer(self):
514 """\
515 Loop and copy console->serial until self.exit_character character is
516 found. When self.menu_character is found, interpret the next key
517 locally.
518 """
519 menu_active = False
520 try:
521 while self.alive:
522 try:
523 c = self.console.getkey()
524 except KeyboardInterrupt:
525 c = '\x03'
526 if not self.alive:
527 break
528 if menu_active:
529 self.handle_menu_key(c)
530 menu_active = False
531 elif c == self.menu_character:
532 menu_active = True # next char will be for menu
533 elif c == self.exit_character:
534 self.stop() # exit app
535 break
536 else:
537 #~ if self.raw:
538 text = c
539 for transformation in self.tx_transformations:
540 text = transformation.tx(text)
541 self.serial.write(self.tx_encoder.encode(text))
542 if self.echo:
543 echo_text = c
544 for transformation in self.tx_transformations:
545 echo_text = transformation.echo(echo_text)
546 self.console.write(echo_text)
547 except:
548 self.alive = False
549 raise
550
551 def handle_menu_key(self, c):
552 """Implement a simple menu / settings"""
553 if c == self.menu_character or c == self.exit_character:
554 # Menu/exit character again -> send itself
555 self.serial.write(self.tx_encoder.encode(c))
556 if self.echo:
557 self.console.write(c)
558 elif c == '\x15': # CTRL+U -> upload file
559 self.upload_file()
560 elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
561 sys.stderr.write(self.get_help_text())
562 elif c == '\x12': # CTRL+R -> Toggle RTS
563 self.serial.rts = not self.serial.rts
564 sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive'))
565 elif c == '\x04': # CTRL+D -> Toggle DTR
566 self.serial.dtr = not self.serial.dtr
567 sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive'))
568 elif c == '\x02': # CTRL+B -> toggle BREAK condition
569 self.serial.break_condition = not self.serial.break_condition
570 sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive'))
571 elif c == '\x05': # CTRL+E -> toggle local echo
572 self.echo = not self.echo
573 sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
574 elif c == '\x06': # CTRL+F -> edit filters
575 self.change_filter()
576 elif c == '\x0c': # CTRL+L -> EOL mode
577 modes = list(EOL_TRANSFORMATIONS) # keys
578 eol = modes.index(self.eol) + 1
579 if eol >= len(modes):
580 eol = 0
581 self.eol = modes[eol]
582 sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
583 self.update_transformations()
584 elif c == '\x01': # CTRL+A -> set encoding
585 self.change_encoding()
586 elif c == '\x09': # CTRL+I -> info
587 self.dump_port_settings()
588 #~ elif c == '\x01': # CTRL+A -> cycle escape mode
589 #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
590 elif c in 'pP': # P -> change port
591 self.change_port()
592 elif c in 'zZ': # S -> suspend / open port temporarily
593 self.suspend_port()
594 elif c in 'bB': # B -> change baudrate
595 self.change_baudrate()
596 elif c == '8': # 8 -> change to 8 bits
597 self.serial.bytesize = serial.EIGHTBITS
598 self.dump_port_settings()
599 elif c == '7': # 7 -> change to 8 bits
600 self.serial.bytesize = serial.SEVENBITS
601 self.dump_port_settings()
602 elif c in 'eE': # E -> change to even parity
603 self.serial.parity = serial.PARITY_EVEN
604 self.dump_port_settings()
605 elif c in 'oO': # O -> change to odd parity
606 self.serial.parity = serial.PARITY_ODD
607 self.dump_port_settings()
608 elif c in 'mM': # M -> change to mark parity
609 self.serial.parity = serial.PARITY_MARK
610 self.dump_port_settings()
611 elif c in 'sS': # S -> change to space parity
612 self.serial.parity = serial.PARITY_SPACE
613 self.dump_port_settings()
614 elif c in 'nN': # N -> change to no parity
615 self.serial.parity = serial.PARITY_NONE
616 self.dump_port_settings()
617 elif c == '1': # 1 -> change to 1 stop bits
618 self.serial.stopbits = serial.STOPBITS_ONE
619 self.dump_port_settings()
620 elif c == '2': # 2 -> change to 2 stop bits
621 self.serial.stopbits = serial.STOPBITS_TWO
622 self.dump_port_settings()
623 elif c == '3': # 3 -> change to 1.5 stop bits
624 self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
625 self.dump_port_settings()
626 elif c in 'xX': # X -> change software flow control
627 self.serial.xonxoff = (c == 'X')
628 self.dump_port_settings()
629 elif c in 'rR': # R -> change hardware flow control
630 self.serial.rtscts = (c == 'R')
631 self.dump_port_settings()
632 elif c in 'qQ':
633 self.stop() # Q -> exit app
634 else:
635 sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
636
637 def upload_file(self):
638 """Ask user for filenname and send its contents"""
639 sys.stderr.write('\n--- File to upload: ')
640 sys.stderr.flush()
641 with self.console:
642 filename = sys.stdin.readline().rstrip('\r\n')
643 if filename:
644 try:
645 with open(filename, 'rb') as f:
646 sys.stderr.write('--- Sending file {} ---\n'.format(filename))
647 while True:
648 block = f.read(1024)
649 if not block:
650 break
651 self.serial.write(block)
652 # Wait for output buffer to drain.
653 self.serial.flush()
654 sys.stderr.write('.') # Progress indicator.
655 sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
656 except IOError as e:
657 sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
658
659 def change_filter(self):
660 """change the i/o transformations"""
661 sys.stderr.write('\n--- Available Filters:\n')
662 sys.stderr.write('\n'.join(
663 '--- {:<10} = {.__doc__}'.format(k, v)
664 for k, v in sorted(TRANSFORMATIONS.items())))
665 sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
666 with self.console:
667 new_filters = sys.stdin.readline().lower().split()
668 if new_filters:
669 for f in new_filters:
670 if f not in TRANSFORMATIONS:
671 sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
672 break
673 else:
674 self.filters = new_filters
675 self.update_transformations()
676 sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
677
678 def change_encoding(self):
679 """change encoding on the serial port"""
680 sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
681 with self.console:
682 new_encoding = sys.stdin.readline().strip()
683 if new_encoding:
684 try:
685 codecs.lookup(new_encoding)
686 except LookupError:
687 sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
688 else:
689 self.set_rx_encoding(new_encoding)
690 self.set_tx_encoding(new_encoding)
691 sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
692 sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
693
694 def change_baudrate(self):
695 """change the baudrate"""
696 sys.stderr.write('\n--- Baudrate: ')
697 sys.stderr.flush()
698 with self.console:
699 backup = self.serial.baudrate
700 try:
701 self.serial.baudrate = int(sys.stdin.readline().strip())
702 except ValueError as e:
703 sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
704 self.serial.baudrate = backup
705 else:
706 self.dump_port_settings()
707
708 def change_port(self):
709 """Have a conversation with the user to change the serial port"""
710 with self.console:
711 try:
712 port = ask_for_port()
713 except KeyboardInterrupt:
714 port = None
715 if port and port != self.serial.port:
716 # reader thread needs to be shut down
717 self._stop_reader()
718 # save settings
719 settings = self.serial.getSettingsDict()
720 try:
721 new_serial = serial.serial_for_url(port, do_not_open=True)
722 # restore settings and open
723 new_serial.applySettingsDict(settings)
724 new_serial.rts = self.serial.rts
725 new_serial.dtr = self.serial.dtr
726 new_serial.open()
727 new_serial.break_condition = self.serial.break_condition
728 except Exception as e:
729 sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
730 new_serial.close()
731 else:
732 self.serial.close()
733 self.serial = new_serial
734 sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
735 # and restart the reader thread
736 self._start_reader()
737
738 def suspend_port(self):
739 """\
740 open port temporarily, allow reconnect, exit and port change to get
741 out of the loop
742 """
743 # reader thread needs to be shut down
744 self._stop_reader()
745 self.serial.close()
746 sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
747 do_change_port = False
748 while not self.serial.is_open:
749 sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
750 exit=key_description(self.exit_character)))
751 k = self.console.getkey()
752 if k == self.exit_character:
753 self.stop() # exit app
754 break
755 elif k in 'pP':
756 do_change_port = True
757 break
758 try:
759 self.serial.open()
760 except Exception as e:
761 sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
762 if do_change_port:
763 self.change_port()
764 else:
765 # and restart the reader thread
766 self._start_reader()
767 sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
768
769 def get_help_text(self):
770 """return the help text"""
771 # help text, starts with blank line!
772 return """
773--- pySerial ({version}) - miniterm - help
774---
775--- {exit:8} Exit program (alias {menu} Q)
776--- {menu:8} Menu escape key, followed by:
777--- Menu keys:
778--- {menu:7} Send the menu character itself to remote
779--- {exit:7} Send the exit character itself to remote
780--- {info:7} Show info
781--- {upload:7} Upload file (prompt will be shown)
782--- {repr:7} encoding
783--- {filter:7} edit filters
784--- Toggles:
785--- {rts:7} RTS {dtr:7} DTR {brk:7} BREAK
786--- {echo:7} echo {eol:7} EOL
787---
788--- Port settings ({menu} followed by the following):
789--- p change port
790--- 7 8 set data bits
791--- N E O S M change parity (None, Even, Odd, Space, Mark)
792--- 1 2 3 set stop bits (1, 2, 1.5)
793--- b change baud rate
794--- x X disable/enable software flow control
795--- r R disable/enable hardware flow control
796""".format(version=getattr(serial, 'VERSION', 'unknown version'),
797 exit=key_description(self.exit_character),
798 menu=key_description(self.menu_character),
799 rts=key_description('\x12'),
800 dtr=key_description('\x04'),
801 brk=key_description('\x02'),
802 echo=key_description('\x05'),
803 info=key_description('\x09'),
804 upload=key_description('\x15'),
805 repr=key_description('\x01'),
806 filter=key_description('\x06'),
807 eol=key_description('\x0c'))
808
809
810# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
811# default args can be used to override when calling main() from an other script
812# e.g to create a miniterm-my-device.py
813def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None):
814 """Command line tool, entry point"""
815
816 import argparse
817
818 parser = argparse.ArgumentParser(
819 description='Miniterm - A simple terminal program for the serial port.')
820
821 parser.add_argument(
822 'port',
823 nargs='?',
824 help='serial port name ("-" to show port list)',
825 default=default_port)
826
827 parser.add_argument(
828 'baudrate',
829 nargs='?',
830 type=int,
831 help='set baud rate, default: %(default)s',
832 default=default_baudrate)
833
834 group = parser.add_argument_group('port settings')
835
836 group.add_argument(
837 '--parity',
838 choices=['N', 'E', 'O', 'S', 'M'],
839 type=lambda c: c.upper(),
840 help='set parity, one of {N E O S M}, default: N',
841 default='N')
842
843 group.add_argument(
844 '--rtscts',
845 action='store_true',
846 help='enable RTS/CTS flow control (default off)',
847 default=False)
848
849 group.add_argument(
850 '--xonxoff',
851 action='store_true',
852 help='enable software flow control (default off)',
853 default=False)
854
855 group.add_argument(
856 '--rts',
857 type=int,
858 help='set initial RTS line state (possible values: 0, 1)',
859 default=default_rts)
860
861 group.add_argument(
862 '--dtr',
863 type=int,
864 help='set initial DTR line state (possible values: 0, 1)',
865 default=default_dtr)
866
867 group.add_argument(
868 '--non-exclusive',
869 dest='exclusive',
870 action='store_false',
871 help='disable locking for native ports',
872 default=True)
873
874 group.add_argument(
875 '--ask',
876 action='store_true',
877 help='ask again for port when open fails',
878 default=False)
879
880 group = parser.add_argument_group('data handling')
881
882 group.add_argument(
883 '-e', '--echo',
884 action='store_true',
885 help='enable local echo (default off)',
886 default=False)
887
888 group.add_argument(
889 '--encoding',
890 dest='serial_port_encoding',
891 metavar='CODEC',
892 help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
893 default='UTF-8')
894
895 group.add_argument(
896 '-f', '--filter',
897 action='append',
898 metavar='NAME',
899 help='add text transformation',
900 default=[])
901
902 group.add_argument(
903 '--eol',
904 choices=['CR', 'LF', 'CRLF'],
905 type=lambda c: c.upper(),
906 help='end of line mode',
907 default='CRLF')
908
909 group.add_argument(
910 '--raw',
911 action='store_true',
912 help='Do no apply any encodings/transformations',
913 default=False)
914
915 group = parser.add_argument_group('hotkeys')
916
917 group.add_argument(
918 '--exit-char',
919 type=int,
920 metavar='NUM',
921 help='Unicode of special character that is used to exit the application, default: %(default)s',
922 default=0x1d) # GS/CTRL+]
923
924 group.add_argument(
925 '--menu-char',
926 type=int,
927 metavar='NUM',
928 help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
929 default=0x14) # Menu: CTRL+T
930
931 group = parser.add_argument_group('diagnostics')
932
933 group.add_argument(
934 '-q', '--quiet',
935 action='store_true',
936 help='suppress non-error messages',
937 default=False)
938
939 group.add_argument(
940 '--develop',
941 action='store_true',
942 help='show Python traceback on error',
943 default=False)
944
945 args = parser.parse_args()
946
947 if args.menu_char == args.exit_char:
948 parser.error('--exit-char can not be the same as --menu-char')
949
950 if args.filter:
951 if 'help' in args.filter:
952 sys.stderr.write('Available filters:\n')
953 sys.stderr.write('\n'.join(
954 '{:<10} = {.__doc__}'.format(k, v)
955 for k, v in sorted(TRANSFORMATIONS.items())))
956 sys.stderr.write('\n')
957 sys.exit(1)
958 filters = args.filter
959 else:
960 filters = ['default']
961
962 while True:
963 # no port given on command line -> ask user now
964 if args.port is None or args.port == '-':
965 try:
966 args.port = ask_for_port()
967 except KeyboardInterrupt:
968 sys.stderr.write('\n')
969 parser.error('user aborted and port is not given')
970 else:
971 if not args.port:
972 parser.error('port is not given')
973 try:
974 serial_instance = serial.serial_for_url(
975 args.port,
976 args.baudrate,
977 parity=args.parity,
978 rtscts=args.rtscts,
979 xonxoff=args.xonxoff,
980 do_not_open=True)
981
982 if not hasattr(serial_instance, 'cancel_read'):
983 # enable timeout for alive flag polling if cancel_read is not available
984 serial_instance.timeout = 1
985
986 if args.dtr is not None:
987 if not args.quiet:
988 sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive'))
989 serial_instance.dtr = args.dtr
990 if args.rts is not None:
991 if not args.quiet:
992 sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
993 serial_instance.rts = args.rts
994
995 if isinstance(serial_instance, serial.Serial):
996 serial_instance.exclusive = args.exclusive
997
998 serial_instance.open()
999 except serial.SerialException as e:
1000 sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
1001 if args.develop:
1002 raise
1003 if not args.ask:
1004 sys.exit(1)
1005 else:
1006 args.port = '-'
1007 else:
1008 break
1009
1010 miniterm = Miniterm(
1011 serial_instance,
1012 echo=args.echo,
1013 eol=args.eol.lower(),
1014 filters=filters)
1015 miniterm.exit_character = unichr(args.exit_char)
1016 miniterm.menu_character = unichr(args.menu_char)
1017 miniterm.raw = args.raw
1018 miniterm.set_rx_encoding(args.serial_port_encoding)
1019 miniterm.set_tx_encoding(args.serial_port_encoding)
1020
1021 if not args.quiet:
1022 sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format(
1023 p=miniterm.serial))
1024 sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format(
1025 key_description(miniterm.exit_character),
1026 key_description(miniterm.menu_character),
1027 key_description(miniterm.menu_character),
1028 key_description('\x08')))
1029
1030 miniterm.start()
1031 try:
1032 miniterm.join(True)
1033 except KeyboardInterrupt:
1034 pass
1035 if not args.quiet:
1036 sys.stderr.write('\n--- exit ---\n')
1037 miniterm.join()
1038 miniterm.close()
1039
1040# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1041if __name__ == '__main__':
1042 main()