blob: 62f2d7a0fd44666ac2803b63268ded0fbd18a669 [file] [log] [blame]
xf.li6c8fc1e2023-08-12 00:11:09 -07001#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Project ___| | | | _ \| |
5# / __| | | | |_) | |
6# | (__| |_| | _ <| |___
7# \___|\___/|_| \_\_____|
8#
9# Copyright (C) 2017 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
10#
11# This software is licensed as described in the file COPYING, which
12# you should have received as part of this distribution. The terms
13# are also available at https://curl.se/docs/copyright.html.
14#
15# You may opt to use, copy, modify, merge, publish, distribute and/or sell
16# copies of the Software, and permit persons to whom the Software is
17# furnished to do so, under the terms of the COPYING file.
18#
19# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20# KIND, either express or implied.
21#
22# SPDX-License-Identifier: curl
23#
24"""Server for testing SMB"""
25
26from __future__ import (absolute_import, division, print_function,
27 unicode_literals)
28
29import argparse
30import logging
31import os
32import sys
33import tempfile
34
35# Import our curl test data helper
36from util import ClosingFileHandler, TestData
37
38if sys.version_info.major >= 3:
39 import configparser
40else:
41 import ConfigParser as configparser
42
43# impacket needs to be installed in the Python environment
44try:
45 import impacket
46except ImportError:
47 sys.stderr.write('Python package impacket needs to be installed!\n')
48 sys.stderr.write('Use pip or your package manager to install it.\n')
49 sys.exit(1)
50from impacket import smb as imp_smb
51from impacket import smbserver as imp_smbserver
52from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_NO_SUCH_FILE,
53 STATUS_SUCCESS)
54
55log = logging.getLogger(__name__)
56SERVER_MAGIC = "SERVER_MAGIC"
57TESTS_MAGIC = "TESTS_MAGIC"
58VERIFIED_REQ = "verifiedserver"
59VERIFIED_RSP = "WE ROOLZ: {pid}\n"
60
61
62def smbserver(options):
63 """Start up a TCP SMB server that serves forever
64
65 """
66 if options.pidfile:
67 pid = os.getpid()
68 # see tests/server/util.c function write_pidfile
69 if os.name == "nt":
70 pid += 65536
71 with open(options.pidfile, "w") as f:
72 f.write(str(pid))
73
74 # Here we write a mini config for the server
75 smb_config = configparser.ConfigParser()
76 smb_config.add_section("global")
77 smb_config.set("global", "server_name", "SERVICE")
78 smb_config.set("global", "server_os", "UNIX")
79 smb_config.set("global", "server_domain", "WORKGROUP")
80 smb_config.set("global", "log_file", "")
81 smb_config.set("global", "credentials_file", "")
82
83 # We need a share which allows us to test that the server is running
84 smb_config.add_section("SERVER")
85 smb_config.set("SERVER", "comment", "server function")
86 smb_config.set("SERVER", "read only", "yes")
87 smb_config.set("SERVER", "share type", "0")
88 smb_config.set("SERVER", "path", SERVER_MAGIC)
89
90 # Have a share for tests. These files will be autogenerated from the
91 # test input.
92 smb_config.add_section("TESTS")
93 smb_config.set("TESTS", "comment", "tests")
94 smb_config.set("TESTS", "read only", "yes")
95 smb_config.set("TESTS", "share type", "0")
96 smb_config.set("TESTS", "path", TESTS_MAGIC)
97
98 if not options.srcdir or not os.path.isdir(options.srcdir):
99 raise ScriptException("--srcdir is mandatory")
100
101 test_data_dir = os.path.join(options.srcdir, "data")
102
103 smb_server = TestSmbServer((options.host, options.port),
104 config_parser=smb_config,
105 test_data_directory=test_data_dir)
106 log.info("[SMB] setting up SMB server on port %s", options.port)
107 smb_server.processConfigFile()
108 smb_server.serve_forever()
109 return 0
110
111
112class TestSmbServer(imp_smbserver.SMBSERVER):
113 """
114 Test server for SMB which subclasses the impacket SMBSERVER and provides
115 test functionality.
116 """
117
118 def __init__(self,
119 address,
120 config_parser=None,
121 test_data_directory=None):
122 imp_smbserver.SMBSERVER.__init__(self,
123 address,
124 config_parser=config_parser)
125
126 # Set up a test data object so we can get test data later.
127 self.ctd = TestData(test_data_directory)
128
129 # Override smbComNtCreateAndX so we can pretend to have files which
130 # don't exist.
131 self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
132 self.create_and_x)
133
134 def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
135 """
136 Our version of smbComNtCreateAndX looks for special test files and
137 fools the rest of the framework into opening them as if they were
138 normal files.
139 """
140 conn_data = smb_server.getConnectionData(conn_id)
141
142 # Wrap processing in a try block which allows us to throw SmbException
143 # to control the flow.
144 try:
145 ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
146 smb_command["Parameters"])
147
148 path = self.get_share_path(conn_data,
149 ncax_parms["RootFid"],
150 recv_packet["Tid"])
151 log.info("[SMB] Requested share path: %s", path)
152
153 disposition = ncax_parms["Disposition"]
154 log.debug("[SMB] Requested disposition: %s", disposition)
155
156 # Currently we only support reading files.
157 if disposition != imp_smb.FILE_OPEN:
158 raise SmbException(STATUS_ACCESS_DENIED,
159 "Only support reading files")
160
161 # Check to see if the path we were given is actually a
162 # magic path which needs generating on the fly.
163 if path not in [SERVER_MAGIC, TESTS_MAGIC]:
164 # Pass the command onto the original handler.
165 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
166 smb_server,
167 smb_command,
168 recv_packet)
169
170 flags2 = recv_packet["Flags2"]
171 ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
172 data=smb_command[
173 "Data"])
174 requested_file = imp_smbserver.decodeSMBString(
175 flags2,
176 ncax_data["FileName"])
177 log.debug("[SMB] User requested file '%s'", requested_file)
178
179 if path == SERVER_MAGIC:
180 fid, full_path = self.get_server_path(requested_file)
181 else:
182 assert (path == TESTS_MAGIC)
183 fid, full_path = self.get_test_path(requested_file)
184
185 resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
186 resp_data = ""
187
188 # Simple way to generate a fid
189 if len(conn_data["OpenedFiles"]) == 0:
190 fakefid = 1
191 else:
192 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
193 resp_parms["Fid"] = fakefid
194 resp_parms["CreateAction"] = disposition
195
196 if os.path.isdir(path):
197 resp_parms[
198 "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
199 resp_parms["IsDirectory"] = 1
200 else:
201 resp_parms["IsDirectory"] = 0
202 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
203
204 # Get this file's information
205 resp_info, error_code = imp_smbserver.queryPathInformation(
206 os.path.dirname(full_path), os.path.basename(full_path),
207 level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
208
209 if error_code != STATUS_SUCCESS:
210 raise SmbException(error_code, "Failed to query path info")
211
212 resp_parms["CreateTime"] = resp_info["CreationTime"]
213 resp_parms["LastAccessTime"] = resp_info[
214 "LastAccessTime"]
215 resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
216 resp_parms["LastChangeTime"] = resp_info[
217 "LastChangeTime"]
218 resp_parms["FileAttributes"] = resp_info[
219 "ExtFileAttributes"]
220 resp_parms["AllocationSize"] = resp_info[
221 "AllocationSize"]
222 resp_parms["EndOfFile"] = resp_info["EndOfFile"]
223
224 # Let's store the fid for the connection
225 # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
226 conn_data["OpenedFiles"][fakefid] = {}
227 conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
228 conn_data["OpenedFiles"][fakefid]["FileName"] = path
229 conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
230
231 except SmbException as s:
232 log.debug("[SMB] SmbException hit: %s", s)
233 error_code = s.error_code
234 resp_parms = ""
235 resp_data = ""
236
237 resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
238 resp_cmd["Parameters"] = resp_parms
239 resp_cmd["Data"] = resp_data
240 smb_server.setConnectionData(conn_id, conn_data)
241
242 return [resp_cmd], None, error_code
243
244 def get_share_path(self, conn_data, root_fid, tid):
245 conn_shares = conn_data["ConnectedShares"]
246
247 if tid in conn_shares:
248 if root_fid > 0:
249 # If we have a rootFid, the path is relative to that fid
250 path = conn_data["OpenedFiles"][root_fid]["FileName"]
251 log.debug("RootFid present %s!" % path)
252 else:
253 if "path" in conn_shares[tid]:
254 path = conn_shares[tid]["path"]
255 else:
256 raise SmbException(STATUS_ACCESS_DENIED,
257 "Connection share had no path")
258 else:
259 raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
260 "TID was invalid")
261
262 return path
263
264 def get_server_path(self, requested_filename):
265 log.debug("[SMB] Get server path '%s'", requested_filename)
266
267 if requested_filename not in [VERIFIED_REQ]:
268 raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
269
270 fid, filename = tempfile.mkstemp()
271 log.debug("[SMB] Created %s (%d) for storing '%s'",
272 filename, fid, requested_filename)
273
274 contents = ""
275
276 if requested_filename == VERIFIED_REQ:
277 log.debug("[SMB] Verifying server is alive")
278 pid = os.getpid()
279 # see tests/server/util.c function write_pidfile
280 if os.name == "nt":
281 pid += 65536
282 contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')
283
284 self.write_to_fid(fid, contents)
285 return fid, filename
286
287 def write_to_fid(self, fid, contents):
288 # Write the contents to file descriptor
289 os.write(fid, contents)
290 os.fsync(fid)
291
292 # Rewind the file to the beginning so a read gets us the contents
293 os.lseek(fid, 0, os.SEEK_SET)
294
295 def get_test_path(self, requested_filename):
296 log.info("[SMB] Get reply data from 'test%s'", requested_filename)
297
298 fid, filename = tempfile.mkstemp()
299 log.debug("[SMB] Created %s (%d) for storing test '%s'",
300 filename, fid, requested_filename)
301
302 try:
303 contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
304 self.write_to_fid(fid, contents)
305 return fid, filename
306
307 except Exception:
308 log.exception("Failed to make test file")
309 raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
310
311
312class SmbException(Exception):
313 def __init__(self, error_code, error_message):
314 super(SmbException, self).__init__(error_message)
315 self.error_code = error_code
316
317
318class ScriptRC(object):
319 """Enum for script return codes"""
320 SUCCESS = 0
321 FAILURE = 1
322 EXCEPTION = 2
323
324
325class ScriptException(Exception):
326 pass
327
328
329def get_options():
330 parser = argparse.ArgumentParser()
331
332 parser.add_argument("--port", action="store", default=9017,
333 type=int, help="port to listen on")
334 parser.add_argument("--host", action="store", default="127.0.0.1",
335 help="host to listen on")
336 parser.add_argument("--verbose", action="store", type=int, default=0,
337 help="verbose output")
338 parser.add_argument("--pidfile", action="store",
339 help="file name for the PID")
340 parser.add_argument("--logfile", action="store",
341 help="file name for the log")
342 parser.add_argument("--srcdir", action="store", help="test directory")
343 parser.add_argument("--id", action="store", help="server ID")
344 parser.add_argument("--ipv4", action="store_true", default=0,
345 help="IPv4 flag")
346
347 return parser.parse_args()
348
349
350def setup_logging(options):
351 """
352 Set up logging from the command line options
353 """
354 root_logger = logging.getLogger()
355 add_stdout = False
356
357 formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
358
359 # Write out to a logfile
360 if options.logfile:
361 handler = ClosingFileHandler(options.logfile)
362 handler.setFormatter(formatter)
363 handler.setLevel(logging.DEBUG)
364 root_logger.addHandler(handler)
365 else:
366 # The logfile wasn't specified. Add a stdout logger.
367 add_stdout = True
368
369 if options.verbose:
370 # Add a stdout logger as well in verbose mode
371 root_logger.setLevel(logging.DEBUG)
372 add_stdout = True
373 else:
374 root_logger.setLevel(logging.INFO)
375
376 if add_stdout:
377 stdout_handler = logging.StreamHandler(sys.stdout)
378 stdout_handler.setFormatter(formatter)
379 stdout_handler.setLevel(logging.DEBUG)
380 root_logger.addHandler(stdout_handler)
381
382
383if __name__ == '__main__':
384 # Get the options from the user.
385 options = get_options()
386
387 # Setup logging using the user options
388 setup_logging(options)
389
390 # Run main script.
391 try:
392 rc = smbserver(options)
393 except Exception as e:
394 log.exception(e)
395 rc = ScriptRC.EXCEPTION
396
397 if options.pidfile and os.path.isfile(options.pidfile):
398 os.unlink(options.pidfile)
399
400 log.info("[SMB] Returning %d", rc)
401 sys.exit(rc)