blob: e925054b7f99b6aafd6c7d449f2aaec6b295aadf [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001#!/usr/bin/env python3
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
13import pickle
14import traceback
15import queue
16from multiprocessing import Lock
17from threading import Thread
18
19if sys.getfilesystemencoding() != "utf-8":
20 sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
21
22# Users shouldn't be running this code directly
23if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
24 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
25 sys.exit(1)
26
27profiling = False
28if sys.argv[1].startswith("decafbadbad"):
29 profiling = True
30 try:
31 import cProfile as profile
32 except:
33 import profile
34
35# Unbuffer stdout to avoid log truncation in the event
36# of an unorderly exit as well as to provide timely
37# updates to log files for use with tail
38try:
39 if sys.stdout.name == '<stdout>':
40 import fcntl
41 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
42 fl |= os.O_SYNC
43 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
44 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
45except:
46 pass
47
48logger = logging.getLogger("BitBake")
49
50worker_pipe = sys.stdout.fileno()
51bb.utils.nonblockingfd(worker_pipe)
52# Need to guard against multiprocessing being used in child processes
53# and multiple processes trying to write to the parent at the same time
54worker_pipe_lock = None
55
56handler = bb.event.LogHandler()
57logger.addHandler(handler)
58
59if 0:
60 # Code to write out a log file of all events passing through the worker
61 logfilename = "/tmp/workerlogfile"
62 format_str = "%(levelname)s: %(message)s"
63 conlogformat = bb.msg.BBLogFormatter(format_str)
64 consolelog = logging.FileHandler(logfilename)
65 bb.msg.addDefaultlogFilter(consolelog)
66 consolelog.setFormatter(conlogformat)
67 logger.addHandler(consolelog)
68
69worker_queue = queue.Queue()
70
71def worker_fire(event, d):
72 data = b"<event>" + pickle.dumps(event) + b"</event>"
73 worker_fire_prepickled(data)
74
75def worker_fire_prepickled(event):
76 global worker_queue
77
78 worker_queue.put(event)
79
80#
81# We can end up with write contention with the cooker, it can be trying to send commands
82# and we can be trying to send event data back. Therefore use a separate thread for writing
83# back data to cooker.
84#
85worker_thread_exit = False
86
87def worker_flush(worker_queue):
88 worker_queue_int = b""
89 global worker_pipe, worker_thread_exit
90
91 while True:
92 try:
93 worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
94 except queue.Empty:
95 pass
96 while (worker_queue_int or not worker_queue.empty()):
97 try:
98 (_, ready, _) = select.select([], [worker_pipe], [], 1)
99 if not worker_queue.empty():
100 worker_queue_int = worker_queue_int + worker_queue.get()
101 written = os.write(worker_pipe, worker_queue_int)
102 worker_queue_int = worker_queue_int[written:]
103 except (IOError, OSError) as e:
104 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
105 raise
106 if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
107 return
108
109worker_thread = Thread(target=worker_flush, args=(worker_queue,))
110worker_thread.start()
111
112def worker_child_fire(event, d):
113 global worker_pipe
114 global worker_pipe_lock
115
116 data = b"<event>" + pickle.dumps(event) + b"</event>"
117 try:
118 worker_pipe_lock.acquire()
119 worker_pipe.write(data)
120 worker_pipe_lock.release()
121 except IOError:
122 sigterm_handler(None, None)
123 raise
124
125bb.event.worker_fire = worker_fire
126
127lf = None
128#lf = open("/tmp/workercommandlog", "w+")
129def workerlog_write(msg):
130 if lf:
131 lf.write(msg)
132 lf.flush()
133
134def sigterm_handler(signum, frame):
135 signal.signal(signal.SIGTERM, signal.SIG_DFL)
136 os.killpg(0, signal.SIGTERM)
137 sys.exit()
138
139def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
140 # We need to setup the environment BEFORE the fork, since
141 # a fork() or exec*() activates PSEUDO...
142
143 envbackup = {}
144 fakeenv = {}
145 umask = None
146
147 taskdep = workerdata["taskdeps"][fn]
148 if 'umask' in taskdep and taskname in taskdep['umask']:
149 # umask might come in as a number or text string..
150 try:
151 umask = int(taskdep['umask'][taskname],8)
152 except TypeError:
153 umask = taskdep['umask'][taskname]
154
155 dry_run = cfg.dry_run or dry_run_exec
156
157 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
158 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
159 envvars = (workerdata["fakerootenv"][fn] or "").split()
160 for key, value in (var.split('=') for var in envvars):
161 envbackup[key] = os.environ.get(key)
162 os.environ[key] = value
163 fakeenv[key] = value
164
165 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
166 for p in fakedirs:
167 bb.utils.mkdirhier(p)
168 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
169 (fn, taskname, ', '.join(fakedirs)))
170 else:
171 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
172 for key, value in (var.split('=') for var in envvars):
173 envbackup[key] = os.environ.get(key)
174 os.environ[key] = value
175 fakeenv[key] = value
176
177 sys.stdout.flush()
178 sys.stderr.flush()
179
180 try:
181 pipein, pipeout = os.pipe()
182 pipein = os.fdopen(pipein, 'rb', 4096)
183 pipeout = os.fdopen(pipeout, 'wb', 0)
184 pid = os.fork()
185 except OSError as e:
186 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
187 sys.exit(1)
188
189 if pid == 0:
190 def child():
191 global worker_pipe
192 global worker_pipe_lock
193 pipein.close()
194
195 signal.signal(signal.SIGTERM, sigterm_handler)
196 # Let SIGHUP exit as SIGTERM
197 signal.signal(signal.SIGHUP, sigterm_handler)
198 bb.utils.signal_on_parent_exit("SIGTERM")
199
200 # Save out the PID so that the event can include it the
201 # events
202 bb.event.worker_pid = os.getpid()
203 bb.event.worker_fire = worker_child_fire
204 worker_pipe = pipeout
205 worker_pipe_lock = Lock()
206
207 # Make the child the process group leader and ensure no
208 # child process will be controlled by the current terminal
209 # This ensures signals sent to the controlling terminal like Ctrl+C
210 # don't stop the child processes.
211 os.setsid()
212 # No stdin
213 newsi = os.open(os.devnull, os.O_RDWR)
214 os.dup2(newsi, sys.stdin.fileno())
215
216 if umask:
217 os.umask(umask)
218
219 try:
220 bb_cache = bb.cache.NoCache(databuilder)
221 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
222 the_data = databuilder.mcdata[mc]
223 the_data.setVar("BB_WORKERCONTEXT", "1")
224 the_data.setVar("BB_TASKDEPDATA", taskdepdata)
225 if cfg.limited_deps:
226 the_data.setVar("BB_LIMITEDDEPS", "1")
227 the_data.setVar("BUILDNAME", workerdata["buildname"])
228 the_data.setVar("DATE", workerdata["date"])
229 the_data.setVar("TIME", workerdata["time"])
230 for varname, value in extraconfigdata.items():
231 the_data.setVar(varname, value)
232
233 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
234 ret = 0
235
236 the_data = bb_cache.loadDataFull(fn, appends)
237 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
238
239 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
240
241 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
242 # successfully. We also need to unset anything from the environment which shouldn't be there
243 exports = bb.data.exported_vars(the_data)
244
245 bb.utils.empty_environment()
246 for e, v in exports:
247 os.environ[e] = v
248
249 for e in fakeenv:
250 os.environ[e] = fakeenv[e]
251 the_data.setVar(e, fakeenv[e])
252 the_data.setVarFlag(e, 'export', "1")
253
254 task_exports = the_data.getVarFlag(taskname, 'exports')
255 if task_exports:
256 for e in task_exports.split():
257 the_data.setVarFlag(e, 'export', '1')
258 v = the_data.getVar(e)
259 if v is not None:
260 os.environ[e] = v
261
262 if quieterrors:
263 the_data.setVarFlag(taskname, "quieterrors", "1")
264
265 except Exception:
266 if not quieterrors:
267 logger.critical(traceback.format_exc())
268 os._exit(1)
269 try:
270 if dry_run:
271 return 0
272 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
273 except:
274 os._exit(1)
275 if not profiling:
276 os._exit(child())
277 else:
278 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
279 prof = profile.Profile()
280 try:
281 ret = profile.Profile.runcall(prof, child)
282 finally:
283 prof.dump_stats(profname)
284 bb.utils.process_profilelog(profname)
285 os._exit(ret)
286 else:
287 for key, value in iter(envbackup.items()):
288 if value is None:
289 del os.environ[key]
290 else:
291 os.environ[key] = value
292
293 return pid, pipein, pipeout
294
295class runQueueWorkerPipe():
296 """
297 Abstraction for a pipe between a worker thread and the worker server
298 """
299 def __init__(self, pipein, pipeout):
300 self.input = pipein
301 if pipeout:
302 pipeout.close()
303 bb.utils.nonblockingfd(self.input)
304 self.queue = b""
305
306 def read(self):
307 start = len(self.queue)
308 try:
309 self.queue = self.queue + (self.input.read(102400) or b"")
310 except (OSError, IOError) as e:
311 if e.errno != errno.EAGAIN:
312 raise
313
314 end = len(self.queue)
315 index = self.queue.find(b"</event>")
316 while index != -1:
317 worker_fire_prepickled(self.queue[:index+8])
318 self.queue = self.queue[index+8:]
319 index = self.queue.find(b"</event>")
320 return (end > start)
321
322 def close(self):
323 while self.read():
324 continue
325 if len(self.queue) > 0:
326 print("Warning, worker child left partial message: %s" % self.queue)
327 self.input.close()
328
329normalexit = False
330
331class BitbakeWorker(object):
332 def __init__(self, din):
333 self.input = din
334 bb.utils.nonblockingfd(self.input)
335 self.queue = b""
336 self.cookercfg = None
337 self.databuilder = None
338 self.data = None
339 self.extraconfigdata = None
340 self.build_pids = {}
341 self.build_pipes = {}
342
343 signal.signal(signal.SIGTERM, self.sigterm_exception)
344 # Let SIGHUP exit as SIGTERM
345 signal.signal(signal.SIGHUP, self.sigterm_exception)
346 if "beef" in sys.argv[1]:
347 bb.utils.set_process_name("Worker (Fakeroot)")
348 else:
349 bb.utils.set_process_name("Worker")
350
351 def sigterm_exception(self, signum, stackframe):
352 if signum == signal.SIGTERM:
353 bb.warn("Worker received SIGTERM, shutting down...")
354 elif signum == signal.SIGHUP:
355 bb.warn("Worker received SIGHUP, shutting down...")
356 self.handle_finishnow(None)
357 signal.signal(signal.SIGTERM, signal.SIG_DFL)
358 os.kill(os.getpid(), signal.SIGTERM)
359
360 def serve(self):
361 while True:
362 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
363 if self.input in ready:
364 try:
365 r = self.input.read()
366 if len(r) == 0:
367 # EOF on pipe, server must have terminated
368 self.sigterm_exception(signal.SIGTERM, None)
369 self.queue = self.queue + r
370 except (OSError, IOError):
371 pass
372 if len(self.queue):
373 self.handle_item(b"cookerconfig", self.handle_cookercfg)
374 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
375 self.handle_item(b"workerdata", self.handle_workerdata)
376 self.handle_item(b"runtask", self.handle_runtask)
377 self.handle_item(b"finishnow", self.handle_finishnow)
378 self.handle_item(b"ping", self.handle_ping)
379 self.handle_item(b"quit", self.handle_quit)
380
381 for pipe in self.build_pipes:
382 if self.build_pipes[pipe].input in ready:
383 self.build_pipes[pipe].read()
384 if len(self.build_pids):
385 while self.process_waitpid():
386 continue
387
388
389 def handle_item(self, item, func):
390 if self.queue.startswith(b"<" + item + b">"):
391 index = self.queue.find(b"</" + item + b">")
392 while index != -1:
393 func(self.queue[(len(item) + 2):index])
394 self.queue = self.queue[(index + len(item) + 3):]
395 index = self.queue.find(b"</" + item + b">")
396
397 def handle_cookercfg(self, data):
398 self.cookercfg = pickle.loads(data)
399 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
400 self.databuilder.parseBaseConfiguration()
401 self.data = self.databuilder.data
402
403 def handle_extraconfigdata(self, data):
404 self.extraconfigdata = pickle.loads(data)
405
406 def handle_workerdata(self, data):
407 self.workerdata = pickle.loads(data)
408 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
409 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
410 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
411 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
412 for mc in self.databuilder.mcdata:
413 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
414
415 def handle_ping(self, _):
416 workerlog_write("Handling ping\n")
417
418 logger.warning("Pong from bitbake-worker!")
419
420 def handle_quit(self, data):
421 workerlog_write("Handling quit\n")
422
423 global normalexit
424 normalexit = True
425 sys.exit(0)
426
427 def handle_runtask(self, data):
428 fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
429 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
430
431 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
432
433 self.build_pids[pid] = task
434 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
435
436 def process_waitpid(self):
437 """
438 Return none is there are no processes awaiting result collection, otherwise
439 collect the process exit codes and close the information pipe.
440 """
441 try:
442 pid, status = os.waitpid(-1, os.WNOHANG)
443 if pid == 0 or os.WIFSTOPPED(status):
444 return False
445 except OSError:
446 return False
447
448 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
449
450 if os.WIFEXITED(status):
451 status = os.WEXITSTATUS(status)
452 elif os.WIFSIGNALED(status):
453 # Per shell conventions for $?, when a process exits due to
454 # a signal, we return an exit code of 128 + SIGNUM
455 status = 128 + os.WTERMSIG(status)
456
457 task = self.build_pids[pid]
458 del self.build_pids[pid]
459
460 self.build_pipes[pid].close()
461 del self.build_pipes[pid]
462
463 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
464
465 return True
466
467 def handle_finishnow(self, _):
468 if self.build_pids:
469 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
470 for k, v in iter(self.build_pids.items()):
471 try:
472 os.kill(-k, signal.SIGTERM)
473 os.waitpid(-1, 0)
474 except:
475 pass
476 for pipe in self.build_pipes:
477 self.build_pipes[pipe].read()
478
479try:
480 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
481 if not profiling:
482 worker.serve()
483 else:
484 profname = "profile-worker.log"
485 prof = profile.Profile()
486 try:
487 profile.Profile.runcall(prof, worker.serve)
488 finally:
489 prof.dump_stats(profname)
490 bb.utils.process_profilelog(profname)
491except BaseException as e:
492 if not normalexit:
493 import traceback
494 sys.stderr.write(traceback.format_exc())
495 sys.stderr.write(str(e))
496
497worker_thread_exit = True
498worker_thread.join()
499
500workerlog_write("exitting")
501sys.exit(0)