blob: b93d5caed5908434ab5d3609597c3efc8f97e72b [file] [log] [blame]
b.liue9582032025-04-17 19:18:16 +08001# Python class for controlling hostapd
2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8import re
9import time
10import logging
11import binascii
12import struct
13import tempfile
14import wpaspy
15import remotehost
16import utils
17import subprocess
18
19logger = logging.getLogger()
20hapd_ctrl = '/var/run/hostapd'
21hapd_global = '/var/run/hostapd-global'
22
23def mac2tuple(mac):
24 return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
25
26class HostapdGlobal:
27 def __init__(self, apdev=None, global_ctrl_override=None):
28 try:
29 hostname = apdev['hostname']
30 port = apdev['port']
31 except:
32 hostname = None
33 port = 8878
34 self.host = remotehost.Host(hostname)
35 self.hostname = hostname
36 self.port = port
37 if hostname is None:
38 global_ctrl = hapd_global
39 if global_ctrl_override:
40 global_ctrl = global_ctrl_override
41 self.ctrl = wpaspy.Ctrl(global_ctrl)
42 self.mon = wpaspy.Ctrl(global_ctrl)
43 self.dbg = ""
44 else:
45 self.ctrl = wpaspy.Ctrl(hostname, port)
46 self.mon = wpaspy.Ctrl(hostname, port)
47 self.dbg = hostname + "/" + str(port)
48 self.mon.attach()
49
50 def cmd_execute(self, cmd_array, shell=False):
51 if self.hostname is None:
52 if shell:
53 cmd = ' '.join(cmd_array)
54 else:
55 cmd = cmd_array
56 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
57 stdout=subprocess.PIPE, shell=shell)
58 out = proc.communicate()[0]
59 ret = proc.returncode
60 return ret, out.decode()
61 else:
62 return self.host.execute(cmd_array)
63
64 def request(self, cmd, timeout=10):
65 logger.debug(self.dbg + ": CTRL(global): " + cmd)
66 return self.ctrl.request(cmd, timeout)
67
68 def wait_event(self, events, timeout):
69 start = os.times()[4]
70 while True:
71 while self.mon.pending():
72 ev = self.mon.recv()
73 logger.debug(self.dbg + "(global): " + ev)
74 for event in events:
75 if event in ev:
76 return ev
77 now = os.times()[4]
78 remaining = start + timeout - now
79 if remaining <= 0:
80 break
81 if not self.mon.pending(timeout=remaining):
82 break
83 return None
84
85 def add(self, ifname, driver=None):
86 cmd = "ADD " + ifname + " " + hapd_ctrl
87 if driver:
88 cmd += " " + driver
89 res = self.request(cmd)
90 if "OK" not in res:
91 raise Exception("Could not add hostapd interface " + ifname)
92
93 def add_iface(self, ifname, confname):
94 res = self.request("ADD " + ifname + " config=" + confname)
95 if "OK" not in res:
96 raise Exception("Could not add hostapd interface")
97
98 def add_bss(self, phy, confname, ignore_error=False):
99 res = self.request("ADD bss_config=" + phy + ":" + confname)
100 if "OK" not in res:
101 if not ignore_error:
102 raise Exception("Could not add hostapd BSS")
103
104 def add_link(self, ifname, confname):
105 res = self.request("ADD " + ifname + " config=" + confname)
106 if "OK" not in res:
107 raise Exception("Could not add hostapd link")
108
109 def remove(self, ifname):
110 self.request("REMOVE " + ifname, timeout=30)
111
112 def relog(self):
113 self.request("RELOG")
114
115 def flush(self):
116 self.request("FLUSH")
117
118 def get_ctrl_iface_port(self, ifname):
119 if self.hostname is None:
120 return None
121
122 res = self.request("INTERFACES ctrl")
123 lines = res.splitlines()
124 found = False
125 for line in lines:
126 words = line.split()
127 if words[0] == ifname:
128 found = True
129 break
130 if not found:
131 raise Exception("Could not find UDP port for " + ifname)
132 res = line.find("ctrl_iface=udp:")
133 if res == -1:
134 raise Exception("Wrong ctrl_interface format")
135 words = line.split(":")
136 return int(words[1])
137
138 def terminate(self):
139 self.mon.detach()
140 self.mon.close()
141 self.mon = None
142 self.ctrl.terminate()
143 self.ctrl = None
144
145 def send_file(self, src, dst):
146 self.host.send_file(src, dst)
147
148class Hostapd:
149 def __init__(self, ifname, bssidx=0, hostname=None, ctrl=hapd_ctrl,
150 port=8877):
151 self.hostname = hostname
152 self.host = remotehost.Host(hostname, ifname)
153 self.ifname = ifname
154 if hostname is None:
155 self.ctrl = wpaspy.Ctrl(os.path.join(ctrl, ifname))
156 self.mon = wpaspy.Ctrl(os.path.join(ctrl, ifname))
157 self.dbg = ifname
158 else:
159 self.ctrl = wpaspy.Ctrl(hostname, port)
160 self.mon = wpaspy.Ctrl(hostname, port)
161 self.dbg = hostname + "/" + ifname
162 self.mon.attach()
163 self.bssid = None
164 self.bssidx = bssidx
165 self.mld_addr = None
166
167 def cmd_execute(self, cmd_array, shell=False):
168 if self.hostname is None:
169 if shell:
170 cmd = ' '.join(cmd_array)
171 else:
172 cmd = cmd_array
173 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
174 stdout=subprocess.PIPE, shell=shell)
175 out = proc.communicate()[0]
176 ret = proc.returncode
177 return ret, out.decode()
178 else:
179 return self.host.execute(cmd_array)
180
181 def close_ctrl(self):
182 if self.mon is not None:
183 self.mon.detach()
184 self.mon.close()
185 self.mon = None
186 self.ctrl.close()
187 self.ctrl = None
188
189 def own_addr(self):
190 if self.bssid is None:
191 self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
192 return self.bssid
193
194 def own_mld_addr(self):
195 if self.mld_addr is None:
196 self.mld_addr = self.get_status_field('mld_addr[%d]' % self.bssidx)
197 return self.mld_addr
198
199 def get_addr(self, group=False):
200 if self.own_mld_addr() is None:
201 return self.own_addr()
202 return self.own_mld_addr()
203
204 def request(self, cmd):
205 logger.debug(self.dbg + ": CTRL: " + cmd)
206 return self.ctrl.request(cmd)
207
208 def ping(self):
209 return "PONG" in self.request("PING")
210
211 def set(self, field, value):
212 if "OK" not in self.request("SET " + field + " " + value):
213 if "TKIP" in value and (field == "wpa_pairwise" or \
214 field == "rsn_pairwise"):
215 raise utils.HwsimSkip("Cipher TKIP not supported")
216 raise Exception("Failed to set hostapd parameter " + field)
217
218 def set_defaults(self, set_channel=True):
219 self.set("driver", "nl80211")
220 if set_channel:
221 self.set("hw_mode", "g")
222 self.set("channel", "1")
223 self.set("ieee80211n", "1")
224 self.set("logger_stdout", "-1")
225 self.set("logger_stdout_level", "0")
226
227 def set_open(self, ssid):
228 self.set_defaults()
229 self.set("ssid", ssid)
230
231 def set_wpa2_psk(self, ssid, passphrase):
232 self.set_defaults()
233 self.set("ssid", ssid)
234 self.set("wpa_passphrase", passphrase)
235 self.set("wpa", "2")
236 self.set("wpa_key_mgmt", "WPA-PSK")
237 self.set("rsn_pairwise", "CCMP")
238
239 def set_wpa_psk(self, ssid, passphrase):
240 self.set_defaults()
241 self.set("ssid", ssid)
242 self.set("wpa_passphrase", passphrase)
243 self.set("wpa", "1")
244 self.set("wpa_key_mgmt", "WPA-PSK")
245 self.set("wpa_pairwise", "TKIP")
246
247 def set_wpa_psk_mixed(self, ssid, passphrase):
248 self.set_defaults()
249 self.set("ssid", ssid)
250 self.set("wpa_passphrase", passphrase)
251 self.set("wpa", "3")
252 self.set("wpa_key_mgmt", "WPA-PSK")
253 self.set("wpa_pairwise", "TKIP")
254 self.set("rsn_pairwise", "CCMP")
255
256 def set_wep(self, ssid, key):
257 self.set_defaults()
258 self.set("ssid", ssid)
259 self.set("wep_key0", key)
260
261 def enable(self):
262 if "OK" not in self.request("ENABLE"):
263 raise Exception("Failed to enable hostapd interface " + self.ifname)
264
265 def disable(self):
266 if "OK" not in self.request("DISABLE"):
267 raise Exception("Failed to disable hostapd interface " + self.ifname)
268
269 def dump_monitor(self):
270 while self.mon.pending():
271 ev = self.mon.recv()
272 logger.debug(self.dbg + ": " + ev)
273
274 def wait_event(self, events, timeout):
275 if not isinstance(events, list):
276 raise Exception("Hostapd.wait_event() called with incorrect events argument type")
277 start = os.times()[4]
278 while True:
279 while self.mon.pending():
280 ev = self.mon.recv()
281 logger.debug(self.dbg + ": " + ev)
282 for event in events:
283 if event in ev:
284 return ev
285 now = os.times()[4]
286 remaining = start + timeout - now
287 if remaining <= 0:
288 break
289 if not self.mon.pending(timeout=remaining):
290 break
291 return None
292
293 def wait_sta(self, addr=None, timeout=2):
294 ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
295 if ev is None:
296 raise Exception("AP did not report STA connection")
297 if addr and addr not in ev:
298 raise Exception("Unexpected STA address in connection event: " + ev)
299 return ev
300
301 def wait_ptkinitdone(self, addr, timeout=2):
302 while timeout > 0:
303 sta = self.get_sta(addr)
304 if 'hostapdWPAPTKState' not in sta:
305 raise Exception("GET_STA did not return hostapdWPAPTKState")
306 state = sta['hostapdWPAPTKState']
307 if state == "11":
308 return
309 time.sleep(0.1)
310 timeout -= 0.1
311 raise Exception("Timeout while waiting for PTKINITDONE")
312
313 def get_status(self):
314 res = self.request("STATUS")
315 lines = res.splitlines()
316 vals = dict()
317 for l in lines:
318 [name, value] = l.split('=', 1)
319 vals[name] = value
320 return vals
321
322 def get_status_field(self, field):
323 vals = self.get_status()
324 if field in vals:
325 return vals[field]
326 return None
327
328 def get_driver_status(self):
329 res = self.request("STATUS-DRIVER")
330 lines = res.splitlines()
331 vals = dict()
332 for l in lines:
333 [name, value] = l.split('=', 1)
334 vals[name] = value
335 return vals
336
337 def get_driver_status_field(self, field):
338 vals = self.get_driver_status()
339 if field in vals:
340 return vals[field]
341 return None
342
343 def get_config(self):
344 res = self.request("GET_CONFIG")
345 lines = res.splitlines()
346 vals = dict()
347 for l in lines:
348 [name, value] = l.split('=', 1)
349 vals[name] = value
350 return vals
351
352 def mgmt_rx(self, timeout=5):
353 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
354 if ev is None:
355 return None
356 msg = {}
357 frame = binascii.unhexlify(ev.split(' ')[1])
358 msg['frame'] = frame
359
360 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
361 msg['fc'] = hdr[0]
362 msg['subtype'] = (hdr[0] >> 4) & 0xf
363 hdr = hdr[1:]
364 msg['duration'] = hdr[0]
365 hdr = hdr[1:]
366 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
367 hdr = hdr[6:]
368 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
369 hdr = hdr[6:]
370 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
371 hdr = hdr[6:]
372 msg['seq_ctrl'] = hdr[0]
373 msg['payload'] = frame[24:]
374
375 return msg
376
377 def mgmt_tx(self, msg):
378 t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
379 hdr = struct.pack('<HH6B6B6BH', *t)
380 res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
381 if "OK" not in res:
382 raise Exception("MGMT_TX command to hostapd failed")
383
384 def get_sta(self, addr, info=None, next=False):
385 cmd = "STA-NEXT " if next else "STA "
386 if addr is None:
387 res = self.request("STA-FIRST")
388 elif info:
389 res = self.request(cmd + addr + " " + info)
390 else:
391 res = self.request(cmd + addr)
392 lines = res.splitlines()
393 vals = dict()
394 first = True
395 for l in lines:
396 if first and '=' not in l:
397 vals['addr'] = l
398 first = False
399 else:
400 [name, value] = l.split('=', 1)
401 vals[name] = value
402 return vals
403
404 def get_mib(self, param=None):
405 if param:
406 res = self.request("MIB " + param)
407 else:
408 res = self.request("MIB")
409 lines = res.splitlines()
410 vals = dict()
411 for l in lines:
412 name_val = l.split('=', 1)
413 if len(name_val) > 1:
414 vals[name_val[0]] = name_val[1]
415 return vals
416
417 def get_pmksa(self, addr):
418 res = self.request("PMKSA")
419 lines = res.splitlines()
420 for l in lines:
421 if addr not in l:
422 continue
423 vals = dict()
424 [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
425 vals['index'] = index
426 vals['pmkid'] = pmkid
427 vals['expiration'] = expiration
428 vals['opportunistic'] = opportunistic
429 return vals
430 return None
431
432 def dpp_qr_code(self, uri):
433 res = self.request("DPP_QR_CODE " + uri)
434 if "FAIL" in res:
435 raise Exception("Failed to parse QR Code URI")
436 return int(res)
437
438 def dpp_nfc_uri(self, uri):
439 res = self.request("DPP_NFC_URI " + uri)
440 if "FAIL" in res:
441 raise Exception("Failed to parse NFC URI")
442 return int(res)
443
444 def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
445 curve=None, key=None, supported_curves=None,
446 host=None):
447 cmd = "DPP_BOOTSTRAP_GEN type=" + type
448 if chan:
449 cmd += " chan=" + chan
450 if mac:
451 if mac is True:
452 mac = self.own_addr()
453 cmd += " mac=" + mac.replace(':', '')
454 if info:
455 cmd += " info=" + info
456 if curve:
457 cmd += " curve=" + curve
458 if key:
459 cmd += " key=" + key
460 if supported_curves:
461 cmd += " supported_curves=" + supported_curves
462 if host:
463 cmd += " host=" + host
464 res = self.request(cmd)
465 if "FAIL" in res:
466 raise Exception("Failed to generate bootstrapping info")
467 return int(res)
468
469 def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None,
470 extra=None):
471 cmd = "DPP_BOOTSTRAP_SET %d" % id
472 if ssid:
473 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
474 if extra:
475 cmd += " " + extra
476 if conf:
477 cmd += " conf=" + conf
478 if configurator is not None:
479 cmd += " configurator=%d" % configurator
480 if "OK" not in self.request(cmd):
481 raise Exception("Failed to set bootstrapping parameters")
482
483 def dpp_listen(self, freq, netrole=None, qr=None, role=None):
484 cmd = "DPP_LISTEN " + str(freq)
485 if netrole:
486 cmd += " netrole=" + netrole
487 if qr:
488 cmd += " qr=" + qr
489 if role:
490 cmd += " role=" + role
491 if "OK" not in self.request(cmd):
492 raise Exception("Failed to start listen operation")
493
494 def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
495 extra=None, own=None, role=None, neg_freq=None,
496 ssid=None, passphrase=None, expect_fail=False,
497 conn_status=False, nfc_uri=None):
498 cmd = "DPP_AUTH_INIT"
499 if peer is None:
500 if nfc_uri:
501 peer = self.dpp_nfc_uri(nfc_uri)
502 else:
503 peer = self.dpp_qr_code(uri)
504 cmd += " peer=%d" % peer
505 if own is not None:
506 cmd += " own=%d" % own
507 if role:
508 cmd += " role=" + role
509 if extra:
510 cmd += " " + extra
511 if conf:
512 cmd += " conf=" + conf
513 if configurator is not None:
514 cmd += " configurator=%d" % configurator
515 if neg_freq:
516 cmd += " neg_freq=%d" % neg_freq
517 if ssid:
518 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
519 if passphrase:
520 cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
521 if conn_status:
522 cmd += " conn_status=1"
523 res = self.request(cmd)
524 if expect_fail:
525 if "FAIL" not in res:
526 raise Exception("DPP authentication started unexpectedly")
527 return
528 if "OK" not in res:
529 raise Exception("Failed to initiate DPP Authentication")
530
531 def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
532 extra=None, use_id=None, ver=None):
533 if use_id is None:
534 id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
535 else:
536 id1 = use_id
537 cmd = "own=%d " % id1
538 if identifier:
539 cmd += "identifier=%s " % identifier
540 cmd += "init=1 "
541 if ver is not None:
542 cmd += "ver=" + str(ver) + " "
543 if role:
544 cmd += "role=%s " % role
545 if extra:
546 cmd += extra + " "
547 cmd += "code=%s" % code
548 res = self.request("DPP_PKEX_ADD " + cmd)
549 if "FAIL" in res:
550 raise Exception("Failed to set PKEX data (initiator)")
551 return id1
552
553 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
554 listen_role=None):
555 id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
556 cmd = "own=%d " % id0
557 if identifier:
558 cmd += "identifier=%s " % identifier
559 cmd += "code=%s" % code
560 res = self.request("DPP_PKEX_ADD " + cmd)
561 if "FAIL" in res:
562 raise Exception("Failed to set PKEX data (responder)")
563 self.dpp_listen(freq, role=listen_role)
564
565 def dpp_configurator_add(self, curve=None, key=None,
566 net_access_key_curve=None):
567 cmd = "DPP_CONFIGURATOR_ADD"
568 if curve:
569 cmd += " curve=" + curve
570 if net_access_key_curve:
571 cmd += " net_access_key_curve=" + curve
572 if key:
573 cmd += " key=" + key
574 res = self.request(cmd)
575 if "FAIL" in res:
576 raise Exception("Failed to add configurator")
577 return int(res)
578
579 def dpp_configurator_remove(self, conf_id):
580 res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
581 if "OK" not in res:
582 raise Exception("DPP_CONFIGURATOR_REMOVE failed")
583
584 def note(self, txt):
585 self.request("NOTE " + txt)
586
587 def send_file(self, src, dst):
588 self.host.send_file(src, dst)
589
590 def get_ptksa(self, bssid, cipher):
591 res = self.request("PTKSA_CACHE_LIST")
592 lines = res.splitlines()
593 for l in lines:
594 if bssid not in l or cipher not in l:
595 continue
596 vals = dict()
597 [index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5)
598 vals['index'] = index
599 vals['addr'] = addr
600 vals['cipher'] = cipher
601 vals['expiration'] = expiration
602 vals['tk'] = tk
603 vals['kdk'] = kdk
604 return vals
605 return None
606
607def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
608 global_ctrl_override=None, driver=False, set_channel=True):
609 if isinstance(apdev, dict):
610 ifname = apdev['ifname']
611 try:
612 hostname = apdev['hostname']
613 port = apdev['port']
614 logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
615 except:
616 logger.info("Starting AP " + ifname)
617 hostname = None
618 port = 8878
619 else:
620 ifname = apdev
621 logger.info("Starting AP " + ifname + " (old add_ap argument type)")
622 hostname = None
623 port = 8878
624 hapd_global = HostapdGlobal(apdev,
625 global_ctrl_override=global_ctrl_override)
626 hapd_global.remove(ifname)
627 hapd_global.add(ifname, driver=driver)
628 port = hapd_global.get_ctrl_iface_port(ifname)
629 hapd = Hostapd(ifname, hostname=hostname, port=port)
630 if not hapd.ping():
631 raise Exception("Could not ping hostapd")
632 hapd.set_defaults(set_channel=set_channel)
633 fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
634 "wpa", "wpa_deny_ptk0_rekey",
635 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
636 "acct_server_addr", "osu_server_uri"]
637 for field in fields:
638 if field in params:
639 hapd.set(field, params[field])
640 for f, v in list(params.items()):
641 if f in fields:
642 continue
643 if isinstance(v, list):
644 for val in v:
645 hapd.set(f, val)
646 else:
647 hapd.set(f, v)
648 if no_enable:
649 return hapd
650 hapd.enable()
651 if wait_enabled:
652 ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
653 if ev is None:
654 raise Exception("AP startup timed out")
655 if "AP-ENABLED" not in ev:
656 raise Exception("AP startup failed")
657 return hapd
658
659def add_bss(apdev, ifname, confname, ignore_error=False):
660 phy = utils.get_phy(apdev)
661 try:
662 hostname = apdev['hostname']
663 port = apdev['port']
664 logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
665 except:
666 logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
667 hostname = None
668 port = 8878
669 hapd_global = HostapdGlobal(apdev)
670 confname = cfg_file(apdev, confname, ifname)
671 hapd_global.send_file(confname, confname)
672 hapd_global.add_bss(phy, confname, ignore_error)
673 port = hapd_global.get_ctrl_iface_port(ifname)
674 hapd = Hostapd(ifname, hostname=hostname, port=port)
675 if not hapd.ping():
676 raise Exception("Could not ping hostapd")
677 return hapd
678
679def add_iface(apdev, confname):
680 ifname = apdev['ifname']
681 try:
682 hostname = apdev['hostname']
683 port = apdev['port']
684 logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
685 except:
686 logger.info("Starting interface " + ifname)
687 hostname = None
688 port = 8878
689 hapd_global = HostapdGlobal(apdev)
690 confname = cfg_file(apdev, confname, ifname)
691 hapd_global.send_file(confname, confname)
692 hapd_global.add_iface(ifname, confname)
693 port = hapd_global.get_ctrl_iface_port(ifname)
694 hapd = Hostapd(ifname, hostname=hostname, port=port)
695 if not hapd.ping():
696 raise Exception("Could not ping hostapd")
697 return hapd
698
699def add_mld_link(apdev, params):
700 if isinstance(apdev, dict):
701 ifname = apdev['ifname']
702 try:
703 hostname = apdev['hostname']
704 port = apdev['port']
705 logger.info("Adding link on: " + hostname + "/" + port + " ifname=" + ifname)
706 except:
707 logger.info("Adding link on: ifname=" + ifname)
708 hostname = None
709 port = 8878
710 else:
711 ifname = apdev
712 logger.info("Adding link on: ifname=" + ifname)
713 hostname = None
714 port = 8878
715
716 hapd_global = HostapdGlobal(apdev)
717 confname, ctrl_iface = cfg_mld_link_file(ifname, params)
718 hapd_global.send_file(confname, confname)
719 try:
720 hapd_global.add_link(ifname, confname)
721 except Exception as e:
722 if str(e) == "Could not add hostapd link":
723 raise utils.HwsimSkip("No MLO support in hostapd")
724 port = hapd_global.get_ctrl_iface_port(ifname)
725 hapd = Hostapd(ifname, hostname=hostname, ctrl=ctrl_iface, port=port)
726 if not hapd.ping():
727 raise Exception("Could not ping hostapd")
728 return hapd
729
730def remove_bss(apdev, ifname=None):
731 if ifname == None:
732 ifname = apdev['ifname']
733 try:
734 hostname = apdev['hostname']
735 port = apdev['port']
736 logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
737 except:
738 logger.info("Removing BSS " + ifname)
739 hapd_global = HostapdGlobal(apdev)
740 hapd_global.remove(ifname)
741
742def terminate(apdev):
743 try:
744 hostname = apdev['hostname']
745 port = apdev['port']
746 logger.info("Terminating hostapd " + hostname + "/" + port)
747 except:
748 logger.info("Terminating hostapd")
749 hapd_global = HostapdGlobal(apdev)
750 hapd_global.terminate()
751
752def wpa3_params(ssid=None, password=None, wpa_key_mgmt="SAE",
753 ieee80211w="2"):
754 params = {"wpa": "2",
755 "wpa_key_mgmt": wpa_key_mgmt,
756 "ieee80211w": ieee80211w,
757 "rsn_pairwise": "CCMP"}
758 if ssid:
759 params["ssid"] = ssid
760 if password:
761 params["sae_password"] = password
762 return params
763
764def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
765 ieee80211w=None):
766 params = {"wpa": "2",
767 "wpa_key_mgmt": wpa_key_mgmt,
768 "rsn_pairwise": "CCMP"}
769 if ssid:
770 params["ssid"] = ssid
771 if passphrase:
772 params["wpa_passphrase"] = passphrase
773 if ieee80211w is not None:
774 params["ieee80211w"] = ieee80211w
775 return params
776
777def wpa_params(ssid=None, passphrase=None):
778 params = {"wpa": "1",
779 "wpa_key_mgmt": "WPA-PSK",
780 "wpa_pairwise": "TKIP"}
781 if ssid:
782 params["ssid"] = ssid
783 if passphrase:
784 params["wpa_passphrase"] = passphrase
785 return params
786
787def wpa_mixed_params(ssid=None, passphrase=None):
788 params = {"wpa": "3",
789 "wpa_key_mgmt": "WPA-PSK",
790 "wpa_pairwise": "TKIP",
791 "rsn_pairwise": "CCMP"}
792 if ssid:
793 params["ssid"] = ssid
794 if passphrase:
795 params["wpa_passphrase"] = passphrase
796 return params
797
798def radius_params():
799 params = {"auth_server_addr": "127.0.0.1",
800 "auth_server_port": "1812",
801 "auth_server_shared_secret": "radius",
802 "nas_identifier": "nas.w1.fi"}
803 return params
804
805def wpa_eap_params(ssid=None):
806 params = radius_params()
807 params["wpa"] = "1"
808 params["wpa_key_mgmt"] = "WPA-EAP"
809 params["wpa_pairwise"] = "TKIP"
810 params["ieee8021x"] = "1"
811 if ssid:
812 params["ssid"] = ssid
813 return params
814
815def wpa2_eap_params(ssid=None):
816 params = radius_params()
817 params["wpa"] = "2"
818 params["wpa_key_mgmt"] = "WPA-EAP"
819 params["rsn_pairwise"] = "CCMP"
820 params["ieee8021x"] = "1"
821 if ssid:
822 params["ssid"] = ssid
823 return params
824
825def b_only_params(channel="1", ssid=None, country=None):
826 params = {"hw_mode": "b",
827 "channel": channel}
828 if ssid:
829 params["ssid"] = ssid
830 if country:
831 params["country_code"] = country
832 return params
833
834def g_only_params(channel="1", ssid=None, country=None):
835 params = {"hw_mode": "g",
836 "channel": channel}
837 if ssid:
838 params["ssid"] = ssid
839 if country:
840 params["country_code"] = country
841 return params
842
843def a_only_params(channel="36", ssid=None, country=None):
844 params = {"hw_mode": "a",
845 "channel": channel}
846 if ssid:
847 params["ssid"] = ssid
848 if country:
849 params["country_code"] = country
850 return params
851
852def ht20_params(channel="1", ssid=None, country=None):
853 params = {"ieee80211n": "1",
854 "channel": channel,
855 "hw_mode": "g"}
856 if int(channel) > 14:
857 params["hw_mode"] = "a"
858 if ssid:
859 params["ssid"] = ssid
860 if country:
861 params["country_code"] = country
862 return params
863
864def ht40_plus_params(channel="1", ssid=None, country=None):
865 params = ht20_params(channel, ssid, country)
866 params['ht_capab'] = "[HT40+]"
867 return params
868
869def ht40_minus_params(channel="1", ssid=None, country=None):
870 params = ht20_params(channel, ssid, country)
871 params['ht_capab'] = "[HT40-]"
872 return params
873
874def cmd_execute(apdev, cmd, shell=False):
875 hapd_global = HostapdGlobal(apdev)
876 return hapd_global.cmd_execute(cmd, shell=shell)
877
878def send_file(apdev, src, dst):
879 hapd_global = HostapdGlobal(apdev)
880 return hapd_global.send_file(src, dst)
881
882def acl_file(dev, apdev, conf):
883 fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
884 f = os.fdopen(fd, 'w')
885
886 if conf == 'hostapd.macaddr':
887 mac0 = dev[0].get_status_field("address")
888 f.write(mac0 + '\n')
889 f.write("02:00:00:00:00:12\n")
890 f.write("02:00:00:00:00:34\n")
891 f.write("-02:00:00:00:00:12\n")
892 f.write("-02:00:00:00:00:34\n")
893 f.write("01:01:01:01:01:01\n")
894 f.write("03:01:01:01:01:03\n")
895 elif conf == 'hostapd.accept':
896 mac0 = dev[0].get_status_field("address")
897 mac1 = dev[1].get_status_field("address")
898 f.write(mac0 + " 1\n")
899 f.write(mac1 + " 2\n")
900 elif conf == 'hostapd.accept2':
901 mac0 = dev[0].get_status_field("address")
902 mac1 = dev[1].get_status_field("address")
903 mac2 = dev[2].get_status_field("address")
904 f.write(mac0 + " 1\n")
905 f.write(mac1 + " 2\n")
906 f.write(mac2 + " 3\n")
907 else:
908 f.close()
909 os.unlink(filename)
910 return conf
911
912 return filename
913
914def bssid_inc(apdev, inc=1):
915 parts = apdev['bssid'].split(':')
916 parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
917 bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
918 parts[3], parts[4], parts[5])
919 return bssid
920
921def cfg_file(apdev, conf, ifname=None):
922 match = re.search(r'^bss-.+', conf)
923 if match:
924 # put cfg file in /tmp directory
925 fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
926 f = os.fdopen(fd, 'w')
927 idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
928 if ifname is None:
929 ifname = apdev['ifname']
930 if idx != '1':
931 ifname = ifname + '-' + idx
932
933 f.write("driver=nl80211\n")
934 f.write("ctrl_interface=/var/run/hostapd\n")
935 f.write("hw_mode=g\n")
936 f.write("channel=1\n")
937 f.write("ieee80211n=1\n")
938 if conf.startswith('bss-ht40-'):
939 f.write("ht_capab=[HT40+]\n")
940 f.write("interface=%s\n" % ifname)
941
942 f.write("ssid=bss-%s\n" % idx)
943 if conf == 'bss-2-dup.conf':
944 bssid = apdev['bssid']
945 else:
946 bssid = bssid_inc(apdev, int(idx) - 1)
947 f.write("bssid=%s\n" % bssid)
948
949 return fname
950
951 return conf
952
953idx = 0
954def cfg_mld_link_file(ifname, params):
955 global idx
956 ctrl_iface="/var/run/hostapd"
957 conf = "link-%d.conf" % idx
958
959 fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
960 f = os.fdopen(fd, 'w')
961
962 if idx != 0:
963 ctrl_iface="/var/run/hostapd_%d" % idx
964
965 f.write("ctrl_interface=%s\n" % ctrl_iface)
966 f.write("driver=nl80211\n")
967 f.write("ieee80211n=1\n")
968 f.write("ieee80211ac=1\n")
969 f.write("ieee80211ax=1\n")
970 f.write("ieee80211be=1\n")
971 f.write("interface=%s\n" % ifname)
972 f.write("mld_ap=1\n")
973 f.write("mld_id=0\n")
974
975 for k, v in list(params.items()):
976 f.write("{}={}\n".format(k,v))
977
978 idx = idx + 1
979
980 return fname, ctrl_iface