blob: 57175750641286792ca6b3bbb9b9d591399ac40f [file] [log] [blame]
xjb04a4022021-11-25 15:01:52 +08001# Python class for controlling hostapd
2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
you.chen2b010c72022-04-08 17:10:16 +08008import re
xjb04a4022021-11-25 15:01:52 +08009import time
10import logging
11import binascii
12import struct
you.chen2b010c72022-04-08 17:10:16 +080013import tempfile
xjb04a4022021-11-25 15:01:52 +080014import wpaspy
15import remotehost
16import utils
17import subprocess
18
19logger = logging.getLogger()
20hapd_ctrl = '/var/run/hostapd'
21hapd_global = '/var/run/hostapd-global'
22
23def mac2tuple(mac):
24 return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
25
26class HostapdGlobal:
27 def __init__(self, apdev=None, global_ctrl_override=None):
28 try:
29 hostname = apdev['hostname']
30 port = apdev['port']
31 except:
32 hostname = None
33 port = 8878
34 self.host = remotehost.Host(hostname)
35 self.hostname = hostname
36 self.port = port
37 if hostname is None:
38 global_ctrl = hapd_global
39 if global_ctrl_override:
40 global_ctrl = global_ctrl_override
41 self.ctrl = wpaspy.Ctrl(global_ctrl)
42 self.mon = wpaspy.Ctrl(global_ctrl)
43 self.dbg = ""
44 else:
45 self.ctrl = wpaspy.Ctrl(hostname, port)
46 self.mon = wpaspy.Ctrl(hostname, port)
47 self.dbg = hostname + "/" + str(port)
48 self.mon.attach()
49
50 def cmd_execute(self, cmd_array, shell=False):
51 if self.hostname is None:
52 if shell:
53 cmd = ' '.join(cmd_array)
54 else:
55 cmd = cmd_array
56 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
57 stdout=subprocess.PIPE, shell=shell)
58 out = proc.communicate()[0]
59 ret = proc.returncode
60 return ret, out.decode()
61 else:
62 return self.host.execute(cmd_array)
63
64 def request(self, cmd, timeout=10):
65 logger.debug(self.dbg + ": CTRL(global): " + cmd)
66 return self.ctrl.request(cmd, timeout)
67
68 def wait_event(self, events, timeout):
69 start = os.times()[4]
70 while True:
71 while self.mon.pending():
72 ev = self.mon.recv()
73 logger.debug(self.dbg + "(global): " + ev)
74 for event in events:
75 if event in ev:
76 return ev
77 now = os.times()[4]
78 remaining = start + timeout - now
79 if remaining <= 0:
80 break
81 if not self.mon.pending(timeout=remaining):
82 break
83 return None
84
85 def add(self, ifname, driver=None):
86 cmd = "ADD " + ifname + " " + hapd_ctrl
87 if driver:
88 cmd += " " + driver
89 res = self.request(cmd)
90 if "OK" not in res:
91 raise Exception("Could not add hostapd interface " + ifname)
92
93 def add_iface(self, ifname, confname):
94 res = self.request("ADD " + ifname + " config=" + confname)
95 if "OK" not in res:
96 raise Exception("Could not add hostapd interface")
97
98 def add_bss(self, phy, confname, ignore_error=False):
99 res = self.request("ADD bss_config=" + phy + ":" + confname)
100 if "OK" not in res:
101 if not ignore_error:
102 raise Exception("Could not add hostapd BSS")
103
104 def remove(self, ifname):
105 self.request("REMOVE " + ifname, timeout=30)
106
107 def relog(self):
108 self.request("RELOG")
109
110 def flush(self):
111 self.request("FLUSH")
112
113 def get_ctrl_iface_port(self, ifname):
114 if self.hostname is None:
115 return None
116
117 res = self.request("INTERFACES ctrl")
118 lines = res.splitlines()
119 found = False
120 for line in lines:
121 words = line.split()
122 if words[0] == ifname:
123 found = True
124 break
125 if not found:
126 raise Exception("Could not find UDP port for " + ifname)
127 res = line.find("ctrl_iface=udp:")
128 if res == -1:
129 raise Exception("Wrong ctrl_interface format")
130 words = line.split(":")
131 return int(words[1])
132
133 def terminate(self):
134 self.mon.detach()
135 self.mon.close()
136 self.mon = None
137 self.ctrl.terminate()
138 self.ctrl = None
139
you.chen2b010c72022-04-08 17:10:16 +0800140 def send_file(self, src, dst):
141 self.host.send_file(src, dst)
142
xjb04a4022021-11-25 15:01:52 +0800143class Hostapd:
144 def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
145 self.hostname = hostname
146 self.host = remotehost.Host(hostname, ifname)
147 self.ifname = ifname
148 if hostname is None:
149 self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
150 self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
151 self.dbg = ifname
152 else:
153 self.ctrl = wpaspy.Ctrl(hostname, port)
154 self.mon = wpaspy.Ctrl(hostname, port)
155 self.dbg = hostname + "/" + ifname
156 self.mon.attach()
157 self.bssid = None
158 self.bssidx = bssidx
159
160 def cmd_execute(self, cmd_array, shell=False):
161 if self.hostname is None:
162 if shell:
163 cmd = ' '.join(cmd_array)
164 else:
165 cmd = cmd_array
166 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
167 stdout=subprocess.PIPE, shell=shell)
168 out = proc.communicate()[0]
169 ret = proc.returncode
170 return ret, out.decode()
171 else:
172 return self.host.execute(cmd_array)
173
174 def close_ctrl(self):
175 if self.mon is not None:
176 self.mon.detach()
177 self.mon.close()
178 self.mon = None
179 self.ctrl.close()
180 self.ctrl = None
181
182 def own_addr(self):
183 if self.bssid is None:
184 self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
185 return self.bssid
186
you.chen2b010c72022-04-08 17:10:16 +0800187 def get_addr(self, group=False):
188 return self.own_addr()
189
xjb04a4022021-11-25 15:01:52 +0800190 def request(self, cmd):
191 logger.debug(self.dbg + ": CTRL: " + cmd)
192 return self.ctrl.request(cmd)
193
194 def ping(self):
195 return "PONG" in self.request("PING")
196
197 def set(self, field, value):
198 if "OK" not in self.request("SET " + field + " " + value):
you.chen2b010c72022-04-08 17:10:16 +0800199 if "TKIP" in value and (field == "wpa_pairwise" or \
200 field == "rsn_pairwise"):
201 raise utils.HwsimSkip("Cipher TKIP not supported")
xjb04a4022021-11-25 15:01:52 +0800202 raise Exception("Failed to set hostapd parameter " + field)
203
204 def set_defaults(self):
205 self.set("driver", "nl80211")
206 self.set("hw_mode", "g")
207 self.set("channel", "1")
208 self.set("ieee80211n", "1")
209 self.set("logger_stdout", "-1")
210 self.set("logger_stdout_level", "0")
211
212 def set_open(self, ssid):
213 self.set_defaults()
214 self.set("ssid", ssid)
215
216 def set_wpa2_psk(self, ssid, passphrase):
217 self.set_defaults()
218 self.set("ssid", ssid)
219 self.set("wpa_passphrase", passphrase)
220 self.set("wpa", "2")
221 self.set("wpa_key_mgmt", "WPA-PSK")
222 self.set("rsn_pairwise", "CCMP")
223
224 def set_wpa_psk(self, ssid, passphrase):
225 self.set_defaults()
226 self.set("ssid", ssid)
227 self.set("wpa_passphrase", passphrase)
228 self.set("wpa", "1")
229 self.set("wpa_key_mgmt", "WPA-PSK")
230 self.set("wpa_pairwise", "TKIP")
231
232 def set_wpa_psk_mixed(self, ssid, passphrase):
233 self.set_defaults()
234 self.set("ssid", ssid)
235 self.set("wpa_passphrase", passphrase)
236 self.set("wpa", "3")
237 self.set("wpa_key_mgmt", "WPA-PSK")
238 self.set("wpa_pairwise", "TKIP")
239 self.set("rsn_pairwise", "CCMP")
240
241 def set_wep(self, ssid, key):
242 self.set_defaults()
243 self.set("ssid", ssid)
244 self.set("wep_key0", key)
245
246 def enable(self):
247 if "OK" not in self.request("ENABLE"):
248 raise Exception("Failed to enable hostapd interface " + self.ifname)
249
250 def disable(self):
251 if "OK" not in self.request("DISABLE"):
252 raise Exception("Failed to disable hostapd interface " + self.ifname)
253
254 def dump_monitor(self):
255 while self.mon.pending():
256 ev = self.mon.recv()
257 logger.debug(self.dbg + ": " + ev)
258
259 def wait_event(self, events, timeout):
260 if not isinstance(events, list):
261 raise Exception("Hostapd.wait_event() called with incorrect events argument type")
262 start = os.times()[4]
263 while True:
264 while self.mon.pending():
265 ev = self.mon.recv()
266 logger.debug(self.dbg + ": " + ev)
267 for event in events:
268 if event in ev:
269 return ev
270 now = os.times()[4]
271 remaining = start + timeout - now
272 if remaining <= 0:
273 break
274 if not self.mon.pending(timeout=remaining):
275 break
276 return None
277
278 def wait_sta(self, addr=None, timeout=2):
279 ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
280 if ev is None:
281 raise Exception("AP did not report STA connection")
282 if addr and addr not in ev:
283 raise Exception("Unexpected STA address in connection event: " + ev)
284
you.chen2b010c72022-04-08 17:10:16 +0800285 def wait_ptkinitdone(self, addr, timeout=2):
286 while timeout > 0:
287 sta = self.get_sta(addr)
288 if 'hostapdWPAPTKState' not in sta:
289 raise Exception("GET_STA did not return hostapdWPAPTKState")
290 state = sta['hostapdWPAPTKState']
291 if state == "11":
292 return
293 time.sleep(0.1)
294 timeout -= 0.1
295 raise Exception("Timeout while waiting for PTKINITDONE")
296
xjb04a4022021-11-25 15:01:52 +0800297 def get_status(self):
298 res = self.request("STATUS")
299 lines = res.splitlines()
300 vals = dict()
301 for l in lines:
302 [name, value] = l.split('=', 1)
303 vals[name] = value
304 return vals
305
306 def get_status_field(self, field):
307 vals = self.get_status()
308 if field in vals:
309 return vals[field]
310 return None
311
312 def get_driver_status(self):
313 res = self.request("STATUS-DRIVER")
314 lines = res.splitlines()
315 vals = dict()
316 for l in lines:
317 [name, value] = l.split('=', 1)
318 vals[name] = value
319 return vals
320
321 def get_driver_status_field(self, field):
322 vals = self.get_driver_status()
323 if field in vals:
324 return vals[field]
325 return None
326
327 def get_config(self):
328 res = self.request("GET_CONFIG")
329 lines = res.splitlines()
330 vals = dict()
331 for l in lines:
332 [name, value] = l.split('=', 1)
333 vals[name] = value
334 return vals
335
336 def mgmt_rx(self, timeout=5):
337 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
338 if ev is None:
339 return None
340 msg = {}
341 frame = binascii.unhexlify(ev.split(' ')[1])
342 msg['frame'] = frame
343
344 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
345 msg['fc'] = hdr[0]
346 msg['subtype'] = (hdr[0] >> 4) & 0xf
347 hdr = hdr[1:]
348 msg['duration'] = hdr[0]
349 hdr = hdr[1:]
350 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
351 hdr = hdr[6:]
352 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
353 hdr = hdr[6:]
354 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
355 hdr = hdr[6:]
356 msg['seq_ctrl'] = hdr[0]
357 msg['payload'] = frame[24:]
358
359 return msg
360
361 def mgmt_tx(self, msg):
362 t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
363 hdr = struct.pack('<HH6B6B6BH', *t)
364 res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
365 if "OK" not in res:
366 raise Exception("MGMT_TX command to hostapd failed")
367
368 def get_sta(self, addr, info=None, next=False):
369 cmd = "STA-NEXT " if next else "STA "
370 if addr is None:
371 res = self.request("STA-FIRST")
372 elif info:
373 res = self.request(cmd + addr + " " + info)
374 else:
375 res = self.request(cmd + addr)
376 lines = res.splitlines()
377 vals = dict()
378 first = True
379 for l in lines:
380 if first and '=' not in l:
381 vals['addr'] = l
382 first = False
383 else:
384 [name, value] = l.split('=', 1)
385 vals[name] = value
386 return vals
387
388 def get_mib(self, param=None):
389 if param:
390 res = self.request("MIB " + param)
391 else:
392 res = self.request("MIB")
393 lines = res.splitlines()
394 vals = dict()
395 for l in lines:
396 name_val = l.split('=', 1)
397 if len(name_val) > 1:
398 vals[name_val[0]] = name_val[1]
399 return vals
400
401 def get_pmksa(self, addr):
402 res = self.request("PMKSA")
403 lines = res.splitlines()
404 for l in lines:
405 if addr not in l:
406 continue
407 vals = dict()
408 [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
409 vals['index'] = index
410 vals['pmkid'] = pmkid
411 vals['expiration'] = expiration
412 vals['opportunistic'] = opportunistic
413 return vals
414 return None
415
416 def dpp_qr_code(self, uri):
417 res = self.request("DPP_QR_CODE " + uri)
418 if "FAIL" in res:
419 raise Exception("Failed to parse QR Code URI")
420 return int(res)
421
422 def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
423 curve=None, key=None):
424 cmd = "DPP_BOOTSTRAP_GEN type=" + type
425 if chan:
426 cmd += " chan=" + chan
427 if mac:
428 if mac is True:
429 mac = self.own_addr()
430 cmd += " mac=" + mac.replace(':', '')
431 if info:
432 cmd += " info=" + info
433 if curve:
434 cmd += " curve=" + curve
435 if key:
436 cmd += " key=" + key
437 res = self.request(cmd)
438 if "FAIL" in res:
439 raise Exception("Failed to generate bootstrapping info")
440 return int(res)
441
442 def dpp_listen(self, freq, netrole=None, qr=None, role=None):
443 cmd = "DPP_LISTEN " + str(freq)
444 if netrole:
445 cmd += " netrole=" + netrole
446 if qr:
447 cmd += " qr=" + qr
448 if role:
449 cmd += " role=" + role
450 if "OK" not in self.request(cmd):
451 raise Exception("Failed to start listen operation")
452
453 def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
454 extra=None, own=None, role=None, neg_freq=None,
455 ssid=None, passphrase=None, expect_fail=False):
456 cmd = "DPP_AUTH_INIT"
457 if peer is None:
458 peer = self.dpp_qr_code(uri)
459 cmd += " peer=%d" % peer
460 if own is not None:
461 cmd += " own=%d" % own
462 if role:
463 cmd += " role=" + role
464 if extra:
465 cmd += " " + extra
466 if conf:
467 cmd += " conf=" + conf
468 if configurator is not None:
469 cmd += " configurator=%d" % configurator
470 if neg_freq:
471 cmd += " neg_freq=%d" % neg_freq
472 if ssid:
473 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
474 if passphrase:
475 cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
476 res = self.request(cmd)
477 if expect_fail:
478 if "FAIL" not in res:
479 raise Exception("DPP authentication started unexpectedly")
480 return
481 if "OK" not in res:
482 raise Exception("Failed to initiate DPP Authentication")
483
484 def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
485 extra=None, use_id=None):
486 if use_id is None:
487 id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
488 else:
489 id1 = use_id
490 cmd = "own=%d " % id1
491 if identifier:
492 cmd += "identifier=%s " % identifier
493 cmd += "init=1 "
494 if role:
495 cmd += "role=%s " % role
496 if extra:
497 cmd += extra + " "
498 cmd += "code=%s" % code
499 res = self.request("DPP_PKEX_ADD " + cmd)
500 if "FAIL" in res:
501 raise Exception("Failed to set PKEX data (initiator)")
502 return id1
503
504 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
505 listen_role=None):
506 id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
507 cmd = "own=%d " % id0
508 if identifier:
509 cmd += "identifier=%s " % identifier
510 cmd += "code=%s" % code
511 res = self.request("DPP_PKEX_ADD " + cmd)
512 if "FAIL" in res:
513 raise Exception("Failed to set PKEX data (responder)")
514 self.dpp_listen(freq, role=listen_role)
515
516 def dpp_configurator_add(self, curve=None, key=None):
517 cmd = "DPP_CONFIGURATOR_ADD"
518 if curve:
519 cmd += " curve=" + curve
520 if key:
521 cmd += " key=" + key
522 res = self.request(cmd)
523 if "FAIL" in res:
524 raise Exception("Failed to add configurator")
525 return int(res)
526
527 def dpp_configurator_remove(self, conf_id):
528 res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
529 if "OK" not in res:
530 raise Exception("DPP_CONFIGURATOR_REMOVE failed")
531
532 def note(self, txt):
533 self.request("NOTE " + txt)
534
you.chen2b010c72022-04-08 17:10:16 +0800535 def send_file(self, src, dst):
536 self.host.send_file(src, dst)
537
xjb04a4022021-11-25 15:01:52 +0800538def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
539 global_ctrl_override=None, driver=False):
540 if isinstance(apdev, dict):
541 ifname = apdev['ifname']
542 try:
543 hostname = apdev['hostname']
544 port = apdev['port']
545 logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
546 except:
547 logger.info("Starting AP " + ifname)
548 hostname = None
549 port = 8878
550 else:
551 ifname = apdev
552 logger.info("Starting AP " + ifname + " (old add_ap argument type)")
553 hostname = None
554 port = 8878
555 hapd_global = HostapdGlobal(apdev,
556 global_ctrl_override=global_ctrl_override)
557 hapd_global.remove(ifname)
558 hapd_global.add(ifname, driver=driver)
559 port = hapd_global.get_ctrl_iface_port(ifname)
560 hapd = Hostapd(ifname, hostname=hostname, port=port)
561 if not hapd.ping():
562 raise Exception("Could not ping hostapd")
563 hapd.set_defaults()
564 fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
you.chen2b010c72022-04-08 17:10:16 +0800565 "wpa", "wpa_deny_ptk0_rekey",
xjb04a4022021-11-25 15:01:52 +0800566 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
567 "acct_server_addr", "osu_server_uri"]
568 for field in fields:
569 if field in params:
570 hapd.set(field, params[field])
571 for f, v in list(params.items()):
572 if f in fields:
573 continue
574 if isinstance(v, list):
575 for val in v:
576 hapd.set(f, val)
577 else:
578 hapd.set(f, v)
579 if no_enable:
580 return hapd
581 hapd.enable()
582 if wait_enabled:
583 ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
584 if ev is None:
585 raise Exception("AP startup timed out")
586 if "AP-ENABLED" not in ev:
587 raise Exception("AP startup failed")
588 return hapd
589
590def add_bss(apdev, ifname, confname, ignore_error=False):
591 phy = utils.get_phy(apdev)
592 try:
593 hostname = apdev['hostname']
594 port = apdev['port']
595 logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
596 except:
597 logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
598 hostname = None
599 port = 8878
600 hapd_global = HostapdGlobal(apdev)
you.chen2b010c72022-04-08 17:10:16 +0800601 confname = cfg_file(apdev, confname, ifname)
602 hapd_global.send_file(confname, confname)
xjb04a4022021-11-25 15:01:52 +0800603 hapd_global.add_bss(phy, confname, ignore_error)
604 port = hapd_global.get_ctrl_iface_port(ifname)
605 hapd = Hostapd(ifname, hostname=hostname, port=port)
606 if not hapd.ping():
607 raise Exception("Could not ping hostapd")
608 return hapd
609
610def add_iface(apdev, confname):
611 ifname = apdev['ifname']
612 try:
613 hostname = apdev['hostname']
614 port = apdev['port']
615 logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
616 except:
617 logger.info("Starting interface " + ifname)
618 hostname = None
619 port = 8878
620 hapd_global = HostapdGlobal(apdev)
you.chen2b010c72022-04-08 17:10:16 +0800621 confname = cfg_file(apdev, confname, ifname)
622 hapd_global.send_file(confname, confname)
xjb04a4022021-11-25 15:01:52 +0800623 hapd_global.add_iface(ifname, confname)
624 port = hapd_global.get_ctrl_iface_port(ifname)
625 hapd = Hostapd(ifname, hostname=hostname, port=port)
626 if not hapd.ping():
627 raise Exception("Could not ping hostapd")
628 return hapd
629
630def remove_bss(apdev, ifname=None):
631 if ifname == None:
632 ifname = apdev['ifname']
633 try:
634 hostname = apdev['hostname']
635 port = apdev['port']
636 logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
637 except:
638 logger.info("Removing BSS " + ifname)
639 hapd_global = HostapdGlobal(apdev)
640 hapd_global.remove(ifname)
641
642def terminate(apdev):
643 try:
644 hostname = apdev['hostname']
645 port = apdev['port']
646 logger.info("Terminating hostapd " + hostname + "/" + port)
647 except:
648 logger.info("Terminating hostapd")
649 hapd_global = HostapdGlobal(apdev)
650 hapd_global.terminate()
651
you.chen2b010c72022-04-08 17:10:16 +0800652def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
653 ieee80211w=None):
xjb04a4022021-11-25 15:01:52 +0800654 params = {"wpa": "2",
you.chen2b010c72022-04-08 17:10:16 +0800655 "wpa_key_mgmt": wpa_key_mgmt,
xjb04a4022021-11-25 15:01:52 +0800656 "rsn_pairwise": "CCMP"}
657 if ssid:
658 params["ssid"] = ssid
659 if passphrase:
660 params["wpa_passphrase"] = passphrase
you.chen2b010c72022-04-08 17:10:16 +0800661 if ieee80211w is not None:
662 params["ieee80211w"] = ieee80211w
xjb04a4022021-11-25 15:01:52 +0800663 return params
664
665def wpa_params(ssid=None, passphrase=None):
666 params = {"wpa": "1",
667 "wpa_key_mgmt": "WPA-PSK",
668 "wpa_pairwise": "TKIP"}
669 if ssid:
670 params["ssid"] = ssid
671 if passphrase:
672 params["wpa_passphrase"] = passphrase
673 return params
674
675def wpa_mixed_params(ssid=None, passphrase=None):
676 params = {"wpa": "3",
677 "wpa_key_mgmt": "WPA-PSK",
678 "wpa_pairwise": "TKIP",
679 "rsn_pairwise": "CCMP"}
680 if ssid:
681 params["ssid"] = ssid
682 if passphrase:
683 params["wpa_passphrase"] = passphrase
684 return params
685
686def radius_params():
687 params = {"auth_server_addr": "127.0.0.1",
688 "auth_server_port": "1812",
689 "auth_server_shared_secret": "radius",
690 "nas_identifier": "nas.w1.fi"}
691 return params
692
693def wpa_eap_params(ssid=None):
694 params = radius_params()
695 params["wpa"] = "1"
696 params["wpa_key_mgmt"] = "WPA-EAP"
697 params["wpa_pairwise"] = "TKIP"
698 params["ieee8021x"] = "1"
699 if ssid:
700 params["ssid"] = ssid
701 return params
702
703def wpa2_eap_params(ssid=None):
704 params = radius_params()
705 params["wpa"] = "2"
706 params["wpa_key_mgmt"] = "WPA-EAP"
707 params["rsn_pairwise"] = "CCMP"
708 params["ieee8021x"] = "1"
709 if ssid:
710 params["ssid"] = ssid
711 return params
712
713def b_only_params(channel="1", ssid=None, country=None):
714 params = {"hw_mode": "b",
715 "channel": channel}
716 if ssid:
717 params["ssid"] = ssid
718 if country:
719 params["country_code"] = country
720 return params
721
722def g_only_params(channel="1", ssid=None, country=None):
723 params = {"hw_mode": "g",
724 "channel": channel}
725 if ssid:
726 params["ssid"] = ssid
727 if country:
728 params["country_code"] = country
729 return params
730
731def a_only_params(channel="36", ssid=None, country=None):
732 params = {"hw_mode": "a",
733 "channel": channel}
734 if ssid:
735 params["ssid"] = ssid
736 if country:
737 params["country_code"] = country
738 return params
739
740def ht20_params(channel="1", ssid=None, country=None):
741 params = {"ieee80211n": "1",
742 "channel": channel,
743 "hw_mode": "g"}
744 if int(channel) > 14:
745 params["hw_mode"] = "a"
746 if ssid:
747 params["ssid"] = ssid
748 if country:
749 params["country_code"] = country
750 return params
751
752def ht40_plus_params(channel="1", ssid=None, country=None):
753 params = ht20_params(channel, ssid, country)
754 params['ht_capab'] = "[HT40+]"
755 return params
756
757def ht40_minus_params(channel="1", ssid=None, country=None):
758 params = ht20_params(channel, ssid, country)
759 params['ht_capab'] = "[HT40-]"
760 return params
761
762def cmd_execute(apdev, cmd, shell=False):
763 hapd_global = HostapdGlobal(apdev)
764 return hapd_global.cmd_execute(cmd, shell=shell)
you.chen2b010c72022-04-08 17:10:16 +0800765
766def send_file(apdev, src, dst):
767 hapd_global = HostapdGlobal(apdev)
768 return hapd_global.send_file(src, dst)
769
770def acl_file(dev, apdev, conf):
771 fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
772 f = os.fdopen(fd, 'w')
773
774 if conf == 'hostapd.macaddr':
775 mac0 = dev[0].get_status_field("address")
776 f.write(mac0 + '\n')
777 f.write("02:00:00:00:00:12\n")
778 f.write("02:00:00:00:00:34\n")
779 f.write("-02:00:00:00:00:12\n")
780 f.write("-02:00:00:00:00:34\n")
781 f.write("01:01:01:01:01:01\n")
782 f.write("03:01:01:01:01:03\n")
783 elif conf == 'hostapd.accept':
784 mac0 = dev[0].get_status_field("address")
785 mac1 = dev[1].get_status_field("address")
786 f.write(mac0 + " 1\n")
787 f.write(mac1 + " 2\n")
788 elif conf == 'hostapd.accept2':
789 mac0 = dev[0].get_status_field("address")
790 mac1 = dev[1].get_status_field("address")
791 mac2 = dev[2].get_status_field("address")
792 f.write(mac0 + " 1\n")
793 f.write(mac1 + " 2\n")
794 f.write(mac2 + " 3\n")
795 else:
796 f.close()
797 os.unlink(filename)
798 return conf
799
800 return filename
801
802def bssid_inc(apdev, inc=1):
803 parts = apdev['bssid'].split(':')
804 parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
805 bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
806 parts[3], parts[4], parts[5])
807 return bssid
808
809def cfg_file(apdev, conf, ifname=None):
810 match = re.search(r'^bss-.+', conf)
811 if match:
812 # put cfg file in /tmp directory
813 fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
814 f = os.fdopen(fd, 'w')
815 idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
816 if ifname is None:
817 ifname = apdev['ifname']
818 if idx != '1':
819 ifname = ifname + '-' + idx
820
821 f.write("driver=nl80211\n")
822 f.write("ctrl_interface=/var/run/hostapd\n")
823 f.write("hw_mode=g\n")
824 f.write("channel=1\n")
825 f.write("ieee80211n=1\n")
826 if conf.startswith('bss-ht40-'):
827 f.write("ht_capab=[HT40+]\n")
828 f.write("interface=%s\n" % ifname)
829
830 f.write("ssid=bss-%s\n" % idx)
831 if conf == 'bss-2-dup.conf':
832 bssid = apdev['bssid']
833 else:
834 bssid = bssid_inc(apdev, int(idx) - 1)
835 f.write("bssid=%s\n" % bssid)
836
837 return fname
838
839 return conf