# encoding: utf-8 # Thomas Nagy, 2005-2008 (ita) # this replaces the core of Runner.py in waf with a varient that works # on systems with completely broken threading (such as Python 2.5.x on # AIX). For simplicity we enable this when JOBS=1, which is triggered # by the compatibility makefile used for the waf build. That also ensures # this code is tested, as it means it is used in the build farm, and by # anyone using 'make' to build Samba with waf "Execute the tasks" import sys, random, time, threading, traceback, os try: from Queue import Queue except ImportError: from queue import Queue import Build, Utils, Logs, Options from Logs import debug, error from Constants import * GAP = 15 run_old = threading.Thread.run def run(*args, **kwargs): try: run_old(*args, **kwargs) except (KeyboardInterrupt, SystemExit): raise except: sys.excepthook(*sys.exc_info()) threading.Thread.run = run class TaskConsumer(object): consumers = 1 def process(tsk): m = tsk.master if m.stop: m.out.put(tsk) return try: tsk.generator.bld.printout(tsk.display()) if tsk.__class__.stat: ret = tsk.__class__.stat(tsk) # actual call to task's run() function else: ret = tsk.call_run() except Exception, e: tsk.err_msg = Utils.ex_stack() tsk.hasrun = EXCEPTION # TODO cleanup m.error_handler(tsk) m.out.put(tsk) return if ret: tsk.err_code = ret tsk.hasrun = CRASHED else: try: tsk.post_run() except Utils.WafError: pass except Exception: tsk.err_msg = Utils.ex_stack() tsk.hasrun = EXCEPTION else: tsk.hasrun = SUCCESS if tsk.hasrun != SUCCESS: m.error_handler(tsk) m.out.put(tsk) class Parallel(object): """ keep the consumer threads busy, and avoid consuming cpu cycles when no more tasks can be added (end of the build, etc) """ def __init__(self, bld, j=2): # number of consumers self.numjobs = j self.manager = bld.task_manager self.manager.current_group = 0 self.total = self.manager.total() # tasks waiting to be processed - IMPORTANT self.outstanding = [] self.maxjobs = MAXJOBS # tasks that are awaiting for another task to complete self.frozen = [] # tasks returned by the consumers self.out = Queue(0) self.count = 0 # tasks not in the producer area self.processed = 1 # progress indicator self.stop = False # error condition to stop the build self.error = False # error flag def get_next(self): "override this method to schedule the tasks in a particular order" if not self.outstanding: return None return self.outstanding.pop(0) def postpone(self, tsk): "override this method to schedule the tasks in a particular order" # TODO consider using a deque instead if random.randint(0, 1): self.frozen.insert(0, tsk) else: self.frozen.append(tsk) def refill_task_list(self): "called to set the next group of tasks" while self.count > self.numjobs + GAP or self.count >= self.maxjobs: self.get_out() while not self.outstanding: if self.count: self.get_out() if self.frozen: self.outstanding += self.frozen self.frozen = [] elif not self.count: (jobs, tmp) = self.manager.get_next_set() if jobs is not None: self.maxjobs = jobs if tmp: self.outstanding += tmp break def get_out(self): "the tasks that are put to execute are all collected using get_out" ret = self.out.get() self.manager.add_finished(ret) if not self.stop and getattr(ret, 'more_tasks', None): self.outstanding += ret.more_tasks self.total += len(ret.more_tasks) self.count -= 1 def error_handler(self, tsk): "by default, errors make the build stop (not thread safe so be careful)" if not Options.options.keep: self.stop = True self.error = True def start(self): "execute the tasks" while not self.stop: self.refill_task_list() # consider the next task tsk = self.get_next() if not tsk: if self.count: # tasks may add new ones after they are run continue else: # no tasks to run, no tasks running, time to exit break if tsk.hasrun: # if the task is marked as "run", just skip it self.processed += 1 self.manager.add_finished(tsk) continue try: st = tsk.runnable_status() except Exception, e: self.processed += 1 if self.stop and not Options.options.keep: tsk.hasrun = SKIPPED self.manager.add_finished(tsk) continue self.error_handler(tsk) self.manager.add_finished(tsk) tsk.hasrun = EXCEPTION tsk.err_msg = Utils.ex_stack() continue if st == ASK_LATER: self.postpone(tsk) elif st == SKIP_ME: self.processed += 1 tsk.hasrun = SKIPPED self.manager.add_finished(tsk) else: # run me: put the task in ready queue tsk.position = (self.processed, self.total) self.count += 1 self.processed += 1 tsk.master = self process(tsk) # self.count represents the tasks that have been made available to the consumer threads # collect all the tasks after an error else the message may be incomplete while self.error and self.count: self.get_out() #print loop assert (self.count == 0 or self.stop) # enable nothreads import Runner Runner.process = process Runner.Parallel = Parallel