[Feature] [A1 only] change wpa for wifi
Change-Id: I2e04f2d866ea6b415926ee2ecc9f2bf678e82f39
diff --git a/src/lynq/packages/thirdpart/lynq-wg870/tests/hwsim/hostapd.py b/src/lynq/packages/thirdpart/lynq-wg870/tests/hwsim/hostapd.py
index 4430d80..5717575 100755
--- a/src/lynq/packages/thirdpart/lynq-wg870/tests/hwsim/hostapd.py
+++ b/src/lynq/packages/thirdpart/lynq-wg870/tests/hwsim/hostapd.py
@@ -5,10 +5,12 @@
# See README for more details.
import os
+import re
import time
import logging
import binascii
import struct
+import tempfile
import wpaspy
import remotehost
import utils
@@ -135,6 +137,9 @@
self.ctrl.terminate()
self.ctrl = None
+ def send_file(self, src, dst):
+ self.host.send_file(src, dst)
+
class Hostapd:
def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
self.hostname = hostname
@@ -179,6 +184,9 @@
self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
return self.bssid
+ def get_addr(self, group=False):
+ return self.own_addr()
+
def request(self, cmd):
logger.debug(self.dbg + ": CTRL: " + cmd)
return self.ctrl.request(cmd)
@@ -188,6 +196,9 @@
def set(self, field, value):
if "OK" not in self.request("SET " + field + " " + value):
+ if "TKIP" in value and (field == "wpa_pairwise" or \
+ field == "rsn_pairwise"):
+ raise utils.HwsimSkip("Cipher TKIP not supported")
raise Exception("Failed to set hostapd parameter " + field)
def set_defaults(self):
@@ -271,6 +282,18 @@
if addr and addr not in ev:
raise Exception("Unexpected STA address in connection event: " + ev)
+ def wait_ptkinitdone(self, addr, timeout=2):
+ while timeout > 0:
+ sta = self.get_sta(addr)
+ if 'hostapdWPAPTKState' not in sta:
+ raise Exception("GET_STA did not return hostapdWPAPTKState")
+ state = sta['hostapdWPAPTKState']
+ if state == "11":
+ return
+ time.sleep(0.1)
+ timeout -= 0.1
+ raise Exception("Timeout while waiting for PTKINITDONE")
+
def get_status(self):
res = self.request("STATUS")
lines = res.splitlines()
@@ -509,6 +532,9 @@
def note(self, txt):
self.request("NOTE " + txt)
+ def send_file(self, src, dst):
+ self.host.send_file(src, dst)
+
def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
global_ctrl_override=None, driver=False):
if isinstance(apdev, dict):
@@ -536,7 +562,7 @@
raise Exception("Could not ping hostapd")
hapd.set_defaults()
fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
- "wpa",
+ "wpa", "wpa_deny_ptk0_rekey",
"wpa_pairwise", "rsn_pairwise", "auth_server_addr",
"acct_server_addr", "osu_server_uri"]
for field in fields:
@@ -572,6 +598,8 @@
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
+ confname = cfg_file(apdev, confname, ifname)
+ hapd_global.send_file(confname, confname)
hapd_global.add_bss(phy, confname, ignore_error)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
@@ -590,6 +618,8 @@
hostname = None
port = 8878
hapd_global = HostapdGlobal(apdev)
+ confname = cfg_file(apdev, confname, ifname)
+ hapd_global.send_file(confname, confname)
hapd_global.add_iface(ifname, confname)
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = Hostapd(ifname, hostname=hostname, port=port)
@@ -619,14 +649,17 @@
hapd_global = HostapdGlobal(apdev)
hapd_global.terminate()
-def wpa2_params(ssid=None, passphrase=None):
+def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
+ ieee80211w=None):
params = {"wpa": "2",
- "wpa_key_mgmt": "WPA-PSK",
+ "wpa_key_mgmt": wpa_key_mgmt,
"rsn_pairwise": "CCMP"}
if ssid:
params["ssid"] = ssid
if passphrase:
params["wpa_passphrase"] = passphrase
+ if ieee80211w is not None:
+ params["ieee80211w"] = ieee80211w
return params
def wpa_params(ssid=None, passphrase=None):
@@ -729,3 +762,78 @@
def cmd_execute(apdev, cmd, shell=False):
hapd_global = HostapdGlobal(apdev)
return hapd_global.cmd_execute(cmd, shell=shell)
+
+def send_file(apdev, src, dst):
+ hapd_global = HostapdGlobal(apdev)
+ return hapd_global.send_file(src, dst)
+
+def acl_file(dev, apdev, conf):
+ fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
+ f = os.fdopen(fd, 'w')
+
+ if conf == 'hostapd.macaddr':
+ mac0 = dev[0].get_status_field("address")
+ f.write(mac0 + '\n')
+ f.write("02:00:00:00:00:12\n")
+ f.write("02:00:00:00:00:34\n")
+ f.write("-02:00:00:00:00:12\n")
+ f.write("-02:00:00:00:00:34\n")
+ f.write("01:01:01:01:01:01\n")
+ f.write("03:01:01:01:01:03\n")
+ elif conf == 'hostapd.accept':
+ mac0 = dev[0].get_status_field("address")
+ mac1 = dev[1].get_status_field("address")
+ f.write(mac0 + " 1\n")
+ f.write(mac1 + " 2\n")
+ elif conf == 'hostapd.accept2':
+ mac0 = dev[0].get_status_field("address")
+ mac1 = dev[1].get_status_field("address")
+ mac2 = dev[2].get_status_field("address")
+ f.write(mac0 + " 1\n")
+ f.write(mac1 + " 2\n")
+ f.write(mac2 + " 3\n")
+ else:
+ f.close()
+ os.unlink(filename)
+ return conf
+
+ return filename
+
+def bssid_inc(apdev, inc=1):
+ parts = apdev['bssid'].split(':')
+ parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
+ bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
+ parts[3], parts[4], parts[5])
+ return bssid
+
+def cfg_file(apdev, conf, ifname=None):
+ match = re.search(r'^bss-.+', conf)
+ if match:
+ # put cfg file in /tmp directory
+ fd, fname = tempfile.mkstemp(dir='/tmp', prefix=conf + '-')
+ f = os.fdopen(fd, 'w')
+ idx = ''.join(filter(str.isdigit, conf.split('-')[-1]))
+ if ifname is None:
+ ifname = apdev['ifname']
+ if idx != '1':
+ ifname = ifname + '-' + idx
+
+ f.write("driver=nl80211\n")
+ f.write("ctrl_interface=/var/run/hostapd\n")
+ f.write("hw_mode=g\n")
+ f.write("channel=1\n")
+ f.write("ieee80211n=1\n")
+ if conf.startswith('bss-ht40-'):
+ f.write("ht_capab=[HT40+]\n")
+ f.write("interface=%s\n" % ifname)
+
+ f.write("ssid=bss-%s\n" % idx)
+ if conf == 'bss-2-dup.conf':
+ bssid = apdev['bssid']
+ else:
+ bssid = bssid_inc(apdev, int(idx) - 1)
+ f.write("bssid=%s\n" % bssid)
+
+ return fname
+
+ return conf