blob: c79cd7b7b226afa0893b0e221b89a7ed70b46f09 [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001#! python
2#
3# Backend for .NET/Mono (IronPython), .NET >= 2
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C) 2008-2015 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier: BSD-3-Clause
9
10import clr
11import System
12import System.IO.Ports
13from pyserial.serialutil import *
14
15
16#~ def device(portnum):
17 #~ """Turn a port number into a device name"""
18 #~ return System.IO.Ports.SerialPort.GetPortNames()[portnum]
19
20
21# must invoke function with byte array, make a helper to convert strings
22# to byte arrays
23sab = System.Array[System.Byte]
24def as_byte_array(string):
25 return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython
26
27class Serial(SerialBase):
28 """Serial port implementation for .NET/Mono."""
29
30 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
31 9600, 19200, 38400, 57600, 115200)
32
33 def open(self):
34 """\
35 Open port with current settings. This may throw a SerialException
36 if the port cannot be opened.
37 """
38 if self._port is None:
39 raise SerialException("Port must be configured before it can be used.")
40 if self.is_open:
41 raise SerialException("Port is already open.")
42 try:
43 self._port_handle = System.IO.Ports.SerialPort(self.portstr)
44 except Exception as msg:
45 self._port_handle = None
46 raise SerialException("could not open port %s: %s" % (self.portstr, msg))
47
48 self._reconfigurePort()
49 self._port_handle.Open()
50 self.is_open = True
51 if not self._dsrdtr:
52 self._update_dtr_state()
53 if not self._rtscts:
54 self._update_rts_state()
55 self.reset_input_buffer()
56
57 def _reconfigurePort(self):
58 """Set communication parameters on opened port."""
59 if not self._port_handle:
60 raise SerialException("Can only operate on a valid port handle")
61
62 #~ self._port_handle.ReceivedBytesThreshold = 1
63
64 if self._timeout is None:
65 self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
66 else:
67 self._port_handle.ReadTimeout = int(self._timeout*1000)
68
69 # if self._timeout != 0 and self._interCharTimeout is not None:
70 # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
71
72 if self._write_timeout is None:
73 self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
74 else:
75 self._port_handle.WriteTimeout = int(self._write_timeout*1000)
76
77
78 # Setup the connection info.
79 try:
80 self._port_handle.BaudRate = self._baudrate
81 except IOError as e:
82 # catch errors from illegal baudrate settings
83 raise ValueError(str(e))
84
85 if self._bytesize == FIVEBITS:
86 self._port_handle.DataBits = 5
87 elif self._bytesize == SIXBITS:
88 self._port_handle.DataBits = 6
89 elif self._bytesize == SEVENBITS:
90 self._port_handle.DataBits = 7
91 elif self._bytesize == EIGHTBITS:
92 self._port_handle.DataBits = 8
93 else:
94 raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
95
96 if self._parity == PARITY_NONE:
97 self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
98 elif self._parity == PARITY_EVEN:
99 self._port_handle.Parity = System.IO.Ports.Parity.Even
100 elif self._parity == PARITY_ODD:
101 self._port_handle.Parity = System.IO.Ports.Parity.Odd
102 elif self._parity == PARITY_MARK:
103 self._port_handle.Parity = System.IO.Ports.Parity.Mark
104 elif self._parity == PARITY_SPACE:
105 self._port_handle.Parity = System.IO.Ports.Parity.Space
106 else:
107 raise ValueError("Unsupported parity mode: %r" % self._parity)
108
109 if self._stopbits == STOPBITS_ONE:
110 self._port_handle.StopBits = System.IO.Ports.StopBits.One
111 elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
112 self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
113 elif self._stopbits == STOPBITS_TWO:
114 self._port_handle.StopBits = System.IO.Ports.StopBits.Two
115 else:
116 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
117
118 if self._rtscts and self._xonxoff:
119 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
120 elif self._rtscts:
121 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
122 elif self._xonxoff:
123 self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
124 else:
125 self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k
126
127 #~ def __del__(self):
128 #~ self.close()
129
130 def close(self):
131 """Close port"""
132 if self.is_open:
133 if self._port_handle:
134 try:
135 self._port_handle.Close()
136 except System.IO.Ports.InvalidOperationException:
137 # ignore errors. can happen for unplugged USB serial devices
138 pass
139 self._port_handle = None
140 self.is_open = False
141
142 # - - - - - - - - - - - - - - - - - - - - - - - -
143
144 @property
145 def in_waiting(self):
146 """Return the number of characters currently in the input buffer."""
147 if not self._port_handle:
148 raise portNotOpenError
149 return self._port_handle.BytesToRead
150
151 def read(self, size=1):
152 """\
153 Read size bytes from the serial port. If a timeout is set it may
154 return less characters as requested. With no timeout it will block
155 until the requested number of bytes is read.
156 """
157 if not self._port_handle:
158 raise portNotOpenError
159 # must use single byte reads as this is the only way to read
160 # without applying encodings
161 data = bytearray()
162 while size:
163 try:
164 data.append(self._port_handle.ReadByte())
165 except System.TimeoutException as e:
166 break
167 else:
168 size -= 1
169 return bytes(data)
170
171 def write(self, data):
172 """Output the given string over the serial port."""
173 if not self._port_handle:
174 raise portNotOpenError
175 #~ if not isinstance(data, (bytes, bytearray)):
176 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
177 try:
178 # must call overloaded method with byte array argument
179 # as this is the only one not applying encodings
180 self._port_handle.Write(as_byte_array(data), 0, len(data))
181 except System.TimeoutException as e:
182 raise writeTimeoutError
183 return len(data)
184
185 def reset_input_buffer(self):
186 """Clear input buffer, discarding all that is in the buffer."""
187 if not self._port_handle:
188 raise portNotOpenError
189 self._port_handle.DiscardInBuffer()
190
191 def reset_output_buffer(self):
192 """\
193 Clear output buffer, aborting the current output and
194 discarding all that is in the buffer.
195 """
196 if not self._port_handle:
197 raise portNotOpenError
198 self._port_handle.DiscardOutBuffer()
199
200 def _update_break_state(self):
201 """
202 Set break: Controls TXD. When active, to transmitting is possible.
203 """
204 if not self._port_handle:
205 raise portNotOpenError
206 self._port_handle.BreakState = bool(self._break_state)
207
208 def _update_rts_state(self):
209 """Set terminal status line: Request To Send"""
210 if not self._port_handle:
211 raise portNotOpenError
212 self._port_handle.RtsEnable = bool(self._rts_state)
213
214 def _update_dtr_state(self):
215 """Set terminal status line: Data Terminal Ready"""
216 if not self._port_handle:
217 raise portNotOpenError
218 self._port_handle.DtrEnable = bool(self._dtr_state)
219
220 @property
221 def cts(self):
222 """Read terminal status line: Clear To Send"""
223 if not self._port_handle:
224 raise portNotOpenError
225 return self._port_handle.CtsHolding
226
227 @property
228 def dsr(self):
229 """Read terminal status line: Data Set Ready"""
230 if not self._port_handle:
231 raise portNotOpenError
232 return self._port_handle.DsrHolding
233
234 @property
235 def ri(self):
236 """Read terminal status line: Ring Indicator"""
237 if not self._port_handle:
238 raise portNotOpenError
239 #~ return self._port_handle.XXX
240 return False #XXX an error would be better
241
242 @property
243 def cd(self):
244 """Read terminal status line: Carrier Detect"""
245 if not self._port_handle:
246 raise portNotOpenError
247 return self._port_handle.CDHolding
248
249 # - - platform specific - - - -
250 # none
251
252
253# Nur Testfunktion!!
254if __name__ == '__main__':
255 import sys
256
257 s = Serial(0)
258 sys.stdio.write('%s\n' % s)
259
260 s = Serial()
261 sys.stdio.write('%s\n' % s)
262
263
264 s.baudrate = 19200
265 s.databits = 7
266 s.close()
267 s.port = 0
268 s.open()
269 sys.stdio.write('%s\n' % s)
270