blob: 383c18323506badb6912fd4620b3d1a10b17b035 [file] [log] [blame]
rjw1f884582022-01-06 17:20:42 +08001#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38import pickle
39from multiprocessing import Process
40
41bblogger = logging.getLogger("BitBake")
42logger = logging.getLogger("BitBake.RunQueue")
43
44__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
45
46def fn_from_tid(tid):
47 return tid.rsplit(":", 1)[0]
48
49def taskname_from_tid(tid):
50 return tid.rsplit(":", 1)[1]
51
52def mc_from_tid(tid):
53 if tid.startswith('multiconfig:'):
54 return tid.split(':')[1]
55 return ""
56
57def split_tid(tid):
58 (mc, fn, taskname, _) = split_tid_mcfn(tid)
59 return (mc, fn, taskname)
60
61def split_tid_mcfn(tid):
62 if tid.startswith('multiconfig:'):
63 elems = tid.split(':')
64 mc = elems[1]
65 fn = ":".join(elems[2:-1])
66 taskname = elems[-1]
67 mcfn = "multiconfig:" + mc + ":" + fn
68 else:
69 tid = tid.rsplit(":", 1)
70 mc = ""
71 fn = tid[0]
72 taskname = tid[1]
73 mcfn = fn
74
75 return (mc, fn, taskname, mcfn)
76
77def build_tid(mc, fn, taskname):
78 if mc:
79 return "multiconfig:" + mc + ":" + fn + ":" + taskname
80 return fn + ":" + taskname
81
82class RunQueueStats:
83 """
84 Holds statistics on the tasks handled by the associated runQueue
85 """
86 def __init__(self, total):
87 self.completed = 0
88 self.skipped = 0
89 self.failed = 0
90 self.active = 0
91 self.total = total
92
93 def copy(self):
94 obj = self.__class__(self.total)
95 obj.__dict__.update(self.__dict__)
96 return obj
97
98 def taskFailed(self):
99 self.active = self.active - 1
100 self.failed = self.failed + 1
101
102 def taskCompleted(self):
103 self.active = self.active - 1
104 self.completed = self.completed + 1
105
106 def taskSkipped(self):
107 self.active = self.active + 1
108 self.skipped = self.skipped + 1
109
110 def taskActive(self):
111 self.active = self.active + 1
112
113# These values indicate the next step due to be run in the
114# runQueue state machine
115runQueuePrepare = 2
116runQueueSceneInit = 3
117runQueueSceneRun = 4
118runQueueRunInit = 5
119runQueueRunning = 6
120runQueueFailed = 7
121runQueueCleanUp = 8
122runQueueComplete = 9
123
124class RunQueueScheduler(object):
125 """
126 Control the order tasks are scheduled in.
127 """
128 name = "basic"
129
130 def __init__(self, runqueue, rqdata):
131 """
132 The default scheduler just returns the first buildable task (the
133 priority map is sorted by task number)
134 """
135 self.rq = runqueue
136 self.rqdata = rqdata
137 self.numTasks = len(self.rqdata.runtaskentries)
138
139 self.prio_map = [self.rqdata.runtaskentries.keys()]
140
141 self.buildable = []
142 self.skip_maxthread = {}
143 self.stamps = {}
144 for tid in self.rqdata.runtaskentries:
145 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
146 self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
147 if tid in self.rq.runq_buildable:
148 self.buildable.append(tid)
149
150 self.rev_prio_map = None
151
152 def next_buildable_task(self):
153 """
154 Return the id of the first task we find that is buildable
155 """
156 self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
157 if not self.buildable:
158 return None
159
160 # Filter out tasks that have a max number of threads that have been exceeded
161 skip_buildable = {}
162 for running in self.rq.runq_running.difference(self.rq.runq_complete):
163 rtaskname = taskname_from_tid(running)
164 if rtaskname not in self.skip_maxthread:
165 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
166 if not self.skip_maxthread[rtaskname]:
167 continue
168 if rtaskname in skip_buildable:
169 skip_buildable[rtaskname] += 1
170 else:
171 skip_buildable[rtaskname] = 1
172
173 if len(self.buildable) == 1:
174 tid = self.buildable[0]
175 taskname = taskname_from_tid(tid)
176 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
177 return None
178 stamp = self.stamps[tid]
179 if stamp not in self.rq.build_stamps.values():
180 return tid
181
182 if not self.rev_prio_map:
183 self.rev_prio_map = {}
184 for tid in self.rqdata.runtaskentries:
185 self.rev_prio_map[tid] = self.prio_map.index(tid)
186
187 best = None
188 bestprio = None
189 for tid in self.buildable:
190 taskname = taskname_from_tid(tid)
191 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
192 continue
193 prio = self.rev_prio_map[tid]
194 if bestprio is None or bestprio > prio:
195 stamp = self.stamps[tid]
196 if stamp in self.rq.build_stamps.values():
197 continue
198 bestprio = prio
199 best = tid
200
201 return best
202
203 def next(self):
204 """
205 Return the id of the task we should build next
206 """
207 if self.rq.can_start_task():
208 return self.next_buildable_task()
209
210 def newbuildable(self, task):
211 self.buildable.append(task)
212
213 def describe_task(self, taskid):
214 result = 'ID %s' % taskid
215 if self.rev_prio_map:
216 result = result + (' pri %d' % self.rev_prio_map[taskid])
217 return result
218
219 def dump_prio(self, comment):
220 bb.debug(3, '%s (most important first):\n%s' %
221 (comment,
222 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
223 index, taskid in enumerate(self.prio_map)])))
224
225class RunQueueSchedulerSpeed(RunQueueScheduler):
226 """
227 A scheduler optimised for speed. The priority map is sorted by task weight,
228 heavier weighted tasks (tasks needed by the most other tasks) are run first.
229 """
230 name = "speed"
231
232 def __init__(self, runqueue, rqdata):
233 """
234 The priority map is sorted by task weight.
235 """
236 RunQueueScheduler.__init__(self, runqueue, rqdata)
237
238 weights = {}
239 for tid in self.rqdata.runtaskentries:
240 weight = self.rqdata.runtaskentries[tid].weight
241 if not weight in weights:
242 weights[weight] = []
243 weights[weight].append(tid)
244
245 self.prio_map = []
246 for weight in sorted(weights):
247 for w in weights[weight]:
248 self.prio_map.append(w)
249
250 self.prio_map.reverse()
251
252class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
253 """
254 A scheduler optimised to complete .bb files as quickly as possible. The
255 priority map is sorted by task weight, but then reordered so once a given
256 .bb file starts to build, it's completed as quickly as possible by
257 running all tasks related to the same .bb file one after the after.
258 This works well where disk space is at a premium and classes like OE's
259 rm_work are in force.
260 """
261 name = "completion"
262
263 def __init__(self, runqueue, rqdata):
264 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
265
266 # Extract list of tasks for each recipe, with tasks sorted
267 # ascending from "must run first" (typically do_fetch) to
268 # "runs last" (do_build). The speed scheduler prioritizes
269 # tasks that must run first before the ones that run later;
270 # this is what we depend on here.
271 task_lists = {}
272 for taskid in self.prio_map:
273 fn, taskname = taskid.rsplit(':', 1)
274 task_lists.setdefault(fn, []).append(taskname)
275
276 # Now unify the different task lists. The strategy is that
277 # common tasks get skipped and new ones get inserted after the
278 # preceeding common one(s) as they are found. Because task
279 # lists should differ only by their number of tasks, but not
280 # the ordering of the common tasks, this should result in a
281 # deterministic result that is a superset of the individual
282 # task ordering.
283 all_tasks = []
284 for recipe, new_tasks in task_lists.items():
285 index = 0
286 old_task = all_tasks[index] if index < len(all_tasks) else None
287 for new_task in new_tasks:
288 if old_task == new_task:
289 # Common task, skip it. This is the fast-path which
290 # avoids a full search.
291 index += 1
292 old_task = all_tasks[index] if index < len(all_tasks) else None
293 else:
294 try:
295 index = all_tasks.index(new_task)
296 # Already present, just not at the current
297 # place. We re-synchronized by changing the
298 # index so that it matches again. Now
299 # move on to the next existing task.
300 index += 1
301 old_task = all_tasks[index] if index < len(all_tasks) else None
302 except ValueError:
303 # Not present. Insert before old_task, which
304 # remains the same (but gets shifted back).
305 all_tasks.insert(index, new_task)
306 index += 1
307 bb.debug(3, 'merged task list: %s' % all_tasks)
308
309 # Now reverse the order so that tasks that finish the work on one
310 # recipe are considered more imporant (= come first). The ordering
311 # is now so that do_build is most important.
312 all_tasks.reverse()
313
314 # Group tasks of the same kind before tasks of less important
315 # kinds at the head of the queue (because earlier = lower
316 # priority number = runs earlier), while preserving the
317 # ordering by recipe. If recipe foo is more important than
318 # bar, then the goal is to work on foo's do_populate_sysroot
319 # before bar's do_populate_sysroot and on the more important
320 # tasks of foo before any of the less important tasks in any
321 # other recipe (if those other recipes are more important than
322 # foo).
323 #
324 # All of this only applies when tasks are runable. Explicit
325 # dependencies still override this ordering by priority.
326 #
327 # Here's an example why this priority re-ordering helps with
328 # minimizing disk usage. Consider a recipe foo with a higher
329 # priority than bar where foo DEPENDS on bar. Then the
330 # implicit rule (from base.bbclass) is that foo's do_configure
331 # depends on bar's do_populate_sysroot. This ensures that
332 # bar's do_populate_sysroot gets done first. Normally the
333 # tasks from foo would continue to run once that is done, and
334 # bar only gets completed and cleaned up later. By ordering
335 # bar's task that depend on bar's do_populate_sysroot before foo's
336 # do_configure, that problem gets avoided.
337 task_index = 0
338 self.dump_prio('original priorities')
339 for task in all_tasks:
340 for index in range(task_index, self.numTasks):
341 taskid = self.prio_map[index]
342 taskname = taskid.rsplit(':', 1)[1]
343 if taskname == task:
344 del self.prio_map[index]
345 self.prio_map.insert(task_index, taskid)
346 task_index += 1
347 self.dump_prio('completion priorities')
348
349class RunTaskEntry(object):
350 def __init__(self):
351 self.depends = set()
352 self.revdeps = set()
353 self.hash = None
354 self.task = None
355 self.weight = 1
356
357class RunQueueData:
358 """
359 BitBake Run Queue implementation
360 """
361 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
362 self.cooker = cooker
363 self.dataCaches = dataCaches
364 self.taskData = taskData
365 self.targets = targets
366 self.rq = rq
367 self.warn_multi_bb = False
368
369 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or ""
370 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split()
371 self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData)
372 self.setscenewhitelist_checked = False
373 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
374 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
375
376 self.reset()
377
378 def reset(self):
379 self.runtaskentries = {}
380
381 def runq_depends_names(self, ids):
382 import re
383 ret = []
384 for id in ids:
385 nam = os.path.basename(id)
386 nam = re.sub("_[^,]*,", ",", nam)
387 ret.extend([nam])
388 return ret
389
390 def get_task_hash(self, tid):
391 return self.runtaskentries[tid].hash
392
393 def get_user_idstring(self, tid, task_name_suffix = ""):
394 return tid + task_name_suffix
395
396 def get_short_user_idstring(self, task, task_name_suffix = ""):
397 (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
398 pn = self.dataCaches[mc].pkg_fn[taskfn]
399 taskname = taskname_from_tid(task) + task_name_suffix
400 return "%s:%s" % (pn, taskname)
401
402 def circular_depchains_handler(self, tasks):
403 """
404 Some tasks aren't buildable, likely due to circular dependency issues.
405 Identify the circular dependencies and print them in a user readable format.
406 """
407 from copy import deepcopy
408
409 valid_chains = []
410 explored_deps = {}
411 msgs = []
412
413 class TooManyLoops(Exception):
414 pass
415
416 def chain_reorder(chain):
417 """
418 Reorder a dependency chain so the lowest task id is first
419 """
420 lowest = 0
421 new_chain = []
422 for entry in range(len(chain)):
423 if chain[entry] < chain[lowest]:
424 lowest = entry
425 new_chain.extend(chain[lowest:])
426 new_chain.extend(chain[:lowest])
427 return new_chain
428
429 def chain_compare_equal(chain1, chain2):
430 """
431 Compare two dependency chains and see if they're the same
432 """
433 if len(chain1) != len(chain2):
434 return False
435 for index in range(len(chain1)):
436 if chain1[index] != chain2[index]:
437 return False
438 return True
439
440 def chain_array_contains(chain, chain_array):
441 """
442 Return True if chain_array contains chain
443 """
444 for ch in chain_array:
445 if chain_compare_equal(ch, chain):
446 return True
447 return False
448
449 def find_chains(tid, prev_chain):
450 prev_chain.append(tid)
451 total_deps = []
452 total_deps.extend(self.runtaskentries[tid].revdeps)
453 for revdep in self.runtaskentries[tid].revdeps:
454 if revdep in prev_chain:
455 idx = prev_chain.index(revdep)
456 # To prevent duplicates, reorder the chain to start with the lowest taskid
457 # and search through an array of those we've already printed
458 chain = prev_chain[idx:]
459 new_chain = chain_reorder(chain)
460 if not chain_array_contains(new_chain, valid_chains):
461 valid_chains.append(new_chain)
462 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
463 for dep in new_chain:
464 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
465 msgs.append("\n")
466 if len(valid_chains) > 10:
467 msgs.append("Aborted dependency loops search after 10 matches.\n")
468 raise TooManyLoops
469 continue
470 scan = False
471 if revdep not in explored_deps:
472 scan = True
473 elif revdep in explored_deps[revdep]:
474 scan = True
475 else:
476 for dep in prev_chain:
477 if dep in explored_deps[revdep]:
478 scan = True
479 if scan:
480 find_chains(revdep, copy.deepcopy(prev_chain))
481 for dep in explored_deps[revdep]:
482 if dep not in total_deps:
483 total_deps.append(dep)
484
485 explored_deps[tid] = total_deps
486
487 try:
488 for task in tasks:
489 find_chains(task, [])
490 except TooManyLoops:
491 pass
492
493 return msgs
494
495 def calculate_task_weights(self, endpoints):
496 """
497 Calculate a number representing the "weight" of each task. Heavier weighted tasks
498 have more dependencies and hence should be executed sooner for maximum speed.
499
500 This function also sanity checks the task list finding tasks that are not
501 possible to execute due to circular dependencies.
502 """
503
504 numTasks = len(self.runtaskentries)
505 weight = {}
506 deps_left = {}
507 task_done = {}
508
509 for tid in self.runtaskentries:
510 task_done[tid] = False
511 weight[tid] = 1
512 deps_left[tid] = len(self.runtaskentries[tid].revdeps)
513
514 for tid in endpoints:
515 weight[tid] = 10
516 task_done[tid] = True
517
518 while True:
519 next_points = []
520 for tid in endpoints:
521 for revdep in self.runtaskentries[tid].depends:
522 weight[revdep] = weight[revdep] + weight[tid]
523 deps_left[revdep] = deps_left[revdep] - 1
524 if deps_left[revdep] == 0:
525 next_points.append(revdep)
526 task_done[revdep] = True
527 endpoints = next_points
528 if len(next_points) == 0:
529 break
530
531 # Circular dependency sanity check
532 problem_tasks = []
533 for tid in self.runtaskentries:
534 if task_done[tid] is False or deps_left[tid] != 0:
535 problem_tasks.append(tid)
536 logger.debug(2, "Task %s is not buildable", tid)
537 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
538 self.runtaskentries[tid].weight = weight[tid]
539
540 if problem_tasks:
541 message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
542 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
543 message = message + "Identifying dependency loops (this may take a short while)...\n"
544 logger.error(message)
545
546 msgs = self.circular_depchains_handler(problem_tasks)
547
548 message = "\n"
549 for msg in msgs:
550 message = message + msg
551 bb.msg.fatal("RunQueue", message)
552
553 return weight
554
555 def prepare(self):
556 """
557 Turn a set of taskData into a RunQueue and compute data needed
558 to optimise the execution order.
559 """
560
561 runq_build = {}
562 recursivetasks = {}
563 recursiveitasks = {}
564 recursivetasksselfref = set()
565
566 taskData = self.taskData
567
568 found = False
569 for mc in self.taskData:
570 if len(taskData[mc].taskentries) > 0:
571 found = True
572 break
573 if not found:
574 # Nothing to do
575 return 0
576
577 self.init_progress_reporter.start()
578 self.init_progress_reporter.next_stage()
579
580 # Step A - Work out a list of tasks to run
581 #
582 # Taskdata gives us a list of possible providers for every build and run
583 # target ordered by priority. It also gives information on each of those
584 # providers.
585 #
586 # To create the actual list of tasks to execute we fix the list of
587 # providers and then resolve the dependencies into task IDs. This
588 # process is repeated for each type of dependency (tdepends, deptask,
589 # rdeptast, recrdeptask, idepends).
590
591 def add_build_dependencies(depids, tasknames, depends, mc):
592 for depname in depids:
593 # Won't be in build_targets if ASSUME_PROVIDED
594 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
595 continue
596 depdata = taskData[mc].build_targets[depname][0]
597 if depdata is None:
598 continue
599 for taskname in tasknames:
600 t = depdata + ":" + taskname
601 if t in taskData[mc].taskentries:
602 depends.add(t)
603
604 def add_runtime_dependencies(depids, tasknames, depends, mc):
605 for depname in depids:
606 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
607 continue
608 depdata = taskData[mc].run_targets[depname][0]
609 if depdata is None:
610 continue
611 for taskname in tasknames:
612 t = depdata + ":" + taskname
613 if t in taskData[mc].taskentries:
614 depends.add(t)
615
616 def add_mc_dependencies(mc, tid):
617 mcdeps = taskData[mc].get_mcdepends()
618 for dep in mcdeps:
619 mcdependency = dep.split(':')
620 pn = mcdependency[3]
621 frommc = mcdependency[1]
622 mcdep = mcdependency[2]
623 deptask = mcdependency[4]
624 if mc == frommc:
625 fn = taskData[mcdep].build_targets[pn][0]
626 newdep = '%s:%s' % (fn,deptask)
627 taskData[mc].taskentries[tid].tdepends.append(newdep)
628
629 for mc in taskData:
630 for tid in taskData[mc].taskentries:
631
632 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
633 #runtid = build_tid(mc, fn, taskname)
634
635 #logger.debug(2, "Processing %s,%s:%s", mc, fn, taskname)
636
637 depends = set()
638 task_deps = self.dataCaches[mc].task_deps[taskfn]
639
640 self.runtaskentries[tid] = RunTaskEntry()
641
642 if fn in taskData[mc].failed_fns:
643 continue
644
645 # We add multiconfig dependencies before processing internal task deps (tdepends)
646 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
647 add_mc_dependencies(mc, tid)
648
649 # Resolve task internal dependencies
650 #
651 # e.g. addtask before X after Y
652 for t in taskData[mc].taskentries[tid].tdepends:
653 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
654 depends.add(build_tid(depmc, depfn, deptaskname))
655
656 # Resolve 'deptask' dependencies
657 #
658 # e.g. do_sometask[deptask] = "do_someothertask"
659 # (makes sure sometask runs after someothertask of all DEPENDS)
660 if 'deptask' in task_deps and taskname in task_deps['deptask']:
661 tasknames = task_deps['deptask'][taskname].split()
662 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
663
664 # Resolve 'rdeptask' dependencies
665 #
666 # e.g. do_sometask[rdeptask] = "do_someothertask"
667 # (makes sure sometask runs after someothertask of all RDEPENDS)
668 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
669 tasknames = task_deps['rdeptask'][taskname].split()
670 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
671
672 # Resolve inter-task dependencies
673 #
674 # e.g. do_sometask[depends] = "targetname:do_someothertask"
675 # (makes sure sometask runs after targetname's someothertask)
676 idepends = taskData[mc].taskentries[tid].idepends
677 for (depname, idependtask) in idepends:
678 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
679 # Won't be in build_targets if ASSUME_PROVIDED
680 depdata = taskData[mc].build_targets[depname][0]
681 if depdata is not None:
682 t = depdata + ":" + idependtask
683 depends.add(t)
684 if t not in taskData[mc].taskentries:
685 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
686 irdepends = taskData[mc].taskentries[tid].irdepends
687 for (depname, idependtask) in irdepends:
688 if depname in taskData[mc].run_targets:
689 # Won't be in run_targets if ASSUME_PROVIDED
690 if not taskData[mc].run_targets[depname]:
691 continue
692 depdata = taskData[mc].run_targets[depname][0]
693 if depdata is not None:
694 t = depdata + ":" + idependtask
695 depends.add(t)
696 if t not in taskData[mc].taskentries:
697 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
698
699 # Resolve recursive 'recrdeptask' dependencies (Part A)
700 #
701 # e.g. do_sometask[recrdeptask] = "do_someothertask"
702 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
703 # We cover the recursive part of the dependencies below
704 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
705 tasknames = task_deps['recrdeptask'][taskname].split()
706 recursivetasks[tid] = tasknames
707 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
708 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
709 if taskname in tasknames:
710 recursivetasksselfref.add(tid)
711
712 if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
713 recursiveitasks[tid] = []
714 for t in task_deps['recideptask'][taskname].split():
715 newdep = build_tid(mc, fn, t)
716 recursiveitasks[tid].append(newdep)
717
718 self.runtaskentries[tid].depends = depends
719 # Remove all self references
720 self.runtaskentries[tid].depends.discard(tid)
721
722 #self.dump_data()
723
724 self.init_progress_reporter.next_stage()
725
726 # Resolve recursive 'recrdeptask' dependencies (Part B)
727 #
728 # e.g. do_sometask[recrdeptask] = "do_someothertask"
729 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
730 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
731
732 # Generating/interating recursive lists of dependencies is painful and potentially slow
733 # Precompute recursive task dependencies here by:
734 # a) create a temp list of reverse dependencies (revdeps)
735 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
736 # c) combine the total list of dependencies in cumulativedeps
737 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
738
739
740 revdeps = {}
741 deps = {}
742 cumulativedeps = {}
743 for tid in self.runtaskentries:
744 deps[tid] = set(self.runtaskentries[tid].depends)
745 revdeps[tid] = set()
746 cumulativedeps[tid] = set()
747 # Generate a temp list of reverse dependencies
748 for tid in self.runtaskentries:
749 for dep in self.runtaskentries[tid].depends:
750 revdeps[dep].add(tid)
751 # Find the dependency chain endpoints
752 endpoints = set()
753 for tid in self.runtaskentries:
754 if len(deps[tid]) == 0:
755 endpoints.add(tid)
756 # Iterate the chains collating dependencies
757 while endpoints:
758 next = set()
759 for tid in endpoints:
760 for dep in revdeps[tid]:
761 cumulativedeps[dep].add(fn_from_tid(tid))
762 cumulativedeps[dep].update(cumulativedeps[tid])
763 if tid in deps[dep]:
764 deps[dep].remove(tid)
765 if len(deps[dep]) == 0:
766 next.add(dep)
767 endpoints = next
768 #for tid in deps:
769 # if len(deps[tid]) != 0:
770 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
771
772 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
773 # resolve these recursively until we aren't adding any further extra dependencies
774 extradeps = True
775 while extradeps:
776 extradeps = 0
777 for tid in recursivetasks:
778 tasknames = recursivetasks[tid]
779
780 totaldeps = set(self.runtaskentries[tid].depends)
781 if tid in recursiveitasks:
782 totaldeps.update(recursiveitasks[tid])
783 for dep in recursiveitasks[tid]:
784 if dep not in self.runtaskentries:
785 continue
786 totaldeps.update(self.runtaskentries[dep].depends)
787
788 deps = set()
789 for dep in totaldeps:
790 if dep in cumulativedeps:
791 deps.update(cumulativedeps[dep])
792
793 for t in deps:
794 for taskname in tasknames:
795 newtid = t + ":" + taskname
796 if newtid == tid:
797 continue
798 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
799 extradeps += 1
800 self.runtaskentries[tid].depends.add(newtid)
801
802 # Handle recursive tasks which depend upon other recursive tasks
803 deps = set()
804 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
805 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
806 for newtid in deps:
807 for taskname in tasknames:
808 if not newtid.endswith(":" + taskname):
809 continue
810 if newtid in self.runtaskentries:
811 extradeps += 1
812 self.runtaskentries[tid].depends.add(newtid)
813
814 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
815
816 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
817 for tid in recursivetasksselfref:
818 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
819
820 self.init_progress_reporter.next_stage()
821
822 #self.dump_data()
823
824 # Step B - Mark all active tasks
825 #
826 # Start with the tasks we were asked to run and mark all dependencies
827 # as active too. If the task is to be 'forced', clear its stamp. Once
828 # all active tasks are marked, prune the ones we don't need.
829
830 logger.verbose("Marking Active Tasks")
831
832 def mark_active(tid, depth):
833 """
834 Mark an item as active along with its depends
835 (calls itself recursively)
836 """
837
838 if tid in runq_build:
839 return
840
841 runq_build[tid] = 1
842
843 depends = self.runtaskentries[tid].depends
844 for depend in depends:
845 mark_active(depend, depth+1)
846
847 self.target_tids = []
848 for (mc, target, task, fn) in self.targets:
849
850 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
851 continue
852
853 if target in taskData[mc].failed_deps:
854 continue
855
856 parents = False
857 if task.endswith('-'):
858 parents = True
859 task = task[:-1]
860
861 if fn in taskData[mc].failed_fns:
862 continue
863
864 # fn already has mc prefix
865 tid = fn + ":" + task
866 self.target_tids.append(tid)
867 if tid not in taskData[mc].taskentries:
868 import difflib
869 tasks = []
870 for x in taskData[mc].taskentries:
871 if x.startswith(fn + ":"):
872 tasks.append(taskname_from_tid(x))
873 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
874 if close_matches:
875 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
876 else:
877 extra = ""
878 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
879
880 # For tasks called "XXXX-", ony run their dependencies
881 if parents:
882 for i in self.runtaskentries[tid].depends:
883 mark_active(i, 1)
884 else:
885 mark_active(tid, 1)
886
887 self.init_progress_reporter.next_stage()
888
889 # Step C - Prune all inactive tasks
890 #
891 # Once all active tasks are marked, prune the ones we don't need.
892
893 delcount = {}
894 for tid in list(self.runtaskentries.keys()):
895 if tid not in runq_build:
896 delcount[tid] = self.runtaskentries[tid]
897 del self.runtaskentries[tid]
898
899 # Handle --runall
900 if self.cooker.configuration.runall:
901 # re-run the mark_active and then drop unused tasks from new list
902 runq_build = {}
903
904 for task in self.cooker.configuration.runall:
905 runall_tids = set()
906 for tid in list(self.runtaskentries):
907 wanttid = fn_from_tid(tid) + ":do_%s" % task
908 if wanttid in delcount:
909 self.runtaskentries[wanttid] = delcount[wanttid]
910 if wanttid in self.runtaskentries:
911 runall_tids.add(wanttid)
912
913 for tid in list(runall_tids):
914 mark_active(tid,1)
915
916 for tid in list(self.runtaskentries.keys()):
917 if tid not in runq_build:
918 delcount[tid] = self.runtaskentries[tid]
919 del self.runtaskentries[tid]
920
921 if len(self.runtaskentries) == 0:
922 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
923
924 self.init_progress_reporter.next_stage()
925
926 # Handle runonly
927 if self.cooker.configuration.runonly:
928 # re-run the mark_active and then drop unused tasks from new list
929 runq_build = {}
930
931 for task in self.cooker.configuration.runonly:
932 runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == "do_%s" % task }
933
934 for tid in list(runonly_tids):
935 mark_active(tid,1)
936
937 for tid in list(self.runtaskentries.keys()):
938 if tid not in runq_build:
939 delcount[tid] = self.runtaskentries[tid]
940 del self.runtaskentries[tid]
941
942 if len(self.runtaskentries) == 0:
943 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
944
945 #
946 # Step D - Sanity checks and computation
947 #
948
949 # Check to make sure we still have tasks to run
950 if len(self.runtaskentries) == 0:
951 if not taskData[''].abort:
952 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
953 else:
954 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
955
956 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
957
958 logger.verbose("Assign Weightings")
959
960 self.init_progress_reporter.next_stage()
961
962 # Generate a list of reverse dependencies to ease future calculations
963 for tid in self.runtaskentries:
964 for dep in self.runtaskentries[tid].depends:
965 self.runtaskentries[dep].revdeps.add(tid)
966
967 self.init_progress_reporter.next_stage()
968
969 # Identify tasks at the end of dependency chains
970 # Error on circular dependency loops (length two)
971 endpoints = []
972 for tid in self.runtaskentries:
973 revdeps = self.runtaskentries[tid].revdeps
974 if len(revdeps) == 0:
975 endpoints.append(tid)
976 for dep in revdeps:
977 if dep in self.runtaskentries[tid].depends:
978 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
979
980
981 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
982
983 self.init_progress_reporter.next_stage()
984
985 # Calculate task weights
986 # Check of higher length circular dependencies
987 self.runq_weight = self.calculate_task_weights(endpoints)
988
989 self.init_progress_reporter.next_stage()
990
991 # Sanity Check - Check for multiple tasks building the same provider
992 for mc in self.dataCaches:
993 prov_list = {}
994 seen_fn = []
995 for tid in self.runtaskentries:
996 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
997 if taskfn in seen_fn:
998 continue
999 if mc != tidmc:
1000 continue
1001 seen_fn.append(taskfn)
1002 for prov in self.dataCaches[mc].fn_provides[taskfn]:
1003 if prov not in prov_list:
1004 prov_list[prov] = [taskfn]
1005 elif taskfn not in prov_list[prov]:
1006 prov_list[prov].append(taskfn)
1007 for prov in prov_list:
1008 if len(prov_list[prov]) < 2:
1009 continue
1010 if prov in self.multi_provider_whitelist:
1011 continue
1012 seen_pn = []
1013 # If two versions of the same PN are being built its fatal, we don't support it.
1014 for fn in prov_list[prov]:
1015 pn = self.dataCaches[mc].pkg_fn[fn]
1016 if pn not in seen_pn:
1017 seen_pn.append(pn)
1018 else:
1019 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1020 msg = "Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))
1021 #
1022 # Construct a list of things which uniquely depend on each provider
1023 # since this may help the user figure out which dependency is triggering this warning
1024 #
1025 msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from."
1026 deplist = {}
1027 commondeps = None
1028 for provfn in prov_list[prov]:
1029 deps = set()
1030 for tid in self.runtaskentries:
1031 fn = fn_from_tid(tid)
1032 if fn != provfn:
1033 continue
1034 for dep in self.runtaskentries[tid].revdeps:
1035 fn = fn_from_tid(dep)
1036 if fn == provfn:
1037 continue
1038 deps.add(dep)
1039 if not commondeps:
1040 commondeps = set(deps)
1041 else:
1042 commondeps &= deps
1043 deplist[provfn] = deps
1044 for provfn in deplist:
1045 msg += "\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))
1046 #
1047 # Construct a list of provides and runtime providers for each recipe
1048 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1049 #
1050 msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful."
1051 provide_results = {}
1052 rprovide_results = {}
1053 commonprovs = None
1054 commonrprovs = None
1055 for provfn in prov_list[prov]:
1056 provides = set(self.dataCaches[mc].fn_provides[provfn])
1057 rprovides = set()
1058 for rprovide in self.dataCaches[mc].rproviders:
1059 if provfn in self.dataCaches[mc].rproviders[rprovide]:
1060 rprovides.add(rprovide)
1061 for package in self.dataCaches[mc].packages:
1062 if provfn in self.dataCaches[mc].packages[package]:
1063 rprovides.add(package)
1064 for package in self.dataCaches[mc].packages_dynamic:
1065 if provfn in self.dataCaches[mc].packages_dynamic[package]:
1066 rprovides.add(package)
1067 if not commonprovs:
1068 commonprovs = set(provides)
1069 else:
1070 commonprovs &= provides
1071 provide_results[provfn] = provides
1072 if not commonrprovs:
1073 commonrprovs = set(rprovides)
1074 else:
1075 commonrprovs &= rprovides
1076 rprovide_results[provfn] = rprovides
1077 #msg += "\nCommon provides:\n %s" % ("\n ".join(commonprovs))
1078 #msg += "\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))
1079 for provfn in prov_list[prov]:
1080 msg += "\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))
1081 msg += "\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))
1082
1083 if self.warn_multi_bb:
1084 logger.verbnote(msg)
1085 else:
1086 logger.error(msg)
1087
1088 self.init_progress_reporter.next_stage()
1089
1090 # Create a whitelist usable by the stamp checks
1091 self.stampfnwhitelist = {}
1092 for mc in self.taskData:
1093 self.stampfnwhitelist[mc] = []
1094 for entry in self.stampwhitelist.split():
1095 if entry not in self.taskData[mc].build_targets:
1096 continue
1097 fn = self.taskData.build_targets[entry][0]
1098 self.stampfnwhitelist[mc].append(fn)
1099
1100 self.init_progress_reporter.next_stage()
1101
1102 # Iterate over the task list looking for tasks with a 'setscene' function
1103 self.runq_setscene_tids = []
1104 if not self.cooker.configuration.nosetscene:
1105 for tid in self.runtaskentries:
1106 (mc, fn, taskname, _) = split_tid_mcfn(tid)
1107 setscenetid = tid + "_setscene"
1108 if setscenetid not in taskData[mc].taskentries:
1109 continue
1110 self.runq_setscene_tids.append(tid)
1111
1112 def invalidate_task(tid, error_nostamp):
1113 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1114 taskdep = self.dataCaches[mc].task_deps[taskfn]
1115 if fn + ":" + taskname not in taskData[mc].taskentries:
1116 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
1117 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1118 if error_nostamp:
1119 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
1120 else:
1121 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
1122 else:
1123 logger.verbose("Invalidate task %s, %s", taskname, fn)
1124 bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn)
1125
1126 self.init_progress_reporter.next_stage()
1127
1128 # Invalidate task if force mode active
1129 if self.cooker.configuration.force:
1130 for tid in self.target_tids:
1131 invalidate_task(tid, False)
1132
1133 # Invalidate task if invalidate mode active
1134 if self.cooker.configuration.invalidate_stamp:
1135 for tid in self.target_tids:
1136 fn = fn_from_tid(tid)
1137 for st in self.cooker.configuration.invalidate_stamp.split(','):
1138 if not st.startswith("do_"):
1139 st = "do_%s" % st
1140 invalidate_task(fn + ":" + st, True)
1141
1142 self.init_progress_reporter.next_stage()
1143
1144 # Create and print to the logs a virtual/xxxx -> PN (fn) table
1145 for mc in taskData:
1146 virtmap = taskData[mc].get_providermap(prefix="virtual/")
1147 virtpnmap = {}
1148 for v in virtmap:
1149 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1150 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1151 if hasattr(bb.parse.siggen, "tasks_resolved"):
1152 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1153
1154 self.init_progress_reporter.next_stage()
1155
1156 # Iterate over the task list and call into the siggen code
1157 dealtwith = set()
1158 todeal = set(self.runtaskentries)
1159 while len(todeal) > 0:
1160 for tid in todeal.copy():
1161 if len(self.runtaskentries[tid].depends - dealtwith) == 0:
1162 dealtwith.add(tid)
1163 todeal.remove(tid)
1164 procdep = []
1165 for dep in self.runtaskentries[tid].depends:
1166 procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
1167 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1168 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.dataCaches[mc])
1169 task = self.runtaskentries[tid].task
1170
1171 bb.parse.siggen.writeout_file_checksum_cache()
1172
1173 #self.dump_data()
1174 return len(self.runtaskentries)
1175
1176 def dump_data(self):
1177 """
1178 Dump some debug information on the internal data structures
1179 """
1180 logger.debug(3, "run_tasks:")
1181 for tid in self.runtaskentries:
1182 logger.debug(3, " %s: %s Deps %s RevDeps %s", tid,
1183 self.runtaskentries[tid].weight,
1184 self.runtaskentries[tid].depends,
1185 self.runtaskentries[tid].revdeps)
1186
1187class RunQueueWorker():
1188 def __init__(self, process, pipe):
1189 self.process = process
1190 self.pipe = pipe
1191
1192class RunQueue:
1193 def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1194
1195 self.cooker = cooker
1196 self.cfgData = cfgData
1197 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1198
1199 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile"
1200 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1201 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION2") or None
1202 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1203
1204 self.state = runQueuePrepare
1205
1206 # For disk space monitor
1207 # Invoked at regular time intervals via the bitbake heartbeat event
1208 # while the build is running. We generate a unique name for the handler
1209 # here, just in case that there ever is more than one RunQueue instance,
1210 # start the handler when reaching runQueueSceneRun, and stop it when
1211 # done with the build.
1212 self.dm = monitordisk.diskMonitor(cfgData)
1213 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1214 self.dm_event_handler_registered = False
1215 self.rqexe = None
1216 self.worker = {}
1217 self.fakeworker = {}
1218
1219 def _start_worker(self, mc, fakeroot = False, rqexec = None):
1220 logger.debug(1, "Starting bitbake-worker")
1221 magic = "decafbad"
1222 if self.cooker.configuration.profile:
1223 magic = "decafbadbad"
1224 if fakeroot:
1225 magic = magic + "beef"
1226 mcdata = self.cooker.databuilder.mcdata[mc]
1227 fakerootcmd = mcdata.getVar("FAKEROOTCMD")
1228 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1229 env = os.environ.copy()
1230 for key, value in (var.split('=') for var in fakerootenv):
1231 env[key] = value
1232 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1233 else:
1234 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1235 bb.utils.nonblockingfd(worker.stdout)
1236 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
1237
1238 runqhash = {}
1239 for tid in self.rqdata.runtaskentries:
1240 runqhash[tid] = self.rqdata.runtaskentries[tid].hash
1241
1242 workerdata = {
1243 "taskdeps" : self.rqdata.dataCaches[mc].task_deps,
1244 "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv,
1245 "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs,
1246 "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv,
1247 "sigdata" : bb.parse.siggen.get_taskdata(),
1248 "runq_hash" : runqhash,
1249 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
1250 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
1251 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
1252 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1253 "prhost" : self.cooker.prhost,
1254 "buildname" : self.cfgData.getVar("BUILDNAME"),
1255 "date" : self.cfgData.getVar("DATE"),
1256 "time" : self.cfgData.getVar("TIME"),
1257 }
1258
1259 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
1260 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
1261 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
1262 worker.stdin.flush()
1263
1264 return RunQueueWorker(worker, workerpipe)
1265
1266 def _teardown_worker(self, worker):
1267 if not worker:
1268 return
1269 logger.debug(1, "Teardown for bitbake-worker")
1270 try:
1271 worker.process.stdin.write(b"<quit></quit>")
1272 worker.process.stdin.flush()
1273 worker.process.stdin.close()
1274 except IOError:
1275 pass
1276 while worker.process.returncode is None:
1277 worker.pipe.read()
1278 worker.process.poll()
1279 while worker.pipe.read():
1280 continue
1281 worker.pipe.close()
1282
1283 def start_worker(self):
1284 if self.worker:
1285 self.teardown_workers()
1286 self.teardown = False
1287 for mc in self.rqdata.dataCaches:
1288 self.worker[mc] = self._start_worker(mc)
1289
1290 def start_fakeworker(self, rqexec, mc):
1291 if not mc in self.fakeworker:
1292 self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1293
1294 def teardown_workers(self):
1295 self.teardown = True
1296 for mc in self.worker:
1297 self._teardown_worker(self.worker[mc])
1298 self.worker = {}
1299 for mc in self.fakeworker:
1300 self._teardown_worker(self.fakeworker[mc])
1301 self.fakeworker = {}
1302
1303 def read_workers(self):
1304 for mc in self.worker:
1305 self.worker[mc].pipe.read()
1306 for mc in self.fakeworker:
1307 self.fakeworker[mc].pipe.read()
1308
1309 def active_fds(self):
1310 fds = []
1311 for mc in self.worker:
1312 fds.append(self.worker[mc].pipe.input)
1313 for mc in self.fakeworker:
1314 fds.append(self.fakeworker[mc].pipe.input)
1315 return fds
1316
1317 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1318 def get_timestamp(f):
1319 try:
1320 if not os.access(f, os.F_OK):
1321 return None
1322 return os.stat(f)[stat.ST_MTIME]
1323 except:
1324 return None
1325
1326 (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1327 if taskname is None:
1328 taskname = tn
1329
1330 if self.stamppolicy == "perfile":
1331 fulldeptree = False
1332 else:
1333 fulldeptree = True
1334 stampwhitelist = []
1335 if self.stamppolicy == "whitelist":
1336 stampwhitelist = self.rqdata.stampfnwhitelist[mc]
1337
1338 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn)
1339
1340 # If the stamp is missing, it's not current
1341 if not os.access(stampfile, os.F_OK):
1342 logger.debug(2, "Stampfile %s not available", stampfile)
1343 return False
1344 # If it's a 'nostamp' task, it's not current
1345 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1346 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1347 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
1348 return False
1349
1350 if taskname != "do_setscene" and taskname.endswith("_setscene"):
1351 return True
1352
1353 if cache is None:
1354 cache = {}
1355
1356 iscurrent = True
1357 t1 = get_timestamp(stampfile)
1358 for dep in self.rqdata.runtaskentries[tid].depends:
1359 if iscurrent:
1360 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1361 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2)
1362 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2)
1363 t2 = get_timestamp(stampfile2)
1364 t3 = get_timestamp(stampfile3)
1365 if t3 and not t2:
1366 continue
1367 if t3 and t3 > t2:
1368 continue
1369 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1370 if not t2:
1371 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1372 iscurrent = False
1373 break
1374 if t1 < t2:
1375 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1376 iscurrent = False
1377 break
1378 if recurse and iscurrent:
1379 if dep in cache:
1380 iscurrent = cache[dep]
1381 if not iscurrent:
1382 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1383 else:
1384 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1385 cache[dep] = iscurrent
1386 if recurse:
1387 cache[tid] = iscurrent
1388 return iscurrent
1389
1390 def _execute_runqueue(self):
1391 """
1392 Run the tasks in a queue prepared by rqdata.prepare()
1393 Upon failure, optionally try to recover the build using any alternate providers
1394 (if the abort on failure configuration option isn't set)
1395 """
1396
1397 retval = True
1398
1399 if self.state is runQueuePrepare:
1400 self.rqexe = RunQueueExecuteDummy(self)
1401 # NOTE: if you add, remove or significantly refactor the stages of this
1402 # process then you should recalculate the weightings here. This is quite
1403 # easy to do - just change the next line temporarily to pass debug=True as
1404 # the last parameter and you'll get a printout of the weightings as well
1405 # as a map to the lines where next_stage() was called. Of course this isn't
1406 # critical, but it helps to keep the progress reporting accurate.
1407 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1408 "Initialising tasks",
1409 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1410 if self.rqdata.prepare() == 0:
1411 self.state = runQueueComplete
1412 else:
1413 self.state = runQueueSceneInit
1414 self.rqdata.init_progress_reporter.next_stage()
1415
1416 # we are ready to run, emit dependency info to any UI or class which
1417 # needs it
1418 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1419 self.rqdata.init_progress_reporter.next_stage()
1420 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1421
1422 if self.state is runQueueSceneInit:
1423 if not self.dm_event_handler_registered:
1424 res = bb.event.register(self.dm_event_handler_name,
1425 lambda x: self.dm.check(self) if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp] else False,
1426 ('bb.event.HeartbeatEvent',))
1427 self.dm_event_handler_registered = True
1428
1429 dump = self.cooker.configuration.dump_signatures
1430 if dump:
1431 self.rqdata.init_progress_reporter.finish()
1432 if 'printdiff' in dump:
1433 invalidtasks = self.print_diffscenetasks()
1434 self.dump_signatures(dump)
1435 if 'printdiff' in dump:
1436 self.write_diffscenetasks(invalidtasks)
1437 self.state = runQueueComplete
1438 else:
1439 self.rqdata.init_progress_reporter.next_stage()
1440 self.start_worker()
1441 self.rqdata.init_progress_reporter.next_stage()
1442 self.rqexe = RunQueueExecuteScenequeue(self)
1443
1444 if self.state is runQueueSceneRun:
1445 retval = self.rqexe.execute()
1446
1447 if self.state is runQueueRunInit:
1448 if self.cooker.configuration.setsceneonly:
1449 self.state = runQueueComplete
1450 else:
1451 # Just in case we didn't setscene
1452 self.rqdata.init_progress_reporter.finish()
1453 logger.info("Executing RunQueue Tasks")
1454 self.rqexe = RunQueueExecuteTasks(self)
1455 self.state = runQueueRunning
1456
1457 if self.state is runQueueRunning:
1458 retval = self.rqexe.execute()
1459
1460 if self.state is runQueueCleanUp:
1461 retval = self.rqexe.finish()
1462
1463 build_done = self.state is runQueueComplete or self.state is runQueueFailed
1464
1465 if build_done and self.dm_event_handler_registered:
1466 bb.event.remove(self.dm_event_handler_name, None)
1467 self.dm_event_handler_registered = False
1468
1469 if build_done and self.rqexe:
1470 self.teardown_workers()
1471 if self.rqexe.stats.failed:
1472 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1473 else:
1474 # Let's avoid the word "failed" if nothing actually did
1475 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1476
1477 if self.state is runQueueFailed:
1478 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1479
1480 if self.state is runQueueComplete:
1481 # All done
1482 return False
1483
1484 # Loop
1485 return retval
1486
1487 def execute_runqueue(self):
1488 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1489 try:
1490 return self._execute_runqueue()
1491 except bb.runqueue.TaskFailure:
1492 raise
1493 except SystemExit:
1494 raise
1495 except bb.BBHandledException:
1496 try:
1497 self.teardown_workers()
1498 except:
1499 pass
1500 self.state = runQueueComplete
1501 raise
1502 except Exception as err:
1503 logger.exception("An uncaught exception occurred in runqueue")
1504 try:
1505 self.teardown_workers()
1506 except:
1507 pass
1508 self.state = runQueueComplete
1509 raise
1510
1511 def finish_runqueue(self, now = False):
1512 if not self.rqexe:
1513 self.state = runQueueComplete
1514 return
1515
1516 if now:
1517 self.rqexe.finish_now()
1518 else:
1519 self.rqexe.finish()
1520
1521 def rq_dump_sigfn(self, fn, options):
1522 bb_cache = bb.cache.NoCache(self.cooker.databuilder)
1523 the_data = bb_cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn))
1524 siggen = bb.parse.siggen
1525 dataCaches = self.rqdata.dataCaches
1526 siggen.dump_sigfn(fn, dataCaches, options)
1527
1528 def dump_signatures(self, options):
1529 fns = set()
1530 bb.note("Reparsing files to collect dependency data")
1531
1532 for tid in self.rqdata.runtaskentries:
1533 fn = fn_from_tid(tid)
1534 fns.add(fn)
1535
1536 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1537 # We cannot use the real multiprocessing.Pool easily due to some local data
1538 # that can't be pickled. This is a cheap multi-process solution.
1539 launched = []
1540 while fns:
1541 if len(launched) < max_process:
1542 p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options))
1543 p.start()
1544 launched.append(p)
1545 for q in launched:
1546 # The finished processes are joined when calling is_alive()
1547 if not q.is_alive():
1548 launched.remove(q)
1549 for p in launched:
1550 p.join()
1551
1552 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1553
1554 return
1555
1556 def print_diffscenetasks(self):
1557
1558 valid = []
1559 sq_hash = []
1560 sq_hashfn = []
1561 sq_fn = []
1562 sq_taskname = []
1563 sq_task = []
1564 noexec = []
1565 stamppresent = []
1566 valid_new = set()
1567
1568 for tid in self.rqdata.runtaskentries:
1569 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1570 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1571
1572 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1573 noexec.append(tid)
1574 continue
1575
1576 sq_fn.append(fn)
1577 sq_hashfn.append(self.rqdata.dataCaches[mc].hashfn[taskfn])
1578 sq_hash.append(self.rqdata.runtaskentries[tid].hash)
1579 sq_taskname.append(taskname)
1580 sq_task.append(tid)
1581 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1582 try:
1583 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=True)"
1584 valid = bb.utils.better_eval(call, locs)
1585 # Handle version with no siginfo parameter
1586 except TypeError:
1587 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1588 valid = bb.utils.better_eval(call, locs)
1589 for v in valid:
1590 valid_new.add(sq_task[v])
1591
1592 # Tasks which are both setscene and noexec never care about dependencies
1593 # We therefore find tasks which are setscene and noexec and mark their
1594 # unique dependencies as valid.
1595 for tid in noexec:
1596 if tid not in self.rqdata.runq_setscene_tids:
1597 continue
1598 for dep in self.rqdata.runtaskentries[tid].depends:
1599 hasnoexecparents = True
1600 for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1601 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1602 continue
1603 hasnoexecparents = False
1604 break
1605 if hasnoexecparents:
1606 valid_new.add(dep)
1607
1608 invalidtasks = set()
1609 for tid in self.rqdata.runtaskentries:
1610 if tid not in valid_new and tid not in noexec:
1611 invalidtasks.add(tid)
1612
1613 found = set()
1614 processed = set()
1615 for tid in invalidtasks:
1616 toprocess = set([tid])
1617 while toprocess:
1618 next = set()
1619 for t in toprocess:
1620 for dep in self.rqdata.runtaskentries[t].depends:
1621 if dep in invalidtasks:
1622 found.add(tid)
1623 if dep not in processed:
1624 processed.add(dep)
1625 next.add(dep)
1626 toprocess = next
1627 if tid in found:
1628 toprocess = set()
1629
1630 tasklist = []
1631 for tid in invalidtasks.difference(found):
1632 tasklist.append(tid)
1633
1634 if tasklist:
1635 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1636
1637 return invalidtasks.difference(found)
1638
1639 def write_diffscenetasks(self, invalidtasks):
1640
1641 # Define recursion callback
1642 def recursecb(key, hash1, hash2):
1643 hashes = [hash1, hash2]
1644 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1645
1646 recout = []
1647 if len(hashfiles) == 2:
1648 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1649 recout.extend(list(' ' + l for l in out2))
1650 else:
1651 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1652
1653 return recout
1654
1655
1656 for tid in invalidtasks:
1657 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1658 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1659 h = self.rqdata.runtaskentries[tid].hash
1660 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1661 match = None
1662 for m in matches:
1663 if h in m:
1664 match = m
1665 if match is None:
1666 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1667 matches = {k : v for k, v in iter(matches.items()) if h not in k}
1668 if matches:
1669 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1670 prevh = __find_md5__.search(latestmatch).group(0)
1671 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1672 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1673
1674class RunQueueExecute:
1675
1676 def __init__(self, rq):
1677 self.rq = rq
1678 self.cooker = rq.cooker
1679 self.cfgData = rq.cfgData
1680 self.rqdata = rq.rqdata
1681
1682 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1683 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1684
1685 self.runq_buildable = set()
1686 self.runq_running = set()
1687 self.runq_complete = set()
1688
1689 self.build_stamps = {}
1690 self.build_stamps2 = []
1691 self.failed_tids = []
1692
1693 self.stampcache = {}
1694
1695 for mc in rq.worker:
1696 rq.worker[mc].pipe.setrunqueueexec(self)
1697 for mc in rq.fakeworker:
1698 rq.fakeworker[mc].pipe.setrunqueueexec(self)
1699
1700 if self.number_tasks <= 0:
1701 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1702
1703 def runqueue_process_waitpid(self, task, status):
1704
1705 # self.build_stamps[pid] may not exist when use shared work directory.
1706 if task in self.build_stamps:
1707 self.build_stamps2.remove(self.build_stamps[task])
1708 del self.build_stamps[task]
1709
1710 if status != 0:
1711 self.task_fail(task, status)
1712 else:
1713 self.task_complete(task)
1714 return True
1715
1716 def finish_now(self):
1717 for mc in self.rq.worker:
1718 try:
1719 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1720 self.rq.worker[mc].process.stdin.flush()
1721 except IOError:
1722 # worker must have died?
1723 pass
1724 for mc in self.rq.fakeworker:
1725 try:
1726 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1727 self.rq.fakeworker[mc].process.stdin.flush()
1728 except IOError:
1729 # worker must have died?
1730 pass
1731
1732 if len(self.failed_tids) != 0:
1733 self.rq.state = runQueueFailed
1734 return
1735
1736 self.rq.state = runQueueComplete
1737 return
1738
1739 def finish(self):
1740 self.rq.state = runQueueCleanUp
1741
1742 if self.stats.active > 0:
1743 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1744 self.rq.read_workers()
1745 return self.rq.active_fds()
1746
1747 if len(self.failed_tids) != 0:
1748 self.rq.state = runQueueFailed
1749 return True
1750
1751 self.rq.state = runQueueComplete
1752 return True
1753
1754 def check_dependencies(self, task, taskdeps, setscene = False):
1755 if not self.rq.depvalidate:
1756 return False
1757
1758 taskdata = {}
1759 taskdeps.add(task)
1760 for dep in taskdeps:
1761 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
1762 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1763 taskdata[dep] = [pn, taskname, fn]
1764 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1765 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1766 valid = bb.utils.better_eval(call, locs)
1767 return valid
1768
1769 def can_start_task(self):
1770 can_start = self.stats.active < self.number_tasks
1771 return can_start
1772
1773class RunQueueExecuteDummy(RunQueueExecute):
1774 def __init__(self, rq):
1775 self.rq = rq
1776 self.stats = RunQueueStats(0)
1777
1778 def finish(self):
1779 self.rq.state = runQueueComplete
1780 return
1781
1782class RunQueueExecuteTasks(RunQueueExecute):
1783 def __init__(self, rq):
1784 RunQueueExecute.__init__(self, rq)
1785
1786 self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
1787
1788 self.stampcache = {}
1789
1790 initial_covered = self.rq.scenequeue_covered.copy()
1791
1792 # Mark initial buildable tasks
1793 for tid in self.rqdata.runtaskentries:
1794 if len(self.rqdata.runtaskentries[tid].depends) == 0:
1795 self.runq_buildable.add(tid)
1796 if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered):
1797 self.rq.scenequeue_covered.add(tid)
1798
1799 found = True
1800 while found:
1801 found = False
1802 for tid in self.rqdata.runtaskentries:
1803 if tid in self.rq.scenequeue_covered:
1804 continue
1805 logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps)))
1806
1807 if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered):
1808 if tid in self.rq.scenequeue_notcovered:
1809 continue
1810 found = True
1811 self.rq.scenequeue_covered.add(tid)
1812
1813 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1814
1815 # Allow the metadata to elect for setscene tasks to run anyway
1816 covered_remove = set()
1817 if self.rq.setsceneverify:
1818 invalidtasks = []
1819 tasknames = {}
1820 fns = {}
1821 for tid in self.rqdata.runtaskentries:
1822 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1823 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1824 fns[tid] = taskfn
1825 tasknames[tid] = taskname
1826 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1827 continue
1828 if self.rq.check_stamp_task(tid, taskname + "_setscene", cache=self.stampcache):
1829 logger.debug(2, 'Setscene stamp current for task %s', tid)
1830 continue
1831 if self.rq.check_stamp_task(tid, taskname, recurse = True, cache=self.stampcache):
1832 logger.debug(2, 'Normal stamp current for task %s', tid)
1833 continue
1834 invalidtasks.append(tid)
1835
1836 call = self.rq.setsceneverify + "(covered, tasknames, fns, d, invalidtasks=invalidtasks)"
1837 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : tasknames, "fns" : fns, "d" : self.cooker.data, "invalidtasks" : invalidtasks }
1838 covered_remove = bb.utils.better_eval(call, locs)
1839
1840 def removecoveredtask(tid):
1841 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1842 taskname = taskname + '_setscene'
1843 bb.build.del_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
1844 self.rq.scenequeue_covered.remove(tid)
1845
1846 toremove = covered_remove | self.rq.scenequeue_notcovered
1847 for task in toremove:
1848 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1849 while toremove:
1850 covered_remove = []
1851 for task in toremove:
1852 if task in self.rq.scenequeue_covered:
1853 removecoveredtask(task)
1854 for deptask in self.rqdata.runtaskentries[task].depends:
1855 if deptask not in self.rq.scenequeue_covered:
1856 continue
1857 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1858 continue
1859 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1860 covered_remove.append(deptask)
1861 toremove = covered_remove
1862
1863 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1864
1865
1866 for mc in self.rqdata.dataCaches:
1867 target_pairs = []
1868 for tid in self.rqdata.target_tids:
1869 (tidmc, fn, taskname, _) = split_tid_mcfn(tid)
1870 if tidmc == mc:
1871 target_pairs.append((fn, taskname))
1872
1873 event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData)
1874
1875 schedulers = self.get_schedulers()
1876 for scheduler in schedulers:
1877 if self.scheduler == scheduler.name:
1878 self.sched = scheduler(self, self.rqdata)
1879 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1880 break
1881 else:
1882 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1883 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1884
1885 def get_schedulers(self):
1886 schedulers = set(obj for obj in globals().values()
1887 if type(obj) is type and
1888 issubclass(obj, RunQueueScheduler))
1889
1890 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
1891 if user_schedulers:
1892 for sched in user_schedulers.split():
1893 if not "." in sched:
1894 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1895 continue
1896
1897 modname, name = sched.rsplit(".", 1)
1898 try:
1899 module = __import__(modname, fromlist=(name,))
1900 except ImportError as exc:
1901 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1902 raise SystemExit(1)
1903 else:
1904 schedulers.add(getattr(module, name))
1905 return schedulers
1906
1907 def setbuildable(self, task):
1908 self.runq_buildable.add(task)
1909 self.sched.newbuildable(task)
1910
1911 def task_completeoutright(self, task):
1912 """
1913 Mark a task as completed
1914 Look at the reverse dependencies and mark any task with
1915 completed dependencies as buildable
1916 """
1917 self.runq_complete.add(task)
1918 for revdep in self.rqdata.runtaskentries[task].revdeps:
1919 if revdep in self.runq_running:
1920 continue
1921 if revdep in self.runq_buildable:
1922 continue
1923 alldeps = True
1924 for dep in self.rqdata.runtaskentries[revdep].depends:
1925 if dep not in self.runq_complete:
1926 alldeps = False
1927 break
1928 if alldeps:
1929 self.setbuildable(revdep)
1930 logger.debug(1, "Marking task %s as buildable", revdep)
1931
1932 def task_complete(self, task):
1933 self.stats.taskCompleted()
1934 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1935 self.task_completeoutright(task)
1936
1937 def task_fail(self, task, exitcode):
1938 """
1939 Called when a task has failed
1940 Updates the state engine with the failure
1941 """
1942 self.stats.taskFailed()
1943 self.failed_tids.append(task)
1944 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1945 if self.rqdata.taskData[''].abort:
1946 self.rq.state = runQueueCleanUp
1947
1948 def task_skip(self, task, reason):
1949 self.runq_running.add(task)
1950 self.setbuildable(task)
1951 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1952 self.task_completeoutright(task)
1953 self.stats.taskSkipped()
1954 self.stats.taskCompleted()
1955
1956 def execute(self):
1957 """
1958 Run the tasks in a queue prepared by rqdata.prepare()
1959 """
1960
1961 if self.rqdata.setscenewhitelist is not None and not self.rqdata.setscenewhitelist_checked:
1962 self.rqdata.setscenewhitelist_checked = True
1963
1964 # Check tasks that are going to run against the whitelist
1965 def check_norun_task(tid, showerror=False):
1966 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1967 # Ignore covered tasks
1968 if tid in self.rq.scenequeue_covered:
1969 return False
1970 # Ignore stamped tasks
1971 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
1972 return False
1973 # Ignore noexec tasks
1974 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1975 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1976 return False
1977
1978 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1979 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
1980 if showerror:
1981 if tid in self.rqdata.runq_setscene_tids:
1982 logger.error('Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname))
1983 else:
1984 logger.error('Task %s.%s attempted to execute unexpectedly' % (pn, taskname))
1985 return True
1986 return False
1987 # Look to see if any tasks that we think shouldn't run are going to
1988 unexpected = False
1989 for tid in self.rqdata.runtaskentries:
1990 if check_norun_task(tid):
1991 unexpected = True
1992 break
1993 if unexpected:
1994 # Run through the tasks in the rough order they'd have executed and print errors
1995 # (since the order can be useful - usually missing sstate for the last few tasks
1996 # is the cause of the problem)
1997 task = self.sched.next()
1998 while task is not None:
1999 check_norun_task(task, showerror=True)
2000 self.task_skip(task, 'Setscene enforcement check')
2001 task = self.sched.next()
2002
2003 self.rq.state = runQueueCleanUp
2004 return True
2005
2006 self.rq.read_workers()
2007
2008 if self.stats.total == 0:
2009 # nothing to do
2010 self.rq.state = runQueueCleanUp
2011
2012 task = self.sched.next()
2013 if task is not None:
2014 (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2015
2016 if task in self.rq.scenequeue_covered:
2017 logger.debug(2, "Setscene covered task %s", task)
2018 self.task_skip(task, "covered")
2019 return True
2020
2021 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2022 logger.debug(2, "Stamp current task %s", task)
2023
2024 self.task_skip(task, "existing")
2025 return True
2026
2027 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2028 if 'noexec' in taskdep and taskname in taskdep['noexec']:
2029 startevent = runQueueTaskStarted(task, self.stats, self.rq,
2030 noexec=True)
2031 bb.event.fire(startevent, self.cfgData)
2032 self.runq_running.add(task)
2033 self.stats.taskActive()
2034 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2035 bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
2036 self.task_complete(task)
2037 return True
2038 else:
2039 startevent = runQueueTaskStarted(task, self.stats, self.rq)
2040 bb.event.fire(startevent, self.cfgData)
2041
2042 taskdepdata = self.build_taskdepdata(task)
2043
2044 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2045 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2046 if not mc in self.rq.fakeworker:
2047 try:
2048 self.rq.start_fakeworker(self, mc)
2049 except OSError as exc:
2050 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2051 self.rq.state = runQueueFailed
2052 self.stats.taskFailed()
2053 return True
2054 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2055 self.rq.fakeworker[mc].process.stdin.flush()
2056 else:
2057 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, False, self.cooker.collection.get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2058 self.rq.worker[mc].process.stdin.flush()
2059
2060 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2061 self.build_stamps2.append(self.build_stamps[task])
2062 self.runq_running.add(task)
2063 self.stats.taskActive()
2064 if self.can_start_task():
2065 return True
2066
2067 if self.stats.active > 0:
2068 self.rq.read_workers()
2069 return self.rq.active_fds()
2070
2071 if len(self.failed_tids) != 0:
2072 self.rq.state = runQueueFailed
2073 return True
2074
2075 # Sanity Checks
2076 for task in self.rqdata.runtaskentries:
2077 if task not in self.runq_buildable:
2078 logger.error("Task %s never buildable!", task)
2079 if task not in self.runq_running:
2080 logger.error("Task %s never ran!", task)
2081 if task not in self.runq_complete:
2082 logger.error("Task %s never completed!", task)
2083 self.rq.state = runQueueComplete
2084
2085 return True
2086
2087 def filtermcdeps(self, task, deps):
2088 ret = set()
2089 mainmc = mc_from_tid(task)
2090 for dep in deps:
2091 mc = mc_from_tid(dep)
2092 if mc != mainmc:
2093 continue
2094 ret.add(dep)
2095 return ret
2096
2097 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2098 # as most code can't handle them
2099 def build_taskdepdata(self, task):
2100 taskdepdata = {}
2101 next = self.rqdata.runtaskentries[task].depends
2102 next.add(task)
2103 next = self.filtermcdeps(task, next)
2104 while next:
2105 additional = []
2106 for revdep in next:
2107 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2108 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2109 deps = self.rqdata.runtaskentries[revdep].depends
2110 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2111 taskhash = self.rqdata.runtaskentries[revdep].hash
2112 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash]
2113 deps = self.filtermcdeps(task, deps)
2114 for revdep2 in deps:
2115 if revdep2 not in taskdepdata:
2116 additional.append(revdep2)
2117 next = additional
2118
2119 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2120 return taskdepdata
2121
2122class RunQueueExecuteScenequeue(RunQueueExecute):
2123 def __init__(self, rq):
2124 RunQueueExecute.__init__(self, rq)
2125
2126 self.scenequeue_covered = set()
2127 self.scenequeue_notcovered = set()
2128 self.scenequeue_notneeded = set()
2129
2130 # If we don't have any setscene functions, skip this step
2131 if len(self.rqdata.runq_setscene_tids) == 0:
2132 rq.scenequeue_covered = set()
2133 rq.scenequeue_notcovered = set()
2134 rq.state = runQueueRunInit
2135 return
2136
2137 self.stats = RunQueueStats(len(self.rqdata.runq_setscene_tids))
2138
2139 sq_revdeps = {}
2140 sq_revdeps_new = {}
2141 sq_revdeps_squash = {}
2142 self.sq_harddeps = {}
2143 self.stamps = {}
2144
2145 # We need to construct a dependency graph for the setscene functions. Intermediate
2146 # dependencies between the setscene tasks only complicate the code. This code
2147 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2148 # only containing the setscene functions.
2149
2150 self.rqdata.init_progress_reporter.next_stage()
2151
2152 # First process the chains up to the first setscene task.
2153 endpoints = {}
2154 for tid in self.rqdata.runtaskentries:
2155 sq_revdeps[tid] = copy.copy(self.rqdata.runtaskentries[tid].revdeps)
2156 sq_revdeps_new[tid] = set()
2157 if (len(sq_revdeps[tid]) == 0) and tid not in self.rqdata.runq_setscene_tids:
2158 #bb.warn("Added endpoint %s" % (tid))
2159 endpoints[tid] = set()
2160
2161 self.rqdata.init_progress_reporter.next_stage()
2162
2163 # Secondly process the chains between setscene tasks.
2164 for tid in self.rqdata.runq_setscene_tids:
2165 #bb.warn("Added endpoint 2 %s" % (tid))
2166 for dep in self.rqdata.runtaskentries[tid].depends:
2167 if tid in sq_revdeps[dep]:
2168 sq_revdeps[dep].remove(tid)
2169 if dep not in endpoints:
2170 endpoints[dep] = set()
2171 #bb.warn(" Added endpoint 3 %s" % (dep))
2172 endpoints[dep].add(tid)
2173
2174 self.rqdata.init_progress_reporter.next_stage()
2175
2176 def process_endpoints(endpoints):
2177 newendpoints = {}
2178 for point, task in endpoints.items():
2179 tasks = set()
2180 if task:
2181 tasks |= task
2182 if sq_revdeps_new[point]:
2183 tasks |= sq_revdeps_new[point]
2184 sq_revdeps_new[point] = set()
2185 if point in self.rqdata.runq_setscene_tids:
2186 sq_revdeps_new[point] = tasks
2187 tasks = set()
2188 continue
2189 for dep in self.rqdata.runtaskentries[point].depends:
2190 if point in sq_revdeps[dep]:
2191 sq_revdeps[dep].remove(point)
2192 if tasks:
2193 sq_revdeps_new[dep] |= tasks
2194 if len(sq_revdeps[dep]) == 0 and dep not in self.rqdata.runq_setscene_tids:
2195 newendpoints[dep] = task
2196 if len(newendpoints) != 0:
2197 process_endpoints(newendpoints)
2198
2199 process_endpoints(endpoints)
2200
2201 self.rqdata.init_progress_reporter.next_stage()
2202
2203 # Build a list of setscene tasks which are "unskippable"
2204 # These are direct endpoints referenced by the build
2205 endpoints2 = {}
2206 sq_revdeps2 = {}
2207 sq_revdeps_new2 = {}
2208 def process_endpoints2(endpoints):
2209 newendpoints = {}
2210 for point, task in endpoints.items():
2211 tasks = set([point])
2212 if task:
2213 tasks |= task
2214 if sq_revdeps_new2[point]:
2215 tasks |= sq_revdeps_new2[point]
2216 sq_revdeps_new2[point] = set()
2217 if point in self.rqdata.runq_setscene_tids:
2218 sq_revdeps_new2[point] = tasks
2219 for dep in self.rqdata.runtaskentries[point].depends:
2220 if point in sq_revdeps2[dep]:
2221 sq_revdeps2[dep].remove(point)
2222 if tasks:
2223 sq_revdeps_new2[dep] |= tasks
2224 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene_tids:
2225 newendpoints[dep] = tasks
2226 if len(newendpoints) != 0:
2227 process_endpoints2(newendpoints)
2228 for tid in self.rqdata.runtaskentries:
2229 sq_revdeps2[tid] = copy.copy(self.rqdata.runtaskentries[tid].revdeps)
2230 sq_revdeps_new2[tid] = set()
2231 if (len(sq_revdeps2[tid]) == 0) and tid not in self.rqdata.runq_setscene_tids:
2232 endpoints2[tid] = set()
2233 process_endpoints2(endpoints2)
2234 self.unskippable = []
2235 for tid in self.rqdata.runq_setscene_tids:
2236 if sq_revdeps_new2[tid]:
2237 self.unskippable.append(tid)
2238
2239 self.rqdata.init_progress_reporter.next_stage(len(self.rqdata.runtaskentries))
2240
2241 for taskcounter, tid in enumerate(self.rqdata.runtaskentries):
2242 if tid in self.rqdata.runq_setscene_tids:
2243 deps = set()
2244 for dep in sq_revdeps_new[tid]:
2245 deps.add(dep)
2246 sq_revdeps_squash[tid] = deps
2247 elif len(sq_revdeps_new[tid]) != 0:
2248 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
2249 self.rqdata.init_progress_reporter.update(taskcounter)
2250
2251 self.rqdata.init_progress_reporter.next_stage()
2252
2253 # Resolve setscene inter-task dependencies
2254 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2255 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2256 for tid in self.rqdata.runq_setscene_tids:
2257 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2258 realtid = tid + "_setscene"
2259 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2260 self.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
2261 for (depname, idependtask) in idepends:
2262
2263 if depname not in self.rqdata.taskData[mc].build_targets:
2264 continue
2265
2266 depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2267 if depfn is None:
2268 continue
2269 deptid = depfn + ":" + idependtask.replace("_setscene", "")
2270 if deptid not in self.rqdata.runtaskentries:
2271 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2272
2273 if not deptid in self.sq_harddeps:
2274 self.sq_harddeps[deptid] = set()
2275 self.sq_harddeps[deptid].add(tid)
2276
2277 sq_revdeps_squash[tid].add(deptid)
2278 # Have to zero this to avoid circular dependencies
2279 sq_revdeps_squash[deptid] = set()
2280
2281 self.rqdata.init_progress_reporter.next_stage()
2282
2283 for task in self.sq_harddeps:
2284 for dep in self.sq_harddeps[task]:
2285 sq_revdeps_squash[dep].add(task)
2286
2287 self.rqdata.init_progress_reporter.next_stage()
2288
2289 #for tid in sq_revdeps_squash:
2290 # for dep in sq_revdeps_squash[tid]:
2291 # data = data + "\n %s" % dep
2292 # bb.warn("Task %s_setscene: is %s " % (tid, data
2293
2294 self.sq_deps = {}
2295 self.sq_revdeps = sq_revdeps_squash
2296 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
2297
2298 for tid in self.sq_revdeps:
2299 self.sq_deps[tid] = set()
2300 for tid in self.sq_revdeps:
2301 for dep in self.sq_revdeps[tid]:
2302 self.sq_deps[dep].add(tid)
2303
2304 self.rqdata.init_progress_reporter.next_stage()
2305
2306 for tid in self.sq_revdeps:
2307 if len(self.sq_revdeps[tid]) == 0:
2308 self.runq_buildable.add(tid)
2309
2310 self.rqdata.init_progress_reporter.finish()
2311
2312 self.outrightfail = []
2313 if self.rq.hashvalidate:
2314 sq_hash = []
2315 sq_hashfn = []
2316 sq_fn = []
2317 sq_taskname = []
2318 sq_task = []
2319 noexec = []
2320 stamppresent = []
2321 for tid in self.sq_revdeps:
2322 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2323
2324 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2325
2326 if 'noexec' in taskdep and taskname in taskdep['noexec']:
2327 noexec.append(tid)
2328 self.task_skip(tid)
2329 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn)
2330 continue
2331
2332 if self.rq.check_stamp_task(tid, taskname + "_setscene", cache=self.stampcache):
2333 logger.debug(2, 'Setscene stamp current for task %s', tid)
2334 stamppresent.append(tid)
2335 self.task_skip(tid)
2336 continue
2337
2338 if self.rq.check_stamp_task(tid, taskname, recurse = True, cache=self.stampcache):
2339 logger.debug(2, 'Normal stamp current for task %s', tid)
2340 stamppresent.append(tid)
2341 self.task_skip(tid)
2342 continue
2343
2344 sq_fn.append(fn)
2345 sq_hashfn.append(self.rqdata.dataCaches[mc].hashfn[taskfn])
2346 sq_hash.append(self.rqdata.runtaskentries[tid].hash)
2347 sq_taskname.append(taskname)
2348 sq_task.append(tid)
2349
2350 self.cooker.data.setVar("BB_SETSCENE_STAMPCURRENT_COUNT", len(stamppresent))
2351
2352 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
2353 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
2354 valid = bb.utils.better_eval(call, locs)
2355
2356 self.cooker.data.delVar("BB_SETSCENE_STAMPCURRENT_COUNT")
2357
2358 valid_new = stamppresent
2359 for v in valid:
2360 valid_new.append(sq_task[v])
2361
2362 for tid in self.sq_revdeps:
2363 if tid not in valid_new and tid not in noexec:
2364 logger.debug(2, 'No package found, so skipping setscene task %s', tid)
2365 self.outrightfail.append(tid)
2366
2367 logger.info('Executing SetScene Tasks')
2368
2369 self.rq.state = runQueueSceneRun
2370
2371 def scenequeue_updatecounters(self, task, fail = False):
2372 for dep in self.sq_deps[task]:
2373 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
2374 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2375 self.scenequeue_updatecounters(dep, fail)
2376 continue
2377 if task not in self.sq_revdeps2[dep]:
2378 # May already have been removed by the fail case above
2379 continue
2380 self.sq_revdeps2[dep].remove(task)
2381 if len(self.sq_revdeps2[dep]) == 0:
2382 self.runq_buildable.add(dep)
2383
2384 def task_completeoutright(self, task):
2385 """
2386 Mark a task as completed
2387 Look at the reverse dependencies and mark any task with
2388 completed dependencies as buildable
2389 """
2390
2391 logger.debug(1, 'Found task %s which could be accelerated', task)
2392 self.scenequeue_covered.add(task)
2393 self.scenequeue_updatecounters(task)
2394
2395 def check_taskfail(self, task):
2396 if self.rqdata.setscenewhitelist is not None:
2397 realtask = task.split('_setscene')[0]
2398 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2399 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2400 if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
2401 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2402 self.rq.state = runQueueCleanUp
2403
2404 def task_complete(self, task):
2405 self.stats.taskCompleted()
2406 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2407 self.task_completeoutright(task)
2408
2409 def task_fail(self, task, result):
2410 self.stats.taskFailed()
2411 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2412 self.scenequeue_notcovered.add(task)
2413 self.scenequeue_updatecounters(task, True)
2414 self.check_taskfail(task)
2415
2416 def task_failoutright(self, task):
2417 self.runq_running.add(task)
2418 self.runq_buildable.add(task)
2419 self.stats.taskSkipped()
2420 self.stats.taskCompleted()
2421 self.scenequeue_notcovered.add(task)
2422 self.scenequeue_updatecounters(task, True)
2423
2424 def task_skip(self, task):
2425 self.runq_running.add(task)
2426 self.runq_buildable.add(task)
2427 self.task_completeoutright(task)
2428 self.stats.taskSkipped()
2429 self.stats.taskCompleted()
2430
2431 def execute(self):
2432 """
2433 Run the tasks in a queue prepared by prepare_runqueue
2434 """
2435
2436 self.rq.read_workers()
2437
2438 task = None
2439 if self.can_start_task():
2440 # Find the next setscene to run
2441 for nexttask in self.rqdata.runq_setscene_tids:
2442 if nexttask in self.runq_buildable and nexttask not in self.runq_running and self.stamps[nexttask] not in self.build_stamps.values():
2443 if nexttask in self.unskippable:
2444 logger.debug(2, "Setscene task %s is unskippable" % nexttask)
2445 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
2446 fn = fn_from_tid(nexttask)
2447 foundtarget = False
2448
2449 if nexttask in self.rqdata.target_tids:
2450 foundtarget = True
2451 if not foundtarget:
2452 logger.debug(2, "Skipping setscene for task %s" % nexttask)
2453 self.task_skip(nexttask)
2454 self.scenequeue_notneeded.add(nexttask)
2455 return True
2456 if nexttask in self.outrightfail:
2457 self.task_failoutright(nexttask)
2458 return True
2459 task = nexttask
2460 break
2461 if task is not None:
2462 (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2463 taskname = taskname + "_setscene"
2464 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2465 logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task)
2466 self.task_failoutright(task)
2467 return True
2468
2469 if self.cooker.configuration.force:
2470 if task in self.rqdata.target_tids:
2471 self.task_failoutright(task)
2472 return True
2473
2474 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2475 logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task)
2476 self.task_skip(task)
2477 return True
2478
2479 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2480 bb.event.fire(startevent, self.cfgData)
2481
2482 taskdepdata = self.build_taskdepdata(task)
2483
2484 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2485 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2486 if not mc in self.rq.fakeworker:
2487 self.rq.start_fakeworker(self, mc)
2488 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2489 self.rq.fakeworker[mc].process.stdin.flush()
2490 else:
2491 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2492 self.rq.worker[mc].process.stdin.flush()
2493
2494 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2495 self.build_stamps2.append(self.build_stamps[task])
2496 self.runq_running.add(task)
2497 self.stats.taskActive()
2498 if self.can_start_task():
2499 return True
2500
2501 if self.stats.active > 0:
2502 self.rq.read_workers()
2503 return self.rq.active_fds()
2504
2505 #for tid in self.sq_revdeps:
2506 # if tid not in self.runq_running:
2507 # buildable = tid in self.runq_buildable
2508 # revdeps = self.sq_revdeps[tid]
2509 # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps)))
2510
2511 self.rq.scenequeue_covered = self.scenequeue_covered
2512 self.rq.scenequeue_notcovered = self.scenequeue_notcovered
2513
2514 logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered)))
2515
2516 self.rq.state = runQueueRunInit
2517
2518 completeevent = sceneQueueComplete(self.stats, self.rq)
2519 bb.event.fire(completeevent, self.cfgData)
2520
2521 return True
2522
2523 def runqueue_process_waitpid(self, task, status):
2524 RunQueueExecute.runqueue_process_waitpid(self, task, status)
2525
2526
2527 def build_taskdepdata(self, task):
2528 def getsetscenedeps(tid):
2529 deps = set()
2530 (mc, fn, taskname, _) = split_tid_mcfn(tid)
2531 realtid = tid + "_setscene"
2532 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2533 for (depname, idependtask) in idepends:
2534 if depname not in self.rqdata.taskData[mc].build_targets:
2535 continue
2536
2537 depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2538 if depfn is None:
2539 continue
2540 deptid = depfn + ":" + idependtask.replace("_setscene", "")
2541 deps.add(deptid)
2542 return deps
2543
2544 taskdepdata = {}
2545 next = getsetscenedeps(task)
2546 next.add(task)
2547 while next:
2548 additional = []
2549 for revdep in next:
2550 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2551 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2552 deps = getsetscenedeps(revdep)
2553 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2554 taskhash = self.rqdata.runtaskentries[revdep].hash
2555 taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash]
2556 for revdep2 in deps:
2557 if revdep2 not in taskdepdata:
2558 additional.append(revdep2)
2559 next = additional
2560
2561 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2562 return taskdepdata
2563
2564class TaskFailure(Exception):
2565 """
2566 Exception raised when a task in a runqueue fails
2567 """
2568 def __init__(self, x):
2569 self.args = x
2570
2571
2572class runQueueExitWait(bb.event.Event):
2573 """
2574 Event when waiting for task processes to exit
2575 """
2576
2577 def __init__(self, remain):
2578 self.remain = remain
2579 self.message = "Waiting for %s active tasks to finish" % remain
2580 bb.event.Event.__init__(self)
2581
2582class runQueueEvent(bb.event.Event):
2583 """
2584 Base runQueue event class
2585 """
2586 def __init__(self, task, stats, rq):
2587 self.taskid = task
2588 self.taskstring = task
2589 self.taskname = taskname_from_tid(task)
2590 self.taskfile = fn_from_tid(task)
2591 self.taskhash = rq.rqdata.get_task_hash(task)
2592 self.stats = stats.copy()
2593 bb.event.Event.__init__(self)
2594
2595class sceneQueueEvent(runQueueEvent):
2596 """
2597 Base sceneQueue event class
2598 """
2599 def __init__(self, task, stats, rq, noexec=False):
2600 runQueueEvent.__init__(self, task, stats, rq)
2601 self.taskstring = task + "_setscene"
2602 self.taskname = taskname_from_tid(task) + "_setscene"
2603 self.taskfile = fn_from_tid(task)
2604 self.taskhash = rq.rqdata.get_task_hash(task)
2605
2606class runQueueTaskStarted(runQueueEvent):
2607 """
2608 Event notifying a task was started
2609 """
2610 def __init__(self, task, stats, rq, noexec=False):
2611 runQueueEvent.__init__(self, task, stats, rq)
2612 self.noexec = noexec
2613
2614class sceneQueueTaskStarted(sceneQueueEvent):
2615 """
2616 Event notifying a setscene task was started
2617 """
2618 def __init__(self, task, stats, rq, noexec=False):
2619 sceneQueueEvent.__init__(self, task, stats, rq)
2620 self.noexec = noexec
2621
2622class runQueueTaskFailed(runQueueEvent):
2623 """
2624 Event notifying a task failed
2625 """
2626 def __init__(self, task, stats, exitcode, rq):
2627 runQueueEvent.__init__(self, task, stats, rq)
2628 self.exitcode = exitcode
2629
2630 def __str__(self):
2631 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
2632
2633class sceneQueueTaskFailed(sceneQueueEvent):
2634 """
2635 Event notifying a setscene task failed
2636 """
2637 def __init__(self, task, stats, exitcode, rq):
2638 sceneQueueEvent.__init__(self, task, stats, rq)
2639 self.exitcode = exitcode
2640
2641 def __str__(self):
2642 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
2643
2644class sceneQueueComplete(sceneQueueEvent):
2645 """
2646 Event when all the sceneQueue tasks are complete
2647 """
2648 def __init__(self, stats, rq):
2649 self.stats = stats.copy()
2650 bb.event.Event.__init__(self)
2651
2652class runQueueTaskCompleted(runQueueEvent):
2653 """
2654 Event notifying a task completed
2655 """
2656
2657class sceneQueueTaskCompleted(sceneQueueEvent):
2658 """
2659 Event notifying a setscene task completed
2660 """
2661
2662class runQueueTaskSkipped(runQueueEvent):
2663 """
2664 Event notifying a task was skipped
2665 """
2666 def __init__(self, task, stats, rq, reason):
2667 runQueueEvent.__init__(self, task, stats, rq)
2668 self.reason = reason
2669
2670class runQueuePipe():
2671 """
2672 Abstraction for a pipe between a worker thread and the server
2673 """
2674 def __init__(self, pipein, pipeout, d, rq, rqexec):
2675 self.input = pipein
2676 if pipeout:
2677 pipeout.close()
2678 bb.utils.nonblockingfd(self.input)
2679 self.queue = b""
2680 self.d = d
2681 self.rq = rq
2682 self.rqexec = rqexec
2683
2684 def setrunqueueexec(self, rqexec):
2685 self.rqexec = rqexec
2686
2687 def read(self):
2688 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
2689 for worker in workers.values():
2690 worker.process.poll()
2691 if worker.process.returncode is not None and not self.rq.teardown:
2692 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
2693 self.rq.finish_runqueue(True)
2694
2695 start = len(self.queue)
2696 try:
2697 self.queue = self.queue + (self.input.read(102400) or b"")
2698 except (OSError, IOError) as e:
2699 if e.errno != errno.EAGAIN:
2700 raise
2701 end = len(self.queue)
2702 found = True
2703 while found and len(self.queue):
2704 found = False
2705 index = self.queue.find(b"</event>")
2706 while index != -1 and self.queue.startswith(b"<event>"):
2707 try:
2708 event = pickle.loads(self.queue[7:index])
2709 except ValueError as e:
2710 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2711 bb.event.fire_from_worker(event, self.d)
2712 found = True
2713 self.queue = self.queue[index+8:]
2714 index = self.queue.find(b"</event>")
2715 index = self.queue.find(b"</exitcode>")
2716 while index != -1 and self.queue.startswith(b"<exitcode>"):
2717 try:
2718 task, status = pickle.loads(self.queue[10:index])
2719 except ValueError as e:
2720 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2721 self.rqexec.runqueue_process_waitpid(task, status)
2722 found = True
2723 self.queue = self.queue[index+11:]
2724 index = self.queue.find(b"</exitcode>")
2725 return (end > start)
2726
2727 def close(self):
2728 while self.read():
2729 continue
2730 if len(self.queue) > 0:
2731 print("Warning, worker left partial message: %s" % self.queue)
2732 self.input.close()
2733
2734def get_setscene_enforce_whitelist(d):
2735 if d.getVar('BB_SETSCENE_ENFORCE') != '1':
2736 return None
2737 whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split()
2738 outlist = []
2739 for item in whitelist[:]:
2740 if item.startswith('%:'):
2741 for target in sys.argv[1:]:
2742 if not target.startswith('-'):
2743 outlist.append(target.split(':')[0] + ':' + item.split(':')[1])
2744 else:
2745 outlist.append(item)
2746 return outlist
2747
2748def check_setscene_enforce_whitelist(pn, taskname, whitelist):
2749 import fnmatch
2750 if whitelist is not None:
2751 item = '%s:%s' % (pn, taskname)
2752 for whitelist_item in whitelist:
2753 if fnmatch.fnmatch(item, whitelist_item):
2754 return True
2755 return False
2756 return True