hydrus/test.py

369 lines
11 KiB
Python

#!/usr/bin/env python2
import locale
try: locale.setlocale( locale.LC_ALL, '' )
except: pass
from include import HydrusConstants as HC
from include import ClientConstants as CC
from include import HydrusGlobals
from include import ClientDefaults
from include import ClientNetworking
from include import HydrusPubSub
from include import HydrusSessions
from include import HydrusTags
from include import HydrusThreading
from include import TestClientConstants
from include import TestClientDaemons
from include import TestClientDownloading
from include import TestConstants
from include import TestDialogs
from include import TestDB
from include import TestFunctions
from include import TestClientImageHandling
from include import TestHydrusNATPunch
from include import TestHydrusServer
from include import TestHydrusSessions
from include import TestHydrusTags
import collections
import os
import random
import shutil
import sys
import tempfile
import threading
import time
import unittest
import wx
from twisted.internet import reactor
from include import ClientCaches
from include import ClientData
from include import HydrusData
from include import HydrusPaths
only_run = None
class Controller( object ):
def __init__( self ):
self._db_dir = tempfile.mkdtemp()
TestConstants.DB_DIR = self._db_dir
self._server_files_dir = os.path.join( self._db_dir, 'server_files' )
self._updates_dir = os.path.join( self._db_dir, 'test_updates' )
client_files_default = os.path.join( self._db_dir, 'client_files' )
HydrusPaths.MakeSureDirectoryExists( client_files_default )
HydrusGlobals.controller = self
HydrusGlobals.client_controller = self
HydrusGlobals.server_controller = self
HydrusGlobals.test_controller = self
self._pubsub = HydrusPubSub.HydrusPubSub( self )
self._new_options = ClientData.ClientOptions( self._db_dir )
def show_text( text ): pass
HydrusData.ShowText = show_text
self._http = ClientNetworking.HTTPConnectionManager()
self._call_to_threads = []
self._reads = {}
self._reads[ 'hydrus_sessions' ] = []
self._reads[ 'local_booru_share_keys' ] = []
self._reads[ 'messaging_sessions' ] = []
self._reads[ 'tag_censorship' ] = []
self._reads[ 'options' ] = ClientDefaults.GetClientDefaultOptions()
services = []
services.append( ClientData.GenerateService( CC.LOCAL_BOORU_SERVICE_KEY, HC.LOCAL_BOORU, CC.LOCAL_BOORU_SERVICE_KEY, { 'max_monthly_data' : None, 'used_monthly_data' : 0 } ) )
services.append( ClientData.GenerateService( CC.LOCAL_FILE_SERVICE_KEY, HC.LOCAL_FILE, CC.LOCAL_FILE_SERVICE_KEY, {} ) )
services.append( ClientData.GenerateService( CC.LOCAL_TAG_SERVICE_KEY, HC.LOCAL_TAG, CC.LOCAL_TAG_SERVICE_KEY, {} ) )
self._reads[ 'services' ] = services
client_files_locations = {}
for prefix in HydrusData.IterateHexPrefixes():
for c in ( 'f', 't', 'r' ):
client_files_locations[ c + prefix ] = client_files_default
self._reads[ 'client_files_locations' ] = client_files_locations
self._reads[ 'sessions' ] = []
self._reads[ 'tag_parents' ] = {}
self._reads[ 'tag_siblings' ] = {}
self._reads[ 'web_sessions' ] = {}
HC.options = ClientDefaults.GetClientDefaultOptions()
self._writes = collections.defaultdict( list )
self._managers = {}
self._services_manager = ClientCaches.ServicesManager( self )
self._client_files_manager = ClientCaches.ClientFilesManager( self )
self._client_session_manager = ClientCaches.HydrusSessionManager( self )
self._managers[ 'tag_censorship' ] = ClientCaches.TagCensorshipManager( self )
self._managers[ 'tag_siblings' ] = ClientCaches.TagSiblingsManager( self )
self._managers[ 'tag_parents' ] = ClientCaches.TagParentsManager( self )
self._managers[ 'undo' ] = ClientCaches.UndoManager( self )
self._managers[ 'web_sessions' ] = TestConstants.FakeWebSessionManager()
self._server_session_manager = HydrusSessions.HydrusSessionManagerServer()
self._managers[ 'local_booru' ] = ClientCaches.LocalBooruCache( self )
self._cookies = {}
def _GetCallToThread( self ):
for call_to_thread in self._call_to_threads:
if not call_to_thread.CurrentlyWorking():
return call_to_thread
if len( self._call_to_threads ) > 100:
raise Exception( 'Too many call to threads!' )
call_to_thread = HydrusThreading.THREADCallToThread( self )
self._call_to_threads.append( call_to_thread )
call_to_thread.start()
return call_to_thread
def pub( self, topic, *args, **kwargs ):
pass
def pubimmediate( self, topic, *args, **kwargs ):
self._pubsub.pubimmediate( topic, *args, **kwargs )
def sub( self, object, method_name, topic ):
self._pubsub.sub( object, method_name, topic )
def CallToThread( self, callable, *args, **kwargs ):
call_to_thread = self._GetCallToThread()
call_to_thread.put( callable, *args, **kwargs )
def DoHTTP( self, *args, **kwargs ): return self._http.Request( *args, **kwargs )
def GetClientFilesManager( self ):
return self._client_files_manager
def GetClientSessionManager( self ):
return self._client_session_manager
def GetFilesDir( self ):
return self._server_files_dir
def GetHTTP( self ): return self._http
def GetNewOptions( self ):
return self._new_options
def GetOptions( self ):
return HC.options
def GetManager( self, manager_type ): return self._managers[ manager_type ]
def GetServicesManager( self ):
return self._services_manager
def GetServerSessionManager( self ):
return self._server_session_manager
def GetUpdatesDir( self ):
return self._updates_dir
def GetWrite( self, name ):
write = self._writes[ name ]
del self._writes[ name ]
return write
def IsFirstStart( self ):
return True
def ModelIsShutdown( self ):
return HydrusGlobals.model_shutdown
def Read( self, name, *args, **kwargs ): return self._reads[ name ]
def ResetIdleTimer( self ): pass
def Run( self ):
suites = []
if only_run is None: run_all = True
else: run_all = False
if run_all or only_run == 'cc': suites.append( unittest.TestLoader().loadTestsFromModule( TestClientConstants ) )
if run_all or only_run == 'daemons': suites.append( unittest.TestLoader().loadTestsFromModule( TestClientDaemons ) )
if run_all or only_run == 'dialogs': suites.append( unittest.TestLoader().loadTestsFromModule( TestDialogs ) )
if run_all or only_run == 'db': suites.append( unittest.TestLoader().loadTestsFromModule( TestDB ) )
if run_all or only_run == 'downloading': suites.append( unittest.TestLoader().loadTestsFromModule( TestClientDownloading ) )
if run_all or only_run == 'functions': suites.append( unittest.TestLoader().loadTestsFromModule( TestFunctions ) )
if run_all or only_run == 'image': suites.append( unittest.TestLoader().loadTestsFromModule( TestClientImageHandling ) )
if run_all or only_run == 'nat': suites.append( unittest.TestLoader().loadTestsFromModule( TestHydrusNATPunch ) )
if run_all or only_run == 'server': suites.append( unittest.TestLoader().loadTestsFromModule( TestHydrusServer ) )
if run_all or only_run == 'sessions': suites.append( unittest.TestLoader().loadTestsFromModule( TestHydrusSessions ) )
if run_all or only_run == 'tags': suites.append( unittest.TestLoader().loadTestsFromModule( TestHydrusTags ) )
suite = unittest.TestSuite( suites )
runner = unittest.TextTestRunner( verbosity = 1 )
runner.run( suite )
def SetHTTP( self, http ): self._http = http
def SetRead( self, name, value ): self._reads[ name ] = value
def SetWebCookies( self, name, value ): self._cookies[ name ] = value
def TidyUp( self ):
shutil.rmtree( self._db_dir )
def ViewIsShutdown( self ):
return HydrusGlobals.view_shutdown
def Write( self, name, *args, **kwargs ):
self._writes[ name ].append( ( args, kwargs ) )
def WriteSynchronous( self, name, *args, **kwargs ):
self._writes[ name ].append( ( args, kwargs ) )
if name == 'import_file':
( path, ) = args
with open( path, 'rb' ) as f: file = f.read()
if file == 'blarg': raise Exception( 'File failed to import for some reason!' )
else: return ( CC.STATUS_SUCCESSFUL, '0123456789abcdef'.decode( 'hex' ) )
if __name__ == '__main__':
args = sys.argv[1:]
if len( args ) > 0:
only_run = args[0]
else: only_run = None
try:
threading.Thread( target = reactor.run, kwargs = { 'installSignalHandlers' : 0 } ).start()
app = wx.App()
controller = Controller()
try:
win = wx.Frame( None )
wx.CallAfter( controller.Run )
#threading.Thread( target = controller.Run ).start()
wx.CallAfter( win.Destroy )
app.MainLoop()
except:
import traceback
HydrusData.DebugPrint( traceback.format_exc() )
finally:
HydrusGlobals.view_shutdown = True
controller.pubimmediate( 'wake_daemons' )
HydrusGlobals.model_shutdown = True
controller.pubimmediate( 'wake_daemons' )
controller.TidyUp()
except:
import traceback
HydrusData.DebugPrint( traceback.format_exc() )
finally:
reactor.callFromThread( reactor.stop )
print( 'This was version ' + str( HC.SOFTWARE_VERSION ) )
raw_input()