blob: 80a7875ad98053f8569fed94b60cbe6cad90c5c1 [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License version 2 as
8# published by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19"""
20 This module implements a multiprocessing.Process based server for bitbake.
21"""
22
23import bb
24import bb.event
25import logging
26import multiprocessing
27import threading
28import array
29import os
30import sys
31import time
32import select
33import socket
34import subprocess
35import errno
36import re
37import datetime
38import bb.server.xmlrpcserver
39from bb import daemonize
40from multiprocessing import queues
41
42logger = logging.getLogger('BitBake')
43
44class ProcessTimeout(SystemExit):
45 pass
46
47class ProcessServer(multiprocessing.Process):
48 profile_filename = "profile.log"
49 profile_processed_filename = "profile.log.processed"
50
51 def __init__(self, lock, sock, sockname):
52 multiprocessing.Process.__init__(self)
53 self.command_channel = False
54 self.command_channel_reply = False
55 self.quit = False
56 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
57 self.next_heartbeat = time.time()
58
59 self.event_handle = None
60 self.haveui = False
61 self.lastui = False
62 self.xmlrpc = False
63
64 self._idlefuns = {}
65
66 self.bitbake_lock = lock
67 self.sock = sock
68 self.sockname = sockname
69
70 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data
74
75 def run(self):
76
77 if self.xmlrpcinterface[0]:
78 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
79
80 print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
81
82 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
83 if heartbeat_event:
84 try:
85 self.heartbeat_seconds = float(heartbeat_event)
86 except:
87 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
88
89 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
90 try:
91 if self.timeout:
92 self.timeout = float(self.timeout)
93 except:
94 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
95
96
97 try:
98 self.bitbake_lock.seek(0)
99 self.bitbake_lock.truncate()
100 if self.xmlrpc:
101 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
102 else:
103 self.bitbake_lock.write("%s\n" % (os.getpid()))
104 self.bitbake_lock.flush()
105 except Exception as e:
106 print("Error writing to lock file: %s" % str(e))
107 pass
108
109 if self.cooker.configuration.profile:
110 try:
111 import cProfile as profile
112 except:
113 import profile
114 prof = profile.Profile()
115
116 ret = profile.Profile.runcall(prof, self.main)
117
118 prof.dump_stats("profile.log")
119 bb.utils.process_profilelog("profile.log")
120 print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
121
122 else:
123 ret = self.main()
124
125 return ret
126
127 def main(self):
128 self.cooker.pre_serve()
129
130 bb.utils.set_process_name("Cooker")
131
132 ready = []
133 newconnections = []
134
135 self.controllersock = False
136 fds = [self.sock]
137 if self.xmlrpc:
138 fds.append(self.xmlrpc)
139 print("Entering server connection loop")
140
141 def disconnect_client(self, fds):
142 print("Disconnecting Client")
143 if self.controllersock:
144 fds.remove(self.controllersock)
145 self.controllersock.close()
146 self.controllersock = False
147 if self.haveui:
148 fds.remove(self.command_channel)
149 bb.event.unregister_UIHhandler(self.event_handle, True)
150 self.command_channel_reply.writer.close()
151 self.event_writer.writer.close()
152 self.command_channel.close()
153 self.command_channel = False
154 del self.event_writer
155 self.lastui = time.time()
156 self.cooker.clientComplete()
157 self.haveui = False
158 ready = select.select(fds,[],[],0)[0]
159 if newconnections:
160 print("Starting new client")
161 conn = newconnections.pop(-1)
162 fds.append(conn)
163 self.controllersock = conn
164 elif self.timeout is None and not ready:
165 print("No timeout, exiting.")
166 self.quit = True
167
168 while not self.quit:
169 if self.sock in ready:
170 while select.select([self.sock],[],[],0)[0]:
171 controllersock, address = self.sock.accept()
172 if self.controllersock:
173 print("Queuing %s (%s)" % (str(ready), str(newconnections)))
174 newconnections.append(controllersock)
175 else:
176 print("Accepting %s (%s)" % (str(ready), str(newconnections)))
177 self.controllersock = controllersock
178 fds.append(controllersock)
179 if self.controllersock in ready:
180 try:
181 print("Processing Client")
182 ui_fds = recvfds(self.controllersock, 3)
183 print("Connecting Client")
184
185 # Where to write events to
186 writer = ConnectionWriter(ui_fds[0])
187 self.event_handle = bb.event.register_UIHhandler(writer, True)
188 self.event_writer = writer
189
190 # Where to read commands from
191 reader = ConnectionReader(ui_fds[1])
192 fds.append(reader)
193 self.command_channel = reader
194
195 # Where to send command return values to
196 writer = ConnectionWriter(ui_fds[2])
197 self.command_channel_reply = writer
198
199 self.haveui = True
200
201 except (EOFError, OSError):
202 disconnect_client(self, fds)
203
204 if not self.timeout == -1.0 and not self.haveui and self.lastui and self.timeout and \
205 (self.lastui + self.timeout) < time.time():
206 print("Server timeout, exiting.")
207 self.quit = True
208
209 if self.command_channel in ready:
210 try:
211 command = self.command_channel.get()
212 except EOFError:
213 # Client connection shutting down
214 ready = []
215 disconnect_client(self, fds)
216 continue
217 if command[0] == "terminateServer":
218 self.quit = True
219 continue
220 try:
221 print("Running command %s" % command)
222 self.command_channel_reply.send(self.cooker.command.runCommand(command))
223 except Exception as e:
224 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
225
226 if self.xmlrpc in ready:
227 self.xmlrpc.handle_requests()
228
229 ready = self.idle_commands(.1, fds)
230
231 print("Exiting")
232 # Remove the socket file so we don't get any more connections to avoid races
233 os.unlink(self.sockname)
234 self.sock.close()
235
236 try:
237 self.cooker.shutdown(True)
238 self.cooker.notifier.stop()
239 self.cooker.confignotifier.stop()
240 except:
241 pass
242
243 self.cooker.post_serve()
244
245 # Finally release the lockfile but warn about other processes holding it open
246 lock = self.bitbake_lock
247 lockfile = lock.name
248 lock.close()
249 lock = None
250
251 while not lock:
252 with bb.utils.timeout(3):
253 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
254 if lock:
255 # We hold the lock so we can remove the file (hide stale pid data)
256 bb.utils.remove(lockfile)
257 bb.utils.unlockfile(lock)
258 return
259
260 if not lock:
261 # Some systems may not have lsof available
262 procs = None
263 try:
264 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
265 except OSError as e:
266 if e.errno != errno.ENOENT:
267 raise
268 if procs is None:
269 # Fall back to fuser if lsof is unavailable
270 try:
271 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
272 except OSError as e:
273 if e.errno != errno.ENOENT:
274 raise
275
276 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
277 if procs:
278 msg += ":\n%s" % str(procs)
279 print(msg)
280
281 def idle_commands(self, delay, fds=None):
282 nextsleep = delay
283 if not fds:
284 fds = []
285
286 for function, data in list(self._idlefuns.items()):
287 try:
288 retval = function(self, data, False)
289 if retval is False:
290 del self._idlefuns[function]
291 nextsleep = None
292 elif retval is True:
293 nextsleep = None
294 elif isinstance(retval, float) and nextsleep:
295 if (retval < nextsleep):
296 nextsleep = retval
297 elif nextsleep is None:
298 continue
299 else:
300 fds = fds + retval
301 except SystemExit:
302 raise
303 except Exception as exc:
304 if not isinstance(exc, bb.BBHandledException):
305 logger.exception('Running idle function')
306 del self._idlefuns[function]
307 self.quit = True
308
309 # Create new heartbeat event?
310 now = time.time()
311 if now >= self.next_heartbeat:
312 # We might have missed heartbeats. Just trigger once in
313 # that case and continue after the usual delay.
314 self.next_heartbeat += self.heartbeat_seconds
315 if self.next_heartbeat <= now:
316 self.next_heartbeat = now + self.heartbeat_seconds
317 heartbeat = bb.event.HeartbeatEvent(now)
318 bb.event.fire(heartbeat, self.cooker.data)
319 if nextsleep and now + nextsleep > self.next_heartbeat:
320 # Shorten timeout so that we we wake up in time for
321 # the heartbeat.
322 nextsleep = self.next_heartbeat - now
323
324 if nextsleep is not None:
325 if self.xmlrpc:
326 nextsleep = self.xmlrpc.get_timeout(nextsleep)
327 try:
328 return select.select(fds,[],[],nextsleep)[0]
329 except InterruptedError:
330 # Ignore EINTR
331 return []
332 else:
333 return select.select(fds,[],[],0)[0]
334
335
336class ServerCommunicator():
337 def __init__(self, connection, recv):
338 self.connection = connection
339 self.recv = recv
340
341 def runCommand(self, command):
342 self.connection.send(command)
343 if not self.recv.poll(30):
344 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server")
345 return self.recv.get()
346
347 def updateFeatureSet(self, featureset):
348 _, error = self.runCommand(["setFeatures", featureset])
349 if error:
350 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
351 raise BaseException(error)
352
353 def getEventHandle(self):
354 handle, error = self.runCommand(["getUIHandlerNum"])
355 if error:
356 logger.error("Unable to get UI Handler Number: %s" % error)
357 raise BaseException(error)
358
359 return handle
360
361 def terminateServer(self):
362 self.connection.send(['terminateServer'])
363 return
364
365class BitBakeProcessServerConnection(object):
366 def __init__(self, ui_channel, recv, eq, sock):
367 self.connection = ServerCommunicator(ui_channel, recv)
368 self.events = eq
369 # Save sock so it doesn't get gc'd for the life of our connection
370 self.socket_connection = sock
371
372 def terminate(self):
373 self.socket_connection.close()
374 self.connection.connection.close()
375 self.connection.recv.close()
376 return
377
378class BitBakeServer(object):
379 start_log_format = '--- Starting bitbake server pid %s at %s ---'
380 start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
381
382 def __init__(self, lock, sockname, configuration, featureset):
383
384 self.configuration = configuration
385 self.featureset = featureset
386 self.sockname = sockname
387 self.bitbake_lock = lock
388 self.readypipe, self.readypipein = os.pipe()
389
390 # Create server control socket
391 if os.path.exists(sockname):
392 os.unlink(sockname)
393
394 # Place the log in the builddirectory alongside the lock file
395 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
396
397 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
398 # AF_UNIX has path length issues so chdir here to workaround
399 cwd = os.getcwd()
400 try:
401 os.chdir(os.path.dirname(sockname))
402 self.sock.bind(os.path.basename(sockname))
403 finally:
404 os.chdir(cwd)
405 self.sock.listen(1)
406
407 os.set_inheritable(self.sock.fileno(), True)
408 startdatetime = datetime.datetime.now()
409 bb.daemonize.createDaemon(self._startServer, logfile)
410 self.sock.close()
411 self.bitbake_lock.close()
412 os.close(self.readypipein)
413
414 ready = ConnectionReader(self.readypipe)
415 r = ready.poll(5)
416 if not r:
417 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
418 r = ready.poll(90)
419 if r:
420 try:
421 r = ready.get()
422 except EOFError:
423 # Trap the child exitting/closing the pipe and error out
424 r = None
425 if not r or r[0] != "r":
426 ready.close()
427 bb.error("Unable to start bitbake server (%s)" % str(r))
428 if os.path.exists(logfile):
429 logstart_re = re.compile(self.start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
430 started = False
431 lines = []
432 lastlines = []
433 with open(logfile, "r") as f:
434 for line in f:
435 if started:
436 lines.append(line)
437 else:
438 lastlines.append(line)
439 res = logstart_re.match(line.rstrip())
440 if res:
441 ldatetime = datetime.datetime.strptime(res.group(2), self.start_log_datetime_format)
442 if ldatetime >= startdatetime:
443 started = True
444 lines.append(line)
445 if len(lastlines) > 60:
446 lastlines = lastlines[-60:]
447 if lines:
448 if len(lines) > 60:
449 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
450 else:
451 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
452 elif lastlines:
453 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
454 else:
455 bb.error("%s doesn't exist" % logfile)
456
457 raise SystemExit(1)
458
459 ready.close()
460
461 def _startServer(self):
462 print(self.start_log_format % (os.getpid(), datetime.datetime.now().strftime(self.start_log_datetime_format)))
463 sys.stdout.flush()
464
465 server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
466 self.configuration.setServerRegIdleCallback(server.register_idle_function)
467 os.close(self.readypipe)
468 writer = ConnectionWriter(self.readypipein)
469 self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
470 writer.send("r")
471 writer.close()
472 server.cooker = self.cooker
473 server.server_timeout = self.configuration.server_timeout
474 server.xmlrpcinterface = self.configuration.xmlrpcinterface
475 print("Started bitbake server pid %d" % os.getpid())
476 sys.stdout.flush()
477
478 server.start()
479
480def connectProcessServer(sockname, featureset):
481 # Connect to socket
482 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
483 # AF_UNIX has path length issues so chdir here to workaround
484 cwd = os.getcwd()
485
486 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
487 eq = command_chan_recv = command_chan = None
488
489 sock.settimeout(10)
490
491 try:
492 try:
493 os.chdir(os.path.dirname(sockname))
494 finished = False
495 while not finished:
496 try:
497 sock.connect(os.path.basename(sockname))
498 finished = True
499 except IOError as e:
500 if e.errno == errno.EWOULDBLOCK:
501 pass
502 raise
503 finally:
504 os.chdir(cwd)
505
506 # Send an fd for the remote to write events to
507 readfd, writefd = os.pipe()
508 eq = BBUIEventQueue(readfd)
509 # Send an fd for the remote to recieve commands from
510 readfd1, writefd1 = os.pipe()
511 command_chan = ConnectionWriter(writefd1)
512 # Send an fd for the remote to write commands results to
513 readfd2, writefd2 = os.pipe()
514 command_chan_recv = ConnectionReader(readfd2)
515
516 sendfds(sock, [writefd, readfd1, writefd2])
517
518 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
519
520 # Close the ends of the pipes we won't use
521 for i in [writefd, readfd1, writefd2]:
522 os.close(i)
523
524 server_connection.connection.updateFeatureSet(featureset)
525
526 except (Exception, SystemExit) as e:
527 if command_chan_recv:
528 command_chan_recv.close()
529 if command_chan:
530 command_chan.close()
531 for i in [writefd, readfd1, writefd2]:
532 try:
533 if i:
534 os.close(i)
535 except OSError:
536 pass
537 sock.close()
538 raise
539
540 return server_connection
541
542def sendfds(sock, fds):
543 '''Send an array of fds over an AF_UNIX socket.'''
544 fds = array.array('i', fds)
545 msg = bytes([len(fds) % 256])
546 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
547
548def recvfds(sock, size):
549 '''Receive an array of fds over an AF_UNIX socket.'''
550 a = array.array('i')
551 bytes_size = a.itemsize * size
552 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
553 if not msg and not ancdata:
554 raise EOFError
555 try:
556 if len(ancdata) != 1:
557 raise RuntimeError('received %d items of ancdata' %
558 len(ancdata))
559 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
560 if (cmsg_level == socket.SOL_SOCKET and
561 cmsg_type == socket.SCM_RIGHTS):
562 if len(cmsg_data) % a.itemsize != 0:
563 raise ValueError
564 a.frombytes(cmsg_data)
565 assert len(a) % 256 == msg[0]
566 return list(a)
567 except (ValueError, IndexError):
568 pass
569 raise RuntimeError('Invalid data received')
570
571class BBUIEventQueue:
572 def __init__(self, readfd):
573
574 self.eventQueue = []
575 self.eventQueueLock = threading.Lock()
576 self.eventQueueNotify = threading.Event()
577
578 self.reader = ConnectionReader(readfd)
579
580 self.t = threading.Thread()
581 self.t.setDaemon(True)
582 self.t.run = self.startCallbackHandler
583 self.t.start()
584
585 def getEvent(self):
586 self.eventQueueLock.acquire()
587
588 if len(self.eventQueue) == 0:
589 self.eventQueueLock.release()
590 return None
591
592 item = self.eventQueue.pop(0)
593
594 if len(self.eventQueue) == 0:
595 self.eventQueueNotify.clear()
596
597 self.eventQueueLock.release()
598 return item
599
600 def waitEvent(self, delay):
601 self.eventQueueNotify.wait(delay)
602 return self.getEvent()
603
604 def queue_event(self, event):
605 self.eventQueueLock.acquire()
606 self.eventQueue.append(event)
607 self.eventQueueNotify.set()
608 self.eventQueueLock.release()
609
610 def send_event(self, event):
611 self.queue_event(pickle.loads(event))
612
613 def startCallbackHandler(self):
614 bb.utils.set_process_name("UIEventQueue")
615 while True:
616 try:
617 self.reader.wait()
618 event = self.reader.get()
619 self.queue_event(event)
620 except EOFError:
621 # Easiest way to exit is to close the file descriptor to cause an exit
622 break
623 self.reader.close()
624
625class ConnectionReader(object):
626
627 def __init__(self, fd):
628 self.reader = multiprocessing.connection.Connection(fd, writable=False)
629 self.rlock = multiprocessing.Lock()
630
631 def wait(self, timeout=None):
632 return multiprocessing.connection.wait([self.reader], timeout)
633
634 def poll(self, timeout=None):
635 return self.reader.poll(timeout)
636
637 def get(self):
638 with self.rlock:
639 res = self.reader.recv_bytes()
640 return multiprocessing.reduction.ForkingPickler.loads(res)
641
642 def fileno(self):
643 return self.reader.fileno()
644
645 def close(self):
646 return self.reader.close()
647
648
649class ConnectionWriter(object):
650
651 def __init__(self, fd):
652 self.writer = multiprocessing.connection.Connection(fd, readable=False)
653 self.wlock = multiprocessing.Lock()
654 # Why bb.event needs this I have no idea
655 self.event = self
656
657 def send(self, obj):
658 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
659 with self.wlock:
660 self.writer.send_bytes(obj)
661
662 def fileno(self):
663 return self.writer.fileno()
664
665 def close(self):
666 return self.writer.close()