hydrus/include/HydrusThreading.py

689 lines
17 KiB
Python
Raw Normal View History

2018-02-14 21:47:18 +00:00
import bisect
2018-02-07 23:40:33 +00:00
import collections
2015-04-29 19:20:35 +00:00
import HydrusExceptions
2014-05-21 21:37:35 +00:00
import Queue
2018-05-23 21:05:06 +00:00
import random
2014-05-21 21:37:35 +00:00
import threading
import time
import traceback
2015-03-25 22:04:19 +00:00
import HydrusData
2017-05-10 21:33:58 +00:00
import HydrusGlobals as HG
2015-09-16 18:11:00 +00:00
import os
2014-05-21 21:37:35 +00:00
2018-04-11 22:30:40 +00:00
NEXT_THREAD_CLEAROUT = 0
2015-09-16 18:11:00 +00:00
THREADS_TO_THREAD_INFO = {}
THREAD_INFO_LOCK = threading.Lock()
2018-04-11 22:30:40 +00:00
def ClearOutDeadThreads():
with THREAD_INFO_LOCK:
all_threads = list( THREADS_TO_THREAD_INFO.keys() )
for thread in all_threads:
if not thread.is_alive():
del THREADS_TO_THREAD_INFO[ thread ]
2015-09-16 18:11:00 +00:00
def GetThreadInfo( thread = None ):
2018-04-11 22:30:40 +00:00
global NEXT_THREAD_CLEAROUT
if HydrusData.TimeHasPassed( NEXT_THREAD_CLEAROUT ):
ClearOutDeadThreads()
NEXT_THREAD_CLEAROUT = HydrusData.GetNow() + 600
2015-09-16 18:11:00 +00:00
if thread is None:
thread = threading.current_thread()
with THREAD_INFO_LOCK:
if thread not in THREADS_TO_THREAD_INFO:
thread_info = {}
thread_info[ 'shutting_down' ] = False
THREADS_TO_THREAD_INFO[ thread ] = thread_info
return THREADS_TO_THREAD_INFO[ thread ]
def IsThreadShuttingDown():
2018-02-28 22:30:36 +00:00
me = threading.current_thread()
if isinstance( me, DAEMON ):
2015-09-16 18:11:00 +00:00
2018-02-28 22:30:36 +00:00
if HG.view_shutdown:
return True
else:
if HG.model_shutdown:
return True
2015-09-16 18:11:00 +00:00
thread_info = GetThreadInfo()
return thread_info[ 'shutting_down' ]
def ShutdownThread( thread ):
thread_info = GetThreadInfo( thread )
thread_info[ 'shutting_down' ] = True
2014-05-21 21:37:35 +00:00
class DAEMON( threading.Thread ):
2018-02-14 21:47:18 +00:00
def __init__( self, controller, name ):
2014-05-21 21:37:35 +00:00
threading.Thread.__init__( self, name = name )
2015-08-26 21:18:39 +00:00
self._controller = controller
2014-05-21 21:37:35 +00:00
self._name = name
self._event = threading.Event()
2015-08-26 21:18:39 +00:00
self._controller.sub( self, 'wake', 'wake_daemons' )
2015-09-16 18:11:00 +00:00
self._controller.sub( self, 'shutdown', 'shutdown' )
2017-12-13 22:33:07 +00:00
def _DoPreCall( self ):
if HG.daemon_report_mode:
HydrusData.ShowText( self._name + ' doing a job.' )
2015-09-16 18:11:00 +00:00
def shutdown( self ):
ShutdownThread( self )
self.wake()
2014-05-21 21:37:35 +00:00
2015-08-26 21:18:39 +00:00
def wake( self ):
self._event.set()
2014-05-21 21:37:35 +00:00
class DAEMONWorker( DAEMON ):
2016-12-14 21:19:07 +00:00
def __init__( self, controller, name, callable, topics = None, period = 3600, init_wait = 3, pre_call_wait = 0 ):
2016-02-17 22:06:47 +00:00
2017-08-09 21:33:51 +00:00
if topics is None:
topics = []
2016-02-17 22:06:47 +00:00
DAEMON.__init__( self, controller, name )
self._callable = callable
self._topics = topics
self._period = period
2016-11-30 20:24:17 +00:00
self._init_wait = init_wait
2016-12-14 21:19:07 +00:00
self._pre_call_wait = pre_call_wait
2016-02-17 22:06:47 +00:00
2017-08-09 21:33:51 +00:00
for topic in topics:
self._controller.sub( self, 'set', topic )
2016-02-17 22:06:47 +00:00
self.start()
2016-12-14 21:19:07 +00:00
def _CanStart( self, time_started_waiting ):
2016-02-17 22:06:47 +00:00
2016-12-14 21:19:07 +00:00
return self._PreCallWaitIsDone( time_started_waiting ) and self._ControllerIsOKWithIt()
2016-02-17 22:06:47 +00:00
2016-12-14 21:19:07 +00:00
def _ControllerIsOKWithIt( self ):
2015-03-25 22:04:19 +00:00
2016-12-14 21:19:07 +00:00
return True
2014-05-21 21:37:35 +00:00
2016-12-14 21:19:07 +00:00
def _PreCallWaitIsDone( self, time_started_waiting ):
2014-05-21 21:37:35 +00:00
2016-12-14 21:19:07 +00:00
# just shave a bit off so things that don't have any wait won't somehow have to wait a single accidentaly cycle
time_to_start = ( float( time_started_waiting ) - 0.1 ) + self._pre_call_wait
2014-05-21 21:37:35 +00:00
2016-12-14 21:19:07 +00:00
return HydrusData.TimeHasPassed( time_to_start )
2014-05-21 21:37:35 +00:00
def run( self ):
self._event.wait( self._init_wait )
while True:
2016-12-14 21:19:07 +00:00
if IsThreadShuttingDown():
return
2014-07-23 21:21:37 +00:00
2016-12-14 21:19:07 +00:00
time_started_waiting = HydrusData.GetNow()
2015-06-17 20:01:41 +00:00
2016-12-14 21:19:07 +00:00
while not self._CanStart( time_started_waiting ):
2014-10-29 21:39:01 +00:00
2015-09-16 18:11:00 +00:00
time.sleep( 1 )
2014-10-29 21:39:01 +00:00
2016-12-14 21:19:07 +00:00
if IsThreadShuttingDown():
return
2014-10-29 21:39:01 +00:00
2017-12-13 22:33:07 +00:00
self._DoPreCall()
2015-04-29 19:20:35 +00:00
try:
2015-11-04 22:30:28 +00:00
self._callable( self._controller )
2015-04-29 19:20:35 +00:00
except HydrusExceptions.ShutdownException:
return
2014-07-23 21:21:37 +00:00
except Exception as e:
2015-03-25 22:04:19 +00:00
HydrusData.ShowText( 'Daemon ' + self._name + ' encountered an exception:' )
2014-09-24 21:50:07 +00:00
2015-03-25 22:04:19 +00:00
HydrusData.ShowException( e )
2014-07-23 21:21:37 +00:00
2014-05-21 21:37:35 +00:00
2015-09-16 18:11:00 +00:00
if IsThreadShuttingDown(): return
2014-05-21 21:37:35 +00:00
self._event.wait( self._period )
self._event.clear()
def set( self, *args, **kwargs ): self._event.set()
2016-12-14 21:19:07 +00:00
# Big stuff like DB maintenance that we don't want to run while other important stuff is going on, like user interaction or vidya on another process
class DAEMONBackgroundWorker( DAEMONWorker ):
def _ControllerIsOKWithIt( self ):
return self._controller.GoodTimeToDoBackgroundWork()
# Big stuff that we want to run when the user sees, but not at the expense of something else, like laggy session load
class DAEMONForegroundWorker( DAEMONWorker ):
def _ControllerIsOKWithIt( self ):
return self._controller.GoodTimeToDoForegroundWork()
2018-02-14 21:47:18 +00:00
class THREADCallToThread( DAEMON ):
def __init__( self, controller, name ):
DAEMON.__init__( self, controller, name )
self._queue = Queue.Queue()
self._currently_working = True # start off true so new threads aren't used twice by two quick successive calls
def CurrentlyWorking( self ):
return self._currently_working
def put( self, callable, *args, **kwargs ):
self._currently_working = True
self._queue.put( ( callable, args, kwargs ) )
self._event.set()
def run( self ):
while True:
try:
while self._queue.empty():
2018-02-28 22:30:36 +00:00
if IsThreadShuttingDown():
2018-02-14 21:47:18 +00:00
return
self._event.wait( 1200 )
self._event.clear()
self._DoPreCall()
( callable, args, kwargs ) = self._queue.get()
callable( *args, **kwargs )
del callable
except HydrusExceptions.ShutdownException:
return
except Exception as e:
HydrusData.Print( traceback.format_exc() )
HydrusData.ShowException( e )
finally:
self._currently_working = False
time.sleep( 0.00001 )
class JobScheduler( threading.Thread ):
2018-02-07 23:40:33 +00:00
def __init__( self, controller ):
2018-02-14 21:47:18 +00:00
threading.Thread.__init__( self, name = 'Job Scheduler' )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._controller = controller
2018-02-07 23:40:33 +00:00
self._waiting = []
self._waiting_lock = threading.Lock()
2018-02-14 21:47:18 +00:00
self._new_job_arrived = threading.Event()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._cancel_filter_needed = threading.Event()
2018-02-07 23:40:33 +00:00
self._sort_needed = threading.Event()
2018-02-14 21:47:18 +00:00
self._controller.sub( self, 'shutdown', 'shutdown' )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def _FilterCancelled( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
with self._waiting_lock:
self._waiting = [ job for job in self._waiting if not job.IsCancelled() ]
def _GetLoopWaitTime( self ):
2018-02-07 23:40:33 +00:00
with self._waiting_lock:
2018-02-14 21:47:18 +00:00
if len( self._waiting ) == 0:
return 0.2
next_job = self._waiting[0]
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
time_delta_until_due = next_job.GetTimeDeltaUntilDue()
return min( 1.0, time_delta_until_due )
2018-02-07 23:40:33 +00:00
def _NoWorkToStart( self ):
with self._waiting_lock:
if len( self._waiting ) == 0:
return True
next_job = self._waiting[0]
2018-02-14 21:47:18 +00:00
if next_job.IsDue():
2018-02-07 23:40:33 +00:00
return False
else:
return True
def _SortWaiting( self ):
# sort the waiting jobs in ascending order of expected work time
2018-02-14 21:47:18 +00:00
with self._waiting_lock: # this uses __lt__ to sort
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._waiting.sort()
2018-02-07 23:40:33 +00:00
def _StartWork( self ):
2018-05-23 21:05:06 +00:00
jobs_started = 0
2018-02-07 23:40:33 +00:00
while True:
with self._waiting_lock:
if len( self._waiting ) == 0:
break
2018-05-23 21:05:06 +00:00
if jobs_started >= 10: # try to avoid spikes
break
2018-02-07 23:40:33 +00:00
next_job = self._waiting[0]
2018-02-14 21:47:18 +00:00
if next_job.IsDue():
2018-02-07 23:40:33 +00:00
next_job = self._waiting.pop( 0 )
2018-02-14 21:47:18 +00:00
next_job.StartWork()
2018-02-07 23:40:33 +00:00
2018-05-23 21:05:06 +00:00
jobs_started += 1
2018-02-07 23:40:33 +00:00
else:
break # all the rest in the queue are not due
2018-02-14 21:47:18 +00:00
def AddJob( self, job ):
with self._waiting_lock:
bisect.insort( self._waiting, job )
self._new_job_arrived.set()
2018-02-28 22:30:36 +00:00
def ClearOutDead( self ):
with self._waiting_lock:
self._waiting = [ job for job in self._waiting if not job.IsDead() ]
2018-02-21 21:59:37 +00:00
def GetPrettyJobSummary( self ):
with self._waiting_lock:
num_jobs = len( self._waiting )
job_lines = [ repr( job ) for job in self._waiting ]
2018-07-04 20:48:28 +00:00
lines = [ HydrusData.ToHumanInt( num_jobs ) + ' jobs:' ] + job_lines
2018-02-21 21:59:37 +00:00
text = os.linesep.join( lines )
return text
2018-02-14 21:47:18 +00:00
def JobCancelled( self ):
self._cancel_filter_needed.set()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def shutdown( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
ShutdownThread( self )
2018-02-07 23:40:33 +00:00
def WorkTimesHaveChanged( self ):
self._sort_needed.set()
def run( self ):
while True:
try:
while self._NoWorkToStart():
2018-02-28 22:30:36 +00:00
if IsThreadShuttingDown():
2018-02-07 23:40:33 +00:00
return
#
2018-02-14 21:47:18 +00:00
if self._cancel_filter_needed.is_set():
self._FilterCancelled()
self._cancel_filter_needed.clear()
2018-02-07 23:40:33 +00:00
if self._sort_needed.is_set():
self._SortWaiting()
self._sort_needed.clear()
2018-02-14 21:47:18 +00:00
continue # if some work is now due, let's do it!
#
wait_time = self._GetLoopWaitTime()
self._new_job_arrived.wait( wait_time )
self._new_job_arrived.clear()
2018-02-07 23:40:33 +00:00
self._StartWork()
except HydrusExceptions.ShutdownException:
return
except Exception as e:
HydrusData.Print( traceback.format_exc() )
HydrusData.ShowException( e )
time.sleep( 0.00001 )
2018-02-14 21:47:18 +00:00
class SchedulableJob( object ):
2018-02-07 23:40:33 +00:00
2018-05-16 20:09:50 +00:00
def __init__( self, controller, scheduler, initial_delay, work_callable ):
2018-02-07 23:40:33 +00:00
self._controller = controller
2018-02-14 21:47:18 +00:00
self._scheduler = scheduler
2018-02-07 23:40:33 +00:00
self._work_callable = work_callable
2018-02-14 21:47:18 +00:00
self._next_work_time = HydrusData.GetNowFloat() + initial_delay
2018-02-07 23:40:33 +00:00
self._work_lock = threading.Lock()
self._currently_working = threading.Event()
2018-02-14 21:47:18 +00:00
self._is_cancelled = threading.Event()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def __lt__( self, other ): # for the scheduler to do bisect.insort noice
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
return self._next_work_time < other._next_work_time
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def __repr__( self ):
2018-02-07 23:40:33 +00:00
2018-07-04 20:48:28 +00:00
return repr( self.__class__ ) + ': ' + repr( self._work_callable ) + ' next in ' + HydrusData.TimeDeltaToPrettyTimeDelta( self._next_work_time - HydrusData.GetNowFloat() )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def _BootWorker( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._controller.CallToThread( self.Work )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def Cancel( self ):
self._is_cancelled.set()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._scheduler.JobCancelled()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def CurrentlyWorking( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
return self._currently_working.is_set()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def GetTimeDeltaUntilDue( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
return HydrusData.GetTimeDeltaUntilTimeFloat( self._next_work_time )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def IsCancelled( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
return self._is_cancelled.is_set()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
2018-02-28 22:30:36 +00:00
def IsDead( self ):
return False
2018-02-14 21:47:18 +00:00
def IsDue( self ):
return HydrusData.TimeHasPassedFloat( self._next_work_time )
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
def StartWork( self ):
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
if self._is_cancelled.is_set():
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
return
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
self._currently_working.set()
self._BootWorker()
2018-02-07 23:40:33 +00:00
2018-05-23 21:05:06 +00:00
def Wake( self, next_work_time = None ):
2018-05-16 20:09:50 +00:00
2018-05-23 21:05:06 +00:00
if next_work_time is None:
next_work_time = HydrusData.GetNowFloat()
self._next_work_time = next_work_time
2018-05-16 20:09:50 +00:00
self._scheduler.WorkTimesHaveChanged()
2018-02-07 23:40:33 +00:00
def Work( self ):
2018-02-14 21:47:18 +00:00
try:
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
with self._work_lock:
2018-02-07 23:40:33 +00:00
self._work_callable()
2018-02-14 21:47:18 +00:00
finally:
self._currently_working.clear()
2018-02-07 23:40:33 +00:00
2018-02-14 21:47:18 +00:00
class RepeatingJob( SchedulableJob ):
2014-05-21 21:37:35 +00:00
2018-05-16 20:09:50 +00:00
def __init__( self, controller, scheduler, initial_delay, period, work_callable ):
2014-05-21 21:37:35 +00:00
2018-05-16 20:09:50 +00:00
SchedulableJob.__init__( self, controller, scheduler, initial_delay, work_callable )
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
self._period = period
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
self._stop_repeating = threading.Event()
2016-07-20 19:57:10 +00:00
2018-02-14 21:47:18 +00:00
def Cancel( self ):
2016-07-20 19:57:10 +00:00
2018-02-14 21:47:18 +00:00
SchedulableJob.Cancel( self )
self._stop_repeating.set()
2016-07-20 19:57:10 +00:00
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
def Delay( self, delay ):
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
self._next_work_time = HydrusData.GetNowFloat() + delay
2017-08-09 21:33:51 +00:00
2018-02-14 21:47:18 +00:00
self._scheduler.WorkTimesHaveChanged()
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
def IsFinishedWorking( self ):
return self._stop_repeating.is_set()
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
def SetPeriod( self, period ):
2014-05-21 21:37:35 +00:00
2018-05-23 21:05:06 +00:00
if period > 10.0:
period += random.random() # smooth out future spikes if ten of these all fire at the same time
2018-02-14 21:47:18 +00:00
self._period = period
def StartWork( self ):
if self._stop_repeating.is_set():
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
return
2014-05-21 21:37:35 +00:00
2018-02-14 21:47:18 +00:00
SchedulableJob.StartWork( self )
def Work( self ):
SchedulableJob.Work( self )
if not self._stop_repeating.is_set():
self._next_work_time = HydrusData.GetNowFloat() + self._period
self._scheduler.AddJob( self )
2014-05-28 21:03:24 +00:00
2014-05-21 21:37:35 +00:00
2016-12-14 21:19:07 +00:00