| rjw | 2e8229f | 2022-02-15 21:08:12 +0800 | [diff] [blame] | 1 | #!/usr/bin/env python | 
|  | 2 |  | 
|  | 3 | # RS485 support | 
|  | 4 | # | 
|  | 5 | # This file is part of pySerial. https://github.com/pyserial/pyserial | 
|  | 6 | # (C) 2015 Chris Liechti <cliechti@gmx.net> | 
|  | 7 | # | 
|  | 8 | # SPDX-License-Identifier:    BSD-3-Clause | 
|  | 9 |  | 
|  | 10 | """\ | 
|  | 11 | The settings for RS485 are stored in a dedicated object that can be applied to | 
|  | 12 | serial ports (where supported). | 
|  | 13 | NOTE: Some implementations may only support a subset of the settings. | 
|  | 14 | """ | 
|  | 15 |  | 
|  | 16 | from __future__ import absolute_import | 
|  | 17 |  | 
|  | 18 | import time | 
|  | 19 | import serial | 
|  | 20 |  | 
|  | 21 |  | 
|  | 22 | class RS485Settings(object): | 
|  | 23 | def __init__( | 
|  | 24 | self, | 
|  | 25 | rts_level_for_tx=True, | 
|  | 26 | rts_level_for_rx=False, | 
|  | 27 | loopback=False, | 
|  | 28 | delay_before_tx=None, | 
|  | 29 | delay_before_rx=None): | 
|  | 30 | self.rts_level_for_tx = rts_level_for_tx | 
|  | 31 | self.rts_level_for_rx = rts_level_for_rx | 
|  | 32 | self.loopback = loopback | 
|  | 33 | self.delay_before_tx = delay_before_tx | 
|  | 34 | self.delay_before_rx = delay_before_rx | 
|  | 35 |  | 
|  | 36 |  | 
|  | 37 | class RS485(serial.Serial): | 
|  | 38 | """\ | 
|  | 39 | A subclass that replaces the write method with one that toggles RTS | 
|  | 40 | according to the RS485 settings. | 
|  | 41 |  | 
|  | 42 | NOTE: This may work unreliably on some serial ports (control signals not | 
|  | 43 | synchronized or delayed compared to data). Using delays may be | 
|  | 44 | unreliable (varying times, larger than expected) as the OS may not | 
|  | 45 | support very fine grained delays (no smaller than in the order of | 
|  | 46 | tens of milliseconds). | 
|  | 47 |  | 
|  | 48 | NOTE: Some implementations support this natively. Better performance | 
|  | 49 | can be expected when the native version is used. | 
|  | 50 |  | 
|  | 51 | NOTE: The loopback property is ignored by this implementation. The actual | 
|  | 52 | behavior depends on the used hardware. | 
|  | 53 |  | 
|  | 54 | Usage: | 
|  | 55 |  | 
|  | 56 | ser = RS485(...) | 
|  | 57 | ser.rs485_mode = RS485Settings(...) | 
|  | 58 | ser.write(b'hello') | 
|  | 59 | """ | 
|  | 60 |  | 
|  | 61 | def __init__(self, *args, **kwargs): | 
|  | 62 | super(RS485, self).__init__(*args, **kwargs) | 
|  | 63 | self._alternate_rs485_settings = None | 
|  | 64 |  | 
|  | 65 | def write(self, b): | 
|  | 66 | """Write to port, controlling RTS before and after transmitting.""" | 
|  | 67 | if self._alternate_rs485_settings is not None: | 
|  | 68 | # apply level for TX and optional delay | 
|  | 69 | self.setRTS(self._alternate_rs485_settings.rts_level_for_tx) | 
|  | 70 | if self._alternate_rs485_settings.delay_before_tx is not None: | 
|  | 71 | time.sleep(self._alternate_rs485_settings.delay_before_tx) | 
|  | 72 | # write and wait for data to be written | 
|  | 73 | super(RS485, self).write(b) | 
|  | 74 | super(RS485, self).flush() | 
|  | 75 | # optional delay and apply level for RX | 
|  | 76 | if self._alternate_rs485_settings.delay_before_rx is not None: | 
|  | 77 | time.sleep(self._alternate_rs485_settings.delay_before_rx) | 
|  | 78 | self.setRTS(self._alternate_rs485_settings.rts_level_for_rx) | 
|  | 79 | else: | 
|  | 80 | super(RS485, self).write(b) | 
|  | 81 |  | 
|  | 82 | # redirect where the property stores the settings so that underlying Serial | 
|  | 83 | # instance does not see them | 
|  | 84 | @property | 
|  | 85 | def rs485_mode(self): | 
|  | 86 | """\ | 
|  | 87 | Enable RS485 mode and apply new settings, set to None to disable. | 
|  | 88 | See serial.rs485.RS485Settings for more info about the value. | 
|  | 89 | """ | 
|  | 90 | return self._alternate_rs485_settings | 
|  | 91 |  | 
|  | 92 | @rs485_mode.setter | 
|  | 93 | def rs485_mode(self, rs485_settings): | 
|  | 94 | self._alternate_rs485_settings = rs485_settings |