blob: 6a99728c4fcbb3653c7f91f28ba94e71d3888443 [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001import os,sys,logging
2import signal, time
3from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
4import threading
5import queue
6import socket
7import io
8import sqlite3
9import bb.server.xmlrpcclient
10import prserv
11import prserv.db
12import errno
13import select
14
15logger = logging.getLogger("BitBake.PRserv")
16
17if sys.hexversion < 0x020600F0:
18 print("Sorry, python 2.6 or later is required.")
19 sys.exit(1)
20
21class Handler(SimpleXMLRPCRequestHandler):
22 def _dispatch(self,method,params):
23 try:
24 value=self.server.funcs[method](*params)
25 except:
26 import traceback
27 traceback.print_exc()
28 raise
29 return value
30
31PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
32singleton = None
33
34
35class PRServer(SimpleXMLRPCServer):
36 def __init__(self, dbfile, logfile, interface, daemon=True):
37 ''' constructor '''
38 try:
39 SimpleXMLRPCServer.__init__(self, interface,
40 logRequests=False, allow_none=True)
41 except socket.error:
42 ip=socket.gethostbyname(interface[0])
43 port=interface[1]
44 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
45 sys.stderr.write(msg)
46 raise PRServiceConfigError
47
48 self.dbfile=dbfile
49 self.daemon=daemon
50 self.logfile=logfile
51 self.working_thread=None
52 self.host, self.port = self.socket.getsockname()
53 self.pidfile=PIDPREFIX % (self.host, self.port)
54
55 self.register_function(self.getPR, "getPR")
56 self.register_function(self.quit, "quit")
57 self.register_function(self.ping, "ping")
58 self.register_function(self.export, "export")
59 self.register_function(self.dump_db, "dump_db")
60 self.register_function(self.importone, "importone")
61 self.register_introspection_functions()
62
63 self.quitpipein, self.quitpipeout = os.pipe()
64
65 self.requestqueue = queue.Queue()
66 self.handlerthread = threading.Thread(target = self.process_request_thread)
67 self.handlerthread.daemon = False
68
69 def process_request_thread(self):
70 """Same as in BaseServer but as a thread.
71
72 In addition, exception handling is done here.
73
74 """
75 iter_count = 1
76 # 60 iterations between syncs or sync if dirty every ~30 seconds
77 iterations_between_sync = 60
78
79 bb.utils.set_process_name("PRServ Handler")
80
81 while not self.quitflag:
82 try:
83 (request, client_address) = self.requestqueue.get(True, 30)
84 except queue.Empty:
85 self.table.sync_if_dirty()
86 continue
87 if request is None:
88 continue
89 try:
90 self.finish_request(request, client_address)
91 self.shutdown_request(request)
92 iter_count = (iter_count + 1) % iterations_between_sync
93 if iter_count == 0:
94 self.table.sync_if_dirty()
95 except:
96 self.handle_error(request, client_address)
97 self.shutdown_request(request)
98 self.table.sync()
99 self.table.sync_if_dirty()
100
101 def sigint_handler(self, signum, stack):
102 if self.table:
103 self.table.sync()
104
105 def sigterm_handler(self, signum, stack):
106 if self.table:
107 self.table.sync()
108 self.quit()
109 self.requestqueue.put((None, None))
110
111 def process_request(self, request, client_address):
112 self.requestqueue.put((request, client_address))
113
114 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
115 try:
116 return self.table.export(version, pkgarch, checksum, colinfo)
117 except sqlite3.Error as exc:
118 logger.error(str(exc))
119 return None
120
121 def dump_db(self):
122 """
123 Returns a script (string) that reconstructs the state of the
124 entire database at the time this function is called. The script
125 language is defined by the backing database engine, which is a
126 function of server configuration.
127 Returns None if the database engine does not support dumping to
128 script or if some other error is encountered in processing.
129 """
130 buff = io.StringIO()
131 try:
132 self.table.sync()
133 self.table.dump_db(buff)
134 return buff.getvalue()
135 except Exception as exc:
136 logger.error(str(exc))
137 return None
138 finally:
139 buff.close()
140
141 def importone(self, version, pkgarch, checksum, value):
142 return self.table.importone(version, pkgarch, checksum, value)
143
144 def ping(self):
145 return not self.quitflag
146
147 def getinfo(self):
148 return (self.host, self.port)
149
150 def getPR(self, version, pkgarch, checksum):
151 try:
152 return self.table.getValue(version, pkgarch, checksum)
153 except prserv.NotFoundError:
154 logger.error("can not find value for (%s, %s)",version, checksum)
155 return None
156 except sqlite3.Error as exc:
157 logger.error(str(exc))
158 return None
159
160 def quit(self):
161 self.quitflag=True
162 os.write(self.quitpipeout, b"q")
163 os.close(self.quitpipeout)
164 return
165
166 def work_forever(self,):
167 self.quitflag = False
168 # This timeout applies to the poll in TCPServer, we need the select
169 # below to wake on our quit pipe closing. We only ever call into handle_request
170 # if there is data there.
171 self.timeout = 0.01
172
173 bb.utils.set_process_name("PRServ")
174
175 # DB connection must be created after all forks
176 self.db = prserv.db.PRData(self.dbfile)
177 self.table = self.db["PRMAIN"]
178
179 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
180 (self.dbfile, self.host, self.port, str(os.getpid())))
181
182 self.handlerthread.start()
183 while not self.quitflag:
184 ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
185 if self.quitflag:
186 break
187 if self.fileno() in ready[0]:
188 self.handle_request()
189 self.handlerthread.join()
190 self.db.disconnect()
191 logger.info("PRServer: stopping...")
192 self.server_close()
193 os.close(self.quitpipein)
194 return
195
196 def start(self):
197 if self.daemon:
198 pid = self.daemonize()
199 else:
200 pid = self.fork()
201 self.pid = pid
202
203 # Ensure both the parent sees this and the child from the work_forever log entry above
204 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
205 (self.dbfile, self.host, self.port, str(pid)))
206
207 def delpid(self):
208 os.remove(self.pidfile)
209
210 def daemonize(self):
211 """
212 See Advanced Programming in the UNIX, Sec 13.3
213 """
214 try:
215 pid = os.fork()
216 if pid > 0:
217 os.waitpid(pid, 0)
218 #parent return instead of exit to give control
219 return pid
220 except OSError as e:
221 raise Exception("%s [%d]" % (e.strerror, e.errno))
222
223 os.setsid()
224 """
225 fork again to make sure the daemon is not session leader,
226 which prevents it from acquiring controlling terminal
227 """
228 try:
229 pid = os.fork()
230 if pid > 0: #parent
231 os._exit(0)
232 except OSError as e:
233 raise Exception("%s [%d]" % (e.strerror, e.errno))
234
235 self.cleanup_handles()
236 os._exit(0)
237
238 def fork(self):
239 try:
240 pid = os.fork()
241 if pid > 0:
242 return pid
243 except OSError as e:
244 raise Exception("%s [%d]" % (e.strerror, e.errno))
245
246 bb.utils.signal_on_parent_exit("SIGTERM")
247 self.cleanup_handles()
248 os._exit(0)
249
250 def cleanup_handles(self):
251 signal.signal(signal.SIGINT, self.sigint_handler)
252 signal.signal(signal.SIGTERM, self.sigterm_handler)
253 os.chdir("/")
254
255 sys.stdout.flush()
256 sys.stderr.flush()
257
258 # We could be called from a python thread with io.StringIO as
259 # stdout/stderr or it could be 'real' unix fd forking where we need
260 # to physically close the fds to prevent the program launching us from
261 # potentially hanging on a pipe. Handle both cases.
262 si = open('/dev/null', 'r')
263 try:
264 os.dup2(si.fileno(),sys.stdin.fileno())
265 except (AttributeError, io.UnsupportedOperation):
266 sys.stdin = si
267 so = open(self.logfile, 'a+')
268 try:
269 os.dup2(so.fileno(),sys.stdout.fileno())
270 except (AttributeError, io.UnsupportedOperation):
271 sys.stdout = so
272 try:
273 os.dup2(so.fileno(),sys.stderr.fileno())
274 except (AttributeError, io.UnsupportedOperation):
275 sys.stderr = so
276
277 # Clear out all log handlers prior to the fork() to avoid calling
278 # event handlers not part of the PRserver
279 for logger_iter in logging.Logger.manager.loggerDict.keys():
280 logging.getLogger(logger_iter).handlers = []
281
282 # Ensure logging makes it to the logfile
283 streamhandler = logging.StreamHandler()
284 streamhandler.setLevel(logging.DEBUG)
285 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
286 streamhandler.setFormatter(formatter)
287 logger.addHandler(streamhandler)
288
289 # write pidfile
290 pid = str(os.getpid())
291 pf = open(self.pidfile, 'w')
292 pf.write("%s\n" % pid)
293 pf.close()
294
295 self.work_forever()
296 self.delpid()
297
298class PRServSingleton(object):
299 def __init__(self, dbfile, logfile, interface):
300 self.dbfile = dbfile
301 self.logfile = logfile
302 self.interface = interface
303 self.host = None
304 self.port = None
305
306 def start(self):
307 self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
308 self.prserv.start()
309 self.host, self.port = self.prserv.getinfo()
310
311 def getinfo(self):
312 return (self.host, self.port)
313
314class PRServerConnection(object):
315 def __init__(self, host, port):
316 if is_local_special(host, port):
317 host, port = singleton.getinfo()
318 self.host = host
319 self.port = port
320 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
321
322 def terminate(self):
323 try:
324 logger.info("Terminating PRServer...")
325 self.connection.quit()
326 except Exception as exc:
327 sys.stderr.write("%s\n" % str(exc))
328
329 def getPR(self, version, pkgarch, checksum):
330 return self.connection.getPR(version, pkgarch, checksum)
331
332 def ping(self):
333 return self.connection.ping()
334
335 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
336 return self.connection.export(version, pkgarch, checksum, colinfo)
337
338 def dump_db(self):
339 return self.connection.dump_db()
340
341 def importone(self, version, pkgarch, checksum, value):
342 return self.connection.importone(version, pkgarch, checksum, value)
343
344 def getinfo(self):
345 return self.host, self.port
346
347def start_daemon(dbfile, host, port, logfile):
348 ip = socket.gethostbyname(host)
349 pidfile = PIDPREFIX % (ip, port)
350 try:
351 pf = open(pidfile,'r')
352 pid = int(pf.readline().strip())
353 pf.close()
354 except IOError:
355 pid = None
356
357 if pid:
358 sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
359 % pidfile)
360 return 1
361
362 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
363 server.start()
364
365 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
366 # the one the server actually is listening, so at least warn the user about it
367 _,rport = server.getinfo()
368 if port != rport:
369 sys.stdout.write("Server is listening at port %s instead of %s\n"
370 % (rport,port))
371 return 0
372
373def stop_daemon(host, port):
374 import glob
375 ip = socket.gethostbyname(host)
376 pidfile = PIDPREFIX % (ip, port)
377 try:
378 pf = open(pidfile,'r')
379 pid = int(pf.readline().strip())
380 pf.close()
381 except IOError:
382 pid = None
383
384 if not pid:
385 # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
386 # so at least advise the user which ports the corresponding server is listening
387 ports = []
388 portstr = ""
389 for pf in glob.glob(PIDPREFIX % (ip,'*')):
390 bn = os.path.basename(pf)
391 root, _ = os.path.splitext(bn)
392 ports.append(root.split('_')[-1])
393 if len(ports):
394 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
395
396 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
397 % (pidfile,portstr))
398 return 1
399
400 try:
401 PRServerConnection(ip, port).terminate()
402 except:
403 logger.critical("Stop PRService %s:%d failed" % (host,port))
404
405 try:
406 if pid:
407 wait_timeout = 0
408 print("Waiting for pr-server to exit.")
409 while is_running(pid) and wait_timeout < 50:
410 time.sleep(0.1)
411 wait_timeout += 1
412
413 if is_running(pid):
414 print("Sending SIGTERM to pr-server.")
415 os.kill(pid,signal.SIGTERM)
416 time.sleep(0.1)
417
418 if os.path.exists(pidfile):
419 os.remove(pidfile)
420
421 except OSError as e:
422 err = str(e)
423 if err.find("No such process") <= 0:
424 raise e
425
426 return 0
427
428def is_running(pid):
429 try:
430 os.kill(pid, 0)
431 except OSError as err:
432 if err.errno == errno.ESRCH:
433 return False
434 return True
435
436def is_local_special(host, port):
437 if host.strip().upper() == 'localhost'.upper() and (not port):
438 return True
439 else:
440 return False
441
442class PRServiceConfigError(Exception):
443 pass
444
445def auto_start(d):
446 global singleton
447
448 # Shutdown any existing PR Server
449 auto_shutdown()
450
451 host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':')))
452 if not host_params:
453 return None
454
455 if len(host_params) != 2:
456 logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
457 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
458 raise PRServiceConfigError
459
460 if is_local_special(host_params[0], int(host_params[1])) and not singleton:
461 import bb.utils
462 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
463 if not cachedir:
464 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
465 raise PRServiceConfigError
466 bb.utils.mkdirhier(cachedir)
467 dbfile = os.path.join(cachedir, "prserv.sqlite3")
468 logfile = os.path.join(cachedir, "prserv.log")
469 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
470 singleton.start()
471 if singleton:
472 host, port = singleton.getinfo()
473 else:
474 host = host_params[0]
475 port = int(host_params[1])
476
477 try:
478 connection = PRServerConnection(host,port)
479 connection.ping()
480 realhost, realport = connection.getinfo()
481 return str(realhost) + ":" + str(realport)
482
483 except Exception:
484 logger.critical("PRservice %s:%d not available" % (host, port))
485 raise PRServiceConfigError
486
487def auto_shutdown():
488 global singleton
489 if singleton:
490 host, port = singleton.getinfo()
491 try:
492 PRServerConnection(host, port).terminate()
493 except:
494 logger.critical("Stop PRService %s:%d failed" % (host,port))
495
496 try:
497 os.waitpid(singleton.prserv.pid, 0)
498 except ChildProcessError:
499 pass
500 singleton = None
501
502def ping(host, port):
503 conn=PRServerConnection(host, port)
504 return conn.ping()