[Feature][T106]ZXW P56U09 code

Only Configure: Yes
Affected branch: master
Affected module: unknow
Is it affected on both ZXIC and MTK: only ZXIC
Self-test: No
Doc Update: No

Change-Id: I3cbd8b420271eb20c2b40ebe5c78f83059cd42f3
diff --git a/ap/lib/libcurl/curl-7.86.0/tests/negtelnetserver.py b/ap/lib/libcurl/curl-7.86.0/tests/negtelnetserver.py
new file mode 100755
index 0000000..2b748af
--- /dev/null
+++ b/ap/lib/libcurl/curl-7.86.0/tests/negtelnetserver.py
@@ -0,0 +1,373 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+#  Project                     ___| | | |  _ \| |
+#                             / __| | | | |_) | |
+#                            | (__| |_| |  _ <| |___
+#                             \___|\___/|_| \_\_____|
+#
+# Copyright (C) 2017 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+""" A telnet server which negotiates"""
+
+from __future__ import (absolute_import, division, print_function,
+                        unicode_literals)
+
+import argparse
+import logging
+import os
+import sys
+
+from util import ClosingFileHandler
+
+if sys.version_info.major >= 3:
+    import socketserver
+else:
+    import SocketServer as socketserver
+
+log = logging.getLogger(__name__)
+HOST = "localhost"
+IDENT = "NTEL"
+
+
+# The strings that indicate the test framework is checking our aliveness
+VERIFIED_REQ = "verifiedserver"
+VERIFIED_RSP = "WE ROOLZ: {pid}"
+
+
+def telnetserver(options):
+    """
+    Starts up a TCP server with a telnet handler and serves DICT requests
+    forever.
+    """
+    if options.pidfile:
+        pid = os.getpid()
+        # see tests/server/util.c function write_pidfile
+        if os.name == "nt":
+            pid += 65536
+        with open(options.pidfile, "w") as f:
+            f.write(str(pid))
+
+    local_bind = (HOST, options.port)
+    log.info("Listening on %s", local_bind)
+
+    # Need to set the allow_reuse on the class, not on the instance.
+    socketserver.TCPServer.allow_reuse_address = True
+    server = socketserver.TCPServer(local_bind, NegotiatingTelnetHandler)
+    server.serve_forever()
+
+    return ScriptRC.SUCCESS
+
+
+class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
+    """Handler class for Telnet connections.
+
+    """
+    def handle(self):
+        """
+        Negotiates options before reading data.
+        """
+        neg = Negotiator(self.request)
+
+        try:
+            # Send some initial negotiations.
+            neg.send_do("NEW_ENVIRON")
+            neg.send_will("NEW_ENVIRON")
+            neg.send_dont("NAWS")
+            neg.send_wont("NAWS")
+
+            # Get the data passed through the negotiator
+            data = neg.recv(1024)
+            log.debug("Incoming data: %r", data)
+
+            if VERIFIED_REQ.encode('utf-8') in data:
+                log.debug("Received verification request from test framework")
+                pid = os.getpid()
+                # see tests/server/util.c function write_pidfile
+                if os.name == "nt":
+                    pid += 65536
+                response = VERIFIED_RSP.format(pid=pid)
+                response_data = response.encode('utf-8')
+            else:
+                log.debug("Received normal request - echoing back")
+                response_data = data.decode('utf-8').strip().encode('utf-8')
+
+            if response_data:
+                log.debug("Sending %r", response_data)
+                self.request.sendall(response_data)
+
+        except IOError:
+            log.exception("IOError hit during request")
+
+
+class Negotiator(object):
+    NO_NEG = 0
+    START_NEG = 1
+    WILL = 2
+    WONT = 3
+    DO = 4
+    DONT = 5
+
+    def __init__(self, tcp):
+        self.tcp = tcp
+        self.state = self.NO_NEG
+
+    def recv(self, bytes):
+        """
+        Read bytes from TCP, handling negotiation sequences
+
+        :param bytes: Number of bytes to read
+        :return: a buffer of bytes
+        """
+        buffer = bytearray()
+
+        # If we keep receiving negotiation sequences, we won't fill the buffer.
+        # Keep looping while we can, and until we have something to give back
+        # to the caller.
+        while len(buffer) == 0:
+            data = self.tcp.recv(bytes)
+            if not data:
+                # TCP failed to give us any data. Break out.
+                break
+
+            for byte_int in bytearray(data):
+                if self.state == self.NO_NEG:
+                    self.no_neg(byte_int, buffer)
+                elif self.state == self.START_NEG:
+                    self.start_neg(byte_int)
+                elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
+                    self.handle_option(byte_int)
+                else:
+                    # Received an unexpected byte. Stop negotiations
+                    log.error("Unexpected byte %s in state %s",
+                              byte_int,
+                              self.state)
+                    self.state = self.NO_NEG
+
+        return buffer
+
+    def no_neg(self, byte_int, buffer):
+        # Not negotiating anything thus far. Check to see if we
+        # should.
+        if byte_int == NegTokens.IAC:
+            # Start negotiation
+            log.debug("Starting negotiation (IAC)")
+            self.state = self.START_NEG
+        else:
+            # Just append the incoming byte to the buffer
+            buffer.append(byte_int)
+
+    def start_neg(self, byte_int):
+        # In a negotiation.
+        log.debug("In negotiation (%s)",
+                  NegTokens.from_val(byte_int))
+
+        if byte_int == NegTokens.WILL:
+            # Client is confirming they are willing to do an option
+            log.debug("Client is willing")
+            self.state = self.WILL
+        elif byte_int == NegTokens.WONT:
+            # Client is confirming they are unwilling to do an
+            # option
+            log.debug("Client is unwilling")
+            self.state = self.WONT
+        elif byte_int == NegTokens.DO:
+            # Client is indicating they can do an option
+            log.debug("Client can do")
+            self.state = self.DO
+        elif byte_int == NegTokens.DONT:
+            # Client is indicating they can't do an option
+            log.debug("Client can't do")
+            self.state = self.DONT
+        else:
+            # Received an unexpected byte. Stop negotiations
+            log.error("Unexpected byte %s in state %s",
+                      byte_int,
+                      self.state)
+            self.state = self.NO_NEG
+
+    def handle_option(self, byte_int):
+        if byte_int in [NegOptions.BINARY,
+                        NegOptions.CHARSET,
+                        NegOptions.SUPPRESS_GO_AHEAD,
+                        NegOptions.NAWS,
+                        NegOptions.NEW_ENVIRON]:
+            log.debug("Option: %s", NegOptions.from_val(byte_int))
+
+            # No further negotiation of this option needed. Reset the state.
+            self.state = self.NO_NEG
+
+        else:
+            # Received an unexpected byte. Stop negotiations
+            log.error("Unexpected byte %s in state %s",
+                      byte_int,
+                      self.state)
+            self.state = self.NO_NEG
+
+    def send_message(self, message_ints):
+        self.tcp.sendall(bytearray(message_ints))
+
+    def send_iac(self, arr):
+        message = [NegTokens.IAC]
+        message.extend(arr)
+        self.send_message(message)
+
+    def send_do(self, option_str):
+        log.debug("Sending DO %s", option_str)
+        self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)])
+
+    def send_dont(self, option_str):
+        log.debug("Sending DONT %s", option_str)
+        self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)])
+
+    def send_will(self, option_str):
+        log.debug("Sending WILL %s", option_str)
+        self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)])
+
+    def send_wont(self, option_str):
+        log.debug("Sending WONT %s", option_str)
+        self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
+
+
+class NegBase(object):
+    @classmethod
+    def to_val(cls, name):
+        return getattr(cls, name)
+
+    @classmethod
+    def from_val(cls, val):
+        for k in cls.__dict__.keys():
+            if getattr(cls, k) == val:
+                return k
+
+        return "<unknown>"
+
+
+class NegTokens(NegBase):
+    # The start of a negotiation sequence
+    IAC = 255
+    # Confirm willingness to negotiate
+    WILL = 251
+    # Confirm unwillingness to negotiate
+    WONT = 252
+    # Indicate willingness to negotiate
+    DO = 253
+    # Indicate unwillingness to negotiate
+    DONT = 254
+
+    # The start of sub-negotiation options.
+    SB = 250
+    # The end of sub-negotiation options.
+    SE = 240
+
+
+class NegOptions(NegBase):
+    # Binary Transmission
+    BINARY = 0
+    # Suppress Go Ahead
+    SUPPRESS_GO_AHEAD = 3
+    # NAWS - width and height of client
+    NAWS = 31
+    # NEW-ENVIRON - environment variables on client
+    NEW_ENVIRON = 39
+    # Charset option
+    CHARSET = 42
+
+
+def get_options():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument("--port", action="store", default=9019,
+                        type=int, help="port to listen on")
+    parser.add_argument("--verbose", action="store", type=int, default=0,
+                        help="verbose output")
+    parser.add_argument("--pidfile", action="store",
+                        help="file name for the PID")
+    parser.add_argument("--logfile", action="store",
+                        help="file name for the log")
+    parser.add_argument("--srcdir", action="store", help="test directory")
+    parser.add_argument("--id", action="store", help="server ID")
+    parser.add_argument("--ipv4", action="store_true", default=0,
+                        help="IPv4 flag")
+
+    return parser.parse_args()
+
+
+def setup_logging(options):
+    """
+    Set up logging from the command line options
+    """
+    root_logger = logging.getLogger()
+    add_stdout = False
+
+    formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s "
+                                  "[{ident}] %(message)s"
+                                  .format(ident=IDENT))
+
+    # Write out to a logfile
+    if options.logfile:
+        handler = ClosingFileHandler(options.logfile)
+        handler.setFormatter(formatter)
+        handler.setLevel(logging.DEBUG)
+        root_logger.addHandler(handler)
+    else:
+        # The logfile wasn't specified. Add a stdout logger.
+        add_stdout = True
+
+    if options.verbose:
+        # Add a stdout logger as well in verbose mode
+        root_logger.setLevel(logging.DEBUG)
+        add_stdout = True
+    else:
+        root_logger.setLevel(logging.INFO)
+
+    if add_stdout:
+        stdout_handler = logging.StreamHandler(sys.stdout)
+        stdout_handler.setFormatter(formatter)
+        stdout_handler.setLevel(logging.DEBUG)
+        root_logger.addHandler(stdout_handler)
+
+
+class ScriptRC(object):
+    """Enum for script return codes"""
+    SUCCESS = 0
+    FAILURE = 1
+    EXCEPTION = 2
+
+
+class ScriptException(Exception):
+    pass
+
+
+if __name__ == '__main__':
+    # Get the options from the user.
+    options = get_options()
+
+    # Setup logging using the user options
+    setup_logging(options)
+
+    # Run main script.
+    try:
+        rc = telnetserver(options)
+    except Exception as e:
+        log.exception(e)
+        rc = ScriptRC.EXCEPTION
+
+    if options.pidfile and os.path.isfile(options.pidfile):
+        os.unlink(options.pidfile)
+
+    log.info("Returning %d", rc)
+    sys.exit(rc)