blob: 69926e4e54851d5b10da2152e93136a85bd8dde1 [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001# Testing utilities
2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import os
9import socket
10import struct
11import subprocess
12import time
13import remotehost
14import logging
15logger = logging.getLogger()
16import hostapd
17
18def get_ifnames():
19 ifnames = []
20 with open("/proc/net/dev", "r") as f:
21 lines = f.readlines()
22 for l in lines:
23 val = l.split(':', 1)
24 if len(val) == 2:
25 ifnames.append(val[0].strip(' '))
26 return ifnames
27
28class HwsimSkip(Exception):
29 def __init__(self, reason):
30 self.reason = reason
31 def __str__(self):
32 return self.reason
33
34def long_duration_test(func):
35 func.long_duration_test = True
36 return func
37
38class alloc_fail(object):
39 def __init__(self, dev, count, funcs):
40 self._dev = dev
41 self._count = count
42 self._funcs = funcs
43 def __enter__(self):
44 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
45 if "OK" not in self._dev.request(cmd):
46 raise HwsimSkip("TEST_ALLOC_FAIL not supported")
47 def __exit__(self, type, value, traceback):
48 if type is None:
49 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
50 raise Exception("Allocation failure did not trigger")
51
52class fail_test(object):
53 def __init__(self, dev, count, funcs):
54 self._dev = dev
55 self._count = count
56 self._funcs = funcs
57 def __enter__(self):
58 cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
59 if "OK" not in self._dev.request(cmd):
60 raise HwsimSkip("TEST_FAIL not supported")
61 def __exit__(self, type, value, traceback):
62 if type is None:
63 if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
64 raise Exception("Test failure did not trigger")
65
66def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40,
67 timeout=0.05):
68 for i in range(0, max_iter):
69 if dev.request(cmd).startswith("0:"):
70 break
71 if i == max_iter - 1:
72 raise Exception(note)
73 time.sleep(timeout)
74
75def require_under_vm():
76 with open('/proc/1/cmdline', 'r') as f:
77 cmd = f.read()
78 if "inside.sh" not in cmd:
79 raise HwsimSkip("Not running under VM")
80
81def iface_is_in_bridge(bridge, ifname):
82 fname = "/sys/class/net/"+ifname+"/brport/bridge"
83 if not os.path.exists(fname):
84 return False
85 if not os.path.islink(fname):
86 return False
87 truebridge = os.path.basename(os.readlink(fname))
88 if bridge == truebridge:
89 return True
90 return False
91
92def skip_with_fips(dev, reason="Not supported in FIPS mode"):
93 res = dev.get_capability("fips")
94 if res and 'FIPS' in res:
95 raise HwsimSkip(reason)
96
97def check_ext_key_id_capa(dev):
98 res = dev.get_driver_status_field('capa.flags')
99 if (int(res, 0) & 0x8000000000000000) == 0:
100 raise HwsimSkip("Extended Key ID not supported")
101
102def skip_without_tkip(dev):
103 res = dev.get_capability("fips")
104 if "TKIP" not in dev.get_capability("pairwise") or \
105 "TKIP" not in dev.get_capability("group"):
106 raise HwsimSkip("Cipher TKIP not supported")
107
108def check_wep_capa(dev):
109 if "WEP40" not in dev.get_capability("group"):
110 raise HwsimSkip("WEP not supported")
111
112def check_sae_capab(dev):
113 if "SAE" not in dev.get_capability("auth_alg"):
114 raise HwsimSkip("SAE not supported")
115
116def check_sae_pk_capab(dev):
117 capab = dev.get_capability("sae")
118 if capab is None or "PK" not in capab:
119 raise HwsimSkip("SAE-PK not supported")
120
121def check_erp_capa(dev):
122 capab = dev.get_capability("erp")
123 if not capab or 'ERP' not in capab:
124 raise HwsimSkip("ERP not supported in the build")
125
126def check_fils_capa(dev):
127 capa = dev.get_capability("fils")
128 if capa is None or "FILS" not in capa:
129 raise HwsimSkip("FILS not supported")
130
131def check_fils_sk_pfs_capa(dev):
132 capa = dev.get_capability("fils")
133 if capa is None or "FILS-SK-PFS" not in capa:
134 raise HwsimSkip("FILS-SK-PFS not supported")
135
136def check_imsi_privacy_support(dev):
137 tls = dev.request("GET tls_library")
138 if tls.startswith("OpenSSL"):
139 return
140 raise HwsimSkip("IMSI privacy not supported with this TLS library: " + tls)
141
142def check_tls_tod(dev):
143 tls = dev.request("GET tls_library")
144 if tls.startswith("OpenSSL"):
145 return
146 elif tls.startswith("internal"):
147 return
148 elif tls.startswith("mbed TLS"):
149 return
150 else:
151 raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls)
152
153def vht_supported():
154 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
155 reg = cmd.stdout.read().decode()
156 if "@ 80)" in reg or "@ 160)" in reg:
157 return True
158 return False
159
160# This function checks whether the provided dev, which may be either
161# WpaSupplicant or Hostapd supports CSA.
162def csa_supported(dev):
163 res = dev.get_driver_status()
164 if (int(res['capa.flags'], 0) & 0x80000000) == 0:
165 raise HwsimSkip("CSA not supported")
166
167def get_phy(ap, ifname=None):
168 phy = "phy3"
169 try:
170 hostname = ap['hostname']
171 except:
172 hostname = None
173 host = remotehost.Host(hostname)
174
175 if ifname == None:
176 ifname = ap['ifname']
177 status, buf = host.execute(["iw", "dev", ifname, "info"])
178 if status != 0:
179 raise Exception("iw " + ifname + " info failed")
180 lines = buf.split("\n")
181 for line in lines:
182 if "wiphy" in line:
183 words = line.split()
184 phy = "phy" + words[1]
185 break
186 return phy
187
188def parse_ie(buf):
189 ret = {}
190 data = binascii.unhexlify(buf)
191 while len(data) >= 2:
192 ie, elen = struct.unpack('BB', data[0:2])
193 data = data[2:]
194 if elen > len(data):
195 break
196 ret[ie] = data[0:elen]
197 data = data[elen:]
198 return ret
199
200def wait_regdom_changes(dev):
201 for i in range(10):
202 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
203 if ev is None:
204 break
205
206def clear_country(dev):
207 logger.info("Try to clear country")
208 id = dev[1].add_network()
209 dev[1].set_network(id, "mode", "2")
210 dev[1].set_network_quoted(id, "ssid", "country-clear")
211 dev[1].set_network(id, "key_mgmt", "NONE")
212 dev[1].set_network(id, "frequency", "2412")
213 dev[1].set_network(id, "scan_freq", "2412")
214 dev[1].select_network(id)
215 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
216 if ev:
217 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
218 dev[1].request("DISCONNECT")
219 dev[0].wait_disconnected()
220 dev[0].request("DISCONNECT")
221 dev[0].request("ABORT_SCAN")
222 time.sleep(1)
223 dev[0].dump_monitor()
224 dev[1].dump_monitor()
225
226def clear_regdom(hapd, dev, count=1):
227 disable_hapd(hapd)
228 clear_regdom_dev(dev, count)
229
230def disable_hapd(hapd):
231 if hapd:
232 hapd.request("DISABLE")
233 time.sleep(0.1)
234
235def clear_regdom_dev(dev, count=1):
236 for i in range(count):
237 dev[i].request("DISCONNECT")
238 for i in range(count):
239 dev[i].disconnect_and_stop_scan()
240 dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
241 wait_regdom_changes(dev[0])
242 country = dev[0].get_driver_status_field("country")
243 logger.info("Country code at the end: " + country)
244 if country != "00":
245 clear_country(dev)
246 for i in range(count):
247 dev[i].flush_scan_cache()
248
249def radiotap_build():
250 radiotap_payload = struct.pack('BB', 0x08, 0)
251 radiotap_payload += struct.pack('BB', 0, 0)
252 radiotap_payload += struct.pack('BB', 0, 0)
253 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
254 0xc002)
255 return radiotap_hdr + radiotap_payload
256
257def start_monitor(ifname, freq=2412):
258 subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
259 subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
260 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
261
262 ETH_P_ALL = 3
263 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
264 socket.htons(ETH_P_ALL))
265 sock.bind((ifname, 0))
266 sock.settimeout(0.5)
267 return sock
268
269def stop_monitor(ifname):
270 subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
271 subprocess.call(["iw", ifname, "set", "type", "managed"])
272
273def clear_scan_cache(apdev):
274 ifname = apdev['ifname']
275 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up'])
276 hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412',
277 'flush'])
278 time.sleep(0.1)
279 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down'])
280
281def set_world_reg(apdev0=None, apdev1=None, dev0=None):
282 if apdev0:
283 hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00'])
284 if apdev1:
285 hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00'])
286 if dev0:
287 dev0.cmd_execute(['iw', 'reg', 'set', '00'])
288 time.sleep(0.1)
289
290def sysctl_write(val):
291 subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w'))
292
293def var_arg_call(fn, dev, apdev, params):
294 if fn.__code__.co_argcount > 2:
295 return fn(dev, apdev, params)
296 elif fn.__code__.co_argcount > 1:
297 return fn(dev, apdev)
298 return fn(dev)
299
300def cloned_wrapper(wrapper, fn):
301 # we need the name set right for selecting / printing etc.
302 wrapper.__name__ = fn.__name__
303 wrapper.__doc__ = fn.__doc__
304 # reparent to the right module for module filtering
305 wrapper.__module__ = fn.__module__
306 return wrapper
307
308def disable_ipv6(fn):
309 def wrapper(dev, apdev, params):
310 require_under_vm()
311 try:
312 sysctl_write('net.ipv6.conf.all.disable_ipv6=1')
313 sysctl_write('net.ipv6.conf.default.disable_ipv6=1')
314 var_arg_call(fn, dev, apdev, params)
315 finally:
316 sysctl_write('net.ipv6.conf.all.disable_ipv6=0')
317 sysctl_write('net.ipv6.conf.default.disable_ipv6=0')
318 return cloned_wrapper(wrapper, fn)