hydrus/include/HydrusDB.py

344 lines
9.8 KiB
Python
Raw Normal View History

2015-04-22 22:57:25 +00:00
import cProfile
2015-11-25 22:00:57 +00:00
import cStringIO
2015-04-22 22:57:25 +00:00
import HydrusConstants as HC
import HydrusData
import HydrusExceptions
import HydrusGlobals
import os
2015-11-25 22:00:57 +00:00
import pstats
2015-04-22 22:57:25 +00:00
import Queue
2016-01-06 21:17:20 +00:00
import random
2015-04-22 22:57:25 +00:00
import sqlite3
import sys
import traceback
import time
class HydrusDB( object ):
DB_NAME = 'hydrus'
READ_WRITE_ACTIONS = []
WRITE_SPECIAL_ACTIONS = []
2015-08-26 21:18:39 +00:00
def __init__( self, controller ):
self._controller = controller
2015-04-22 22:57:25 +00:00
self._local_shutdown = False
self._loop_finished = False
2015-11-04 22:30:28 +00:00
self._db_path = os.path.join( HC.DB_DIR, self.DB_NAME + '.db' )
2015-04-22 22:57:25 +00:00
self._jobs = Queue.PriorityQueue()
self._pubsubs = []
self._currently_doing_job = False
if os.path.exists( self._db_path ):
# open and close to clean up in case last session didn't close well
self._InitDB()
self._CloseDBCursor()
self._InitDB()
( version, ) = self._c.execute( 'SELECT version FROM version;' ).fetchone()
2015-11-04 22:30:28 +00:00
if version < HC.SOFTWARE_VERSION - 50: raise Exception( 'Your current version of hydrus ' + str( version ) + ' is too old for this version ' + str( HC.SOFTWARE_VERSION ) + ' to update. Please try updating with version ' + str( version + 45 ) + ' or earlier first.' )
2015-04-22 22:57:25 +00:00
while version < HC.SOFTWARE_VERSION:
time.sleep( 2 )
try: self._c.execute( 'BEGIN IMMEDIATE' )
except Exception as e:
2015-11-04 22:30:28 +00:00
raise HydrusExceptions.DBAccessException( HydrusData.ToUnicode( e ) )
2015-04-22 22:57:25 +00:00
try:
self._UpdateDB( version )
self._c.execute( 'COMMIT' )
except:
self._c.execute( 'ROLLBACK' )
2015-11-04 22:30:28 +00:00
raise Exception( 'Updating the ' + self.DB_NAME + ' db to version ' + str( version + 1 ) + ' caused this error:' + os.linesep + traceback.format_exc() )
2015-04-22 22:57:25 +00:00
( version, ) = self._c.execute( 'SELECT version FROM version;' ).fetchone()
self._CloseDBCursor()
2015-06-17 20:01:41 +00:00
def _CleanUpCaches( self ):
pass
2015-04-22 22:57:25 +00:00
def _CloseDBCursor( self ):
self._c.close()
self._db.close()
del self._db
del self._c
def _CreateDB( self ):
raise NotImplementedError()
def _GetRowCount( self ):
row_count = self._c.rowcount
if row_count == -1: return 0
else: return row_count
def _GetSiteId( self, name ):
result = self._c.execute( 'SELECT site_id FROM imageboard_sites WHERE name = ?;', ( name, ) ).fetchone()
if result is None:
self._c.execute( 'INSERT INTO imageboard_sites ( name ) VALUES ( ? );', ( name, ) )
site_id = self._c.lastrowid
else: ( site_id, ) = result
return site_id
def _InitCaches( self ):
raise NotImplementedError()
def _InitDB( self ):
2015-09-23 21:21:02 +00:00
create_db = False
2016-01-13 22:08:19 +00:00
if not os.path.exists( self._db_path ):
create_db = True
2015-04-22 22:57:25 +00:00
self._InitDBCursor()
2015-09-23 21:21:02 +00:00
result = self._c.execute( 'SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?;', ( 'table', 'version' ) ).fetchone()
if result is None:
create_db = True
2015-04-22 22:57:25 +00:00
if create_db:
self._CreateDB()
def _InitDBCursor( self ):
self._db = sqlite3.connect( self._db_path, isolation_level = None, detect_types = sqlite3.PARSE_DECLTYPES )
self._db.create_function( 'hydrus_hamming', 2, HydrusData.GetHammingDistance )
self._c = self._db.cursor()
2015-11-11 21:20:41 +00:00
self._c.execute( 'DROP TABLE IF EXISTS ratings_aggregates;' )
2016-01-06 21:17:20 +00:00
self._c.execute( 'PRAGMA cache_size = -50000;' )
self._c.execute( 'PRAGMA journal_mode = WAL;' )
2016-01-13 22:08:19 +00:00
self._c.execute( 'PRAGMA synchronous = 1;' )
try:
self._c.execute( 'SELECT * FROM sqlite_master;' ).fetchone()
except sqlite3.OperationalError:
self._c.execute( 'PRAGMA journal_mode = TRUNCATE;' )
self._c.execute( 'SELECT * FROM sqlite_master;' ).fetchone()
2015-04-22 22:57:25 +00:00
def _ManageDBError( self, job, e ):
raise NotImplementedError()
def _ProcessJob( self, job ):
job_type = job.GetType()
action = job.GetAction()
args = job.GetArgs()
kwargs = job.GetKWArgs()
in_transaction = False
try:
if job_type == 'read': self._c.execute( 'BEGIN DEFERRED' )
elif job_type in ( 'read_write', 'write' ): self._c.execute( 'BEGIN IMMEDIATE' )
if job_type != 'write_special': in_transaction = True
if job_type in ( 'read', 'read_write' ): result = self._Read( action, *args, **kwargs )
elif job_type in ( 'write', 'write_special' ): result = self._Write( action, *args, **kwargs )
if job_type != 'write_special': self._c.execute( 'COMMIT' )
2015-08-26 21:18:39 +00:00
for ( topic, args, kwargs ) in self._pubsubs: self._controller.pub( topic, *args, **kwargs )
2015-04-22 22:57:25 +00:00
if job.IsSynchronous(): job.PutResult( result )
except Exception as e:
if in_transaction: self._c.execute( 'ROLLBACK' )
self._ManageDBError( job, e )
def _Read( self, action, *args, **kwargs ):
raise NotImplementedError()
def _ReportStatus( self, text ):
2015-11-18 22:44:07 +00:00
HydrusData.Print( text )
2015-04-22 22:57:25 +00:00
def _UpdateDB( self, version ):
raise NotImplementedError()
def _Write( self, action, *args, **kwargs ):
raise NotImplementedError()
def pub_after_commit( self, topic, *args, **kwargs ): self._pubsubs.append( ( topic, args, kwargs ) )
2015-06-03 21:05:13 +00:00
def CurrentlyDoingJob( self ):
return self._currently_doing_job
2015-04-22 22:57:25 +00:00
def LoopIsFinished( self ): return self._loop_finished
def MainLoop( self ):
self._InitDBCursor() # have to reinitialise because the thread id has changed
2015-04-29 19:20:35 +00:00
self._InitCaches()
2015-04-22 22:57:25 +00:00
error_count = 0
2015-11-04 22:30:28 +00:00
while not ( ( self._local_shutdown or self._controller.ModelIsShutdown() ) and self._jobs.empty() ):
2015-04-22 22:57:25 +00:00
try:
2015-09-02 23:16:09 +00:00
( priority, job ) = self._jobs.get( timeout = 0.1 )
2015-04-22 22:57:25 +00:00
self._currently_doing_job = True
2015-08-26 21:18:39 +00:00
self._controller.pub( 'refresh_status' )
2015-06-03 21:05:13 +00:00
2015-04-22 22:57:25 +00:00
self._pubsubs = []
try:
if HydrusGlobals.db_profile_mode:
HydrusData.ShowText( 'Profiling ' + job.GetType() + ' ' + job.GetAction() )
2015-11-25 22:00:57 +00:00
HydrusData.Profile( 'self._ProcessJob( job )', globals(), locals() )
2015-04-22 22:57:25 +00:00
else:
self._ProcessJob( job )
error_count = 0
except:
error_count += 1
if error_count > 5: raise
self._jobs.put( ( priority, job ) ) # couldn't lock db; put job back on queue
time.sleep( 5 )
self._currently_doing_job = False
2015-08-26 21:18:39 +00:00
self._controller.pub( 'refresh_status' )
2015-06-03 21:05:13 +00:00
2015-04-22 22:57:25 +00:00
except Queue.Empty: pass # no jobs this second; let's see if we should shutdown
2015-06-17 20:01:41 +00:00
self._CleanUpCaches()
2015-04-22 22:57:25 +00:00
self._CloseDBCursor()
self._loop_finished = True
def Read( self, action, priority, *args, **kwargs ):
if action in self.READ_WRITE_ACTIONS: job_type = 'read_write'
else: job_type = 'read'
synchronous = True
job = HydrusData.JobDatabase( action, job_type, synchronous, *args, **kwargs )
2015-11-04 22:30:28 +00:00
if self._controller.ModelIsShutdown():
raise HydrusExceptions.ShutdownException( 'Application has shut down!' )
2015-04-22 22:57:25 +00:00
self._jobs.put( ( priority + 1, job ) ) # +1 so all writes of equal priority can clear out first
if synchronous: return job.GetResult()
def Shutdown( self ): self._local_shutdown = True
def Write( self, action, priority, synchronous, *args, **kwargs ):
if action in self.WRITE_SPECIAL_ACTIONS: job_type = 'write_special'
else: job_type = 'write'
job = HydrusData.JobDatabase( action, job_type, synchronous, *args, **kwargs )
2015-11-04 22:30:28 +00:00
if self._controller.ModelIsShutdown():
raise HydrusExceptions.ShutdownException( 'Application has shut down!' )
2015-04-22 22:57:25 +00:00
self._jobs.put( ( priority, job ) )
if synchronous: return job.GetResult()