| # |
| # tshark module - refactored from test_scan.py |
| # |
| # Copyright (c) 2014, Qualcomm Atheros, Inc. |
| # Copyright (c) 2015, Intel Mobile Communications GmbH |
| # |
| # This software may be distributed under the terms of the BSD license. |
| # See README for more details. |
| |
| import time |
| import subprocess |
| import logging |
| logger = logging.getLogger() |
| |
| from utils import * |
| |
| class UnknownFieldsException(Exception): |
| def __init__(self, fields): |
| Exception.__init__(self, "unknown tshark fields %s" % ','.join(fields)) |
| self.fields = fields |
| |
| _tshark_filter_arg = '-Y' |
| |
| def _run_tshark(filename, filter, display=None, wait=True): |
| global _tshark_filter_arg |
| |
| if wait: |
| # wait a bit to make it more likely for wlantest sniffer to have |
| # captured and written the results into a file that we can process here |
| time.sleep(0.1) |
| |
| try: |
| arg = ["tshark", "-r", filename, |
| _tshark_filter_arg, filter] |
| if display: |
| arg.append('-Tfields') |
| for d in display: |
| arg.append('-e') |
| arg.append(d) |
| else: |
| arg.append('-V') |
| cmd = subprocess.Popen(arg, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| except Exception as e: |
| logger.info("Could run run tshark check: " + str(e)) |
| if "No such file or directory: 'tshark'" in str(e): |
| raise HwsimSkip("No tshark available") |
| cmd = None |
| return None |
| |
| output = cmd.communicate() |
| out = output[0].decode(errors='ignore') |
| out1 = output[1].decode() |
| res = cmd.wait() |
| if res == 1: |
| errmsg = "Some fields aren't valid" |
| if errmsg in out1: |
| errors = out1.split('\n') |
| fields = [] |
| collect = False |
| for f in errors: |
| if collect: |
| f = f.strip() |
| if f: |
| fields.append(f) |
| continue |
| if errmsg in f: |
| collect = True |
| continue |
| raise UnknownFieldsException(fields) |
| # remember this for efficiency |
| _tshark_filter_arg = '-R' |
| arg[3] = '-R' |
| cmd = subprocess.Popen(arg, stdout=subprocess.PIPE, |
| stderr=open('/dev/null', 'w')) |
| out = cmd.communicate()[0].decode() |
| cmd.wait() |
| if res == 2: |
| if "tshark: Neither" in out1 and "are field or protocol names" in out1: |
| errors = out1.split('\n') |
| fields = [] |
| for f in errors: |
| if f.startswith("tshark: Neither "): |
| f = f.split(' ')[2].strip('"') |
| if f: |
| fields.append(f) |
| continue |
| raise UnknownFieldsException(fields) |
| |
| return out |
| |
| def run_tshark(filename, filter, display=None, wait=True): |
| if display is None: display = [] |
| try: |
| return _run_tshark(filename, filter.replace('wlan_mgt', 'wlan'), |
| [x.replace('wlan_mgt', 'wlan') for x in display], |
| wait) |
| except UnknownFieldsException as e: |
| all_wlan_mgt = True |
| for f in e.fields: |
| if not f.startswith('wlan_mgt.'): |
| all_wlan_mgt = False |
| break |
| if not all_wlan_mgt: |
| raise |
| return _run_tshark(filename, filter, display, wait) |
| |
| def run_tshark_json(filename, filter): |
| arg = ["tshark", "-r", filename, |
| _tshark_filter_arg, filter] |
| arg.append('-Tjson') |
| arg.append('-x') |
| try: |
| cmd = subprocess.Popen(arg, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| except Exception as e: |
| logger.info("Could run run tshark: " + str(e)) |
| if "No such file or directory: 'tshark'" in str(e): |
| raise HwsimSkip("No tshark available") |
| return None |
| output = cmd.communicate() |
| out = output[0].decode() |
| res = cmd.wait() |
| return out |