00001 import time
00002 import thread
00003 import threading
00004 import traceback
00005
00006
00007
00008 class SyncCall:
00009
00010 def __init__(self):
00011 self.requested = False
00012 self.returned = False
00013 self.func = None
00014 self.args = None
00015 self.result = None
00016 self.main_thread_id = thread.get_ident()
00017
00018 def __call__( self, func, args=() ):
00019
00020 if thread.get_ident()==self.main_thread_id:
00021 return func(*args)
00022 else:
00023 self.func = func
00024 self.args = args
00025 self.result = None
00026 self.returned = False
00027 self.requested = True
00028
00029 while not self.returned:
00030 time.sleep(0.01)
00031
00032 self.requested = False
00033 self.returned = False
00034
00035 return self.result
00036
00037 def check(self):
00038 assert( thread.get_ident()==self.main_thread_id )
00039 if self.requested:
00040 self.requested = False
00041 self.result = self.func( *self.args )
00042 self.returned = True
00043
00044
00045
00046 JOB_STATUS_WAITING = 0
00047 JOB_STATUS_RUNNING = 1
00048 JOB_STATUS_FINISHED = 2
00049
00050
00051
00052
00053
00054
00055
00056
00057 class JobItem:
00058
00059
00060
00061
00062
00063 def __init__( self, subthread_func=None, finished_func=None ):
00064 self.status = JOB_STATUS_WAITING
00065 self.cancel_requested = False
00066 self.subthread_func = subthread_func
00067 self.finished_func = finished_func
00068
00069
00070 def cancel(self):
00071 self.cancel_requested = True
00072
00073
00074
00075
00076
00077 def isCanceled(self):
00078 return self.cancel_requested
00079
00080 job_queue_list = []
00081 job_queue = None
00082
00083
00084
00085
00086
00087
00088
00089 class JobQueue(threading.Thread):
00090
00091 @staticmethod
00092 def checkAll():
00093 for job_queue in job_queue_list:
00094 job_queue.check()
00095
00096 @staticmethod
00097 def joinAll():
00098 for job_queue in job_queue_list:
00099 job_queue.join()
00100
00101 @staticmethod
00102 def cancelAll():
00103 for job_queue in job_queue_list:
00104 job_queue.cancel()
00105
00106 @staticmethod
00107 def createDefaultQueue():
00108 global job_queue
00109 job_queue = JobQueue()
00110
00111
00112 def __init__(self):
00113 threading.Thread.__init__(self)
00114 self.sema = threading.Semaphore(0)
00115 self.items = []
00116 self.cancel_requested = False
00117 job_queue_list.append(self)
00118 self.start()
00119
00120
00121 def destroy(self):
00122 job_queue_list.remove(self)
00123
00124 def run(self):
00125
00126 while True:
00127
00128 self.sema.acquire()
00129
00130 if self.cancel_requested : break
00131
00132 item = self.items[0]
00133
00134 try:
00135 if item.subthread_func : item.subthread_func( item )
00136 except:
00137 traceback.print_exc()
00138
00139 item.status = JOB_STATUS_FINISHED
00140
00141
00142
00143
00144
00145
00146 def enqueue( self, item ):
00147 self.items.append(item)
00148
00149 def check(self):
00150
00151 while True:
00152
00153 if len(self.items)==0 : return
00154
00155 item = self.items[0]
00156
00157 if item.status == JOB_STATUS_WAITING:
00158
00159 if item.isCanceled():
00160 item.status = JOB_STATUS_FINISHED
00161 else:
00162 item.status = JOB_STATUS_RUNNING
00163 self.sema.release()
00164 return
00165
00166 elif item.status == JOB_STATUS_RUNNING:
00167 return
00168
00169 elif item.status == JOB_STATUS_FINISHED:
00170 try:
00171 if item.finished_func : item.finished_func( item )
00172 except:
00173 traceback.print_exc()
00174
00175 if len(self.items):
00176 del self.items[0]
00177
00178 else:
00179 assert(0)
00180
00181
00182 def numItems(self):
00183 return len(self.items)
00184
00185
00186 def cancel(self):
00187 for item in self.items:
00188 item.cancel()
00189
00190
00191 def join(self):
00192
00193 while True:
00194 if len(self.items)==0 : break
00195 self.check()
00196 time.sleep(0.1)
00197
00198 self.cancel_requested = True
00199 self.sema.release()
00200
00201 threading.Thread.join(self)
00202
00203
00204
00205