blob: e7da929ada8351455941808f0c801123c6623c52 [file] [log] [blame]
rjw2e8229f2022-02-15 21:08:12 +08001#! python
2#
3# backend for Windows ("win32" incl. 32/64 bit support)
4#
5# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
6#
7# This file is part of pySerial. https://github.com/pyserial/pyserial
8# SPDX-License-Identifier: BSD-3-Clause
9#
10# Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
11
12from __future__ import absolute_import
13
14# pylint: disable=invalid-name,too-few-public-methods
15import ctypes
16import time
17from serial import win32
18
19import serial
20from serial.serialutil import SerialBase, SerialException, to_bytes, PortNotOpenError, SerialTimeoutException
21
22
23class Serial(SerialBase):
24 """Serial port implementation for Win32 based on ctypes."""
25
26 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
27 9600, 19200, 38400, 57600, 115200)
28
29 def __init__(self, *args, **kwargs):
30 self._port_handle = None
31 self._overlapped_read = None
32 self._overlapped_write = None
33 super(Serial, self).__init__(*args, **kwargs)
34
35 def open(self):
36 """\
37 Open port with current settings. This may throw a SerialException
38 if the port cannot be opened.
39 """
40 if self._port is None:
41 raise SerialException("Port must be configured before it can be used.")
42 if self.is_open:
43 raise SerialException("Port is already open.")
44 # the "\\.\COMx" format is required for devices other than COM1-COM8
45 # not all versions of windows seem to support this properly
46 # so that the first few ports are used with the DOS device name
47 port = self.name
48 try:
49 if port.upper().startswith('COM') and int(port[3:]) > 8:
50 port = '\\\\.\\' + port
51 except ValueError:
52 # for like COMnotanumber
53 pass
54 self._port_handle = win32.CreateFile(
55 port,
56 win32.GENERIC_READ | win32.GENERIC_WRITE,
57 0, # exclusive access
58 None, # no security
59 win32.OPEN_EXISTING,
60 win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
61 0)
62 if self._port_handle == win32.INVALID_HANDLE_VALUE:
63 self._port_handle = None # 'cause __del__ is called anyway
64 raise SerialException("could not open port {!r}: {!r}".format(self.portstr, ctypes.WinError()))
65
66 try:
67 self._overlapped_read = win32.OVERLAPPED()
68 self._overlapped_read.hEvent = win32.CreateEvent(None, 1, 0, None)
69 self._overlapped_write = win32.OVERLAPPED()
70 #~ self._overlapped_write.hEvent = win32.CreateEvent(None, 1, 0, None)
71 self._overlapped_write.hEvent = win32.CreateEvent(None, 0, 0, None)
72
73 # Setup a 4k buffer
74 win32.SetupComm(self._port_handle, 4096, 4096)
75
76 # Save original timeout values:
77 self._orgTimeouts = win32.COMMTIMEOUTS()
78 win32.GetCommTimeouts(self._port_handle, ctypes.byref(self._orgTimeouts))
79
80 self._reconfigure_port()
81
82 # Clear buffers:
83 # Remove anything that was there
84 win32.PurgeComm(
85 self._port_handle,
86 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
87 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
88 except:
89 try:
90 self._close()
91 except:
92 # ignore any exception when closing the port
93 # also to keep original exception that happened when setting up
94 pass
95 self._port_handle = None
96 raise
97 else:
98 self.is_open = True
99
100 def _reconfigure_port(self):
101 """Set communication parameters on opened port."""
102 if not self._port_handle:
103 raise SerialException("Can only operate on a valid port handle")
104
105 # Set Windows timeout values
106 # timeouts is a tuple with the following items:
107 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
108 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
109 # WriteTotalTimeoutConstant)
110 timeouts = win32.COMMTIMEOUTS()
111 if self._timeout is None:
112 pass # default of all zeros is OK
113 elif self._timeout == 0:
114 timeouts.ReadIntervalTimeout = win32.MAXDWORD
115 else:
116 timeouts.ReadTotalTimeoutConstant = max(int(self._timeout * 1000), 1)
117 if self._timeout != 0 and self._inter_byte_timeout is not None:
118 timeouts.ReadIntervalTimeout = max(int(self._inter_byte_timeout * 1000), 1)
119
120 if self._write_timeout is None:
121 pass
122 elif self._write_timeout == 0:
123 timeouts.WriteTotalTimeoutConstant = win32.MAXDWORD
124 else:
125 timeouts.WriteTotalTimeoutConstant = max(int(self._write_timeout * 1000), 1)
126 win32.SetCommTimeouts(self._port_handle, ctypes.byref(timeouts))
127
128 win32.SetCommMask(self._port_handle, win32.EV_ERR)
129
130 # Setup the connection info.
131 # Get state and modify it:
132 comDCB = win32.DCB()
133 win32.GetCommState(self._port_handle, ctypes.byref(comDCB))
134 comDCB.BaudRate = self._baudrate
135
136 if self._bytesize == serial.FIVEBITS:
137 comDCB.ByteSize = 5
138 elif self._bytesize == serial.SIXBITS:
139 comDCB.ByteSize = 6
140 elif self._bytesize == serial.SEVENBITS:
141 comDCB.ByteSize = 7
142 elif self._bytesize == serial.EIGHTBITS:
143 comDCB.ByteSize = 8
144 else:
145 raise ValueError("Unsupported number of data bits: {!r}".format(self._bytesize))
146
147 if self._parity == serial.PARITY_NONE:
148 comDCB.Parity = win32.NOPARITY
149 comDCB.fParity = 0 # Disable Parity Check
150 elif self._parity == serial.PARITY_EVEN:
151 comDCB.Parity = win32.EVENPARITY
152 comDCB.fParity = 1 # Enable Parity Check
153 elif self._parity == serial.PARITY_ODD:
154 comDCB.Parity = win32.ODDPARITY
155 comDCB.fParity = 1 # Enable Parity Check
156 elif self._parity == serial.PARITY_MARK:
157 comDCB.Parity = win32.MARKPARITY
158 comDCB.fParity = 1 # Enable Parity Check
159 elif self._parity == serial.PARITY_SPACE:
160 comDCB.Parity = win32.SPACEPARITY
161 comDCB.fParity = 1 # Enable Parity Check
162 else:
163 raise ValueError("Unsupported parity mode: {!r}".format(self._parity))
164
165 if self._stopbits == serial.STOPBITS_ONE:
166 comDCB.StopBits = win32.ONESTOPBIT
167 elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
168 comDCB.StopBits = win32.ONE5STOPBITS
169 elif self._stopbits == serial.STOPBITS_TWO:
170 comDCB.StopBits = win32.TWOSTOPBITS
171 else:
172 raise ValueError("Unsupported number of stop bits: {!r}".format(self._stopbits))
173
174 comDCB.fBinary = 1 # Enable Binary Transmission
175 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
176 if self._rs485_mode is None:
177 if self._rtscts:
178 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
179 else:
180 comDCB.fRtsControl = win32.RTS_CONTROL_ENABLE if self._rts_state else win32.RTS_CONTROL_DISABLE
181 comDCB.fOutxCtsFlow = self._rtscts
182 else:
183 # checks for unsupported settings
184 # XXX verify if platform really does not have a setting for those
185 if not self._rs485_mode.rts_level_for_tx:
186 raise ValueError(
187 'Unsupported value for RS485Settings.rts_level_for_tx: {!r} (only True is allowed)'.format(
188 self._rs485_mode.rts_level_for_tx,))
189 if self._rs485_mode.rts_level_for_rx:
190 raise ValueError(
191 'Unsupported value for RS485Settings.rts_level_for_rx: {!r} (only False is allowed)'.format(
192 self._rs485_mode.rts_level_for_rx,))
193 if self._rs485_mode.delay_before_tx is not None:
194 raise ValueError(
195 'Unsupported value for RS485Settings.delay_before_tx: {!r} (only None is allowed)'.format(
196 self._rs485_mode.delay_before_tx,))
197 if self._rs485_mode.delay_before_rx is not None:
198 raise ValueError(
199 'Unsupported value for RS485Settings.delay_before_rx: {!r} (only None is allowed)'.format(
200 self._rs485_mode.delay_before_rx,))
201 if self._rs485_mode.loopback:
202 raise ValueError(
203 'Unsupported value for RS485Settings.loopback: {!r} (only False is allowed)'.format(
204 self._rs485_mode.loopback,))
205 comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
206 comDCB.fOutxCtsFlow = 0
207
208 if self._dsrdtr:
209 comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
210 else:
211 comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE
212 comDCB.fOutxDsrFlow = self._dsrdtr
213 comDCB.fOutX = self._xonxoff
214 comDCB.fInX = self._xonxoff
215 comDCB.fNull = 0
216 comDCB.fErrorChar = 0
217 comDCB.fAbortOnError = 0
218 comDCB.XonChar = serial.XON
219 comDCB.XoffChar = serial.XOFF
220
221 if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)):
222 raise SerialException(
223 'Cannot configure port, something went wrong. '
224 'Original message: {!r}'.format(ctypes.WinError()))
225
226 #~ def __del__(self):
227 #~ self.close()
228
229 def _close(self):
230 """internal close port helper"""
231 if self._port_handle is not None:
232 # Restore original timeout values:
233 win32.SetCommTimeouts(self._port_handle, self._orgTimeouts)
234 if self._overlapped_read is not None:
235 self.cancel_read()
236 win32.CloseHandle(self._overlapped_read.hEvent)
237 self._overlapped_read = None
238 if self._overlapped_write is not None:
239 self.cancel_write()
240 win32.CloseHandle(self._overlapped_write.hEvent)
241 self._overlapped_write = None
242 win32.CloseHandle(self._port_handle)
243 self._port_handle = None
244
245 def close(self):
246 """Close port"""
247 if self.is_open:
248 self._close()
249 self.is_open = False
250
251 # - - - - - - - - - - - - - - - - - - - - - - - -
252
253 @property
254 def in_waiting(self):
255 """Return the number of bytes currently in the input buffer."""
256 flags = win32.DWORD()
257 comstat = win32.COMSTAT()
258 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
259 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
260 return comstat.cbInQue
261
262 def read(self, size=1):
263 """\
264 Read size bytes from the serial port. If a timeout is set it may
265 return less characters as requested. With no timeout it will block
266 until the requested number of bytes is read.
267 """
268 if not self.is_open:
269 raise PortNotOpenError()
270 if size > 0:
271 win32.ResetEvent(self._overlapped_read.hEvent)
272 flags = win32.DWORD()
273 comstat = win32.COMSTAT()
274 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
275 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
276 n = min(comstat.cbInQue, size) if self.timeout == 0 else size
277 if n > 0:
278 buf = ctypes.create_string_buffer(n)
279 rc = win32.DWORD()
280 read_ok = win32.ReadFile(
281 self._port_handle,
282 buf,
283 n,
284 ctypes.byref(rc),
285 ctypes.byref(self._overlapped_read))
286 if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
287 raise SerialException("ReadFile failed ({!r})".format(ctypes.WinError()))
288 result_ok = win32.GetOverlappedResult(
289 self._port_handle,
290 ctypes.byref(self._overlapped_read),
291 ctypes.byref(rc),
292 True)
293 if not result_ok:
294 if win32.GetLastError() != win32.ERROR_OPERATION_ABORTED:
295 raise SerialException("GetOverlappedResult failed ({!r})".format(ctypes.WinError()))
296 read = buf.raw[:rc.value]
297 else:
298 read = bytes()
299 else:
300 read = bytes()
301 return bytes(read)
302
303 def write(self, data):
304 """Output the given byte string over the serial port."""
305 if not self.is_open:
306 raise PortNotOpenError()
307 #~ if not isinstance(data, (bytes, bytearray)):
308 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
309 # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
310 data = to_bytes(data)
311 if data:
312 #~ win32event.ResetEvent(self._overlapped_write.hEvent)
313 n = win32.DWORD()
314 success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
315 if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0)
316 if not success and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
317 raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
318
319 # Wait for the write to complete.
320 #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE)
321 win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
322 if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED:
323 return n.value # canceled IO is no error
324 if n.value != len(data):
325 raise SerialTimeoutException('Write timeout')
326 return n.value
327 else:
328 errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError()
329 if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY,
330 win32.ERROR_OPERATION_ABORTED):
331 return 0
332 elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
333 # no info on true length provided by OS function in async mode
334 return len(data)
335 else:
336 raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
337 else:
338 return 0
339
340 def flush(self):
341 """\
342 Flush of file like objects. In this case, wait until all data
343 is written.
344 """
345 while self.out_waiting:
346 time.sleep(0.05)
347 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
348 # require overlapped IO and it's also only possible to set a single mask
349 # on the port---
350
351 def reset_input_buffer(self):
352 """Clear input buffer, discarding all that is in the buffer."""
353 if not self.is_open:
354 raise PortNotOpenError()
355 win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
356
357 def reset_output_buffer(self):
358 """\
359 Clear output buffer, aborting the current output and discarding all
360 that is in the buffer.
361 """
362 if not self.is_open:
363 raise PortNotOpenError()
364 win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
365
366 def _update_break_state(self):
367 """Set break: Controls TXD. When active, to transmitting is possible."""
368 if not self.is_open:
369 raise PortNotOpenError()
370 if self._break_state:
371 win32.SetCommBreak(self._port_handle)
372 else:
373 win32.ClearCommBreak(self._port_handle)
374
375 def _update_rts_state(self):
376 """Set terminal status line: Request To Send"""
377 if self._rts_state:
378 win32.EscapeCommFunction(self._port_handle, win32.SETRTS)
379 else:
380 win32.EscapeCommFunction(self._port_handle, win32.CLRRTS)
381
382 def _update_dtr_state(self):
383 """Set terminal status line: Data Terminal Ready"""
384 if self._dtr_state:
385 win32.EscapeCommFunction(self._port_handle, win32.SETDTR)
386 else:
387 win32.EscapeCommFunction(self._port_handle, win32.CLRDTR)
388
389 def _GetCommModemStatus(self):
390 if not self.is_open:
391 raise PortNotOpenError()
392 stat = win32.DWORD()
393 win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat))
394 return stat.value
395
396 @property
397 def cts(self):
398 """Read terminal status line: Clear To Send"""
399 return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
400
401 @property
402 def dsr(self):
403 """Read terminal status line: Data Set Ready"""
404 return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
405
406 @property
407 def ri(self):
408 """Read terminal status line: Ring Indicator"""
409 return win32.MS_RING_ON & self._GetCommModemStatus() != 0
410
411 @property
412 def cd(self):
413 """Read terminal status line: Carrier Detect"""
414 return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
415
416 # - - platform specific - - - -
417
418 def set_buffer_size(self, rx_size=4096, tx_size=None):
419 """\
420 Recommend a buffer size to the driver (device driver can ignore this
421 value). Must be called after the port is opened.
422 """
423 if tx_size is None:
424 tx_size = rx_size
425 win32.SetupComm(self._port_handle, rx_size, tx_size)
426
427 def set_output_flow_control(self, enable=True):
428 """\
429 Manually control flow - when software flow control is enabled.
430 This will do the same as if XON (true) or XOFF (false) are received
431 from the other device and control the transmission accordingly.
432 WARNING: this function is not portable to different platforms!
433 """
434 if not self.is_open:
435 raise PortNotOpenError()
436 if enable:
437 win32.EscapeCommFunction(self._port_handle, win32.SETXON)
438 else:
439 win32.EscapeCommFunction(self._port_handle, win32.SETXOFF)
440
441 @property
442 def out_waiting(self):
443 """Return how many bytes the in the outgoing buffer"""
444 flags = win32.DWORD()
445 comstat = win32.COMSTAT()
446 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
447 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
448 return comstat.cbOutQue
449
450 def _cancel_overlapped_io(self, overlapped):
451 """Cancel a blocking read operation, may be called from other thread"""
452 # check if read operation is pending
453 rc = win32.DWORD()
454 err = win32.GetOverlappedResult(
455 self._port_handle,
456 ctypes.byref(overlapped),
457 ctypes.byref(rc),
458 False)
459 if not err and win32.GetLastError() in (win32.ERROR_IO_PENDING, win32.ERROR_IO_INCOMPLETE):
460 # cancel, ignoring any errors (e.g. it may just have finished on its own)
461 win32.CancelIoEx(self._port_handle, overlapped)
462
463 def cancel_read(self):
464 """Cancel a blocking read operation, may be called from other thread"""
465 self._cancel_overlapped_io(self._overlapped_read)
466
467 def cancel_write(self):
468 """Cancel a blocking write operation, may be called from other thread"""
469 self._cancel_overlapped_io(self._overlapped_write)
470
471 @SerialBase.exclusive.setter
472 def exclusive(self, exclusive):
473 """Change the exclusive access setting."""
474 if exclusive is not None and not exclusive:
475 raise ValueError('win32 only supports exclusive access (not: {})'.format(exclusive))
476 else:
477 serial.SerialBase.exclusive.__set__(self, exclusive)