2015-04-22 22:57:25 +00:00
import cProfile
2015-11-25 22:00:57 +00:00
import cStringIO
2016-03-02 21:00:30 +00:00
import distutils . version
2015-04-22 22:57:25 +00:00
import HydrusConstants as HC
import HydrusData
import HydrusExceptions
import HydrusGlobals
2016-04-14 01:54:29 +00:00
import HydrusPaths
2015-04-22 22:57:25 +00:00
import os
2016-04-14 01:54:29 +00:00
import psutil
2015-04-22 22:57:25 +00:00
import Queue
2016-01-06 21:17:20 +00:00
import random
2015-04-22 22:57:25 +00:00
import sqlite3
import sys
2016-04-14 01:54:29 +00:00
import tempfile
2016-03-16 22:19:14 +00:00
import threading
2015-04-22 22:57:25 +00:00
import traceback
import time
2016-04-20 20:42:21 +00:00
CONNECTION_REFRESH_TIME = 60 * 30
2016-04-14 01:54:29 +00:00
def CanVacuum ( db_path , stop_time = None ) :
2016-04-20 20:42:21 +00:00
try :
db = sqlite3 . connect ( db_path , isolation_level = None , detect_types = sqlite3 . PARSE_DECLTYPES )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
c = db . cursor ( )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
( page_size , ) = c . execute ( ' PRAGMA page_size; ' ) . fetchone ( )
( page_count , ) = c . execute ( ' PRAGMA page_count; ' ) . fetchone ( )
( freelist_count , ) = c . execute ( ' PRAGMA freelist_count; ' ) . fetchone ( )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
db_size = ( page_count - freelist_count ) * page_size
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
if stop_time is not None :
2016-04-14 01:54:29 +00:00
2016-05-11 18:16:39 +00:00
approx_vacuum_speed_mb_per_s = 1048576 * 1
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
approx_vacuum_duration = db_size / approx_vacuum_speed_mb_per_s
time_i_will_have_to_start = stop_time - approx_vacuum_duration
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
if HydrusData . TimeHasPassed ( time_i_will_have_to_start ) :
return False
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
temp_dir = tempfile . gettempdir ( )
( db_dir , db_filename ) = os . path . split ( db_path )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
temp_disk_usage = psutil . disk_usage ( temp_dir )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
a = HydrusPaths . GetDevice ( temp_dir )
b = HydrusPaths . GetDevice ( db_dir )
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
if HydrusPaths . GetDevice ( temp_dir ) == HydrusPaths . GetDevice ( db_dir ) :
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
if temp_disk_usage . free < db_size * 2.2 :
return False
else :
if temp_disk_usage . free < db_size * 1.1 :
return False
db_disk_usage = psutil . disk_usage ( db_dir )
if db_disk_usage . free < db_size * 1.1 :
return False
2016-04-14 01:54:29 +00:00
2016-04-20 20:42:21 +00:00
return True
except Exception as e :
HydrusData . Print ( ' Could not determine whether to vacuum or not: ' )
HydrusData . PrintException ( e )
return False
2016-04-14 01:54:29 +00:00
def SetupDBCreatePragma ( c , no_wal = False ) :
c . execute ( ' PRAGMA auto_vacuum = 0; ' ) # none
if HC . PLATFORM_WINDOWS :
c . execute ( ' PRAGMA page_size = 4096; ' )
if not no_wal :
c . execute ( ' PRAGMA journal_mode = WAL; ' )
c . execute ( ' PRAGMA synchronous = 1; ' )
def VacuumDB ( db_path ) :
db = sqlite3 . connect ( db_path , isolation_level = None , detect_types = sqlite3 . PARSE_DECLTYPES )
c = db . cursor ( )
( previous_journal_mode , ) = c . execute ( ' PRAGMA journal_mode; ' ) . fetchone ( )
fast_big_transaction_wal = not distutils . version . LooseVersion ( sqlite3 . sqlite_version ) < distutils . version . LooseVersion ( ' 3.11.0 ' )
if previous_journal_mode == ' wal ' and not fast_big_transaction_wal :
c . execute ( ' PRAGMA journal_mode = TRUNCATE; ' )
if HC . PLATFORM_WINDOWS :
ideal_page_size = 4096
else :
ideal_page_size = 1024
( page_size , ) = c . execute ( ' PRAGMA page_size; ' ) . fetchone ( )
if page_size != ideal_page_size :
c . execute ( ' PRAGMA journal_mode = TRUNCATE; ' )
c . execute ( ' PRAGMA page_size = ' + str ( ideal_page_size ) + ' ; ' )
c . execute ( ' VACUUM; ' )
if previous_journal_mode == ' wal ' :
c . execute ( ' PRAGMA journal_mode = WAL; ' )
2015-04-22 22:57:25 +00:00
class HydrusDB ( object ) :
READ_WRITE_ACTIONS = [ ]
2016-03-09 19:37:14 +00:00
UPDATE_WAIT = 2
2015-04-22 22:57:25 +00:00
2016-04-06 19:52:45 +00:00
def __init__ ( self , controller , db_dir , db_name , no_wal = False ) :
2015-08-26 21:18:39 +00:00
self . _controller = controller
2016-04-06 19:52:45 +00:00
self . _db_dir = db_dir
self . _db_name = db_name
2016-01-20 23:57:33 +00:00
self . _no_wal = no_wal
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
self . _connection_timestamp = 0
2016-04-06 19:52:45 +00:00
main_db_filename = db_name
if not main_db_filename . endswith ( ' .db ' ) :
main_db_filename + = ' .db '
self . _db_filenames = { }
self . _db_filenames [ ' main ' ] = main_db_filename
2016-04-20 20:42:21 +00:00
self . _InitExternalDatabases ( )
2016-04-06 19:52:45 +00:00
if distutils . version . LooseVersion ( sqlite3 . sqlite_version ) < distutils . version . LooseVersion ( ' 3.11.0 ' ) :
2016-03-02 21:00:30 +00:00
self . _fast_big_transaction_wal = False
else :
self . _fast_big_transaction_wal = True
2016-08-31 19:55:14 +00:00
self . _is_first_start = False
self . _is_db_updated = False
2015-04-22 22:57:25 +00:00
self . _local_shutdown = False
self . _loop_finished = False
2016-03-16 22:19:14 +00:00
self . _ready_to_serve_requests = False
self . _could_not_initialise = False
2015-04-22 22:57:25 +00:00
self . _jobs = Queue . PriorityQueue ( )
self . _pubsubs = [ ]
self . _currently_doing_job = False
2016-02-17 22:06:47 +00:00
self . _db = None
self . _c = None
2016-04-06 19:52:45 +00:00
if os . path . exists ( os . path . join ( self . _db_dir , self . _db_filenames [ ' main ' ] ) ) :
2015-04-22 22:57:25 +00:00
# open and close to clean up in case last session didn't close well
self . _InitDB ( )
self . _CloseDBCursor ( )
self . _InitDB ( )
( version , ) = self . _c . execute ( ' SELECT version FROM version; ' ) . fetchone ( )
2015-11-04 22:30:28 +00:00
if version < HC . SOFTWARE_VERSION - 50 : raise Exception ( ' Your current version of hydrus ' + str ( version ) + ' is too old for this version ' + str ( HC . SOFTWARE_VERSION ) + ' to update. Please try updating with version ' + str ( version + 45 ) + ' or earlier first. ' )
2015-04-22 22:57:25 +00:00
while version < HC . SOFTWARE_VERSION :
2016-03-09 19:37:14 +00:00
time . sleep ( self . UPDATE_WAIT )
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
try : self . _c . execute ( ' BEGIN IMMEDIATE; ' )
2015-04-22 22:57:25 +00:00
except Exception as e :
2015-11-04 22:30:28 +00:00
raise HydrusExceptions . DBAccessException ( HydrusData . ToUnicode ( e ) )
2015-04-22 22:57:25 +00:00
try :
self . _UpdateDB ( version )
2016-04-20 20:42:21 +00:00
self . _c . execute ( ' COMMIT; ' )
2015-04-22 22:57:25 +00:00
2016-08-31 19:55:14 +00:00
self . _is_db_updated = True
2015-04-22 22:57:25 +00:00
except :
2016-04-20 20:42:21 +00:00
e = Exception ( ' Updating the ' + self . _db_name + ' db to version ' + str ( version + 1 ) + ' caused this error: ' + os . linesep + traceback . format_exc ( ) )
try :
self . _c . execute ( ' ROLLBACK; ' )
except Exception as rollback_e :
HydrusData . Print ( ' When the update failed, attempting to rollback the database failed. ' )
HydrusData . PrintException ( rollback_e )
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
raise e
2015-04-22 22:57:25 +00:00
( version , ) = self . _c . execute ( ' SELECT version FROM version; ' ) . fetchone ( )
self . _CloseDBCursor ( )
2016-03-16 22:19:14 +00:00
threading . Thread ( target = self . MainLoop , name = ' Database Main Loop ' ) . start ( )
while not self . _ready_to_serve_requests :
time . sleep ( 0.1 )
if self . _could_not_initialise :
raise Exception ( ' Could not initialise the db! Error written to the log! ' )
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
def _AttachExternalDatabases ( self ) :
2016-04-20 20:42:21 +00:00
for ( name , filename ) in self . _db_filenames . items ( ) :
if name == ' main ' :
continue
db_path = os . path . join ( self . _db_dir , self . _db_filenames [ name ] )
if not os . path . exists ( db_path ) :
db = sqlite3 . connect ( db_path , isolation_level = None , detect_types = sqlite3 . PARSE_DECLTYPES )
c = db . cursor ( )
SetupDBCreatePragma ( c , no_wal = self . _no_wal )
del c
del db
2016-05-11 18:16:39 +00:00
self . _c . execute ( ' ATTACH ? AS ' + name + ' ; ' , ( db_path , ) )
2016-04-20 20:42:21 +00:00
2016-03-30 22:56:50 +00:00
2015-06-17 20:01:41 +00:00
def _CleanUpCaches ( self ) :
pass
2015-04-22 22:57:25 +00:00
def _CloseDBCursor ( self ) :
2016-02-17 22:06:47 +00:00
if self . _db is not None :
self . _c . close ( )
self . _db . close ( )
del self . _c
del self . _db
self . _db = None
self . _c = None
2015-04-22 22:57:25 +00:00
def _CreateDB ( self ) :
raise NotImplementedError ( )
def _GetRowCount ( self ) :
row_count = self . _c . rowcount
if row_count == - 1 : return 0
else : return row_count
def _InitCaches ( self ) :
2016-03-16 22:19:14 +00:00
pass
2015-04-22 22:57:25 +00:00
def _InitDB ( self ) :
2015-09-23 21:21:02 +00:00
create_db = False
2016-04-06 19:52:45 +00:00
db_path = os . path . join ( self . _db_dir , self . _db_filenames [ ' main ' ] )
if not os . path . exists ( db_path ) :
2016-01-13 22:08:19 +00:00
create_db = True
2015-04-22 22:57:25 +00:00
self . _InitDBCursor ( )
2015-09-23 21:21:02 +00:00
result = self . _c . execute ( ' SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?; ' , ( ' table ' , ' version ' ) ) . fetchone ( )
if result is None :
create_db = True
2015-04-22 22:57:25 +00:00
if create_db :
2016-08-31 19:55:14 +00:00
self . _is_first_start = True
2015-04-22 22:57:25 +00:00
self . _CreateDB ( )
def _InitDBCursor ( self ) :
2016-02-17 22:06:47 +00:00
self . _CloseDBCursor ( )
2016-04-06 19:52:45 +00:00
db_path = os . path . join ( self . _db_dir , self . _db_filenames [ ' main ' ] )
2016-01-20 23:57:33 +00:00
2016-04-06 19:52:45 +00:00
db_just_created = not os . path . exists ( db_path )
self . _db = sqlite3 . connect ( db_path , isolation_level = None , detect_types = sqlite3 . PARSE_DECLTYPES )
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
self . _connection_timestamp = HydrusData . GetNow ( )
2015-04-22 22:57:25 +00:00
self . _db . create_function ( ' hydrus_hamming ' , 2 , HydrusData . GetHammingDistance )
self . _c = self . _db . cursor ( )
2016-04-20 20:42:21 +00:00
self . _c . execute ( ' PRAGMA main.cache_size = -100000; ' )
2016-01-13 22:08:19 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' ATTACH " :memory: " AS mem; ' )
self . _AttachExternalDatabases ( )
db_names = [ name for ( index , name , path ) in self . _c . execute ( ' PRAGMA database_list; ' ) if name not in ( ' mem ' , ' temp ' ) ]
for db_name in db_names :
2016-01-20 23:57:33 +00:00
2016-04-20 20:42:21 +00:00
self . _c . execute ( ' PRAGMA ' + db_name + ' .cache_size = -100000; ' )
2016-04-14 01:54:29 +00:00
if self . _no_wal :
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' PRAGMA ' + db_name + ' .journal_mode = TRUNCATE; ' )
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' PRAGMA ' + db_name + ' .synchronous = 2; ' )
2016-02-17 22:06:47 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' SELECT * FROM ' + db_name + ' .sqlite_master; ' ) . fetchone ( )
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
else :
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' PRAGMA ' + db_name + ' .journal_mode = WAL; ' )
self . _c . execute ( ' PRAGMA ' + db_name + ' .synchronous = 1; ' )
try :
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
self . _c . execute ( ' SELECT * FROM ' + db_name + ' .sqlite_master; ' ) . fetchone ( )
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
except sqlite3 . OperationalError :
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
traceback . print_exc ( )
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
def create_no_wal_file ( ) :
HydrusGlobals . controller . CreateNoWALFile ( )
self . _no_wal = True
2016-01-20 23:57:33 +00:00
2016-04-14 01:54:29 +00:00
if db_just_created :
del self . _c
del self . _db
os . remove ( db_path )
create_no_wal_file ( )
self . _InitDBCursor ( )
else :
self . _c . execute ( ' PRAGMA ' + db_name + ' .journal_mode = TRUNCATE; ' )
self . _c . execute ( ' PRAGMA ' + db_name + ' .synchronous = 2; ' )
self . _c . execute ( ' SELECT * FROM ' + db_name + ' .sqlite_master; ' ) . fetchone ( )
create_no_wal_file ( )
2016-01-20 23:57:33 +00:00
2016-01-13 22:08:19 +00:00
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
def _InitExternalDatabases ( self ) :
pass
2015-04-22 22:57:25 +00:00
def _ManageDBError ( self , job , e ) :
raise NotImplementedError ( )
def _ProcessJob ( self , job ) :
job_type = job . GetType ( )
2016-03-30 22:56:50 +00:00
( action , args , kwargs ) = job . GetCallableTuple ( )
2015-04-22 22:57:25 +00:00
in_transaction = False
try :
2016-04-20 20:42:21 +00:00
if job_type in ( ' read_write ' , ' write ' ) :
self . _c . execute ( ' BEGIN IMMEDIATE; ' )
in_transaction = True
2015-04-22 22:57:25 +00:00
if job_type in ( ' read ' , ' read_write ' ) : result = self . _Read ( action , * args , * * kwargs )
2016-03-30 22:56:50 +00:00
elif job_type in ( ' write ' ) : result = self . _Write ( action , * args , * * kwargs )
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
if in_transaction :
self . _c . execute ( ' COMMIT; ' )
in_transaction = False
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
for ( topic , args , kwargs ) in self . _pubsubs :
self . _controller . pub ( topic , * args , * * kwargs )
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
if job . IsSynchronous ( ) :
job . PutResult ( result )
2015-04-22 22:57:25 +00:00
except Exception as e :
2016-04-20 20:42:21 +00:00
if in_transaction :
try :
self . _c . execute ( ' ROLLBACK; ' )
except Exception as rollback_e :
HydrusData . Print ( ' When the transaction failed, attempting to rollback the database failed. ' )
HydrusData . PrintException ( rollback_e )
2015-04-22 22:57:25 +00:00
self . _ManageDBError ( job , e )
def _Read ( self , action , * args , * * kwargs ) :
raise NotImplementedError ( )
def _ReportStatus ( self , text ) :
2015-11-18 22:44:07 +00:00
HydrusData . Print ( text )
2015-04-22 22:57:25 +00:00
def _UpdateDB ( self , version ) :
raise NotImplementedError ( )
def _Write ( self , action , * args , * * kwargs ) :
raise NotImplementedError ( )
2016-03-30 22:56:50 +00:00
def pub_after_commit ( self , topic , * args , * * kwargs ) :
self . _pubsubs . append ( ( topic , args , kwargs ) )
2015-04-22 22:57:25 +00:00
2015-06-03 21:05:13 +00:00
def CurrentlyDoingJob ( self ) :
return self . _currently_doing_job
2016-06-01 20:04:15 +00:00
def GetDBDir ( self ) :
return self . _db_dir
2016-08-31 19:55:14 +00:00
def IsDBUpdated ( self ) :
return self . _is_db_updated
def IsFirstStart ( self ) :
return self . _is_first_start
2016-03-30 22:56:50 +00:00
def LoopIsFinished ( self ) :
return self . _loop_finished
2015-04-22 22:57:25 +00:00
2016-04-06 19:52:45 +00:00
def JobsQueueEmpty ( self ) :
return self . _jobs . empty ( )
2015-04-22 22:57:25 +00:00
def MainLoop ( self ) :
2016-03-16 22:19:14 +00:00
try :
self . _InitDBCursor ( ) # have to reinitialise because the thread id has changed
self . _InitCaches ( )
2016-04-06 19:52:45 +00:00
except :
2016-03-16 22:19:14 +00:00
HydrusData . Print ( traceback . format_exc ( ) )
self . _could_not_initialise = True
2016-04-06 19:52:45 +00:00
return
2015-04-22 22:57:25 +00:00
2016-03-16 22:19:14 +00:00
self . _ready_to_serve_requests = True
2015-04-29 19:20:35 +00:00
2015-04-22 22:57:25 +00:00
error_count = 0
2015-11-04 22:30:28 +00:00
while not ( ( self . _local_shutdown or self . _controller . ModelIsShutdown ( ) ) and self . _jobs . empty ( ) ) :
2015-04-22 22:57:25 +00:00
try :
2016-03-30 22:56:50 +00:00
( priority , job ) = self . _jobs . get ( timeout = 0.5 )
2015-04-22 22:57:25 +00:00
self . _currently_doing_job = True
2015-08-26 21:18:39 +00:00
self . _controller . pub ( ' refresh_status ' )
2015-06-03 21:05:13 +00:00
2015-04-22 22:57:25 +00:00
self . _pubsubs = [ ]
try :
if HydrusGlobals . db_profile_mode :
2016-03-30 22:56:50 +00:00
HydrusData . ShowText ( ' Profiling ' + job . ToString ( ) )
2015-04-22 22:57:25 +00:00
2015-11-25 22:00:57 +00:00
HydrusData . Profile ( ' self._ProcessJob( job ) ' , globals ( ) , locals ( ) )
2015-04-22 22:57:25 +00:00
else :
self . _ProcessJob ( job )
error_count = 0
except :
error_count + = 1
if error_count > 5 : raise
self . _jobs . put ( ( priority , job ) ) # couldn't lock db; put job back on queue
time . sleep ( 5 )
self . _currently_doing_job = False
2015-08-26 21:18:39 +00:00
self . _controller . pub ( ' refresh_status ' )
2015-06-03 21:05:13 +00:00
2016-03-30 22:56:50 +00:00
except Queue . Empty :
pass # no jobs in the past little while; let's just check if we should shutdown
2015-06-17 20:01:41 +00:00
2016-04-20 20:42:21 +00:00
if HydrusData . TimeHasPassed ( self . _connection_timestamp + CONNECTION_REFRESH_TIME ) : # just to clear out the journal files
self . _InitDBCursor ( )
2015-06-17 20:01:41 +00:00
self . _CleanUpCaches ( )
2015-04-22 22:57:25 +00:00
self . _CloseDBCursor ( )
self . _loop_finished = True
def Read ( self , action , priority , * args , * * kwargs ) :
if action in self . READ_WRITE_ACTIONS : job_type = ' read_write '
else : job_type = ' read '
synchronous = True
2016-03-30 22:56:50 +00:00
job = HydrusData . JobDatabase ( job_type , synchronous , action , * args , * * kwargs )
2015-04-22 22:57:25 +00:00
2015-11-04 22:30:28 +00:00
if self . _controller . ModelIsShutdown ( ) :
raise HydrusExceptions . ShutdownException ( ' Application has shut down! ' )
2015-04-22 22:57:25 +00:00
self . _jobs . put ( ( priority + 1 , job ) ) # +1 so all writes of equal priority can clear out first
2016-03-16 22:19:14 +00:00
return job . GetResult ( )
def ReadyToServeRequests ( self ) :
return self . _ready_to_serve_requests
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
def Shutdown ( self ) :
self . _local_shutdown = True
2015-04-22 22:57:25 +00:00
2016-03-16 22:19:14 +00:00
def SimpleRead ( self , action , * args , * * kwargs ) :
return self . Read ( action , HC . HIGH_PRIORITY , * args , * * kwargs )
def SimpleWrite ( self , action , * args , * * kwargs ) :
return self . Write ( action , HC . HIGH_PRIORITY , False , * args , * * kwargs )
def SimpleWriteSynchronous ( self , action , * args , * * kwargs ) :
return self . Write ( action , HC . LOW_PRIORITY , True , * args , * * kwargs )
2015-04-22 22:57:25 +00:00
def Write ( self , action , priority , synchronous , * args , * * kwargs ) :
2016-03-30 22:56:50 +00:00
job_type = ' write '
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
job = HydrusData . JobDatabase ( job_type , synchronous , action , * args , * * kwargs )
2015-04-22 22:57:25 +00:00
2015-11-04 22:30:28 +00:00
if self . _controller . ModelIsShutdown ( ) :
raise HydrusExceptions . ShutdownException ( ' Application has shut down! ' )
2015-04-22 22:57:25 +00:00
self . _jobs . put ( ( priority , job ) )
if synchronous : return job . GetResult ( )