hydrus/hydrus/core/HydrusDB.py

1174 lines
34 KiB
Python
Raw Normal View History

2020-12-02 22:04:38 +00:00
import collections
2016-03-02 21:00:30 +00:00
import distutils.version
2020-07-29 20:52:44 +00:00
import os
import queue
import sqlite3
import traceback
import time
2020-04-22 21:00:35 +00:00
from hydrus.core import HydrusConstants as HC
from hydrus.core import HydrusData
from hydrus.core import HydrusEncryption
2020-04-22 21:00:35 +00:00
from hydrus.core import HydrusExceptions
from hydrus.core import HydrusGlobals as HG
from hydrus.core import HydrusPaths
2015-04-22 22:57:25 +00:00
2020-04-08 21:10:11 +00:00
def CheckCanVacuum( db_path, stop_time = None ):
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
db = sqlite3.connect( db_path, isolation_level = None, detect_types = sqlite3.PARSE_DECLTYPES )
c = db.cursor()
2020-12-09 22:18:48 +00:00
CheckCanVacuumCursor( db_path, c, stop_time = stop_time )
def CheckCanVacuumCursor( db_path, c, stop_time = None ):
2020-04-08 21:10:11 +00:00
( page_size, ) = c.execute( 'PRAGMA page_size;' ).fetchone()
( page_count, ) = c.execute( 'PRAGMA page_count;' ).fetchone()
( freelist_count, ) = c.execute( 'PRAGMA freelist_count;' ).fetchone()
db_size = ( page_count - freelist_count ) * page_size
vacuum_estimate = int( db_size * 1.2 )
2020-04-08 21:10:11 +00:00
if stop_time is not None:
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
approx_vacuum_speed_mb_per_s = 1048576 * 1
2016-04-14 01:54:29 +00:00
approx_vacuum_duration = vacuum_estimate // approx_vacuum_speed_mb_per_s
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
time_i_will_have_to_start = stop_time - approx_vacuum_duration
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
if HydrusData.TimeHasPassed( time_i_will_have_to_start ):
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
raise Exception( 'I believe you need about ' + HydrusData.TimeDeltaToPrettyTimeDelta( approx_vacuum_duration ) + ' to vacuum, but there is not enough time allotted.' )
2016-04-14 01:54:29 +00:00
2020-04-08 21:10:11 +00:00
( db_dir, db_filename ) = os.path.split( db_path )
HydrusPaths.CheckHasSpaceForDBTransaction( db_dir, vacuum_estimate )
2016-04-14 01:54:29 +00:00
2020-11-25 22:22:47 +00:00
def ReadFromCancellableCursor( cursor, largest_group_size, cancelled_hook = None ):
if cancelled_hook is None:
return cursor.fetchall()
NUM_TO_GET = 1
results = []
group_of_results = cursor.fetchmany( NUM_TO_GET )
while len( group_of_results ) > 0:
results.extend( group_of_results )
if cancelled_hook():
break
2021-01-20 22:22:03 +00:00
if NUM_TO_GET < largest_group_size:
2020-11-25 22:22:47 +00:00
NUM_TO_GET *= 2
group_of_results = cursor.fetchmany( NUM_TO_GET )
return results
2018-11-28 22:31:04 +00:00
def ReadLargeIdQueryInSeparateChunks( cursor, select_statement, chunk_size ):
2019-02-13 22:26:43 +00:00
table_name = 'tempbigread' + os.urandom( 32 ).hex()
2018-11-28 22:31:04 +00:00
2019-02-13 22:26:43 +00:00
cursor.execute( 'CREATE TEMPORARY TABLE ' + table_name + ' ( job_id INTEGER PRIMARY KEY AUTOINCREMENT, temp_id INTEGER );' )
2018-11-28 22:31:04 +00:00
cursor.execute( 'INSERT INTO ' + table_name + ' ( temp_id ) ' + select_statement ) # given statement should end in semicolon, so we are good
2019-02-13 22:26:43 +00:00
num_to_do = cursor.rowcount
2018-11-28 22:31:04 +00:00
2019-02-13 22:26:43 +00:00
if num_to_do is None or num_to_do == -1:
2018-11-28 22:31:04 +00:00
2019-02-13 22:26:43 +00:00
num_to_do = 0
2018-11-28 22:31:04 +00:00
2019-02-13 22:26:43 +00:00
i = 0
while i < num_to_do:
chunk = [ temp_id for ( temp_id, ) in cursor.execute( 'SELECT temp_id FROM ' + table_name + ' WHERE job_id BETWEEN ? AND ?;', ( i, i + chunk_size - 1 ) ) ]
yield chunk
i += chunk_size
2018-11-28 22:31:04 +00:00
cursor.execute( 'DROP TABLE ' + table_name + ';' )
2016-04-14 01:54:29 +00:00
def VacuumDB( db_path ):
db = sqlite3.connect( db_path, isolation_level = None, detect_types = sqlite3.PARSE_DECLTYPES )
c = db.cursor()
fast_big_transaction_wal = not distutils.version.LooseVersion( sqlite3.sqlite_version ) < distutils.version.LooseVersion( '3.11.0' )
2020-12-09 22:18:48 +00:00
if HG.db_journal_mode == 'WAL' and not fast_big_transaction_wal:
2016-04-14 01:54:29 +00:00
c.execute( 'PRAGMA journal_mode = TRUNCATE;' )
if HC.PLATFORM_WINDOWS:
ideal_page_size = 4096
else:
ideal_page_size = 1024
( page_size, ) = c.execute( 'PRAGMA page_size;' ).fetchone()
if page_size != ideal_page_size:
c.execute( 'PRAGMA journal_mode = TRUNCATE;' )
c.execute( 'PRAGMA page_size = ' + str( ideal_page_size ) + ';' )
2017-07-12 20:03:45 +00:00
c.execute( 'PRAGMA auto_vacuum = 0;' ) # none
2016-04-14 01:54:29 +00:00
c.execute( 'VACUUM;' )
2020-12-09 22:18:48 +00:00
c.execute( 'PRAGMA journal_mode = {};'.format( HG.db_journal_mode ) )
2016-04-14 01:54:29 +00:00
2015-04-22 22:57:25 +00:00
class HydrusDB( object ):
READ_WRITE_ACTIONS = []
2016-03-09 19:37:14 +00:00
UPDATE_WAIT = 2
2015-04-22 22:57:25 +00:00
2019-08-15 00:40:48 +00:00
TRANSACTION_COMMIT_TIME = 30
2017-11-01 20:37:39 +00:00
2019-03-20 21:22:10 +00:00
def __init__( self, controller, db_dir, db_name ):
2015-08-26 21:18:39 +00:00
2018-08-15 20:40:30 +00:00
if HydrusPaths.GetFreeSpace( db_dir ) < 500 * 1048576:
raise Exception( 'Sorry, it looks like the db partition has less than 500MB, please free up some space.' )
2015-08-26 21:18:39 +00:00
self._controller = controller
2016-04-06 19:52:45 +00:00
self._db_dir = db_dir
self._db_name = db_name
2015-04-22 22:57:25 +00:00
2020-12-02 22:04:38 +00:00
TemporaryIntegerTableNameCache()
2017-07-05 21:09:28 +00:00
self._transaction_started = 0
2017-03-29 19:39:34 +00:00
self._in_transaction = False
2017-07-12 20:03:45 +00:00
self._transaction_contains_writes = False
2017-03-29 19:39:34 +00:00
self._ssl_cert_filename = '{}.crt'.format( self._db_name )
self._ssl_key_filename = '{}.key'.format( self._db_name )
self._ssl_cert_path = os.path.join( self._db_dir, self._ssl_cert_filename )
self._ssl_key_path = os.path.join( self._db_dir, self._ssl_key_filename )
2020-12-09 22:18:48 +00:00
self._last_mem_refresh_time = HydrusData.GetNow()
self._last_wal_checkpoint_time = HydrusData.GetNow()
2016-04-20 20:42:21 +00:00
2016-04-06 19:52:45 +00:00
main_db_filename = db_name
if not main_db_filename.endswith( '.db' ):
main_db_filename += '.db'
self._db_filenames = {}
self._db_filenames[ 'main' ] = main_db_filename
2019-09-05 00:05:32 +00:00
self._durable_temp_db_filename = db_name + '.temp.db'
2016-04-20 20:42:21 +00:00
self._InitExternalDatabases()
2016-08-31 19:55:14 +00:00
self._is_first_start = False
self._is_db_updated = False
2015-04-22 22:57:25 +00:00
self._local_shutdown = False
2019-10-09 22:03:03 +00:00
self._pause_and_disconnect = False
2015-04-22 22:57:25 +00:00
self._loop_finished = False
2016-03-16 22:19:14 +00:00
self._ready_to_serve_requests = False
self._could_not_initialise = False
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
self._jobs = queue.Queue()
2015-04-22 22:57:25 +00:00
self._pubsubs = []
self._currently_doing_job = False
2017-05-24 20:28:24 +00:00
self._current_status = ''
self._current_job_name = ''
2015-04-22 22:57:25 +00:00
2016-02-17 22:06:47 +00:00
self._db = None
self._c = None
2016-04-06 19:52:45 +00:00
if os.path.exists( os.path.join( self._db_dir, self._db_filenames[ 'main' ] ) ):
2015-04-22 22:57:25 +00:00
# open and close to clean up in case last session didn't close well
self._InitDB()
self._CloseDBCursor()
self._InitDB()
( version, ) = self._c.execute( 'SELECT version FROM version;' ).fetchone()
2018-11-28 22:31:04 +00:00
if version > HC.SOFTWARE_VERSION:
2017-03-08 23:23:12 +00:00
2018-11-28 22:31:04 +00:00
self._ReportOverupdatedDB( version )
2017-03-08 23:23:12 +00:00
2019-02-13 22:26:43 +00:00
if version < ( HC.SOFTWARE_VERSION - 15 ):
self._ReportUnderupdatedDB( version )
2018-11-28 22:31:04 +00:00
if version < HC.SOFTWARE_VERSION - 50:
2017-03-08 23:23:12 +00:00
2018-11-28 22:31:04 +00:00
raise Exception( 'Your current database version of hydrus ' + str( version ) + ' is too old for this software version ' + str( HC.SOFTWARE_VERSION ) + ' to update. Please try updating with version ' + str( version + 45 ) + ' or earlier first.' )
2017-03-08 23:23:12 +00:00
2015-04-22 22:57:25 +00:00
2021-01-13 21:48:58 +00:00
self._RepairDB()
2015-04-22 22:57:25 +00:00
while version < HC.SOFTWARE_VERSION:
2016-03-09 19:37:14 +00:00
time.sleep( self.UPDATE_WAIT )
2015-04-22 22:57:25 +00:00
2017-03-29 19:39:34 +00:00
try:
self._BeginImmediate()
2015-04-22 22:57:25 +00:00
except Exception as e:
2019-01-09 22:59:03 +00:00
raise HydrusExceptions.DBAccessException( str( e ) )
2015-04-22 22:57:25 +00:00
try:
self._UpdateDB( version )
2017-03-29 19:39:34 +00:00
self._Commit()
2015-04-22 22:57:25 +00:00
2016-08-31 19:55:14 +00:00
self._is_db_updated = True
2015-04-22 22:57:25 +00:00
except:
2016-04-20 20:42:21 +00:00
e = Exception( 'Updating the ' + self._db_name + ' db to version ' + str( version + 1 ) + ' caused this error:' + os.linesep + traceback.format_exc() )
try:
2017-03-29 19:39:34 +00:00
self._Rollback()
2016-04-20 20:42:21 +00:00
except Exception as rollback_e:
HydrusData.Print( 'When the update failed, attempting to rollback the database failed.' )
HydrusData.PrintException( rollback_e )
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
raise e
2015-04-22 22:57:25 +00:00
( version, ) = self._c.execute( 'SELECT version FROM version;' ).fetchone()
self._CloseDBCursor()
2017-08-09 21:33:51 +00:00
self._controller.CallToThreadLongRunning( self.MainLoop )
2016-03-16 22:19:14 +00:00
while not self._ready_to_serve_requests:
time.sleep( 0.1 )
if self._could_not_initialise:
raise Exception( 'Could not initialise the db! Error written to the log!' )
2015-04-22 22:57:25 +00:00
2019-12-18 22:06:34 +00:00
def _AnalyzeTempTable( self, temp_table_name ):
2020-01-02 03:05:35 +00:00
# this is useful to do after populating a temp table so the query planner can decide which index to use in a big join that uses it
2019-12-18 22:06:34 +00:00
self._c.execute( 'ANALYZE {};'.format( temp_table_name ) )
self._c.execute( 'ANALYZE mem.sqlite_master;' ) # this reloads the current stats into the query planner, may no longer be needed
2016-03-30 22:56:50 +00:00
def _AttachExternalDatabases( self ):
2019-01-09 22:59:03 +00:00
for ( name, filename ) in list(self._db_filenames.items()):
2016-04-20 20:42:21 +00:00
if name == 'main':
continue
2019-09-05 00:05:32 +00:00
db_path = os.path.join( self._db_dir, filename )
2016-04-20 20:42:21 +00:00
2016-05-11 18:16:39 +00:00
self._c.execute( 'ATTACH ? AS ' + name + ';', ( db_path, ) )
2016-04-20 20:42:21 +00:00
2016-03-30 22:56:50 +00:00
2019-09-05 00:05:32 +00:00
db_path = os.path.join( self._db_dir, self._durable_temp_db_filename )
self._c.execute( 'ATTACH ? AS durable_temp;', ( db_path, ) )
2016-03-30 22:56:50 +00:00
2017-03-29 19:39:34 +00:00
def _BeginImmediate( self ):
2017-07-12 20:03:45 +00:00
if not self._in_transaction:
2017-03-29 19:39:34 +00:00
self._c.execute( 'BEGIN IMMEDIATE;' )
2017-07-12 20:03:45 +00:00
self._c.execute( 'SAVEPOINT hydrus_savepoint;' )
2017-03-29 19:39:34 +00:00
2017-07-05 21:09:28 +00:00
self._transaction_started = HydrusData.GetNow()
2017-03-29 19:39:34 +00:00
self._in_transaction = True
2020-08-19 22:38:20 +00:00
def _CleanAfterJobWork( self ):
self._pubsubs = []
2015-06-17 20:01:41 +00:00
def _CleanUpCaches( self ):
pass
2015-04-22 22:57:25 +00:00
def _CloseDBCursor( self ):
2020-12-02 22:04:38 +00:00
TemporaryIntegerTableNameCache.instance().Clear()
2016-02-17 22:06:47 +00:00
if self._db is not None:
2017-03-29 19:39:34 +00:00
if self._in_transaction:
self._Commit()
2016-02-17 22:06:47 +00:00
self._c.close()
self._db.close()
del self._c
del self._db
self._db = None
self._c = None
2015-04-22 22:57:25 +00:00
2017-03-29 19:39:34 +00:00
def _Commit( self ):
if self._in_transaction:
self._c.execute( 'COMMIT;' )
self._in_transaction = False
2020-12-09 22:18:48 +00:00
if HG.db_journal_mode == 'WAL' and HydrusData.TimeHasPassed( self._last_wal_checkpoint_time + 1800 ):
self._c.execute( 'PRAGMA wal_checkpoint(PASSIVE);' )
self._last_wal_checkpoint_time = HydrusData.GetNow()
if HydrusData.TimeHasPassed( self._last_mem_refresh_time + 600 ):
self._c.execute( 'DETACH mem;' )
self._c.execute( 'ATTACH ":memory:" AS mem;' )
TemporaryIntegerTableNameCache.instance().Clear()
self._last_mem_refresh_time = HydrusData.GetNow()
2017-03-29 19:39:34 +00:00
else:
HydrusData.Print( 'Received a call to commit, but was not in a transaction!' )
2015-04-22 22:57:25 +00:00
def _CreateDB( self ):
raise NotImplementedError()
2017-03-02 02:14:56 +00:00
def _CreateIndex( self, table_name, columns, unique = False ):
if '.' in table_name:
table_name_simple = table_name.split( '.' )[1]
else:
table_name_simple = table_name
index_name = table_name + '_' + '_'.join( columns ) + '_index'
if unique:
2017-03-15 20:13:04 +00:00
create_phrase = 'CREATE UNIQUE INDEX IF NOT EXISTS '
2017-03-02 02:14:56 +00:00
else:
2017-03-15 20:13:04 +00:00
create_phrase = 'CREATE INDEX IF NOT EXISTS '
2017-03-02 02:14:56 +00:00
on_phrase = ' ON ' + table_name_simple + ' (' + ', '.join( columns ) + ');'
statement = create_phrase + index_name + on_phrase
self._c.execute( statement )
2018-02-14 21:47:18 +00:00
def _DisplayCatastrophicError( self, text ):
message = 'The db encountered a serious error! This is going to be written to the log as well, but here it is for a screenshot:'
message += os.linesep * 2
message += text
HydrusData.DebugPrint( message )
2020-08-19 22:38:20 +00:00
def _DoAfterJobWork( self ):
for ( topic, args, kwargs ) in self._pubsubs:
self._controller.pub( topic, *args, **kwargs )
2019-12-18 22:06:34 +00:00
def _ExecuteManySelectSingleParam( self, query, single_param_iterator ):
select_args_iterator = ( ( param, ) for param in single_param_iterator )
return self._ExecuteManySelect( query, select_args_iterator )
def _ExecuteManySelect( self, query, select_args_iterator ):
# back in python 2, we did batches of 256 hash_ids/whatever at a time in big "hash_id IN (?,?,?,?,...)" predicates.
# this was useful to get over some 100,000 x fetchall() call overhead, but it would sometimes throw the SQLite query planner off and do non-optimal queries
# (basically, the "hash_id in (256)" would weight the hash_id index request x 256 vs another when comparing the sqlite_stat1 tables, which could lead to WEWLAD for some indices with low median very-high mean skewed distribution
# python 3 is better about call overhead, so we'll go back to what is pure
# cursor.executemany SELECT when
for select_args in select_args_iterator:
for result in self._c.execute( query, select_args ):
yield result
2020-04-08 21:10:11 +00:00
def _GenerateDBJob( self, job_type, synchronous, action, *args, **kwargs ):
return HydrusData.JobDatabase( job_type, synchronous, action, *args, **kwargs )
def _GetPossibleAdditionalDBFilenames( self ):
return [ self._ssl_cert_filename, self._ssl_key_filename ]
2015-04-22 22:57:25 +00:00
def _GetRowCount( self ):
row_count = self._c.rowcount
if row_count == -1: return 0
else: return row_count
def _InitCaches( self ):
2016-03-16 22:19:14 +00:00
pass
2015-04-22 22:57:25 +00:00
def _InitDB( self ):
2015-09-23 21:21:02 +00:00
create_db = False
2016-04-06 19:52:45 +00:00
db_path = os.path.join( self._db_dir, self._db_filenames[ 'main' ] )
if not os.path.exists( db_path ):
2016-01-13 22:08:19 +00:00
create_db = True
2019-02-27 23:03:30 +00:00
external_db_paths = [ os.path.join( self._db_dir, self._db_filenames[ db_name ] ) for db_name in self._db_filenames if db_name != 'main' ]
existing_external_db_paths = [ external_db_path for external_db_path in external_db_paths if os.path.exists( external_db_path ) ]
if len( existing_external_db_paths ) > 0:
message = 'Although the external files, "{}" do exist, the main database file, "{}", does not! This makes for an invalid database, and the program will now quit. Please contact hydrus_dev if you do not know how this happened or need help recovering from hard drive failure.'
message = message.format( ', '.join( existing_external_db_paths ), db_path )
2020-09-16 20:46:54 +00:00
raise HydrusExceptions.DBAccessException( message )
2019-02-27 23:03:30 +00:00
2015-04-22 22:57:25 +00:00
self._InitDBCursor()
2015-09-23 21:21:02 +00:00
result = self._c.execute( 'SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?;', ( 'table', 'version' ) ).fetchone()
if result is None:
create_db = True
2015-04-22 22:57:25 +00:00
if create_db:
2016-08-31 19:55:14 +00:00
self._is_first_start = True
2015-04-22 22:57:25 +00:00
self._CreateDB()
2017-07-12 20:03:45 +00:00
self._Commit()
self._BeginImmediate()
2015-04-22 22:57:25 +00:00
def _InitDBCursor( self ):
2016-02-17 22:06:47 +00:00
self._CloseDBCursor()
2016-04-06 19:52:45 +00:00
db_path = os.path.join( self._db_dir, self._db_filenames[ 'main' ] )
2016-01-20 23:57:33 +00:00
2016-04-06 19:52:45 +00:00
db_just_created = not os.path.exists( db_path )
2020-09-16 20:46:54 +00:00
try:
2019-03-06 23:06:22 +00:00
2020-09-16 20:46:54 +00:00
self._db = sqlite3.connect( db_path, isolation_level = None, detect_types = sqlite3.PARSE_DECLTYPES )
2019-03-06 23:06:22 +00:00
2020-12-09 22:18:48 +00:00
self._last_mem_refresh_time = HydrusData.GetNow()
2020-09-16 20:46:54 +00:00
self._c = self._db.cursor()
if HG.no_db_temp_files:
self._c.execute( 'PRAGMA temp_store = 2;' ) # use memory for temp store exclusively
2020-10-21 22:22:10 +00:00
2020-09-16 20:46:54 +00:00
self._AttachExternalDatabases()
self._c.execute( 'ATTACH ":memory:" AS mem;' )
except Exception as e:
raise HydrusExceptions.DBAccessException( 'Could not connect to database! This could be an issue related to WAL and network storage, or something else. If it is not obvious to you, please let hydrus dev know. Error follows:' + os.linesep * 2 + str( e ) )
2016-04-14 01:54:29 +00:00
2020-12-09 22:18:48 +00:00
self._last_mem_refresh_time = HydrusData.GetNow()
2019-09-11 21:51:09 +00:00
2020-12-09 22:18:48 +00:00
TemporaryIntegerTableNameCache.instance().Clear()
2019-09-11 21:51:09 +00:00
# durable_temp is not excluded here
db_names = [ name for ( index, name, path ) in self._c.execute( 'PRAGMA database_list;' ) if name not in ( 'mem', 'temp' ) ]
2016-04-14 01:54:29 +00:00
for db_name in db_names:
2016-01-20 23:57:33 +00:00
2021-01-07 01:10:01 +00:00
# MB -> KB
cache_size = HG.db_cache_size * 1024
2020-12-23 23:07:58 +00:00
self._c.execute( 'PRAGMA {}.cache_size = -{};'.format( db_name, cache_size ) )
2016-04-20 20:42:21 +00:00
2020-12-09 22:18:48 +00:00
self._c.execute( 'PRAGMA {}.journal_mode = {};'.format( db_name, HG.db_journal_mode ) )
if HG.db_journal_mode in ( 'PERSIST', 'WAL' ):
2016-01-20 23:57:33 +00:00
2020-12-09 22:18:48 +00:00
self._c.execute( 'PRAGMA {}.journal_size_limit = {};'.format( db_name, 1024 ** 3 ) ) # 1GB for now
2019-09-11 21:51:09 +00:00
2020-12-09 22:18:48 +00:00
self._c.execute( 'PRAGMA {}.synchronous = {};'.format( db_name, HG.db_synchronous ) )
2019-09-11 21:51:09 +00:00
try:
self._c.execute( 'SELECT * FROM {}.sqlite_master;'.format( db_name ) ).fetchone()
2016-04-14 01:54:29 +00:00
2019-09-11 21:51:09 +00:00
except sqlite3.OperationalError as e:
2016-04-14 01:54:29 +00:00
2020-12-09 22:18:48 +00:00
message = 'The database seemed valid, but hydrus failed to read basic data from it. You may need to run the program in a different journal mode using --db_journal_mode. Full error information:'
2016-01-20 23:57:33 +00:00
2019-09-11 21:51:09 +00:00
message += os.linesep * 2
message += str( e )
HydrusData.DebugPrint( message )
raise HydrusExceptions.DBAccessException( message )
2016-01-13 22:08:19 +00:00
2015-04-22 22:57:25 +00:00
2017-07-12 20:03:45 +00:00
try:
self._BeginImmediate()
except Exception as e:
2019-01-09 22:59:03 +00:00
raise HydrusExceptions.DBAccessException( str( e ) )
2017-07-12 20:03:45 +00:00
2015-04-22 22:57:25 +00:00
2016-04-20 20:42:21 +00:00
def _InitExternalDatabases( self ):
pass
2015-04-22 22:57:25 +00:00
def _ManageDBError( self, job, e ):
raise NotImplementedError()
def _ProcessJob( self, job ):
job_type = job.GetType()
2016-03-30 22:56:50 +00:00
( action, args, kwargs ) = job.GetCallableTuple()
2015-04-22 22:57:25 +00:00
try:
2016-04-20 20:42:21 +00:00
if job_type in ( 'read_write', 'write' ):
2017-05-24 20:28:24 +00:00
self._current_status = 'db write locked'
2017-07-12 20:03:45 +00:00
self._transaction_contains_writes = True
2016-04-20 20:42:21 +00:00
2017-05-24 20:28:24 +00:00
else:
self._current_status = 'db read locked'
2017-07-05 21:09:28 +00:00
self.publish_status_update()
2015-04-22 22:57:25 +00:00
2017-05-24 20:28:24 +00:00
if job_type in ( 'read', 'read_write' ):
result = self._Read( action, *args, **kwargs )
elif job_type in ( 'write' ):
result = self._Write( action, *args, **kwargs )
2015-04-22 22:57:25 +00:00
2017-11-01 20:37:39 +00:00
if self._transaction_contains_writes and HydrusData.TimeHasPassed( self._transaction_started + self.TRANSACTION_COMMIT_TIME ):
2016-04-20 20:42:21 +00:00
2017-05-24 20:28:24 +00:00
self._current_status = 'db committing'
2017-07-05 21:09:28 +00:00
self.publish_status_update()
2017-05-24 20:28:24 +00:00
2017-03-29 19:39:34 +00:00
self._Commit()
2016-04-20 20:42:21 +00:00
2017-07-12 20:03:45 +00:00
self._BeginImmediate()
2019-02-27 23:03:30 +00:00
self._transaction_contains_writes = False
2017-07-12 20:03:45 +00:00
else:
self._Save()
2015-04-22 22:57:25 +00:00
2020-08-19 22:38:20 +00:00
self._DoAfterJobWork()
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
if job.IsSynchronous():
job.PutResult( result )
2015-04-22 22:57:25 +00:00
except Exception as e:
2018-02-28 22:30:36 +00:00
self._ManageDBError( job, e )
2017-07-12 20:03:45 +00:00
try:
2016-04-20 20:42:21 +00:00
2017-07-12 20:03:45 +00:00
self._Rollback()
except Exception as rollback_e:
2018-02-28 22:30:36 +00:00
HydrusData.Print( 'When the transaction failed, attempting to rollback the database failed. Please restart the client as soon as is convenient.' )
self._in_transaction = False
self._CloseDBCursor()
self._InitDBCursor()
2017-07-12 20:03:45 +00:00
HydrusData.PrintException( rollback_e )
2016-04-20 20:42:21 +00:00
2015-04-22 22:57:25 +00:00
2017-05-24 20:28:24 +00:00
finally:
2020-08-19 22:38:20 +00:00
self._CleanAfterJobWork()
2017-07-12 20:03:45 +00:00
2017-05-24 20:28:24 +00:00
self._current_status = ''
2017-07-05 21:09:28 +00:00
self.publish_status_update()
2017-05-24 20:28:24 +00:00
2015-04-22 22:57:25 +00:00
def _Read( self, action, *args, **kwargs ):
raise NotImplementedError()
2018-01-17 22:52:10 +00:00
def _RepairDB( self ):
pass
2018-11-28 22:31:04 +00:00
def _ReportOverupdatedDB( self, version ):
pass
2019-02-13 22:26:43 +00:00
def _ReportUnderupdatedDB( self, version ):
pass
2015-04-22 22:57:25 +00:00
def _ReportStatus( self, text ):
2015-11-18 22:44:07 +00:00
HydrusData.Print( text )
2015-04-22 22:57:25 +00:00
2017-03-29 19:39:34 +00:00
def _Rollback( self ):
if self._in_transaction:
2017-07-12 20:03:45 +00:00
self._c.execute( 'ROLLBACK TO hydrus_savepoint;' )
2017-03-29 19:39:34 +00:00
else:
HydrusData.Print( 'Received a call to rollback, but was not in a transaction!' )
2017-07-12 20:03:45 +00:00
def _Save( self ):
self._c.execute( 'RELEASE hydrus_savepoint;' )
self._c.execute( 'SAVEPOINT hydrus_savepoint;' )
2019-02-27 23:03:30 +00:00
def _ShrinkMemory( self ):
self._c.execute( 'PRAGMA shrink_memory;' )
2017-03-15 20:13:04 +00:00
def _STI( self, iterable_cursor ):
# strip singleton tuples to an iterator
return ( item for ( item, ) in iterable_cursor )
2017-03-08 23:23:12 +00:00
def _STL( self, iterable_cursor ):
# strip singleton tuples to a list
return [ item for ( item, ) in iterable_cursor ]
def _STS( self, iterable_cursor ):
# strip singleton tuples to a set
return { item for ( item, ) in iterable_cursor }
2019-09-05 00:05:32 +00:00
def _TableHasAtLeastRowCount( self, name, row_count ):
cursor = self._c.execute( 'SELECT 1 FROM {};'.format( name ) )
for i in range( row_count ):
r = cursor.fetchone()
if r is None:
return False
return True
def _TableIsEmpty( self, name ):
result = self._c.execute( 'SELECT 1 FROM {};'.format( name ) )
return result is None
2015-04-22 22:57:25 +00:00
def _UpdateDB( self, version ):
raise NotImplementedError()
def _Write( self, action, *args, **kwargs ):
raise NotImplementedError()
2017-07-12 20:03:45 +00:00
def pub_after_job( self, topic, *args, **kwargs ):
2016-03-30 22:56:50 +00:00
2019-01-23 22:19:16 +00:00
if len( args ) == 0 and len( kwargs ) == 0:
if ( topic, args, kwargs ) in self._pubsubs:
return
2016-03-30 22:56:50 +00:00
self._pubsubs.append( ( topic, args, kwargs ) )
2015-04-22 22:57:25 +00:00
2017-07-05 21:09:28 +00:00
def publish_status_update( self ):
pass
2015-06-03 21:05:13 +00:00
def CurrentlyDoingJob( self ):
return self._currently_doing_job
2017-07-12 20:03:45 +00:00
def GetApproxTotalFileSize( self ):
total = 0
2019-01-09 22:59:03 +00:00
for filename in list(self._db_filenames.values()):
2017-07-12 20:03:45 +00:00
path = os.path.join( self._db_dir, filename )
total += os.path.getsize( path )
return total
def GetSSLPaths( self ):
# create ssl keys
cert_here = os.path.exists( self._ssl_cert_path )
key_here = os.path.exists( self._ssl_key_path )
if cert_here ^ key_here:
raise Exception( 'While creating the server database, only one of the paths "{}" and "{}" existed. You can create a db with these files already in place, but please either delete the existing file (to have hydrus generate its own pair) or find the other in the pair (to use your own).'.format( self._ssl_cert_path, self._ssl_key_path ) )
elif not ( cert_here or key_here ):
HydrusData.Print( 'Generating new cert/key files.' )
HydrusEncryption.GenerateOpenSSLCertAndKeyFile( self._ssl_cert_path, self._ssl_key_path )
return ( self._ssl_cert_path, self._ssl_key_path )
2017-05-24 20:28:24 +00:00
def GetStatus( self ):
return ( self._current_status, self._current_job_name )
2016-08-31 19:55:14 +00:00
def IsDBUpdated( self ):
return self._is_db_updated
def IsFirstStart( self ):
return self._is_first_start
2016-03-30 22:56:50 +00:00
def LoopIsFinished( self ):
return self._loop_finished
2015-04-22 22:57:25 +00:00
2016-04-06 19:52:45 +00:00
def JobsQueueEmpty( self ):
return self._jobs.empty()
2015-04-22 22:57:25 +00:00
def MainLoop( self ):
2016-03-16 22:19:14 +00:00
try:
self._InitDBCursor() # have to reinitialise because the thread id has changed
self._InitCaches()
2016-04-06 19:52:45 +00:00
except:
2016-03-16 22:19:14 +00:00
2018-02-14 21:47:18 +00:00
self._DisplayCatastrophicError( traceback.format_exc() )
2016-03-16 22:19:14 +00:00
self._could_not_initialise = True
2016-04-06 19:52:45 +00:00
return
2015-04-22 22:57:25 +00:00
2016-03-16 22:19:14 +00:00
self._ready_to_serve_requests = True
2015-04-29 19:20:35 +00:00
2015-04-22 22:57:25 +00:00
error_count = 0
2019-07-31 22:01:02 +00:00
while not ( ( self._local_shutdown or HG.model_shutdown ) and self._jobs.empty() ):
2015-04-22 22:57:25 +00:00
try:
2019-01-09 22:59:03 +00:00
job = self._jobs.get( timeout = 1 )
2015-04-22 22:57:25 +00:00
self._currently_doing_job = True
2017-05-24 20:28:24 +00:00
self._current_job_name = job.ToString()
2015-04-22 22:57:25 +00:00
2017-07-05 21:09:28 +00:00
self.publish_status_update()
2015-06-03 21:05:13 +00:00
2015-04-22 22:57:25 +00:00
try:
2018-09-05 20:52:32 +00:00
if HG.db_report_mode:
summary = 'Running ' + job.ToString()
HydrusData.ShowText( summary )
2017-05-10 21:33:58 +00:00
if HG.db_profile_mode:
2015-04-22 22:57:25 +00:00
2017-03-08 23:23:12 +00:00
summary = 'Profiling ' + job.ToString()
2021-01-20 22:22:03 +00:00
HydrusData.Profile( summary, 'self._ProcessJob( job )', globals(), locals(), show_summary = True )
2015-04-22 22:57:25 +00:00
else:
self._ProcessJob( job )
error_count = 0
except:
error_count += 1
2017-08-02 21:32:54 +00:00
if error_count > 5:
raise
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
self._jobs.put( job ) # couldn't lock db; put job back on queue
2015-04-22 22:57:25 +00:00
time.sleep( 5 )
self._currently_doing_job = False
2017-05-24 20:28:24 +00:00
self._current_job_name = ''
2015-04-22 22:57:25 +00:00
2017-07-05 21:09:28 +00:00
self.publish_status_update()
2015-06-03 21:05:13 +00:00
2019-01-09 22:59:03 +00:00
except queue.Empty:
2016-03-30 22:56:50 +00:00
2017-11-01 20:37:39 +00:00
if self._transaction_contains_writes and HydrusData.TimeHasPassed( self._transaction_started + self.TRANSACTION_COMMIT_TIME ):
2017-08-02 21:32:54 +00:00
self._Commit()
self._BeginImmediate()
2019-02-27 23:03:30 +00:00
self._transaction_contains_writes = False
2016-03-30 22:56:50 +00:00
2015-06-17 20:01:41 +00:00
2019-10-09 22:03:03 +00:00
if self._pause_and_disconnect:
self._CloseDBCursor()
while self._pause_and_disconnect:
if self._local_shutdown or HG.model_shutdown:
break
time.sleep( 1 )
self._InitDBCursor()
2015-06-17 20:01:41 +00:00
self._CleanUpCaches()
2015-04-22 22:57:25 +00:00
self._CloseDBCursor()
2019-09-05 00:05:32 +00:00
temp_path = os.path.join( self._db_dir, self._durable_temp_db_filename )
HydrusPaths.DeletePath( temp_path )
2015-04-22 22:57:25 +00:00
self._loop_finished = True
2019-10-09 22:03:03 +00:00
def PauseAndDisconnect( self, pause_and_disconnect ):
self._pause_and_disconnect = pause_and_disconnect
2019-01-09 22:59:03 +00:00
def Read( self, action, *args, **kwargs ):
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
if action in self.READ_WRITE_ACTIONS:
job_type = 'read_write'
else:
job_type = 'read'
2015-04-22 22:57:25 +00:00
synchronous = True
2020-04-08 21:10:11 +00:00
job = self._GenerateDBJob( job_type, synchronous, action, *args, **kwargs )
2015-04-22 22:57:25 +00:00
2019-07-31 22:01:02 +00:00
if HG.model_shutdown:
2015-11-04 22:30:28 +00:00
raise HydrusExceptions.ShutdownException( 'Application has shut down!' )
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
self._jobs.put( job )
2015-04-22 22:57:25 +00:00
2016-03-16 22:19:14 +00:00
return job.GetResult()
def ReadyToServeRequests( self ):
return self._ready_to_serve_requests
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
def Shutdown( self ):
self._local_shutdown = True
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
def Write( self, action, synchronous, *args, **kwargs ):
2015-04-22 22:57:25 +00:00
2016-03-30 22:56:50 +00:00
job_type = 'write'
2015-04-22 22:57:25 +00:00
2020-04-08 21:10:11 +00:00
job = self._GenerateDBJob( job_type, synchronous, action, *args, **kwargs )
2015-04-22 22:57:25 +00:00
2019-07-31 22:01:02 +00:00
if HG.model_shutdown:
2015-11-04 22:30:28 +00:00
raise HydrusExceptions.ShutdownException( 'Application has shut down!' )
2015-04-22 22:57:25 +00:00
2019-01-09 22:59:03 +00:00
self._jobs.put( job )
2015-04-22 22:57:25 +00:00
if synchronous: return job.GetResult()
2016-09-28 18:48:01 +00:00
2020-12-02 22:04:38 +00:00
class TemporaryIntegerTableNameCache( object ):
my_instance = None
def __init__( self ):
TemporaryIntegerTableNameCache.my_instance = self
self._column_names_to_table_names = collections.defaultdict( collections.deque )
self._column_names_counter = collections.Counter()
@staticmethod
def instance() -> 'TemporaryIntegerTableNameCache':
if TemporaryIntegerTableNameCache.my_instance is None:
raise Exception( 'TemporaryIntegerTableNameCache is not yet initialised!' )
else:
return TemporaryIntegerTableNameCache.my_instance
def Clear( self ):
self._column_names_to_table_names = collections.defaultdict( collections.deque )
self._column_names_counter = collections.Counter()
def GetName( self, column_name ):
table_names = self._column_names_to_table_names[ column_name ]
initialised = True
if len( table_names ) == 0:
initialised = False
i = self._column_names_counter[ column_name ]
table_name = 'mem.temp_int_{}_{}'.format( column_name, i )
table_names.append( table_name )
self._column_names_counter[ column_name ] += 1
table_name = table_names.pop()
return ( initialised, table_name )
def ReleaseName( self, column_name, table_name ):
self._column_names_to_table_names[ column_name ].append( table_name )
2016-09-28 18:48:01 +00:00
class TemporaryIntegerTable( object ):
def __init__( self, cursor, integer_iterable, column_name ):
2020-12-02 22:04:38 +00:00
if not isinstance( integer_iterable, set ):
integer_iterable = set( integer_iterable )
2016-09-28 18:48:01 +00:00
self._cursor = cursor
self._integer_iterable = integer_iterable
self._column_name = column_name
2020-12-02 22:04:38 +00:00
( self._initialised, self._table_name ) = TemporaryIntegerTableNameCache.instance().GetName( self._column_name )
2016-09-28 18:48:01 +00:00
def __enter__( self ):
2020-12-02 22:04:38 +00:00
if not self._initialised:
self._cursor.execute( 'CREATE TABLE {} ( {} INTEGER PRIMARY KEY );'.format( self._table_name, self._column_name ) )
2016-09-28 18:48:01 +00:00
2020-12-02 22:04:38 +00:00
self._cursor.executemany( 'INSERT INTO {} ( {} ) VALUES ( ? );'.format( self._table_name, self._column_name ), ( ( i, ) for i in self._integer_iterable ) )
2016-09-28 18:48:01 +00:00
return self._table_name
def __exit__( self, exc_type, exc_val, exc_tb ):
2020-12-02 22:04:38 +00:00
self._cursor.execute( 'DELETE FROM {};'.format( self._table_name ) )
TemporaryIntegerTableNameCache.instance().ReleaseName( self._column_name, self._table_name )
2016-09-28 18:48:01 +00:00
return False
2017-01-25 22:56:55 +00:00