b.liu | e958203 | 2025-04-17 19:18:16 +0800 | [diff] [blame^] | 1 | # Testing utilities |
| 2 | # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> |
| 3 | # |
| 4 | # This software may be distributed under the terms of the BSD license. |
| 5 | # See README for more details. |
| 6 | |
| 7 | import binascii |
| 8 | import os |
| 9 | import socket |
| 10 | import struct |
| 11 | import subprocess |
| 12 | import time |
| 13 | import remotehost |
| 14 | import logging |
| 15 | logger = logging.getLogger() |
| 16 | import hostapd |
| 17 | |
| 18 | def get_ifnames(): |
| 19 | ifnames = [] |
| 20 | with open("/proc/net/dev", "r") as f: |
| 21 | lines = f.readlines() |
| 22 | for l in lines: |
| 23 | val = l.split(':', 1) |
| 24 | if len(val) == 2: |
| 25 | ifnames.append(val[0].strip(' ')) |
| 26 | return ifnames |
| 27 | |
| 28 | class HwsimSkip(Exception): |
| 29 | def __init__(self, reason): |
| 30 | self.reason = reason |
| 31 | def __str__(self): |
| 32 | return self.reason |
| 33 | |
| 34 | def long_duration_test(func): |
| 35 | func.long_duration_test = True |
| 36 | return func |
| 37 | |
| 38 | class alloc_fail(object): |
| 39 | def __init__(self, dev, count, funcs): |
| 40 | self._dev = dev |
| 41 | self._count = count |
| 42 | self._funcs = funcs |
| 43 | def __enter__(self): |
| 44 | cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) |
| 45 | if "OK" not in self._dev.request(cmd): |
| 46 | raise HwsimSkip("TEST_ALLOC_FAIL not supported") |
| 47 | def __exit__(self, type, value, traceback): |
| 48 | if type is None: |
| 49 | if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: |
| 50 | raise Exception("Allocation failure did not trigger") |
| 51 | |
| 52 | class fail_test(object): |
| 53 | def __init__(self, dev, count, funcs): |
| 54 | self._dev = dev |
| 55 | self._count = count |
| 56 | self._funcs = funcs |
| 57 | def __enter__(self): |
| 58 | cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs) |
| 59 | if "OK" not in self._dev.request(cmd): |
| 60 | raise HwsimSkip("TEST_FAIL not supported") |
| 61 | def __exit__(self, type, value, traceback): |
| 62 | if type is None: |
| 63 | if self._dev.request("GET_FAIL") != "0:%s" % self._funcs: |
| 64 | raise Exception("Test failure did not trigger") |
| 65 | |
| 66 | def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40, |
| 67 | timeout=0.05): |
| 68 | for i in range(0, max_iter): |
| 69 | if dev.request(cmd).startswith("0:"): |
| 70 | break |
| 71 | if i == max_iter - 1: |
| 72 | raise Exception(note) |
| 73 | time.sleep(timeout) |
| 74 | |
| 75 | def require_under_vm(): |
| 76 | with open('/proc/1/cmdline', 'r') as f: |
| 77 | cmd = f.read() |
| 78 | if "inside.sh" not in cmd: |
| 79 | raise HwsimSkip("Not running under VM") |
| 80 | |
| 81 | def iface_is_in_bridge(bridge, ifname): |
| 82 | fname = "/sys/class/net/"+ifname+"/brport/bridge" |
| 83 | if not os.path.exists(fname): |
| 84 | return False |
| 85 | if not os.path.islink(fname): |
| 86 | return False |
| 87 | truebridge = os.path.basename(os.readlink(fname)) |
| 88 | if bridge == truebridge: |
| 89 | return True |
| 90 | return False |
| 91 | |
| 92 | def skip_with_fips(dev, reason="Not supported in FIPS mode"): |
| 93 | res = dev.get_capability("fips") |
| 94 | if res and 'FIPS' in res: |
| 95 | raise HwsimSkip(reason) |
| 96 | |
| 97 | def check_ext_key_id_capa(dev): |
| 98 | res = dev.get_driver_status_field('capa.flags') |
| 99 | if (int(res, 0) & 0x8000000000000000) == 0: |
| 100 | raise HwsimSkip("Extended Key ID not supported") |
| 101 | |
| 102 | def skip_without_tkip(dev): |
| 103 | res = dev.get_capability("fips") |
| 104 | if "TKIP" not in dev.get_capability("pairwise") or \ |
| 105 | "TKIP" not in dev.get_capability("group"): |
| 106 | raise HwsimSkip("Cipher TKIP not supported") |
| 107 | |
| 108 | def check_wep_capa(dev): |
| 109 | if "WEP40" not in dev.get_capability("group"): |
| 110 | raise HwsimSkip("WEP not supported") |
| 111 | |
| 112 | def check_sae_capab(dev): |
| 113 | if "SAE" not in dev.get_capability("auth_alg"): |
| 114 | raise HwsimSkip("SAE not supported") |
| 115 | |
| 116 | def check_sae_pk_capab(dev): |
| 117 | capab = dev.get_capability("sae") |
| 118 | if capab is None or "PK" not in capab: |
| 119 | raise HwsimSkip("SAE-PK not supported") |
| 120 | |
| 121 | def check_erp_capa(dev): |
| 122 | capab = dev.get_capability("erp") |
| 123 | if not capab or 'ERP' not in capab: |
| 124 | raise HwsimSkip("ERP not supported in the build") |
| 125 | |
| 126 | def check_fils_capa(dev): |
| 127 | capa = dev.get_capability("fils") |
| 128 | if capa is None or "FILS" not in capa: |
| 129 | raise HwsimSkip("FILS not supported") |
| 130 | |
| 131 | def check_fils_sk_pfs_capa(dev): |
| 132 | capa = dev.get_capability("fils") |
| 133 | if capa is None or "FILS-SK-PFS" not in capa: |
| 134 | raise HwsimSkip("FILS-SK-PFS not supported") |
| 135 | |
| 136 | def check_imsi_privacy_support(dev): |
| 137 | tls = dev.request("GET tls_library") |
| 138 | if tls.startswith("OpenSSL"): |
| 139 | return |
| 140 | raise HwsimSkip("IMSI privacy not supported with this TLS library: " + tls) |
| 141 | |
| 142 | def check_tls_tod(dev): |
| 143 | tls = dev.request("GET tls_library") |
| 144 | if tls.startswith("OpenSSL"): |
| 145 | return |
| 146 | elif tls.startswith("internal"): |
| 147 | return |
| 148 | elif tls.startswith("mbed TLS"): |
| 149 | return |
| 150 | else: |
| 151 | raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls) |
| 152 | |
| 153 | def vht_supported(): |
| 154 | cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) |
| 155 | reg = cmd.stdout.read().decode() |
| 156 | if "@ 80)" in reg or "@ 160)" in reg: |
| 157 | return True |
| 158 | return False |
| 159 | |
| 160 | # This function checks whether the provided dev, which may be either |
| 161 | # WpaSupplicant or Hostapd supports CSA. |
| 162 | def csa_supported(dev): |
| 163 | res = dev.get_driver_status() |
| 164 | if (int(res['capa.flags'], 0) & 0x80000000) == 0: |
| 165 | raise HwsimSkip("CSA not supported") |
| 166 | |
| 167 | def get_phy(ap, ifname=None): |
| 168 | phy = "phy3" |
| 169 | try: |
| 170 | hostname = ap['hostname'] |
| 171 | except: |
| 172 | hostname = None |
| 173 | host = remotehost.Host(hostname) |
| 174 | |
| 175 | if ifname == None: |
| 176 | ifname = ap['ifname'] |
| 177 | status, buf = host.execute(["iw", "dev", ifname, "info"]) |
| 178 | if status != 0: |
| 179 | raise Exception("iw " + ifname + " info failed") |
| 180 | lines = buf.split("\n") |
| 181 | for line in lines: |
| 182 | if "wiphy" in line: |
| 183 | words = line.split() |
| 184 | phy = "phy" + words[1] |
| 185 | break |
| 186 | return phy |
| 187 | |
| 188 | def parse_ie(buf): |
| 189 | ret = {} |
| 190 | data = binascii.unhexlify(buf) |
| 191 | while len(data) >= 2: |
| 192 | ie, elen = struct.unpack('BB', data[0:2]) |
| 193 | data = data[2:] |
| 194 | if elen > len(data): |
| 195 | break |
| 196 | ret[ie] = data[0:elen] |
| 197 | data = data[elen:] |
| 198 | return ret |
| 199 | |
| 200 | def wait_regdom_changes(dev): |
| 201 | for i in range(10): |
| 202 | ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1) |
| 203 | if ev is None: |
| 204 | break |
| 205 | |
| 206 | def clear_country(dev): |
| 207 | logger.info("Try to clear country") |
| 208 | id = dev[1].add_network() |
| 209 | dev[1].set_network(id, "mode", "2") |
| 210 | dev[1].set_network_quoted(id, "ssid", "country-clear") |
| 211 | dev[1].set_network(id, "key_mgmt", "NONE") |
| 212 | dev[1].set_network(id, "frequency", "2412") |
| 213 | dev[1].set_network(id, "scan_freq", "2412") |
| 214 | dev[1].select_network(id) |
| 215 | ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"]) |
| 216 | if ev: |
| 217 | dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412") |
| 218 | dev[1].request("DISCONNECT") |
| 219 | dev[0].wait_disconnected() |
| 220 | dev[0].request("DISCONNECT") |
| 221 | dev[0].request("ABORT_SCAN") |
| 222 | time.sleep(1) |
| 223 | dev[0].dump_monitor() |
| 224 | dev[1].dump_monitor() |
| 225 | |
| 226 | def clear_regdom(hapd, dev, count=1): |
| 227 | disable_hapd(hapd) |
| 228 | clear_regdom_dev(dev, count) |
| 229 | |
| 230 | def disable_hapd(hapd): |
| 231 | if hapd: |
| 232 | hapd.request("DISABLE") |
| 233 | time.sleep(0.1) |
| 234 | |
| 235 | def clear_regdom_dev(dev, count=1): |
| 236 | for i in range(count): |
| 237 | dev[i].request("DISCONNECT") |
| 238 | for i in range(count): |
| 239 | dev[i].disconnect_and_stop_scan() |
| 240 | dev[0].cmd_execute(['iw', 'reg', 'set', '00']) |
| 241 | wait_regdom_changes(dev[0]) |
| 242 | country = dev[0].get_driver_status_field("country") |
| 243 | logger.info("Country code at the end: " + country) |
| 244 | if country != "00": |
| 245 | clear_country(dev) |
| 246 | for i in range(count): |
| 247 | dev[i].flush_scan_cache() |
| 248 | |
| 249 | def radiotap_build(): |
| 250 | radiotap_payload = struct.pack('BB', 0x08, 0) |
| 251 | radiotap_payload += struct.pack('BB', 0, 0) |
| 252 | radiotap_payload += struct.pack('BB', 0, 0) |
| 253 | radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload), |
| 254 | 0xc002) |
| 255 | return radiotap_hdr + radiotap_payload |
| 256 | |
| 257 | def start_monitor(ifname, freq=2412): |
| 258 | subprocess.check_call(["iw", ifname, "set", "type", "monitor"]) |
| 259 | subprocess.call(["ip", "link", "set", "dev", ifname, "up"]) |
| 260 | subprocess.check_call(["iw", ifname, "set", "freq", str(freq)]) |
| 261 | |
| 262 | ETH_P_ALL = 3 |
| 263 | sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, |
| 264 | socket.htons(ETH_P_ALL)) |
| 265 | sock.bind((ifname, 0)) |
| 266 | sock.settimeout(0.5) |
| 267 | return sock |
| 268 | |
| 269 | def stop_monitor(ifname): |
| 270 | subprocess.call(["ip", "link", "set", "dev", ifname, "down"]) |
| 271 | subprocess.call(["iw", ifname, "set", "type", "managed"]) |
| 272 | |
| 273 | def clear_scan_cache(apdev): |
| 274 | ifname = apdev['ifname'] |
| 275 | hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up']) |
| 276 | hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412', |
| 277 | 'flush']) |
| 278 | time.sleep(0.1) |
| 279 | hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down']) |
| 280 | |
| 281 | def set_world_reg(apdev0=None, apdev1=None, dev0=None): |
| 282 | if apdev0: |
| 283 | hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00']) |
| 284 | if apdev1: |
| 285 | hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00']) |
| 286 | if dev0: |
| 287 | dev0.cmd_execute(['iw', 'reg', 'set', '00']) |
| 288 | time.sleep(0.1) |
| 289 | |
| 290 | def sysctl_write(val): |
| 291 | subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w')) |
| 292 | |
| 293 | def var_arg_call(fn, dev, apdev, params): |
| 294 | if fn.__code__.co_argcount > 2: |
| 295 | return fn(dev, apdev, params) |
| 296 | elif fn.__code__.co_argcount > 1: |
| 297 | return fn(dev, apdev) |
| 298 | return fn(dev) |
| 299 | |
| 300 | def cloned_wrapper(wrapper, fn): |
| 301 | # we need the name set right for selecting / printing etc. |
| 302 | wrapper.__name__ = fn.__name__ |
| 303 | wrapper.__doc__ = fn.__doc__ |
| 304 | # reparent to the right module for module filtering |
| 305 | wrapper.__module__ = fn.__module__ |
| 306 | return wrapper |
| 307 | |
| 308 | def disable_ipv6(fn): |
| 309 | def wrapper(dev, apdev, params): |
| 310 | require_under_vm() |
| 311 | try: |
| 312 | sysctl_write('net.ipv6.conf.all.disable_ipv6=1') |
| 313 | sysctl_write('net.ipv6.conf.default.disable_ipv6=1') |
| 314 | var_arg_call(fn, dev, apdev, params) |
| 315 | finally: |
| 316 | sysctl_write('net.ipv6.conf.all.disable_ipv6=0') |
| 317 | sysctl_write('net.ipv6.conf.default.disable_ipv6=0') |
| 318 | return cloned_wrapper(wrapper, fn) |