[Feature][ZXW-88]merge P50 version

Only Configure: No
Affected branch: master
Affected module: unknown
Is it affected on both ZXIC and MTK: only ZXIC
Self-test: Yes
Doc Update: No

Change-Id: I34667719d9e0e7e29e8e4368848601cde0a48408
diff --git a/ap/lib/libcurl/curl-7.86.0/tests/smbserver.py b/ap/lib/libcurl/curl-7.86.0/tests/smbserver.py
new file mode 100755
index 0000000..62f2d7a
--- /dev/null
+++ b/ap/lib/libcurl/curl-7.86.0/tests/smbserver.py
@@ -0,0 +1,401 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+#  Project                     ___| | | |  _ \| |
+#                             / __| | | | |_) | |
+#                            | (__| |_| |  _ <| |___
+#                             \___|\___/|_| \_\_____|
+#
+# Copyright (C) 2017 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+"""Server for testing SMB"""
+
+from __future__ import (absolute_import, division, print_function,
+                        unicode_literals)
+
+import argparse
+import logging
+import os
+import sys
+import tempfile
+
+# Import our curl test data helper
+from util import ClosingFileHandler, TestData
+
+if sys.version_info.major >= 3:
+    import configparser
+else:
+    import ConfigParser as configparser
+
+# impacket needs to be installed in the Python environment
+try:
+    import impacket
+except ImportError:
+    sys.stderr.write('Python package impacket needs to be installed!\n')
+    sys.stderr.write('Use pip or your package manager to install it.\n')
+    sys.exit(1)
+from impacket import smb as imp_smb
+from impacket import smbserver as imp_smbserver
+from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_NO_SUCH_FILE,
+                                STATUS_SUCCESS)
+
+log = logging.getLogger(__name__)
+SERVER_MAGIC = "SERVER_MAGIC"
+TESTS_MAGIC = "TESTS_MAGIC"
+VERIFIED_REQ = "verifiedserver"
+VERIFIED_RSP = "WE ROOLZ: {pid}\n"
+
+
+def smbserver(options):
+    """Start up a TCP SMB server that serves forever
+
+    """
+    if options.pidfile:
+        pid = os.getpid()
+        # see tests/server/util.c function write_pidfile
+        if os.name == "nt":
+            pid += 65536
+        with open(options.pidfile, "w") as f:
+            f.write(str(pid))
+
+    # Here we write a mini config for the server
+    smb_config = configparser.ConfigParser()
+    smb_config.add_section("global")
+    smb_config.set("global", "server_name", "SERVICE")
+    smb_config.set("global", "server_os", "UNIX")
+    smb_config.set("global", "server_domain", "WORKGROUP")
+    smb_config.set("global", "log_file", "")
+    smb_config.set("global", "credentials_file", "")
+
+    # We need a share which allows us to test that the server is running
+    smb_config.add_section("SERVER")
+    smb_config.set("SERVER", "comment", "server function")
+    smb_config.set("SERVER", "read only", "yes")
+    smb_config.set("SERVER", "share type", "0")
+    smb_config.set("SERVER", "path", SERVER_MAGIC)
+
+    # Have a share for tests.  These files will be autogenerated from the
+    # test input.
+    smb_config.add_section("TESTS")
+    smb_config.set("TESTS", "comment", "tests")
+    smb_config.set("TESTS", "read only", "yes")
+    smb_config.set("TESTS", "share type", "0")
+    smb_config.set("TESTS", "path", TESTS_MAGIC)
+
+    if not options.srcdir or not os.path.isdir(options.srcdir):
+        raise ScriptException("--srcdir is mandatory")
+
+    test_data_dir = os.path.join(options.srcdir, "data")
+
+    smb_server = TestSmbServer((options.host, options.port),
+                               config_parser=smb_config,
+                               test_data_directory=test_data_dir)
+    log.info("[SMB] setting up SMB server on port %s", options.port)
+    smb_server.processConfigFile()
+    smb_server.serve_forever()
+    return 0
+
+
+class TestSmbServer(imp_smbserver.SMBSERVER):
+    """
+    Test server for SMB which subclasses the impacket SMBSERVER and provides
+    test functionality.
+    """
+
+    def __init__(self,
+                 address,
+                 config_parser=None,
+                 test_data_directory=None):
+        imp_smbserver.SMBSERVER.__init__(self,
+                                         address,
+                                         config_parser=config_parser)
+
+        # Set up a test data object so we can get test data later.
+        self.ctd = TestData(test_data_directory)
+
+        # Override smbComNtCreateAndX so we can pretend to have files which
+        # don't exist.
+        self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
+                            self.create_and_x)
+
+    def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
+        """
+        Our version of smbComNtCreateAndX looks for special test files and
+        fools the rest of the framework into opening them as if they were
+        normal files.
+        """
+        conn_data = smb_server.getConnectionData(conn_id)
+
+        # Wrap processing in a try block which allows us to throw SmbException
+        # to control the flow.
+        try:
+            ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
+                smb_command["Parameters"])
+
+            path = self.get_share_path(conn_data,
+                                       ncax_parms["RootFid"],
+                                       recv_packet["Tid"])
+            log.info("[SMB] Requested share path: %s", path)
+
+            disposition = ncax_parms["Disposition"]
+            log.debug("[SMB] Requested disposition: %s", disposition)
+
+            # Currently we only support reading files.
+            if disposition != imp_smb.FILE_OPEN:
+                raise SmbException(STATUS_ACCESS_DENIED,
+                                   "Only support reading files")
+
+            # Check to see if the path we were given is actually a
+            # magic path which needs generating on the fly.
+            if path not in [SERVER_MAGIC, TESTS_MAGIC]:
+                # Pass the command onto the original handler.
+                return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
+                                                                    smb_server,
+                                                                    smb_command,
+                                                                    recv_packet)
+
+            flags2 = recv_packet["Flags2"]
+            ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
+                                                     data=smb_command[
+                                                         "Data"])
+            requested_file = imp_smbserver.decodeSMBString(
+                flags2,
+                ncax_data["FileName"])
+            log.debug("[SMB] User requested file '%s'", requested_file)
+
+            if path == SERVER_MAGIC:
+                fid, full_path = self.get_server_path(requested_file)
+            else:
+                assert (path == TESTS_MAGIC)
+                fid, full_path = self.get_test_path(requested_file)
+
+            resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
+            resp_data = ""
+
+            # Simple way to generate a fid
+            if len(conn_data["OpenedFiles"]) == 0:
+                fakefid = 1
+            else:
+                fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
+            resp_parms["Fid"] = fakefid
+            resp_parms["CreateAction"] = disposition
+
+            if os.path.isdir(path):
+                resp_parms[
+                    "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
+                resp_parms["IsDirectory"] = 1
+            else:
+                resp_parms["IsDirectory"] = 0
+                resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
+
+            # Get this file's information
+            resp_info, error_code = imp_smbserver.queryPathInformation(
+                os.path.dirname(full_path), os.path.basename(full_path),
+                level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
+
+            if error_code != STATUS_SUCCESS:
+                raise SmbException(error_code, "Failed to query path info")
+
+            resp_parms["CreateTime"] = resp_info["CreationTime"]
+            resp_parms["LastAccessTime"] = resp_info[
+                "LastAccessTime"]
+            resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
+            resp_parms["LastChangeTime"] = resp_info[
+                "LastChangeTime"]
+            resp_parms["FileAttributes"] = resp_info[
+                "ExtFileAttributes"]
+            resp_parms["AllocationSize"] = resp_info[
+                "AllocationSize"]
+            resp_parms["EndOfFile"] = resp_info["EndOfFile"]
+
+            # Let's store the fid for the connection
+            # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
+            conn_data["OpenedFiles"][fakefid] = {}
+            conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
+            conn_data["OpenedFiles"][fakefid]["FileName"] = path
+            conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
+
+        except SmbException as s:
+            log.debug("[SMB] SmbException hit: %s", s)
+            error_code = s.error_code
+            resp_parms = ""
+            resp_data = ""
+
+        resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
+        resp_cmd["Parameters"] = resp_parms
+        resp_cmd["Data"] = resp_data
+        smb_server.setConnectionData(conn_id, conn_data)
+
+        return [resp_cmd], None, error_code
+
+    def get_share_path(self, conn_data, root_fid, tid):
+        conn_shares = conn_data["ConnectedShares"]
+
+        if tid in conn_shares:
+            if root_fid > 0:
+                # If we have a rootFid, the path is relative to that fid
+                path = conn_data["OpenedFiles"][root_fid]["FileName"]
+                log.debug("RootFid present %s!" % path)
+            else:
+                if "path" in conn_shares[tid]:
+                    path = conn_shares[tid]["path"]
+                else:
+                    raise SmbException(STATUS_ACCESS_DENIED,
+                                       "Connection share had no path")
+        else:
+            raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
+                               "TID was invalid")
+
+        return path
+
+    def get_server_path(self, requested_filename):
+        log.debug("[SMB] Get server path '%s'", requested_filename)
+
+        if requested_filename not in [VERIFIED_REQ]:
+            raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
+
+        fid, filename = tempfile.mkstemp()
+        log.debug("[SMB] Created %s (%d) for storing '%s'",
+                  filename, fid, requested_filename)
+
+        contents = ""
+
+        if requested_filename == VERIFIED_REQ:
+            log.debug("[SMB] Verifying server is alive")
+            pid = os.getpid()
+            # see tests/server/util.c function write_pidfile
+            if os.name == "nt":
+                pid += 65536
+            contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
+
+        self.write_to_fid(fid, contents)
+        return fid, filename
+
+    def write_to_fid(self, fid, contents):
+        # Write the contents to file descriptor
+        os.write(fid, contents)
+        os.fsync(fid)
+
+        # Rewind the file to the beginning so a read gets us the contents
+        os.lseek(fid, 0, os.SEEK_SET)
+
+    def get_test_path(self, requested_filename):
+        log.info("[SMB] Get reply data from 'test%s'", requested_filename)
+
+        fid, filename = tempfile.mkstemp()
+        log.debug("[SMB] Created %s (%d) for storing test '%s'",
+                  filename, fid, requested_filename)
+
+        try:
+            contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
+            self.write_to_fid(fid, contents)
+            return fid, filename
+
+        except Exception:
+            log.exception("Failed to make test file")
+            raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
+
+
+class SmbException(Exception):
+    def __init__(self, error_code, error_message):
+        super(SmbException, self).__init__(error_message)
+        self.error_code = error_code
+
+
+class ScriptRC(object):
+    """Enum for script return codes"""
+    SUCCESS = 0
+    FAILURE = 1
+    EXCEPTION = 2
+
+
+class ScriptException(Exception):
+    pass
+
+
+def get_options():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument("--port", action="store", default=9017,
+                      type=int, help="port to listen on")
+    parser.add_argument("--host", action="store", default="127.0.0.1",
+                      help="host to listen on")
+    parser.add_argument("--verbose", action="store", type=int, default=0,
+                        help="verbose output")
+    parser.add_argument("--pidfile", action="store",
+                        help="file name for the PID")
+    parser.add_argument("--logfile", action="store",
+                        help="file name for the log")
+    parser.add_argument("--srcdir", action="store", help="test directory")
+    parser.add_argument("--id", action="store", help="server ID")
+    parser.add_argument("--ipv4", action="store_true", default=0,
+                        help="IPv4 flag")
+
+    return parser.parse_args()
+
+
+def setup_logging(options):
+    """
+    Set up logging from the command line options
+    """
+    root_logger = logging.getLogger()
+    add_stdout = False
+
+    formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
+
+    # Write out to a logfile
+    if options.logfile:
+        handler = ClosingFileHandler(options.logfile)
+        handler.setFormatter(formatter)
+        handler.setLevel(logging.DEBUG)
+        root_logger.addHandler(handler)
+    else:
+        # The logfile wasn't specified. Add a stdout logger.
+        add_stdout = True
+
+    if options.verbose:
+        # Add a stdout logger as well in verbose mode
+        root_logger.setLevel(logging.DEBUG)
+        add_stdout = True
+    else:
+        root_logger.setLevel(logging.INFO)
+
+    if add_stdout:
+        stdout_handler = logging.StreamHandler(sys.stdout)
+        stdout_handler.setFormatter(formatter)
+        stdout_handler.setLevel(logging.DEBUG)
+        root_logger.addHandler(stdout_handler)
+
+
+if __name__ == '__main__':
+    # Get the options from the user.
+    options = get_options()
+
+    # Setup logging using the user options
+    setup_logging(options)
+
+    # Run main script.
+    try:
+        rc = smbserver(options)
+    except Exception as e:
+        log.exception(e)
+        rc = ScriptRC.EXCEPTION
+
+    if options.pidfile and os.path.isfile(options.pidfile):
+        os.unlink(options.pidfile)
+
+    log.info("[SMB] Returning %d", rc)
+    sys.exit(rc)