hydrus/include/HydrusController.py

595 lines
15 KiB
Python
Raw Normal View History

2015-04-01 20:44:54 +00:00
import collections
import gc
import HydrusConstants as HC
2015-08-26 21:18:39 +00:00
import HydrusDaemons
2015-04-01 20:44:54 +00:00
import HydrusData
2015-04-22 22:57:25 +00:00
import HydrusDB
2015-04-01 20:44:54 +00:00
import HydrusExceptions
2017-05-10 21:33:58 +00:00
import HydrusGlobals as HG
2017-09-20 19:47:31 +00:00
import HydrusPaths
2015-04-01 20:44:54 +00:00
import HydrusPubSub
2015-08-26 21:18:39 +00:00
import HydrusThreading
2016-03-09 19:37:14 +00:00
import os
2015-08-26 21:18:39 +00:00
import random
2015-04-01 20:44:54 +00:00
import sys
import threading
import time
import traceback
2015-09-02 23:16:09 +00:00
class HydrusController( object ):
2015-04-01 20:44:54 +00:00
2016-10-19 20:02:56 +00:00
def __init__( self, db_dir, no_daemons, no_wal ):
2015-09-02 23:16:09 +00:00
2017-05-10 21:33:58 +00:00
HG.controller = self
2015-09-02 23:16:09 +00:00
2017-03-08 23:23:12 +00:00
self._name = 'hydrus'
2017-07-12 20:03:45 +00:00
self.db_dir = db_dir
2016-10-19 20:02:56 +00:00
self._no_daemons = no_daemons
self._no_wal = no_wal
2016-01-20 23:57:33 +00:00
2017-07-12 20:03:45 +00:00
self._no_wal_path = os.path.join( self.db_dir, 'no-wal' )
2016-03-09 19:37:14 +00:00
if os.path.exists( self._no_wal_path ):
self._no_wal = True
2017-07-12 20:03:45 +00:00
self.db = None
2016-10-19 20:02:56 +00:00
2015-11-04 22:30:28 +00:00
self._model_shutdown = False
self._view_shutdown = False
2018-01-24 23:09:42 +00:00
self._pubsub = HydrusPubSub.HydrusPubSub( self )
2015-09-02 23:16:09 +00:00
self._daemons = []
self._caches = {}
self._managers = {}
2018-02-14 21:47:18 +00:00
self._job_scheduler = None
2016-07-20 19:57:10 +00:00
self._call_to_threads = []
2017-08-09 21:33:51 +00:00
self._long_running_call_to_threads = []
2015-09-02 23:16:09 +00:00
2018-02-14 21:47:18 +00:00
self._call_to_thread_lock = threading.Lock()
2015-09-02 23:16:09 +00:00
self._timestamps = collections.defaultdict( lambda: 0 )
self._timestamps[ 'boot' ] = HydrusData.GetNow()
self._just_woke_from_sleep = False
self._system_busy = False
2017-08-09 21:33:51 +00:00
self.CallToThreadLongRunning( self.DAEMONPubSub )
2017-07-27 00:47:13 +00:00
2015-09-02 23:16:09 +00:00
2016-07-20 19:57:10 +00:00
def _GetCallToThread( self ):
2018-02-14 21:47:18 +00:00
with self._call_to_thread_lock:
2016-07-20 19:57:10 +00:00
2018-02-14 21:47:18 +00:00
for call_to_thread in self._call_to_threads:
2016-07-20 19:57:10 +00:00
2018-02-14 21:47:18 +00:00
if not call_to_thread.CurrentlyWorking():
return call_to_thread
2016-07-20 19:57:10 +00:00
2018-02-14 21:47:18 +00:00
# all the threads in the pool are currently busy
2017-02-08 22:27:00 +00:00
2018-02-14 21:47:18 +00:00
calling_from_the_thread_pool = threading.current_thread() in self._call_to_threads
2017-02-08 22:27:00 +00:00
2018-02-14 21:47:18 +00:00
if calling_from_the_thread_pool or len( self._call_to_threads ) < 10:
call_to_thread = HydrusThreading.THREADCallToThread( self, 'CallToThread' )
self._call_to_threads.append( call_to_thread )
call_to_thread.start()
else:
call_to_thread = random.choice( self._call_to_threads )
2017-02-08 22:27:00 +00:00
2018-02-14 21:47:18 +00:00
return call_to_thread
2016-07-20 19:57:10 +00:00
2017-08-09 21:33:51 +00:00
def _GetCallToThreadLongRunning( self ):
2018-02-14 21:47:18 +00:00
with self._call_to_thread_lock:
2017-08-09 21:33:51 +00:00
2018-02-14 21:47:18 +00:00
for call_to_thread in self._long_running_call_to_threads:
2017-08-09 21:33:51 +00:00
2018-02-14 21:47:18 +00:00
if not call_to_thread.CurrentlyWorking():
return call_to_thread
2017-08-09 21:33:51 +00:00
2018-02-14 21:47:18 +00:00
call_to_thread = HydrusThreading.THREADCallToThread( self, 'CallToThreadLongRunning' )
self._long_running_call_to_threads.append( call_to_thread )
call_to_thread.start()
return call_to_thread
2017-08-09 21:33:51 +00:00
2015-09-02 23:16:09 +00:00
def _InitDB( self ):
raise NotImplementedError()
2018-02-14 21:47:18 +00:00
def _MaintainCallToThreads( self ):
# we don't really want to hang on to threads that are done as event.wait() has a bit of idle cpu
# so, any that are in the pools that aren't doing anything can be killed and sent to garbage
with self._call_to_thread_lock:
def filter_call_to_threads( t ):
if t.CurrentlyWorking():
return True
else:
t.shutdown()
return False
self._call_to_threads = filter( filter_call_to_threads, self._call_to_threads )
self._long_running_call_to_threads = filter( filter_call_to_threads, self._long_running_call_to_threads )
2015-04-01 20:44:54 +00:00
def _Read( self, action, *args, **kwargs ):
2017-07-12 20:03:45 +00:00
result = self.db.Read( action, HC.HIGH_PRIORITY, *args, **kwargs )
2015-04-01 20:44:54 +00:00
return result
2017-11-08 22:07:12 +00:00
def _ReportShutdownDaemonsStatus( self ):
pass
2015-09-16 18:11:00 +00:00
def _ShutdownDaemons( self ):
for daemon in self._daemons:
daemon.shutdown()
while True in ( daemon.is_alive() for daemon in self._daemons ):
2017-11-08 22:07:12 +00:00
self._ReportShutdownDaemonsStatus()
2015-09-16 18:11:00 +00:00
time.sleep( 0.1 )
self._daemons = []
2015-04-01 20:44:54 +00:00
def _Write( self, action, priority, synchronous, *args, **kwargs ):
2017-07-12 20:03:45 +00:00
result = self.db.Write( action, priority, synchronous, *args, **kwargs )
2015-04-01 20:44:54 +00:00
return result
2015-08-26 21:18:39 +00:00
def pub( self, topic, *args, **kwargs ):
2015-04-01 20:44:54 +00:00
2017-08-09 21:33:51 +00:00
if self._model_shutdown:
self._pubsub.pubimmediate( topic, *args, **kwargs )
else:
self._pubsub.pub( topic, *args, **kwargs )
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
def pubimmediate( self, topic, *args, **kwargs ):
self._pubsub.pubimmediate( topic, *args, **kwargs )
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
def sub( self, object, method_name, topic ):
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
self._pubsub.sub( object, method_name, topic )
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
2018-02-14 21:47:18 +00:00
def CallLater( self, delay, func, *args, **kwargs ):
call = HydrusData.Call( func, *args, **kwargs )
job = HydrusThreading.SchedulableJob( self, self._job_scheduler, call, initial_delay = delay )
self._job_scheduler.AddJob( job )
return job
2015-08-26 21:18:39 +00:00
def CallToThread( self, callable, *args, **kwargs ):
2017-06-21 21:15:59 +00:00
if HG.callto_report_mode:
what_to_report = [ callable ]
if len( args ) > 0:
what_to_report.append( args )
if len( kwargs ) > 0:
what_to_report.append( kwargs )
HydrusData.ShowText( tuple( what_to_report ) )
2016-07-20 19:57:10 +00:00
call_to_thread = self._GetCallToThread()
2015-08-26 21:18:39 +00:00
call_to_thread.put( callable, *args, **kwargs )
2017-08-09 21:33:51 +00:00
def CallToThreadLongRunning( self, callable, *args, **kwargs ):
if HG.callto_report_mode:
what_to_report = [ callable ]
if len( args ) > 0:
what_to_report.append( args )
if len( kwargs ) > 0:
what_to_report.append( kwargs )
HydrusData.ShowText( tuple( what_to_report ) )
call_to_thread = self._GetCallToThreadLongRunning()
call_to_thread.put( callable, *args, **kwargs )
2015-08-26 21:18:39 +00:00
def ClearCaches( self ):
for cache in self._caches.values(): cache.Clear()
2016-03-09 19:37:14 +00:00
def CreateNoWALFile( self ):
with open( self._no_wal_path, 'wb' ) as f:
f.write( 'This file was created because the database failed to set WAL journalling. It will not reattempt WAL as long as this file exists.' )
2017-04-05 21:16:40 +00:00
def CurrentlyIdle( self ):
return True
2015-04-01 20:44:54 +00:00
2016-08-31 19:55:14 +00:00
def DBCurrentlyDoingJob( self ):
2017-07-12 20:03:45 +00:00
if self.db is None:
2016-08-31 19:55:14 +00:00
return False
else:
2017-07-12 20:03:45 +00:00
return self.db.CurrentlyDoingJob()
2016-08-31 19:55:14 +00:00
2017-08-16 21:58:06 +00:00
def GetBootTime( self ):
return self._timestamps[ 'boot' ]
2017-05-24 20:28:24 +00:00
def GetDBDir( self ):
2016-10-19 20:02:56 +00:00
2017-07-12 20:03:45 +00:00
return self.db_dir
2016-10-19 20:02:56 +00:00
2017-05-24 20:28:24 +00:00
def GetDBStatus( self ):
2016-10-19 20:02:56 +00:00
2017-07-12 20:03:45 +00:00
return self.db.GetStatus()
2017-05-24 20:28:24 +00:00
def GetCache( self, name ):
return self._caches[ name ]
2016-10-19 20:02:56 +00:00
2015-04-01 20:44:54 +00:00
2016-10-19 20:02:56 +00:00
def GetManager( self, name ):
return self._managers[ name ]
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
def GoodTimeToDoBackgroundWork( self ):
2016-12-14 21:19:07 +00:00
return self.CurrentlyIdle() and not ( self.JustWokeFromSleep() or self.SystemBusy() )
def GoodTimeToDoForegroundWork( self ):
return True
2015-08-26 21:18:39 +00:00
2015-04-01 20:44:54 +00:00
def JustWokeFromSleep( self ):
2015-08-26 21:18:39 +00:00
self.SleepCheck()
2015-04-01 20:44:54 +00:00
return self._just_woke_from_sleep
2015-09-02 23:16:09 +00:00
def InitModel( self ):
2015-08-26 21:18:39 +00:00
2017-09-20 19:47:31 +00:00
self.temp_dir = HydrusPaths.GetTempDir()
2018-02-14 21:47:18 +00:00
self._job_scheduler = HydrusThreading.JobScheduler( self )
self._job_scheduler.start()
2017-07-12 20:03:45 +00:00
self.db = self._InitDB()
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
2015-09-02 23:16:09 +00:00
def InitView( self ):
2015-04-01 20:44:54 +00:00
2016-01-20 23:57:33 +00:00
if not self._no_daemons:
self._daemons.append( HydrusThreading.DAEMONWorker( self, 'SleepCheck', HydrusDaemons.DAEMONSleepCheck, period = 120 ) )
2017-07-05 21:09:28 +00:00
self._daemons.append( HydrusThreading.DAEMONWorker( self, 'MaintainMemoryFast', HydrusDaemons.DAEMONMaintainMemoryFast, period = 60 ) )
self._daemons.append( HydrusThreading.DAEMONWorker( self, 'MaintainMemorySlow', HydrusDaemons.DAEMONMaintainMemorySlow, period = 300 ) )
2016-01-20 23:57:33 +00:00
2017-04-05 21:16:40 +00:00
self._daemons.append( HydrusThreading.DAEMONBackgroundWorker( self, 'MaintainDB', HydrusDaemons.DAEMONMaintainDB, period = 300, init_wait = 60 ) )
2016-02-17 22:06:47 +00:00
2015-04-01 20:44:54 +00:00
2016-08-31 19:55:14 +00:00
def IsFirstStart( self ):
2017-07-12 20:03:45 +00:00
if self.db is None:
2016-08-31 19:55:14 +00:00
return False
else:
2017-07-12 20:03:45 +00:00
return self.db.IsFirstStart()
2016-08-31 19:55:14 +00:00
2016-05-11 18:16:39 +00:00
def MaintainDB( self, stop_time = None ):
2015-04-01 20:44:54 +00:00
2015-09-02 23:16:09 +00:00
pass
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
2017-07-05 21:09:28 +00:00
def MaintainMemorySlow( self ):
2015-08-26 21:18:39 +00:00
sys.stdout.flush()
sys.stderr.flush()
gc.collect()
2018-01-24 23:09:42 +00:00
HydrusPaths.CleanUpOldTempPaths()
2018-02-14 21:47:18 +00:00
self._MaintainCallToThreads()
2015-08-26 21:18:39 +00:00
2015-11-04 22:30:28 +00:00
def ModelIsShutdown( self ):
return self._model_shutdown
2017-03-08 23:23:12 +00:00
def PrintProfile( self, summary, profile_text ):
2018-02-07 23:40:33 +00:00
boot_pretty_timestamp = time.strftime( '%Y-%m-%d %H-%M-%S', time.localtime( self._timestamps[ 'boot' ] ) )
2017-03-08 23:23:12 +00:00
profile_log_filename = self._name + ' profile - ' + boot_pretty_timestamp + '.log'
2017-07-12 20:03:45 +00:00
profile_log_path = os.path.join( self.db_dir, profile_log_filename )
2017-03-08 23:23:12 +00:00
with open( profile_log_path, 'a' ) as f:
2018-01-31 22:58:15 +00:00
prefix = time.strftime( '%Y/%m/%d %H:%M:%S: ' )
2017-03-08 23:23:12 +00:00
f.write( prefix + summary )
f.write( os.linesep * 2 )
f.write( profile_text )
2015-08-26 21:18:39 +00:00
def ProcessPubSub( self ):
2017-07-27 00:47:13 +00:00
self._pubsub.Process()
2015-04-01 20:44:54 +00:00
2017-03-02 02:14:56 +00:00
def Read( self, action, *args, **kwargs ):
return self._Read( action, *args, **kwargs )
2017-06-21 21:15:59 +00:00
def ReportDataUsed( self, num_bytes ):
pass
def ReportRequestUsed( self ):
2017-03-02 02:14:56 +00:00
pass
2015-04-01 20:44:54 +00:00
2015-09-02 23:16:09 +00:00
def ShutdownModel( self ):
2015-08-05 18:42:35 +00:00
2015-11-04 22:30:28 +00:00
self._model_shutdown = True
2017-05-10 21:33:58 +00:00
HG.model_shutdown = True
2015-04-01 20:44:54 +00:00
2017-07-12 20:03:45 +00:00
if self.db is not None:
2016-01-13 22:08:19 +00:00
2017-07-27 00:47:13 +00:00
while not self.db.LoopIsFinished():
time.sleep( 0.1 )
2016-01-13 22:08:19 +00:00
2015-04-01 20:44:54 +00:00
2018-02-14 21:47:18 +00:00
if self._job_scheduler is not None:
self._job_scheduler.shutdown()
self._job_scheduler = None
2017-11-22 21:03:07 +00:00
if hasattr( self, 'temp_dir' ):
HydrusPaths.DeletePath( self.temp_dir )
2017-09-20 19:47:31 +00:00
2015-04-01 20:44:54 +00:00
2015-09-02 23:16:09 +00:00
def ShutdownView( self ):
2015-11-04 22:30:28 +00:00
self._view_shutdown = True
2017-05-10 21:33:58 +00:00
HG.view_shutdown = True
2015-09-02 23:16:09 +00:00
2015-09-16 18:11:00 +00:00
self._ShutdownDaemons()
2015-09-02 23:16:09 +00:00
2015-09-16 18:11:00 +00:00
def ShutdownFromServer( self ):
raise Exception( 'This hydrus application cannot be shut down from the server!' )
2015-09-02 23:16:09 +00:00
2015-08-26 21:18:39 +00:00
def SleepCheck( self ):
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
if HydrusData.TimeHasPassed( self._timestamps[ 'now_awake' ] ):
last_sleep_check = self._timestamps[ 'last_sleep_check' ]
if last_sleep_check == 0:
self._just_woke_from_sleep = False
else:
if HydrusData.TimeHasPassed( last_sleep_check + 600 ):
self._just_woke_from_sleep = True
self._timestamps[ 'now_awake' ] = HydrusData.GetNow() + 180
else:
self._just_woke_from_sleep = False
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
self._timestamps[ 'last_sleep_check' ] = HydrusData.GetNow()
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
def SystemBusy( self ):
2015-04-01 20:44:54 +00:00
2015-08-26 21:18:39 +00:00
return self._system_busy
2015-04-01 20:44:54 +00:00
2015-11-04 22:30:28 +00:00
def ViewIsShutdown( self ):
return self._view_shutdown
2017-10-04 17:51:58 +00:00
def WaitUntilDBEmpty( self ):
while True:
if self._model_shutdown:
raise HydrusExceptions.ShutdownException( 'Application shutting down!' )
elif self.db.JobsQueueEmpty() and not self.db.CurrentlyDoingJob():
return
else:
time.sleep( 0.00001 )
def WaitUntilModelFree( self ):
self.WaitUntilPubSubsEmpty()
self.WaitUntilDBEmpty()
2015-08-26 21:18:39 +00:00
def WaitUntilPubSubsEmpty( self ):
2015-04-01 20:44:54 +00:00
while True:
2017-10-04 17:51:58 +00:00
if self._model_shutdown:
2015-12-30 23:44:09 +00:00
raise HydrusExceptions.ShutdownException( 'Application shutting down!' )
2017-07-27 00:47:13 +00:00
elif not self._pubsub.WorkToDo() and not self._pubsub.DoingWork():
2015-12-30 23:44:09 +00:00
return
else:
time.sleep( 0.00001 )
2015-04-01 20:44:54 +00:00
def Write( self, action, *args, **kwargs ):
return self._Write( action, HC.HIGH_PRIORITY, False, *args, **kwargs )
2016-04-20 20:42:21 +00:00
def WriteInterruptable( self, action, *args, **kwargs ):
return self._Write( action, HC.INTERRUPTABLE_PRIORITY, True, *args, **kwargs )
2015-04-01 20:44:54 +00:00
def WriteSynchronous( self, action, *args, **kwargs ):
return self._Write( action, HC.LOW_PRIORITY, True, *args, **kwargs )
2016-12-14 21:19:07 +00:00
2017-07-27 00:47:13 +00:00
def DAEMONPubSub( self ):
while not HG.model_shutdown:
if self._pubsub.WorkToDo():
try:
self.ProcessPubSub()
except Exception as e:
HydrusData.ShowException( e, do_wait = True )
else:
self._pubsub.WaitOnPub()