hydrus/include/HydrusThreading.py

245 lines
6.0 KiB
Python

import HydrusExceptions
import Queue
import random
import threading
import time
import traceback
import wx
import HydrusData
import HydrusGlobals
class DAEMON( threading.Thread ):
def __init__( self, name, period = 1200 ):
threading.Thread.__init__( self, name = name )
self._name = name
self._event = threading.Event()
HydrusGlobals.pubsub.sub( self, 'shutdown', 'shutdown' )
def shutdown( self ): self._event.set()
class DAEMONQueue( DAEMON ):
def __init__( self, name, callable, queue_topic, period = 10 ):
DAEMON.__init__( self, name )
self._callable = callable
self._queue = Queue.Queue()
self._queue_topic = queue_topic
self._period = period
HydrusGlobals.pubsub.sub( self, 'put', queue_topic )
self.start()
def put( self, data ): self._queue.put( data )
def run( self ):
time.sleep( 3 )
while True:
while self._queue.empty():
if HydrusGlobals.shutdown: return
self._event.wait( self._period )
self._event.clear()
while wx.GetApp().JustWokeFromSleep():
if HydrusGlobals.shutdown: return
time.sleep( 10 )
items = []
while not self._queue.empty(): items.append( self._queue.get() )
try:
self._callable( items )
except HydrusExceptions.ShutdownException:
return
except Exception as e:
HydrusData.ShowException( e )
class DAEMONWorker( DAEMON ):
def __init__( self, name, callable, topics = None, period = 1200, init_wait = 3, pre_callable_wait = 3 ):
if topics is None: topics = []
DAEMON.__init__( self, name )
self._callable = callable
self._topics = topics
self._period = period
self._init_wait = init_wait
self._pre_callable_wait = pre_callable_wait
for topic in topics: HydrusGlobals.pubsub.sub( self, 'set', topic )
self.start()
def run( self ):
self._event.wait( self._init_wait )
while True:
if HydrusGlobals.shutdown: return
time.sleep( self._pre_callable_wait )
if HydrusGlobals.shutdown: return
while wx.GetApp().JustWokeFromSleep():
if HydrusGlobals.shutdown: return
time.sleep( 10 )
try:
self._callable()
except HydrusExceptions.ShutdownException:
return
except Exception as e:
HydrusData.ShowText( 'Daemon ' + self._name + ' encountered an exception:' )
HydrusData.ShowException( e )
if HydrusGlobals.shutdown: return
self._event.wait( self._period )
self._event.clear()
def set( self, *args, **kwargs ): self._event.set()
class DAEMONCallToThread( DAEMON ):
def __init__( self ):
DAEMON.__init__( self, 'CallToThread' )
self._queue = Queue.Queue()
self.start()
def put( self, callable, *args, **kwargs ):
self._queue.put( ( callable, args, kwargs ) )
self._event.set()
def run( self ):
while True:
while self._queue.empty():
if HydrusGlobals.shutdown: return
self._event.wait( 1200 )
self._event.clear()
try:
( callable, args, kwargs ) = self._queue.get()
callable( *args, **kwargs )
del callable
except HydrusExceptions.ShutdownException:
return
except Exception as e:
HydrusData.ShowException( e )
time.sleep( 0.00001 )
call_to_threads = [ DAEMONCallToThread() for i in range( 10 ) ]
def CallToThread( callable, *args, **kwargs ):
call_to_thread = random.choice( call_to_threads )
while call_to_thread == threading.current_thread: call_to_thread = random.choice( call_to_threads )
call_to_thread.put( callable, *args, **kwargs )
def CallBlockingToWx( callable, *args, **kwargs ):
def wx_code( job_key ):
try:
result = callable( *args, **kwargs )
job_key.SetVariable( 'result', result )
except Exception as e:
print( 'CallBlockingToWx just caught this error:' )
print( traceback.format_exc() )
job_key.SetVariable( 'error', e )
finally: job_key.Finish()
job_key = HydrusData.JobKey()
job_key.Begin()
wx.CallAfter( wx_code, job_key )
while not job_key.IsDone():
if HydrusGlobals.shutdown: return
time.sleep( 0.05 )
if job_key.HasVariable( 'result' ): return job_key.GetVariable( 'result' )
else: raise job_key.GetVariable( 'error' )