| # Python class for controlling wpa_supplicant |
| # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> |
| # |
| # This software may be distributed under the terms of the BSD license. |
| # See README for more details. |
| |
| import os |
| import time |
| import logging |
| import binascii |
| import re |
| import struct |
| import wpaspy |
| import remotehost |
| import subprocess |
| |
| logger = logging.getLogger() |
| wpas_ctrl = '/var/run/wpa_supplicant' |
| |
| class WpaSupplicant: |
| def __init__(self, ifname=None, global_iface=None, hostname=None, |
| port=9877, global_port=9878, monitor=True): |
| self.monitor = monitor |
| self.hostname = hostname |
| self.group_ifname = None |
| self.global_mon = None |
| self.global_ctrl = None |
| self.gctrl_mon = None |
| self.ctrl = None |
| self.mon = None |
| self.ifname = None |
| self.host = remotehost.Host(hostname, ifname) |
| self._group_dbg = None |
| if ifname: |
| self.set_ifname(ifname, hostname, port) |
| res = self.get_driver_status() |
| if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000: |
| self.p2p_dev_ifname = 'p2p-dev-' + self.ifname |
| else: |
| self.p2p_dev_ifname = ifname |
| |
| self.global_iface = global_iface |
| if global_iface: |
| if hostname != None: |
| self.global_ctrl = wpaspy.Ctrl(hostname, global_port) |
| if self.monitor: |
| self.global_mon = wpaspy.Ctrl(hostname, global_port) |
| self.global_dbg = hostname + "/" + str(global_port) + "/" |
| else: |
| self.global_ctrl = wpaspy.Ctrl(global_iface) |
| if self.monitor: |
| self.global_mon = wpaspy.Ctrl(global_iface) |
| self.global_dbg = "" |
| if self.monitor: |
| self.global_mon.attach() |
| |
| def __del__(self): |
| self.close_monitor() |
| self.close_control() |
| |
| def close_control_ctrl(self): |
| if self.ctrl: |
| del self.ctrl |
| self.ctrl = None |
| |
| def close_control_global(self): |
| if self.global_ctrl: |
| del self.global_ctrl |
| self.global_ctrl = None |
| |
| def close_control(self): |
| self.close_control_ctrl() |
| self.close_control_global() |
| |
| def close_monitor_mon(self): |
| if not self.mon: |
| return |
| try: |
| while self.mon.pending(): |
| ev = self.mon.recv() |
| logger.debug(self.dbg + ": " + ev) |
| except: |
| pass |
| try: |
| self.mon.detach() |
| except ConnectionRefusedError: |
| pass |
| except Exception as e: |
| if str(e) == "DETACH failed": |
| pass |
| else: |
| raise |
| del self.mon |
| self.mon = None |
| |
| def close_monitor_global(self): |
| if not self.global_mon: |
| return |
| try: |
| while self.global_mon.pending(): |
| ev = self.global_mon.recv() |
| logger.debug(self.global_dbg + ": " + ev) |
| except: |
| pass |
| try: |
| self.global_mon.detach() |
| except ConnectionRefusedError: |
| pass |
| except Exception as e: |
| if str(e) == "DETACH failed": |
| pass |
| else: |
| raise |
| del self.global_mon |
| self.global_mon = None |
| |
| def close_monitor_group(self): |
| if not self.gctrl_mon: |
| return |
| try: |
| while self.gctrl_mon.pending(): |
| ev = self.gctrl_mon.recv() |
| logger.debug(self.dbg + ": " + ev) |
| except: |
| pass |
| try: |
| self.gctrl_mon.detach() |
| except: |
| pass |
| del self.gctrl_mon |
| self.gctrl_mon = None |
| |
| def close_monitor(self): |
| self.close_monitor_mon() |
| self.close_monitor_global() |
| self.close_monitor_group() |
| |
| def cmd_execute(self, cmd_array, shell=False): |
| if self.hostname is None: |
| if shell: |
| cmd = ' '.join(cmd_array) |
| else: |
| cmd = cmd_array |
| proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT, |
| stdout=subprocess.PIPE, shell=shell) |
| out = proc.communicate()[0] |
| ret = proc.returncode |
| return ret, out.decode() |
| else: |
| return self.host.execute(cmd_array) |
| |
| def terminate(self): |
| if self.global_mon: |
| self.close_monitor_global() |
| self.global_ctrl.terminate() |
| self.global_ctrl = None |
| |
| def close_ctrl(self): |
| self.close_monitor_global() |
| self.close_control_global() |
| self.remove_ifname() |
| |
| def set_ifname(self, ifname, hostname=None, port=9877): |
| self.remove_ifname() |
| self.ifname = ifname |
| if hostname != None: |
| self.ctrl = wpaspy.Ctrl(hostname, port) |
| if self.monitor: |
| self.mon = wpaspy.Ctrl(hostname, port) |
| self.host = remotehost.Host(hostname, ifname) |
| self.dbg = hostname + "/" + ifname |
| else: |
| self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) |
| if self.monitor: |
| self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname)) |
| self.dbg = ifname |
| if self.monitor: |
| self.mon.attach() |
| |
| def remove_ifname(self): |
| self.close_monitor_mon() |
| self.close_control_ctrl() |
| self.ifname = None |
| |
| def get_ctrl_iface_port(self, ifname): |
| if self.hostname is None: |
| return None |
| |
| res = self.global_request("INTERFACES ctrl") |
| lines = res.splitlines() |
| found = False |
| for line in lines: |
| words = line.split() |
| if words[0] == ifname: |
| found = True |
| break |
| if not found: |
| raise Exception("Could not find UDP port for " + ifname) |
| res = line.find("ctrl_iface=udp:") |
| if res == -1: |
| raise Exception("Wrong ctrl_interface format") |
| words = line.split(":") |
| return int(words[1]) |
| |
| def interface_add(self, ifname, config="", driver="nl80211", |
| drv_params=None, br_ifname=None, create=False, |
| set_ifname=True, all_params=False, if_type=None): |
| status, groups = self.host.execute(["id"]) |
| if status != 0: |
| group = "admin" |
| group = "admin" if "(admin)" in groups else "adm" |
| cmd = "INTERFACE_ADD " + ifname + "\t" + config + "\t" + driver + "\tDIR=/var/run/wpa_supplicant GROUP=" + group |
| if drv_params: |
| cmd = cmd + '\t' + drv_params |
| if br_ifname: |
| if not drv_params: |
| cmd += '\t' |
| cmd += '\t' + br_ifname |
| if create: |
| if not br_ifname: |
| cmd += '\t' |
| if not drv_params: |
| cmd += '\t' |
| cmd += '\tcreate' |
| if if_type: |
| cmd += '\t' + if_type |
| if all_params and not create: |
| if not br_ifname: |
| cmd += '\t' |
| if not drv_params: |
| cmd += '\t' |
| cmd += '\t' |
| if "FAIL" in self.global_request(cmd): |
| raise Exception("Failed to add a dynamic wpa_supplicant interface") |
| if not create and set_ifname: |
| port = self.get_ctrl_iface_port(ifname) |
| self.set_ifname(ifname, self.hostname, port) |
| res = self.get_driver_status() |
| if 'capa.flags' in res and int(res['capa.flags'], 0) & 0x20000000: |
| self.p2p_dev_ifname = 'p2p-dev-' + self.ifname |
| else: |
| self.p2p_dev_ifname = ifname |
| |
| def interface_remove(self, ifname): |
| self.remove_ifname() |
| self.global_request("INTERFACE_REMOVE " + ifname) |
| |
| def request(self, cmd, timeout=10): |
| logger.debug(self.dbg + ": CTRL: " + cmd) |
| return self.ctrl.request(cmd, timeout=timeout) |
| |
| def global_request(self, cmd): |
| if self.global_iface is None: |
| return self.request(cmd) |
| else: |
| ifname = self.ifname or self.global_iface |
| logger.debug(self.global_dbg + ifname + ": CTRL(global): " + cmd) |
| return self.global_ctrl.request(cmd) |
| |
| @property |
| def group_dbg(self): |
| if self._group_dbg is not None: |
| return self._group_dbg |
| if self.group_ifname is None: |
| raise Exception("Cannot have group_dbg without group_ifname") |
| if self.hostname is None: |
| self._group_dbg = self.group_ifname |
| else: |
| self._group_dbg = self.hostname + "/" + self.group_ifname |
| return self._group_dbg |
| |
| def group_request(self, cmd): |
| if self.group_ifname and self.group_ifname != self.ifname: |
| if self.hostname is None: |
| gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname)) |
| else: |
| port = self.get_ctrl_iface_port(self.group_ifname) |
| gctrl = wpaspy.Ctrl(self.hostname, port) |
| logger.debug(self.group_dbg + ": CTRL(group): " + cmd) |
| return gctrl.request(cmd) |
| return self.request(cmd) |
| |
| def ping(self): |
| return "PONG" in self.request("PING") |
| |
| def global_ping(self): |
| return "PONG" in self.global_request("PING") |
| |
| def reset(self): |
| self.dump_monitor() |
| res = self.request("FLUSH") |
| if "OK" not in res: |
| logger.info("FLUSH to " + self.ifname + " failed: " + res) |
| self.global_request("REMOVE_NETWORK all") |
| self.global_request("SET p2p_no_group_iface 1") |
| self.global_request("P2P_FLUSH") |
| self.close_monitor_group() |
| self.group_ifname = None |
| self.dump_monitor() |
| |
| iter = 0 |
| while iter < 60: |
| state1 = self.get_driver_status_field("scan_state") |
| p2pdev = "p2p-dev-" + self.ifname |
| state2 = self.get_driver_status_field("scan_state", ifname=p2pdev) |
| states = str(state1) + " " + str(state2) |
| if "SCAN_STARTED" in states or "SCAN_REQUESTED" in states: |
| logger.info(self.ifname + ": Waiting for scan operation to complete before continuing") |
| time.sleep(1) |
| else: |
| break |
| iter = iter + 1 |
| if iter == 60: |
| logger.error(self.ifname + ": Driver scan state did not clear") |
| print("Trying to clear cfg80211/mac80211 scan state") |
| status, buf = self.host.execute(["ifconfig", self.ifname, "down"]) |
| if status != 0: |
| logger.info("ifconfig failed: " + buf) |
| logger.info(status) |
| status, buf = self.host.execute(["ifconfig", self.ifname, "up"]) |
| if status != 0: |
| logger.info("ifconfig failed: " + buf) |
| logger.info(status) |
| if iter > 0: |
| # The ongoing scan could have discovered BSSes or P2P peers |
| logger.info("Run FLUSH again since scan was in progress") |
| self.request("FLUSH") |
| self.dump_monitor() |
| |
| if not self.ping(): |
| logger.info("No PING response from " + self.ifname + " after reset") |
| |
| def set(self, field, value, allow_fail=False): |
| if "OK" not in self.request("SET " + field + " " + value): |
| if allow_fail: |
| return |
| raise Exception("Failed to set wpa_supplicant parameter " + field) |
| |
| def add_network(self): |
| id = self.request("ADD_NETWORK") |
| if "FAIL" in id: |
| raise Exception("ADD_NETWORK failed") |
| return int(id) |
| |
| def remove_network(self, id): |
| id = self.request("REMOVE_NETWORK " + str(id)) |
| if "FAIL" in id: |
| raise Exception("REMOVE_NETWORK failed") |
| return None |
| |
| def get_network(self, id, field): |
| res = self.request("GET_NETWORK " + str(id) + " " + field) |
| if res == "FAIL\n": |
| return None |
| return res |
| |
| def set_network(self, id, field, value): |
| res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value) |
| if "FAIL" in res: |
| raise Exception("SET_NETWORK failed") |
| return None |
| |
| def set_network_quoted(self, id, field, value): |
| res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') |
| if "FAIL" in res: |
| raise Exception("SET_NETWORK failed") |
| return None |
| |
| def p2pdev_request(self, cmd): |
| return self.global_request("IFNAME=" + self.p2p_dev_ifname + " " + cmd) |
| |
| def p2pdev_add_network(self): |
| id = self.p2pdev_request("ADD_NETWORK") |
| if "FAIL" in id: |
| raise Exception("p2pdev ADD_NETWORK failed") |
| return int(id) |
| |
| def p2pdev_set_network(self, id, field, value): |
| res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + " " + value) |
| if "FAIL" in res: |
| raise Exception("p2pdev SET_NETWORK failed") |
| return None |
| |
| def p2pdev_set_network_quoted(self, id, field, value): |
| res = self.p2pdev_request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"') |
| if "FAIL" in res: |
| raise Exception("p2pdev SET_NETWORK failed") |
| return None |
| |
| def list_networks(self, p2p=False): |
| if p2p: |
| res = self.global_request("LIST_NETWORKS") |
| else: |
| res = self.request("LIST_NETWORKS") |
| lines = res.splitlines() |
| networks = [] |
| for l in lines: |
| if "network id" in l: |
| continue |
| [id, ssid, bssid, flags] = l.split('\t') |
| network = {} |
| network['id'] = id |
| network['ssid'] = ssid |
| network['bssid'] = bssid |
| network['flags'] = flags |
| networks.append(network) |
| return networks |
| |
| def hs20_enable(self, auto_interworking=False): |
| self.request("SET interworking 1") |
| self.request("SET hs20 1") |
| if auto_interworking: |
| self.request("SET auto_interworking 1") |
| else: |
| self.request("SET auto_interworking 0") |
| |
| def interworking_add_network(self, bssid): |
| id = self.request("INTERWORKING_ADD_NETWORK " + bssid) |
| if "FAIL" in id or "OK" in id: |
| raise Exception("INTERWORKING_ADD_NETWORK failed") |
| return int(id) |
| |
| def add_cred(self): |
| id = self.request("ADD_CRED") |
| if "FAIL" in id: |
| raise Exception("ADD_CRED failed") |
| return int(id) |
| |
| def remove_cred(self, id): |
| id = self.request("REMOVE_CRED " + str(id)) |
| if "FAIL" in id: |
| raise Exception("REMOVE_CRED failed") |
| return None |
| |
| def set_cred(self, id, field, value): |
| res = self.request("SET_CRED " + str(id) + " " + field + " " + value) |
| if "FAIL" in res: |
| raise Exception("SET_CRED failed") |
| return None |
| |
| def set_cred_quoted(self, id, field, value): |
| res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"') |
| if "FAIL" in res: |
| raise Exception("SET_CRED failed") |
| return None |
| |
| def get_cred(self, id, field): |
| return self.request("GET_CRED " + str(id) + " " + field) |
| |
| def add_cred_values(self, params): |
| id = self.add_cred() |
| |
| quoted = ["realm", "username", "password", "domain", "imsi", |
| "excluded_ssid", "milenage", "ca_cert", "client_cert", |
| "private_key", "domain_suffix_match", "provisioning_sp", |
| "roaming_partner", "phase1", "phase2", "private_key_passwd", |
| "roaming_consortiums", "imsi_privacy_cert", |
| "imsi_privacy_attr"] |
| for field in quoted: |
| if field in params: |
| self.set_cred_quoted(id, field, params[field]) |
| |
| not_quoted = ["eap", "roaming_consortium", "priority", |
| "required_roaming_consortium", "sp_priority", |
| "max_bss_load", "update_identifier", "req_conn_capab", |
| "min_dl_bandwidth_home", "min_ul_bandwidth_home", |
| "min_dl_bandwidth_roaming", "min_ul_bandwidth_roaming"] |
| for field in not_quoted: |
| if field in params: |
| self.set_cred(id, field, params[field]) |
| |
| as_list = ["home_ois", "required_home_ois"] |
| for field in as_list: |
| if field in params: |
| self.set_cred_quoted(id, field, ','.join(params[field])) |
| |
| return id |
| |
| def select_network(self, id, freq=None): |
| if freq: |
| extra = " freq=" + str(freq) |
| else: |
| extra = "" |
| id = self.request("SELECT_NETWORK " + str(id) + extra) |
| if "FAIL" in id: |
| raise Exception("SELECT_NETWORK failed") |
| return None |
| |
| def mesh_group_add(self, id): |
| id = self.request("MESH_GROUP_ADD " + str(id)) |
| if "FAIL" in id: |
| raise Exception("MESH_GROUP_ADD failed") |
| return None |
| |
| def mesh_group_remove(self): |
| id = self.request("MESH_GROUP_REMOVE " + str(self.ifname)) |
| if "FAIL" in id: |
| raise Exception("MESH_GROUP_REMOVE failed") |
| return None |
| |
| def connect_network(self, id, timeout=None): |
| if timeout is None: |
| timeout = 10 if self.hostname is None else 60 |
| self.dump_monitor() |
| self.select_network(id) |
| self.wait_connected(timeout=timeout) |
| self.dump_monitor() |
| |
| def get_status(self, extra=None): |
| if extra: |
| extra = "-" + extra |
| else: |
| extra = "" |
| res = self.request("STATUS" + extra) |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| try: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| except ValueError as e: |
| logger.info(self.ifname + ": Ignore unexpected STATUS line: " + l) |
| return vals |
| |
| def get_status_field(self, field, extra=None): |
| vals = self.get_status(extra) |
| if field in vals: |
| return vals[field] |
| return None |
| |
| def get_group_status(self, extra=None): |
| if extra: |
| extra = "-" + extra |
| else: |
| extra = "" |
| res = self.group_request("STATUS" + extra) |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| try: |
| [name, value] = l.split('=', 1) |
| except ValueError: |
| logger.info(self.ifname + ": Ignore unexpected status line: " + l) |
| continue |
| vals[name] = value |
| return vals |
| |
| def get_group_status_field(self, field, extra=None): |
| vals = self.get_group_status(extra) |
| if field in vals: |
| return vals[field] |
| return None |
| |
| def get_driver_status(self, ifname=None): |
| if ifname is None: |
| res = self.request("STATUS-DRIVER") |
| else: |
| res = self.global_request("IFNAME=%s STATUS-DRIVER" % ifname) |
| if res.startswith("FAIL"): |
| return dict() |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| try: |
| [name, value] = l.split('=', 1) |
| except ValueError: |
| logger.info(self.ifname + ": Ignore unexpected status-driver line: " + l) |
| continue |
| vals[name] = value |
| return vals |
| |
| def get_driver_status_field(self, field, ifname=None): |
| vals = self.get_driver_status(ifname) |
| if field in vals: |
| return vals[field] |
| return None |
| |
| def get_mcc(self): |
| mcc = int(self.get_driver_status_field('capa.num_multichan_concurrent')) |
| return 1 if mcc < 2 else mcc |
| |
| def get_mib(self): |
| res = self.request("MIB") |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| try: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| except ValueError as e: |
| logger.info(self.ifname + ": Ignore unexpected MIB line: " + l) |
| return vals |
| |
| def p2p_dev_addr(self): |
| return self.get_status_field("p2p_device_address") |
| |
| def p2p_interface_addr(self): |
| return self.get_group_status_field("address") |
| |
| def own_addr(self): |
| try: |
| res = self.p2p_interface_addr() |
| except: |
| res = self.p2p_dev_addr() |
| return res |
| |
| def get_addr(self, group=False): |
| dev_addr = self.own_addr() |
| if not group: |
| addr = self.get_status_field('address') |
| if addr: |
| dev_addr = addr |
| |
| return dev_addr |
| |
| def p2p_listen(self): |
| return self.global_request("P2P_LISTEN") |
| |
| def p2p_ext_listen(self, period, interval): |
| return self.global_request("P2P_EXT_LISTEN %d %d" % (period, interval)) |
| |
| def p2p_cancel_ext_listen(self): |
| return self.global_request("P2P_EXT_LISTEN") |
| |
| def p2p_find(self, social=False, progressive=False, dev_id=None, |
| dev_type=None, delay=None, freq=None): |
| cmd = "P2P_FIND" |
| if social: |
| cmd = cmd + " type=social" |
| elif progressive: |
| cmd = cmd + " type=progressive" |
| if dev_id: |
| cmd = cmd + " dev_id=" + dev_id |
| if dev_type: |
| cmd = cmd + " dev_type=" + dev_type |
| if delay: |
| cmd = cmd + " delay=" + str(delay) |
| if freq: |
| cmd = cmd + " freq=" + str(freq) |
| return self.global_request(cmd) |
| |
| def p2p_stop_find(self): |
| return self.global_request("P2P_STOP_FIND") |
| |
| def wps_read_pin(self): |
| self.pin = self.request("WPS_PIN get").rstrip("\n") |
| if "FAIL" in self.pin: |
| raise Exception("Could not generate PIN") |
| return self.pin |
| |
| def peer_known(self, peer, full=True): |
| res = self.global_request("P2P_PEER " + peer) |
| if peer.lower() not in res.lower(): |
| return False |
| if not full: |
| return True |
| return "[PROBE_REQ_ONLY]" not in res |
| |
| def discover_peer(self, peer, full=True, timeout=15, social=True, |
| force_find=False, freq=None): |
| logger.info(self.ifname + ": Trying to discover peer " + peer) |
| if not force_find and self.peer_known(peer, full): |
| return True |
| self.p2p_find(social, freq=freq) |
| count = 0 |
| while count < timeout * 4: |
| time.sleep(0.25) |
| count = count + 1 |
| if self.peer_known(peer, full): |
| return True |
| return False |
| |
| def get_peer(self, peer): |
| res = self.global_request("P2P_PEER " + peer) |
| if peer.lower() not in res.lower(): |
| raise Exception("Peer information not available") |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| if '=' in l: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| return vals |
| |
| def group_form_result(self, ev, expect_failure=False, go_neg_res=None): |
| if expect_failure: |
| if "P2P-GROUP-STARTED" in ev: |
| raise Exception("Group formation succeeded when expecting failure") |
| exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)' |
| s = re.split(exp, ev) |
| if len(s) < 3: |
| return None |
| res = {} |
| res['result'] = 'go-neg-failed' |
| res['status'] = int(s[2]) |
| return res |
| |
| if "P2P-GROUP-STARTED" not in ev: |
| raise Exception("No P2P-GROUP-STARTED event seen") |
| |
| exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*) ip_addr=([0-9.]*) ip_mask=([0-9.]*) go_ip_addr=([0-9.]*)' |
| s = re.split(exp, ev) |
| if len(s) < 11: |
| exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)' |
| s = re.split(exp, ev) |
| if len(s) < 8: |
| raise Exception("Could not parse P2P-GROUP-STARTED") |
| res = {} |
| res['result'] = 'success' |
| res['ifname'] = s[2] |
| self.group_ifname = s[2] |
| try: |
| if self.hostname is None: |
| self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, |
| self.group_ifname)) |
| else: |
| port = self.get_ctrl_iface_port(self.group_ifname) |
| self.gctrl_mon = wpaspy.Ctrl(self.hostname, port) |
| if self.monitor: |
| self.gctrl_mon.attach() |
| except: |
| logger.debug("Could not open monitor socket for group interface") |
| self.gctrl_mon = None |
| res['role'] = s[3] |
| res['ssid'] = s[4] |
| res['freq'] = s[5] |
| if "[PERSISTENT]" in ev: |
| res['persistent'] = True |
| else: |
| res['persistent'] = False |
| p = re.match(r'psk=([0-9a-f]*)', s[6]) |
| if p: |
| res['psk'] = p.group(1) |
| p = re.match(r'passphrase="(.*)"', s[6]) |
| if p: |
| res['passphrase'] = p.group(1) |
| res['go_dev_addr'] = s[7] |
| |
| if len(s) > 8 and len(s[8]) > 0 and "[PERSISTENT]" not in s[8]: |
| res['ip_addr'] = s[8] |
| if len(s) > 9: |
| res['ip_mask'] = s[9] |
| if len(s) > 10: |
| res['go_ip_addr'] = s[10] |
| |
| if go_neg_res: |
| exp = r'<.>(P2P-GO-NEG-SUCCESS) role=(GO|client) freq=([0-9]*)' |
| s = re.split(exp, go_neg_res) |
| if len(s) < 4: |
| raise Exception("Could not parse P2P-GO-NEG-SUCCESS") |
| res['go_neg_role'] = s[2] |
| res['go_neg_freq'] = s[3] |
| |
| return res |
| |
| def p2p_go_neg_auth(self, peer, pin, method, go_intent=None, |
| persistent=False, freq=None, freq2=None, |
| max_oper_chwidth=None, ht40=False, vht=False): |
| if not self.discover_peer(peer): |
| raise Exception("Peer " + peer + " not found") |
| self.dump_monitor() |
| if pin: |
| cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" |
| else: |
| cmd = "P2P_CONNECT " + peer + " " + method + " auth" |
| if go_intent: |
| cmd = cmd + ' go_intent=' + str(go_intent) |
| if freq: |
| cmd = cmd + ' freq=' + str(freq) |
| if freq2: |
| cmd = cmd + ' freq2=' + str(freq2) |
| if max_oper_chwidth: |
| cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth) |
| if ht40: |
| cmd = cmd + ' ht40' |
| if vht: |
| cmd = cmd + ' vht' |
| if persistent: |
| cmd = cmd + " persistent" |
| if "OK" in self.global_request(cmd): |
| return None |
| raise Exception("P2P_CONNECT (auth) failed") |
| |
| def p2p_go_neg_auth_result(self, timeout=None, expect_failure=False): |
| if timeout is None: |
| timeout = 1 if expect_failure else 5 |
| go_neg_res = None |
| ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", |
| "P2P-GO-NEG-FAILURE"], timeout) |
| if ev is None: |
| if expect_failure: |
| return None |
| raise Exception("Group formation timed out") |
| if "P2P-GO-NEG-SUCCESS" in ev: |
| go_neg_res = ev |
| ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) |
| if ev is None: |
| if expect_failure: |
| return None |
| raise Exception("Group formation timed out") |
| self.dump_monitor() |
| return self.group_form_result(ev, expect_failure, go_neg_res) |
| |
| def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, |
| expect_failure=False, persistent=False, |
| persistent_id=None, freq=None, provdisc=False, |
| wait_group=True, freq2=None, max_oper_chwidth=None, |
| ht40=False, vht=False): |
| if not self.discover_peer(peer): |
| raise Exception("Peer " + peer + " not found") |
| self.dump_monitor() |
| if pin: |
| cmd = "P2P_CONNECT " + peer + " " + pin + " " + method |
| else: |
| cmd = "P2P_CONNECT " + peer + " " + method |
| if go_intent is not None: |
| cmd = cmd + ' go_intent=' + str(go_intent) |
| if freq: |
| cmd = cmd + ' freq=' + str(freq) |
| if freq2: |
| cmd = cmd + ' freq2=' + str(freq2) |
| if max_oper_chwidth: |
| cmd = cmd + ' max_oper_chwidth=' + str(max_oper_chwidth) |
| if ht40: |
| cmd = cmd + ' ht40' |
| if vht: |
| cmd = cmd + ' vht' |
| if persistent: |
| cmd = cmd + " persistent" |
| elif persistent_id: |
| cmd = cmd + " persistent=" + persistent_id |
| if provdisc: |
| cmd = cmd + " provdisc" |
| if "OK" in self.global_request(cmd): |
| if timeout == 0: |
| return None |
| go_neg_res = None |
| ev = self.wait_global_event(["P2P-GO-NEG-SUCCESS", |
| "P2P-GO-NEG-FAILURE"], timeout) |
| if ev is None: |
| if expect_failure: |
| return None |
| raise Exception("Group formation timed out") |
| if "P2P-GO-NEG-SUCCESS" in ev: |
| if not wait_group: |
| return ev |
| go_neg_res = ev |
| ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout) |
| if ev is None: |
| if expect_failure: |
| return None |
| raise Exception("Group formation timed out") |
| self.dump_monitor() |
| return self.group_form_result(ev, expect_failure, go_neg_res) |
| raise Exception("P2P_CONNECT failed") |
| |
| def _wait_event(self, mon, pfx, events, timeout): |
| if not isinstance(events, list): |
| raise Exception("WpaSupplicant._wait_event() called with incorrect events argument type") |
| start = os.times()[4] |
| while True: |
| while mon.pending(): |
| ev = mon.recv() |
| logger.debug(self.dbg + pfx + ev) |
| for event in events: |
| if event in ev: |
| return ev |
| now = os.times()[4] |
| remaining = start + timeout - now |
| if remaining <= 0: |
| break |
| if not mon.pending(timeout=remaining): |
| break |
| return None |
| |
| def wait_event(self, events, timeout=10): |
| return self._wait_event(self.mon, ": ", events, timeout) |
| |
| def wait_global_event(self, events, timeout): |
| if self.global_iface is None: |
| return self.wait_event(events, timeout) |
| return self._wait_event(self.global_mon, "(global): ", |
| events, timeout) |
| |
| def wait_group_event(self, events, timeout=10): |
| if not isinstance(events, list): |
| raise Exception("WpaSupplicant.wait_group_event() called with incorrect events argument type") |
| if self.group_ifname and self.group_ifname != self.ifname: |
| if self.gctrl_mon is None: |
| return None |
| start = os.times()[4] |
| while True: |
| while self.gctrl_mon.pending(): |
| ev = self.gctrl_mon.recv() |
| logger.debug(self.group_dbg + "(group): " + ev) |
| for event in events: |
| if event in ev: |
| return ev |
| now = os.times()[4] |
| remaining = start + timeout - now |
| if remaining <= 0: |
| break |
| if not self.gctrl_mon.pending(timeout=remaining): |
| break |
| return None |
| |
| return self.wait_event(events, timeout) |
| |
| def wait_go_ending_session(self): |
| self.close_monitor_group() |
| timeout = 3 if self.hostname is None else 10 |
| ev = self.wait_global_event(["P2P-GROUP-REMOVED"], timeout=timeout) |
| if ev is None: |
| raise Exception("Group removal event timed out") |
| if "reason=GO_ENDING_SESSION" not in ev: |
| raise Exception("Unexpected group removal reason") |
| |
| def dump_monitor(self, mon=True, global_mon=True): |
| count_iface = 0 |
| count_global = 0 |
| while mon and self.monitor and self.mon.pending(): |
| ev = self.mon.recv() |
| logger.debug(self.dbg + ": " + ev) |
| count_iface += 1 |
| while global_mon and self.monitor and self.global_mon and self.global_mon.pending(): |
| ev = self.global_mon.recv() |
| logger.debug(self.global_dbg + self.ifname + "(global): " + ev) |
| count_global += 1 |
| return (count_iface, count_global) |
| |
| def remove_group(self, ifname=None): |
| self.close_monitor_group() |
| if ifname is None: |
| ifname = self.group_ifname if self.group_ifname else self.ifname |
| if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname): |
| raise Exception("Group could not be removed") |
| self.group_ifname = None |
| |
| def p2p_start_go(self, persistent=None, freq=None, no_event_clear=False): |
| self.dump_monitor() |
| cmd = "P2P_GROUP_ADD" |
| if persistent is None: |
| pass |
| elif persistent is True: |
| cmd = cmd + " persistent" |
| else: |
| cmd = cmd + " persistent=" + str(persistent) |
| if freq: |
| cmd = cmd + " freq=" + str(freq) |
| if "OK" in self.global_request(cmd): |
| ev = self.wait_global_event(["P2P-GROUP-STARTED"], timeout=5) |
| if ev is None: |
| raise Exception("GO start up timed out") |
| if not no_event_clear: |
| self.dump_monitor() |
| return self.group_form_result(ev) |
| raise Exception("P2P_GROUP_ADD failed") |
| |
| def p2p_go_authorize_client(self, pin): |
| cmd = "WPS_PIN any " + pin |
| if "FAIL" in self.group_request(cmd): |
| raise Exception("Failed to authorize client connection on GO") |
| return None |
| |
| def p2p_go_authorize_client_pbc(self): |
| cmd = "WPS_PBC" |
| if "FAIL" in self.group_request(cmd): |
| raise Exception("Failed to authorize client connection on GO") |
| return None |
| |
| def p2p_connect_group(self, go_addr, pin, timeout=0, social=False, |
| freq=None): |
| self.dump_monitor() |
| if not self.discover_peer(go_addr, social=social, freq=freq): |
| if social or not self.discover_peer(go_addr, social=social): |
| raise Exception("GO " + go_addr + " not found") |
| self.p2p_stop_find() |
| self.dump_monitor() |
| cmd = "P2P_CONNECT " + go_addr + " " + pin + " join" |
| if freq: |
| cmd += " freq=" + str(freq) |
| if "OK" in self.global_request(cmd): |
| if timeout == 0: |
| self.dump_monitor() |
| return None |
| ev = self.wait_global_event(["P2P-GROUP-STARTED", |
| "P2P-GROUP-FORMATION-FAILURE"], |
| timeout) |
| if ev is None: |
| raise Exception("Joining the group timed out") |
| if "P2P-GROUP-STARTED" not in ev: |
| raise Exception("Failed to join the group") |
| self.dump_monitor() |
| return self.group_form_result(ev) |
| raise Exception("P2P_CONNECT(join) failed") |
| |
| def tdls_setup(self, peer): |
| cmd = "TDLS_SETUP " + peer |
| if "FAIL" in self.group_request(cmd): |
| raise Exception("Failed to request TDLS setup") |
| return None |
| |
| def tdls_teardown(self, peer): |
| cmd = "TDLS_TEARDOWN " + peer |
| if "FAIL" in self.group_request(cmd): |
| raise Exception("Failed to request TDLS teardown") |
| return None |
| |
| def tdls_link_status(self, peer): |
| cmd = "TDLS_LINK_STATUS " + peer |
| ret = self.group_request(cmd) |
| if "FAIL" in ret: |
| raise Exception("Failed to request TDLS link status") |
| return ret |
| |
| def tspecs(self): |
| """Return (tsid, up) tuples representing current tspecs""" |
| res = self.request("WMM_AC_STATUS") |
| tspecs = re.findall(r"TSID=(\d+) UP=(\d+)", res) |
| tspecs = [tuple(map(int, tspec)) for tspec in tspecs] |
| |
| logger.debug("tspecs: " + str(tspecs)) |
| return tspecs |
| |
| def add_ts(self, tsid, up, direction="downlink", expect_failure=False, |
| extra=None): |
| params = { |
| "sba": 9000, |
| "nominal_msdu_size": 1500, |
| "min_phy_rate": 6000000, |
| "mean_data_rate": 1500, |
| } |
| cmd = "WMM_AC_ADDTS %s tsid=%d up=%d" % (direction, tsid, up) |
| for (key, value) in params.items(): |
| cmd += " %s=%d" % (key, value) |
| if extra: |
| cmd += " " + extra |
| |
| if self.request(cmd).strip() != "OK": |
| raise Exception("ADDTS failed (tsid=%d up=%d)" % (tsid, up)) |
| |
| if expect_failure: |
| ev = self.wait_event(["TSPEC-REQ-FAILED"], timeout=2) |
| if ev is None: |
| raise Exception("ADDTS failed (time out while waiting failure)") |
| if "tsid=%d" % (tsid) not in ev: |
| raise Exception("ADDTS failed (invalid tsid in TSPEC-REQ-FAILED") |
| return |
| |
| ev = self.wait_event(["TSPEC-ADDED"], timeout=1) |
| if ev is None: |
| raise Exception("ADDTS failed (time out)") |
| if "tsid=%d" % (tsid) not in ev: |
| raise Exception("ADDTS failed (invalid tsid in TSPEC-ADDED)") |
| |
| if (tsid, up) not in self.tspecs(): |
| raise Exception("ADDTS failed (tsid not in tspec list)") |
| |
| def del_ts(self, tsid): |
| if self.request("WMM_AC_DELTS %d" % (tsid)).strip() != "OK": |
| raise Exception("DELTS failed") |
| |
| ev = self.wait_event(["TSPEC-REMOVED"], timeout=1) |
| if ev is None: |
| raise Exception("DELTS failed (time out)") |
| if "tsid=%d" % (tsid) not in ev: |
| raise Exception("DELTS failed (invalid tsid in TSPEC-REMOVED)") |
| |
| tspecs = [(t, u) for (t, u) in self.tspecs() if t == tsid] |
| if tspecs: |
| raise Exception("DELTS failed (still in tspec list)") |
| |
| def connect(self, ssid=None, ssid2=None, **kwargs): |
| logger.info("Connect STA " + self.ifname + " to AP") |
| id = self.add_network() |
| if ssid: |
| self.set_network_quoted(id, "ssid", ssid) |
| elif ssid2: |
| self.set_network(id, "ssid", ssid2) |
| |
| quoted = ["psk", "identity", "anonymous_identity", "password", |
| "machine_identity", "machine_password", |
| "ca_cert", "client_cert", "private_key", |
| "private_key_passwd", "ca_cert2", "client_cert2", |
| "private_key2", "phase1", "phase2", "domain_suffix_match", |
| "altsubject_match", "subject_match", "pac_file", |
| "bgscan", "ht_mcs", "id_str", "openssl_ciphers", |
| "domain_match", "dpp_connector", "sae_password", |
| "sae_password_id", "check_cert_subject", |
| "machine_ca_cert", "machine_client_cert", |
| "machine_private_key", "machine_phase2", |
| "imsi_identity", "imsi_privacy_cert", "imsi_privacy_attr"] |
| for field in quoted: |
| if field in kwargs and kwargs[field]: |
| self.set_network_quoted(id, field, kwargs[field]) |
| |
| not_quoted = ["proto", "key_mgmt", "ieee80211w", "pairwise", |
| "group", "wep_key0", "wep_key1", "wep_key2", "wep_key3", |
| "wep_tx_keyidx", "scan_freq", "freq_list", "eap", |
| "eapol_flags", "fragment_size", "scan_ssid", "auth_alg", |
| "wpa_ptk_rekey", "disable_ht", "disable_vht", "bssid", |
| "disable_he", |
| "disable_max_amsdu", "ampdu_factor", "ampdu_density", |
| "disable_ht40", "disable_sgi", "disable_ldpc", |
| "ht40_intolerant", "update_identifier", "mac_addr", |
| "erp", "bg_scan_period", "bssid_ignore", |
| "bssid_accept", "mem_only_psk", "eap_workaround", |
| "engine", "fils_dh_group", "bssid_hint", |
| "dpp_csign", "dpp_csign_expiry", |
| "dpp_netaccesskey", "dpp_netaccesskey_expiry", "dpp_pfs", |
| "dpp_connector_privacy", |
| "group_mgmt", "owe_group", "owe_only", |
| "owe_ptk_workaround", |
| "transition_disable", "sae_pk", |
| "roaming_consortium_selection", "ocv", |
| "multi_ap_backhaul_sta", "rx_stbc", "tx_stbc", |
| "ft_eap_pmksa_caching", "beacon_prot", |
| "mac_value", |
| "wpa_deny_ptk0_rekey", |
| "enable_4addr_mode"] |
| for field in not_quoted: |
| if field in kwargs and kwargs[field]: |
| self.set_network(id, field, kwargs[field]) |
| |
| known_args = {"raw_psk", "password_hex", "peerkey", "okc", "ocsp", |
| "only_add_network", "wait_connect", "raw_identity"} |
| unknown = set(kwargs.keys()) |
| unknown -= set(quoted) |
| unknown -= set(not_quoted) |
| unknown -= known_args |
| if unknown: |
| raise Exception("Unknown WpaSupplicant::connect() arguments: " + str(unknown)) |
| |
| if "raw_identity" in kwargs and kwargs['raw_identity']: |
| self.set_network(id, "identity", kwargs['raw_identity']) |
| if "raw_psk" in kwargs and kwargs['raw_psk']: |
| self.set_network(id, "psk", kwargs['raw_psk']) |
| if "password_hex" in kwargs and kwargs['password_hex']: |
| self.set_network(id, "password", kwargs['password_hex']) |
| if "peerkey" in kwargs and kwargs['peerkey']: |
| self.set_network(id, "peerkey", "1") |
| if "okc" in kwargs and kwargs['okc']: |
| self.set_network(id, "proactive_key_caching", "1") |
| if "ocsp" in kwargs and kwargs['ocsp']: |
| self.set_network(id, "ocsp", str(kwargs['ocsp'])) |
| if "only_add_network" in kwargs and kwargs['only_add_network']: |
| return id |
| if "wait_connect" not in kwargs or kwargs['wait_connect']: |
| if "eap" in kwargs: |
| self.connect_network(id, timeout=20) |
| else: |
| self.connect_network(id, timeout=15) |
| else: |
| self.dump_monitor() |
| self.select_network(id) |
| return id |
| |
| def scan(self, type=None, freq=None, no_wait=False, only_new=False, |
| passive=False, timeout=15): |
| if not no_wait: |
| self.dump_monitor() |
| if type: |
| cmd = "SCAN TYPE=" + type |
| else: |
| cmd = "SCAN" |
| if freq: |
| cmd = cmd + " freq=" + str(freq) |
| if only_new: |
| cmd += " only_new=1" |
| if passive: |
| cmd += " passive=1" |
| if not no_wait: |
| self.dump_monitor() |
| res = self.request(cmd) |
| if "OK" not in res: |
| raise Exception("Failed to trigger scan: " + str(res)) |
| if no_wait: |
| return |
| ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS", |
| "CTRL-EVENT-SCAN-FAILED"], timeout) |
| if ev is None: |
| raise Exception("Scan timed out") |
| if "CTRL-EVENT-SCAN-FAILED" in ev: |
| raise Exception("Scan failed: " + ev) |
| |
| def scan_for_bss(self, bssid, freq=None, force_scan=False, only_new=False, |
| passive=False): |
| if not force_scan and self.get_bss(bssid) is not None: |
| return |
| for i in range(0, 10): |
| self.scan(freq=freq, type="ONLY", only_new=only_new, |
| passive=passive) |
| if self.get_bss(bssid) is not None: |
| return |
| raise Exception("Could not find BSS " + bssid + " in scan") |
| |
| def flush_scan_cache(self, freq=2417): |
| self.request("BSS_FLUSH 0") |
| self.scan(freq=freq, only_new=True) |
| res = self.request("SCAN_RESULTS") |
| if len(res.splitlines()) > 1: |
| logger.debug("Scan results remaining after first attempt to flush the results:\n" + res) |
| self.request("BSS_FLUSH 0") |
| self.scan(freq=2422, only_new=True) |
| res = self.request("SCAN_RESULTS") |
| if len(res.splitlines()) > 1: |
| logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res) |
| |
| def disconnect_and_stop_scan(self): |
| self.request("DISCONNECT") |
| res = self.request("ABORT_SCAN") |
| for i in range(2 if "OK" in res else 1): |
| self.wait_event(["CTRL-EVENT-DISCONNECTED", |
| "CTRL-EVENT-SCAN-RESULTS"], timeout=0.5) |
| self.dump_monitor() |
| |
| def roam(self, bssid, fail_test=False, assoc_reject_ok=False, |
| check_bssid=True): |
| self.dump_monitor() |
| if "OK" not in self.request("ROAM " + bssid): |
| raise Exception("ROAM failed") |
| if fail_test: |
| if assoc_reject_ok: |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED", |
| "CTRL-EVENT-DISCONNECTED", |
| "CTRL-EVENT-ASSOC-REJECT"], timeout=1) |
| else: |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED", |
| "CTRL-EVENT-DISCONNECTED"], timeout=1) |
| if ev and "CTRL-EVENT-DISCONNECTED" in ev: |
| self.dump_monitor() |
| return |
| if ev is not None and "CTRL-EVENT-ASSOC-REJECT" not in ev: |
| raise Exception("Unexpected connection") |
| self.dump_monitor() |
| return |
| if assoc_reject_ok: |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED", |
| "CTRL-EVENT-DISCONNECTED"], timeout=10) |
| else: |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED", |
| "CTRL-EVENT-DISCONNECTED", |
| "CTRL-EVENT-ASSOC-REJECT"], timeout=10) |
| if ev is None: |
| raise Exception("Roaming with the AP timed out") |
| if "CTRL-EVENT-ASSOC-REJECT" in ev: |
| raise Exception("Roaming association rejected") |
| if "CTRL-EVENT-DISCONNECTED" in ev: |
| raise Exception("Unexpected disconnection when waiting for roam to complete") |
| self.dump_monitor() |
| if check_bssid and self.get_status_field('bssid') != bssid: |
| raise Exception("Did not roam to correct BSSID") |
| |
| def roam_over_ds(self, bssid, fail_test=False, force=False): |
| self.dump_monitor() |
| cmd = "FT_DS " + bssid |
| if force: |
| cmd += " force" |
| if "OK" not in self.request(cmd): |
| raise Exception("FT_DS failed") |
| if fail_test: |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=1) |
| if ev is not None: |
| raise Exception("Unexpected connection") |
| self.dump_monitor() |
| return |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED", |
| "CTRL-EVENT-ASSOC-REJECT"], timeout=10) |
| if ev is None: |
| raise Exception("Roaming with the AP timed out") |
| if "CTRL-EVENT-ASSOC-REJECT" in ev: |
| raise Exception("Roaming association rejected") |
| self.dump_monitor() |
| |
| def wps_reg(self, bssid, pin, new_ssid=None, key_mgmt=None, cipher=None, |
| new_passphrase=None, no_wait=False): |
| self.dump_monitor() |
| if new_ssid: |
| self.request("WPS_REG " + bssid + " " + pin + " " + |
| binascii.hexlify(new_ssid.encode()).decode() + " " + |
| key_mgmt + " " + cipher + " " + |
| binascii.hexlify(new_passphrase.encode()).decode()) |
| if no_wait: |
| return |
| ev = self.wait_event(["WPS-SUCCESS"], timeout=15) |
| else: |
| self.request("WPS_REG " + bssid + " " + pin) |
| if no_wait: |
| return |
| ev = self.wait_event(["WPS-CRED-RECEIVED"], timeout=15) |
| if ev is None: |
| raise Exception("WPS cred timed out") |
| ev = self.wait_event(["WPS-FAIL"], timeout=15) |
| if ev is None: |
| raise Exception("WPS timed out") |
| self.wait_connected(timeout=15) |
| |
| def relog(self): |
| self.global_request("RELOG") |
| |
| def wait_completed(self, timeout=10): |
| for i in range(0, timeout * 2): |
| if self.get_status_field("wpa_state") == "COMPLETED": |
| return |
| time.sleep(0.5) |
| raise Exception("Timeout while waiting for COMPLETED state") |
| |
| def get_capability(self, field): |
| res = self.request("GET_CAPABILITY " + field) |
| if "FAIL" in res: |
| return None |
| return res.split(' ') |
| |
| def get_bss(self, bssid, ifname=None): |
| if not ifname or ifname == self.ifname: |
| res = self.request("BSS " + bssid) |
| elif ifname == self.group_ifname: |
| res = self.group_request("BSS " + bssid) |
| else: |
| return None |
| |
| if "FAIL" in res: |
| return None |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| if len(vals) == 0: |
| return None |
| return vals |
| |
| def get_pmksa(self, bssid): |
| res = self.request("PMKSA") |
| lines = res.splitlines() |
| for l in lines: |
| if bssid not in l: |
| continue |
| vals = dict() |
| try: |
| [index, aa, pmkid, expiration, opportunistic] = l.split(' ') |
| cache_id = None |
| except ValueError: |
| [index, aa, pmkid, expiration, opportunistic, cache_id] = l.split(' ') |
| vals['index'] = index |
| vals['pmkid'] = pmkid |
| vals['expiration'] = expiration |
| vals['opportunistic'] = opportunistic |
| if cache_id != None: |
| vals['cache_id'] = cache_id |
| return vals |
| return None |
| |
| def get_pmk(self, network_id): |
| bssid = self.get_status_field('bssid') |
| res = self.request("PMKSA_GET %d" % network_id) |
| for val in res.splitlines(): |
| if val.startswith(bssid): |
| return val.split(' ')[2] |
| return None |
| |
| def get_sta(self, addr, info=None, next=False): |
| cmd = "STA-NEXT " if next else "STA " |
| if addr is None: |
| res = self.request("STA-FIRST") |
| elif info: |
| res = self.request(cmd + addr + " " + info) |
| else: |
| res = self.request(cmd + addr) |
| lines = res.splitlines() |
| vals = dict() |
| first = True |
| for l in lines: |
| if first: |
| vals['addr'] = l |
| first = False |
| else: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| return vals |
| |
| def mgmt_rx(self, timeout=5): |
| ev = self.wait_event(["MGMT-RX"], timeout=timeout) |
| if ev is None: |
| return None |
| msg = {} |
| items = ev.split(' ') |
| field, val = items[1].split('=') |
| if field != "freq": |
| raise Exception("Unexpected MGMT-RX event format: " + ev) |
| msg['freq'] = val |
| |
| field, val = items[2].split('=') |
| if field != "datarate": |
| raise Exception("Unexpected MGMT-RX event format: " + ev) |
| msg['datarate'] = val |
| |
| field, val = items[3].split('=') |
| if field != "ssi_signal": |
| raise Exception("Unexpected MGMT-RX event format: " + ev) |
| msg['ssi_signal'] = val |
| |
| frame = binascii.unhexlify(items[4]) |
| msg['frame'] = frame |
| |
| hdr = struct.unpack('<HH6B6B6BH', frame[0:24]) |
| msg['fc'] = hdr[0] |
| msg['subtype'] = (hdr[0] >> 4) & 0xf |
| hdr = hdr[1:] |
| msg['duration'] = hdr[0] |
| hdr = hdr[1:] |
| msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] |
| hdr = hdr[6:] |
| msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] |
| hdr = hdr[6:] |
| msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6] |
| hdr = hdr[6:] |
| msg['seq_ctrl'] = hdr[0] |
| msg['payload'] = frame[24:] |
| |
| return msg |
| |
| def wait_connected(self, timeout=10, error="Connection timed out"): |
| ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout) |
| if ev is None: |
| raise Exception(error) |
| return ev |
| |
| def wait_disconnected(self, timeout=None, error="Disconnection timed out"): |
| if timeout is None: |
| timeout = 10 if self.hostname is None else 30 |
| ev = self.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=timeout) |
| if ev is None: |
| raise Exception(error) |
| return ev |
| |
| def get_group_ifname(self): |
| return self.group_ifname if self.group_ifname else self.ifname |
| |
| def get_config(self): |
| res = self.request("DUMP") |
| if res.startswith("FAIL"): |
| raise Exception("DUMP failed") |
| lines = res.splitlines() |
| vals = dict() |
| for l in lines: |
| [name, value] = l.split('=', 1) |
| vals[name] = value |
| return vals |
| |
| def asp_provision(self, peer, adv_id, adv_mac, session_id, session_mac, |
| method="1000", info="", status=None, cpt=None, role=None): |
| if status is None: |
| cmd = "P2P_ASP_PROVISION" |
| params = "info='%s' method=%s" % (info, method) |
| else: |
| cmd = "P2P_ASP_PROVISION_RESP" |
| params = "status=%d" % status |
| |
| if role is not None: |
| params += " role=" + role |
| if cpt is not None: |
| params += " cpt=" + cpt |
| |
| if "OK" not in self.global_request("%s %s adv_id=%s adv_mac=%s session=%d session_mac=%s %s" % |
| (cmd, peer, adv_id, adv_mac, session_id, session_mac, params)): |
| raise Exception("%s request failed" % cmd) |
| |
| def note(self, txt): |
| self.request("NOTE " + txt) |
| |
| def save_config(self): |
| if "OK" not in self.request("SAVE_CONFIG"): |
| raise Exception("Failed to save configuration file") |
| |
| def wait_regdom(self, country_ie=False): |
| for i in range(5): |
| ev = self.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) |
| if ev is None: |
| break |
| if country_ie: |
| if "init=COUNTRY_IE" in ev: |
| break |
| else: |
| break |
| |
| def dpp_qr_code(self, uri): |
| res = self.request("DPP_QR_CODE " + uri) |
| if "FAIL" in res: |
| raise Exception("Failed to parse QR Code URI") |
| return int(res) |
| |
| def dpp_nfc_uri(self, uri): |
| res = self.request("DPP_NFC_URI " + uri) |
| if "FAIL" in res: |
| raise Exception("Failed to parse NFC URI") |
| return int(res) |
| |
| def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None, |
| curve=None, key=None, supported_curves=None, |
| host=None): |
| cmd = "DPP_BOOTSTRAP_GEN type=" + type |
| if chan: |
| cmd += " chan=" + chan |
| if mac: |
| if mac is True: |
| mac = self.own_addr() |
| cmd += " mac=" + mac.replace(':', '') |
| if info: |
| cmd += " info=" + info |
| if curve: |
| cmd += " curve=" + curve |
| if key: |
| cmd += " key=" + key |
| if supported_curves: |
| cmd += " supported_curves=" + supported_curves |
| if host: |
| cmd += " host=" + host |
| res = self.request(cmd) |
| if "FAIL" in res: |
| raise Exception("Failed to generate bootstrapping info") |
| return int(res) |
| |
| def dpp_bootstrap_set(self, id, conf=None, configurator=None, ssid=None, |
| extra=None): |
| cmd = "DPP_BOOTSTRAP_SET %d" % id |
| if ssid: |
| cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode() |
| if extra: |
| cmd += " " + extra |
| if conf: |
| cmd += " conf=" + conf |
| if configurator is not None: |
| cmd += " configurator=%d" % configurator |
| if "OK" not in self.request(cmd): |
| raise Exception("Failed to set bootstrapping parameters") |
| |
| def dpp_listen(self, freq, netrole=None, qr=None, role=None): |
| cmd = "DPP_LISTEN " + str(freq) |
| if netrole: |
| cmd += " netrole=" + netrole |
| if qr: |
| cmd += " qr=" + qr |
| if role: |
| cmd += " role=" + role |
| if "OK" not in self.request(cmd): |
| raise Exception("Failed to start listen operation") |
| |
| def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None, |
| extra=None, own=None, role=None, neg_freq=None, |
| ssid=None, passphrase=None, expect_fail=False, |
| tcp_addr=None, tcp_port=None, conn_status=False, |
| ssid_charset=None, nfc_uri=None, netrole=None, |
| csrattrs=None): |
| cmd = "DPP_AUTH_INIT" |
| if peer is None: |
| if nfc_uri: |
| peer = self.dpp_nfc_uri(nfc_uri) |
| else: |
| peer = self.dpp_qr_code(uri) |
| cmd += " peer=%d" % peer |
| if own is not None: |
| cmd += " own=%d" % own |
| if role: |
| cmd += " role=" + role |
| if extra: |
| cmd += " " + extra |
| if conf: |
| cmd += " conf=" + conf |
| if configurator is not None: |
| cmd += " configurator=%d" % configurator |
| if neg_freq: |
| cmd += " neg_freq=%d" % neg_freq |
| if ssid: |
| cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode() |
| if ssid_charset: |
| cmd += " ssid_charset=%d" % ssid_charset |
| if passphrase: |
| cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode() |
| if tcp_addr: |
| cmd += " tcp_addr=" + tcp_addr |
| if tcp_port: |
| cmd += " tcp_port=" + tcp_port |
| if conn_status: |
| cmd += " conn_status=1" |
| if netrole: |
| cmd += " netrole=" + netrole |
| if csrattrs: |
| cmd += " csrattrs=" + csrattrs |
| res = self.request(cmd) |
| if expect_fail: |
| if "FAIL" not in res: |
| raise Exception("DPP authentication started unexpectedly") |
| return |
| if "OK" not in res: |
| raise Exception("Failed to initiate DPP Authentication") |
| return int(peer) |
| |
| def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None, |
| extra=None, use_id=None, allow_fail=False, ver=None, |
| tcp_addr=None, tcp_port=None): |
| if use_id is None: |
| id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve) |
| else: |
| id1 = use_id |
| cmd = "own=%d " % id1 |
| if identifier: |
| cmd += "identifier=%s " % identifier |
| cmd += "init=1 " |
| if ver is not None: |
| cmd += "ver=" + str(ver) + " " |
| if role: |
| cmd += "role=%s " % role |
| if tcp_addr: |
| cmd += "tcp_addr=" + tcp_addr + " " |
| if tcp_port: |
| cmd += "tcp_port=" + tcp_port + " " |
| if extra: |
| cmd += extra + " " |
| cmd += "code=%s" % code |
| res = self.request("DPP_PKEX_ADD " + cmd) |
| if allow_fail: |
| return id1 |
| if "FAIL" in res: |
| raise Exception("Failed to set PKEX data (initiator)") |
| return id1 |
| |
| def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None, |
| listen_role=None, use_id=None): |
| if use_id is None: |
| id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve) |
| else: |
| id0 = use_id |
| cmd = "own=%d " % id0 |
| if identifier: |
| cmd += "identifier=%s " % identifier |
| cmd += "code=%s" % code |
| res = self.request("DPP_PKEX_ADD " + cmd) |
| if "FAIL" in res: |
| raise Exception("Failed to set PKEX data (responder)") |
| self.dpp_listen(freq, role=listen_role) |
| return id0 |
| |
| def dpp_configurator_add(self, curve=None, key=None, |
| net_access_key_curve=None): |
| cmd = "DPP_CONFIGURATOR_ADD" |
| if curve: |
| cmd += " curve=" + curve |
| if net_access_key_curve: |
| cmd += " net_access_key_curve=" + net_access_key_curve |
| if key: |
| cmd += " key=" + key |
| res = self.request(cmd) |
| if "FAIL" in res: |
| raise Exception("Failed to add configurator") |
| return int(res) |
| |
| def dpp_configurator_set(self, conf_id, net_access_key_curve=None): |
| cmd = "DPP_CONFIGURATOR_SET %d" % conf_id |
| if net_access_key_curve: |
| cmd += " net_access_key_curve=" + net_access_key_curve |
| res = self.request(cmd) |
| if "FAIL" in res: |
| raise Exception("Failed to set configurator") |
| |
| def dpp_configurator_remove(self, conf_id): |
| res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id) |
| if "OK" not in res: |
| raise Exception("DPP_CONFIGURATOR_REMOVE failed") |
| |
| def get_ptksa(self, bssid, cipher): |
| res = self.request("PTKSA_CACHE_LIST") |
| lines = res.splitlines() |
| for l in lines: |
| if bssid not in l or cipher not in l: |
| continue |
| |
| vals = dict() |
| [index, addr, cipher, expiration, tk, kdk] = l.split(' ', 5) |
| vals['index'] = index |
| vals['addr'] = addr |
| vals['cipher'] = cipher |
| vals['expiration'] = expiration |
| vals['tk'] = tk |
| vals['kdk'] = kdk |
| return vals |
| return None |