blob: 5b8140b7c99f562784724f85dec5f61ede16f874 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001#!/usr/bin/python
2#
3# wpa_supplicant/hostapd control interface using Python
4# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
5#
6# This software may be distributed under the terms of the BSD license.
7# See README for more details.
8
9import os
10import stat
11import socket
12import select
13
14counter = 0
15
16class Ctrl:
17 def __init__(self, path, port=9877):
18 global counter
19 self.started = False
20 self.attached = False
21 self.path = path
22 self.port = port
23
24 self.udp = False
25 if not path.startswith('/'):
26 try:
27 mode = os.stat(path).st_mode
28 if not stat.S_ISSOCK(mode):
29 self.udp = True
30 except:
31 self.udp = True
32
33 if not self.udp:
34 self.s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
35 self.dest = path
36 self.local = "/tmp/wpa_ctrl_" + str(os.getpid()) + '-' + str(counter)
37 counter += 1
38 self.s.bind(self.local)
39 try:
40 self.s.connect(self.dest)
41 except Exception as e:
42 self.s.close()
43 os.unlink(self.local)
44 raise
45 else:
46 try:
47 self.s = None
48 ai_list = socket.getaddrinfo(path, port, socket.AF_INET,
49 socket.SOCK_DGRAM)
50 for af, socktype, proto, cn, sockaddr in ai_list:
51 self.sockaddr = sockaddr
52 break
53 self.s = socket.socket(af, socktype)
54 self.s.settimeout(5)
55 self.s.sendto(b"GET_COOKIE", sockaddr)
56 reply, server = self.s.recvfrom(4096)
57 self.cookie = reply
58 self.port = port
59 except:
60 print("connect exception ", path, str(port))
61 if self.s != None:
62 self.s.close()
63 raise
64 self.started = True
65
66 def __del__(self):
67 self.close()
68
69 def close(self):
70 if self.attached:
71 try:
72 self.detach()
73 except Exception as e:
74 # Need to ignore this allow the socket to be closed
75 self.attached = False
76 pass
77 if self.started:
78 self.s.close()
79 if not self.udp:
80 os.unlink(self.local)
81 self.started = False
82
83 def request(self, cmd, timeout=10):
84 if type(cmd) == str:
85 try:
86 cmd2 = cmd.encode()
87 cmd = cmd2
88 except UnicodeDecodeError as e:
89 pass
90 if self.udp:
91 self.s.sendto(self.cookie + cmd, self.sockaddr)
92 else:
93 self.s.send(cmd)
94 [r, w, e] = select.select([self.s], [], [], timeout)
95 if r:
96 res = self.s.recv(4096).decode()
97 try:
98 r = str(res)
99 except UnicodeDecodeError as e:
100 r = res
101 return r
102 raise Exception("Timeout on waiting response")
103
104 def attach(self):
105 if self.attached:
106 return None
107 res = self.request("ATTACH")
108 if "OK" in res:
109 self.attached = True
110 return None
111 raise Exception("ATTACH failed")
112
113 def detach(self):
114 if not self.attached:
115 return None
116 if self.s.fileno() == -1:
117 self.attached = False
118 return None
119 while self.pending():
120 ev = self.recv()
121 res = self.request("DETACH")
122 if "FAIL" not in res:
123 self.attached = False
124 return None
125 raise Exception("DETACH failed")
126
127 def terminate(self):
128 if self.attached:
129 try:
130 self.detach()
131 except Exception as e:
132 # Need to ignore this to allow the socket to be closed
133 self.attached = False
134 self.request("TERMINATE")
135 self.close()
136
137 def pending(self, timeout=0):
138 [r, w, e] = select.select([self.s], [], [], timeout)
139 if r:
140 return True
141 return False
142
143 def recv(self):
144 res = self.s.recv(4096).decode()
145 try:
146 r = str(res)
147 except UnicodeDecodeError as e:
148 r = res
149 return r