hydrus/include/TestHydrusServer.py

802 lines
28 KiB
Python

from . import ClientLocalServer
from . import ClientMedia
from . import ClientRatings
from . import ClientServices
import hashlib
import http.client
from . import HydrusConstants as HC
from . import HydrusEncryption
from . import HydrusNetwork
from . import HydrusPaths
from . import HydrusServer
from . import HydrusServerResources
import os
import random
from . import ServerFiles
from . import ServerServer
import ssl
from . import TestConstants
import time
import unittest
from twisted.internet import reactor
#from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol
from twisted.internet.defer import deferredGenerator, waitForDeferred
import twisted.internet.ssl
from . import HydrusData
from . import HydrusGlobals as HG
with open( os.path.join( HC.STATIC_DIR, 'hydrus.png' ), 'rb' ) as f:
EXAMPLE_FILE = f.read()
with open( os.path.join( HC.STATIC_DIR, 'hydrus_small.png' ), 'rb' ) as f:
EXAMPLE_THUMBNAIL = f.read()
class TestServer( unittest.TestCase ):
@classmethod
def setUpClass( cls ):
cls._access_key = HydrusData.GenerateKey()
services = []
cls._serverside_file_service = HydrusNetwork.GenerateService( HydrusData.GenerateKey(), HC.FILE_REPOSITORY, 'file repo', HC.DEFAULT_SERVICE_PORT + 1 )
cls._serverside_tag_service = HydrusNetwork.GenerateService( HydrusData.GenerateKey(), HC.TAG_REPOSITORY, 'tag repo', HC.DEFAULT_SERVICE_PORT )
cls._serverside_admin_service = HydrusNetwork.GenerateService( HydrusData.GenerateKey(), HC.SERVER_ADMIN, 'server admin', HC.DEFAULT_SERVER_ADMIN_PORT )
cls._clientside_file_service = ClientServices.GenerateService( HydrusData.GenerateKey(), HC.FILE_REPOSITORY, 'file repo' )
cls._clientside_tag_service = ClientServices.GenerateService( HydrusData.GenerateKey(), HC.TAG_REPOSITORY, 'tag repo' )
cls._clientside_admin_service = ClientServices.GenerateService( HydrusData.GenerateKey(), HC.SERVER_ADMIN, 'server admin' )
cls._clientside_file_service.SetCredentials( HydrusNetwork.Credentials( '127.0.0.1', HC.DEFAULT_SERVICE_PORT + 1, cls._access_key ) )
cls._clientside_tag_service.SetCredentials( HydrusNetwork.Credentials( '127.0.0.1', HC.DEFAULT_SERVICE_PORT, cls._access_key ) )
cls._clientside_admin_service.SetCredentials( HydrusNetwork.Credentials( '127.0.0.1', HC.DEFAULT_SERVER_ADMIN_PORT, cls._access_key ) )
cls._local_booru = ClientServices.GenerateService( HydrusData.GenerateKey(), HC.LOCAL_BOORU, 'local booru' )
services_manager = HG.test_controller.services_manager
services_manager._keys_to_services[ cls._clientside_file_service.GetServiceKey() ] = cls._clientside_file_service
services_manager._keys_to_services[ cls._clientside_tag_service.GetServiceKey() ] = cls._clientside_tag_service
services_manager._keys_to_services[ cls._clientside_admin_service.GetServiceKey() ] = cls._clientside_admin_service
permissions = [ HC.GET_DATA, HC.POST_DATA, HC.POST_PETITIONS, HC.RESOLVE_PETITIONS, HC.MANAGE_USERS, HC.GENERAL_ADMIN, HC.EDIT_SERVICES ]
account_key = HydrusData.GenerateKey()
account_type = HydrusNetwork.AccountType.GenerateAdminAccountType( HC.SERVER_ADMIN )
created = HydrusData.GetNow() - 100000
expires = None
cls._account = HydrusNetwork.Account( account_key, account_type, created, expires )
cls._file_hash = HydrusData.GenerateKey()
def TWISTEDSetup():
cls._ssl_cert_path = os.path.join( TestConstants.DB_DIR, 'server.crt' )
cls._ssl_key_path = os.path.join( TestConstants.DB_DIR, 'server.key' )
# if db test ran, this is still hanging around and read-only, so don't bother to fail overwriting
if not os.path.exists( cls._ssl_cert_path ):
HydrusEncryption.GenerateOpenSSLCertAndKeyFile( cls._ssl_cert_path, cls._ssl_key_path )
context_factory = twisted.internet.ssl.DefaultOpenSSLContextFactory( cls._ssl_key_path, cls._ssl_cert_path )
reactor.listenSSL( HC.DEFAULT_SERVER_ADMIN_PORT, ServerServer.HydrusServiceAdmin( cls._serverside_admin_service ), context_factory )
reactor.listenSSL( HC.DEFAULT_SERVICE_PORT + 1, ServerServer.HydrusServiceRepositoryFile( cls._serverside_file_service ), context_factory )
reactor.listenSSL( HC.DEFAULT_SERVICE_PORT, ServerServer.HydrusServiceRepositoryTag( cls._serverside_tag_service ), context_factory )
reactor.listenTCP( HC.DEFAULT_LOCAL_BOORU_PORT, ClientLocalServer.HydrusServiceBooru( cls._local_booru ) )
reactor.callFromThread( TWISTEDSetup )
time.sleep( 1 )
def _test_basics( self, host, port, https = True ):
if https:
context = ssl.SSLContext( ssl.PROTOCOL_SSLv23 )
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
connection = http.client.HTTPSConnection( host, port, timeout = 10, context = context )
else:
connection = http.client.HTTPConnection( host, port, timeout = 10 )
#
connection.request( 'GET', '/' )
response = connection.getresponse()
data = response.read()
self.assertEqual( response.status, 200 )
#
with open( os.path.join( HC.STATIC_DIR, 'hydrus.ico' ), 'rb' ) as f:
favicon = f.read()
connection.request( 'GET', '/favicon.ico' )
response = connection.getresponse()
data = response.read()
self.assertEqual( data, favicon )
def _test_file_repo( self, service ):
# file
path = ServerFiles.GetExpectedFilePath( self._file_hash )
HydrusPaths.MakeSureDirectoryExists( os.path.dirname( path ) )
with open( path, 'wb' ) as f:
f.write( EXAMPLE_FILE )
response = service.Request( HC.GET, 'file', { 'hash' : self._file_hash } )
self.assertEqual( response, EXAMPLE_FILE )
try: os.remove( path )
except: pass
path = os.path.join( HC.STATIC_DIR, 'hydrus.png' )
with open( path, 'rb' ) as f:
file_bytes = f.read()
service.Request( HC.POST, 'file', { 'file' : file_bytes } )
written = HG.test_controller.GetWrite( 'file' )
[ ( args, kwargs ) ] = written
( written_service_key, written_account, written_file_dict ) = args
self.assertEqual( written_file_dict[ 'hash' ], b'\xadm5\x99\xa6\xc4\x89\xa5u\xeb\x19\xc0&\xfa\xce\x97\xa9\xcdey\xe7G(\xb0\xce\x94\xa6\x01\xd22\xf3\xc3' )
self.assertEqual( written_file_dict[ 'ip' ], '127.0.0.1' )
self.assertEqual( written_file_dict[ 'height' ], 200 )
self.assertEqual( written_file_dict[ 'width' ], 200 )
self.assertEqual( written_file_dict[ 'mime' ], 2 )
self.assertEqual( written_file_dict[ 'size' ], 5270 )
# ip
( ip, timestamp ) = ( '94.45.87.123', HydrusData.GetNow() - 100000 )
HG.test_controller.SetRead( 'ip', ( ip, timestamp ) )
response = service.Request( HC.GET, 'ip', { 'hash' : self._file_hash } )
self.assertEqual( response[ 'ip' ], ip )
self.assertEqual( response[ 'timestamp' ], timestamp )
# thumbnail
path = ServerFiles.GetExpectedThumbnailPath( self._file_hash )
HydrusPaths.MakeSureDirectoryExists( os.path.dirname( path ) )
with open( path, 'wb' ) as f:
f.write( EXAMPLE_THUMBNAIL )
response = service.Request( HC.GET, 'thumbnail', { 'hash' : self._file_hash } )
self.assertEqual( response, EXAMPLE_THUMBNAIL )
try: os.remove( path )
except: pass
def _test_local_booru( self, host, port ):
#
connection = http.client.HTTPConnection( host, port, timeout = 10 )
#
with open( os.path.join( HC.STATIC_DIR, 'local_booru_style.css' ), 'rb' ) as f:
css = f.read()
connection.request( 'GET', '/style.css' )
response = connection.getresponse()
data = response.read()
self.assertEqual( data, css )
#
share_key = HydrusData.GenerateKey()
hashes = [ HydrusData.GenerateKey() for i in range( 5 ) ]
client_files_default = os.path.join( TestConstants.DB_DIR, 'client_files' )
hash_encoded = hashes[0].hex()
prefix = hash_encoded[:2]
file_path = os.path.join( client_files_default, 'f' + prefix, hash_encoded + '.jpg' )
thumbnail_path = os.path.join( client_files_default, 't' + prefix, hash_encoded + '.thumbnail' )
with open( file_path, 'wb' ) as f:
f.write( EXAMPLE_FILE )
with open( thumbnail_path, 'wb' ) as f:
f.write( EXAMPLE_THUMBNAIL )
local_booru_manager = HG.client_controller.local_booru_manager
#
self._test_local_booru_requests( connection, share_key, hashes[0], 404 )
#
info = {}
info[ 'name' ] = 'name'
info[ 'text' ] = 'text'
info[ 'timeout' ] = 0
info[ 'hashes' ] = hashes
file_info_manager = ClientMedia.FileInfoManager( 1, hashes[0], 500, HC.IMAGE_JPEG, 640, 480 )
file_viewing_stats_manager = ClientMedia.FileViewingStatsManager.STATICGenerateEmptyManager()
media_results = [ ClientMedia.MediaResult( file_info_manager, ClientMedia.TagsManager( {} ), ClientMedia.LocationsManager( set(), set(), set(), set() ), ClientRatings.RatingsManager( {} ), file_viewing_stats_manager ) for hash in hashes ]
HG.test_controller.SetRead( 'local_booru_share_keys', [ share_key ] )
HG.test_controller.SetRead( 'local_booru_share', info )
HG.test_controller.SetRead( 'media_results', media_results )
local_booru_manager.RefreshShares()
#
self._test_local_booru_requests( connection, share_key, hashes[0], 403 )
#
info[ 'timeout' ] = None
HG.test_controller.SetRead( 'local_booru_share', info )
local_booru_manager.RefreshShares()
#
self._test_local_booru_requests( connection, share_key, hashes[0], 200 )
#
HG.test_controller.SetRead( 'local_booru_share_keys', [] )
local_booru_manager.RefreshShares()
#
self._test_local_booru_requests( connection, share_key, hashes[0], 404 )
def _test_local_booru_requests( self, connection, share_key, hash, expected_result ):
requests = []
requests.append( '/gallery?share_key=' + share_key.hex() )
requests.append( '/page?share_key=' + share_key.hex() + '&hash=' + hash.hex() )
requests.append( '/file?share_key=' + share_key.hex() + '&hash=' + hash.hex() )
requests.append( '/thumbnail?share_key=' + share_key.hex() + '&hash=' + hash.hex() )
for request in requests:
connection.request( 'GET', request )
response = connection.getresponse()
data = response.read()
self.assertEqual( response.status, expected_result )
def _test_repo( self, service ):
service_key = service.GetServiceKey()
# num_petitions
num_petitions = [ [ HC.CONTENT_TYPE_MAPPINGS, HC.CONTENT_STATUS_PETITIONED, 23 ], [ HC.CONTENT_TYPE_TAG_PARENTS, HC.CONTENT_STATUS_PENDING, 0 ] ]
HG.test_controller.SetRead( 'num_petitions', num_petitions )
response = service.Request( HC.GET, 'num_petitions' )
self.assertEqual( response[ 'num_petitions' ], num_petitions )
# petition
action = HC.CONTENT_UPDATE_PETITION
petitioner_account = HydrusNetwork.Account.GenerateUnknownAccount()
reason = 'it sucks'
contents = [ HydrusNetwork.Content( HC.CONTENT_TYPE_FILES, [ HydrusData.GenerateKey() for i in range( 10 ) ] ) ]
petition = HydrusNetwork.Petition( action, petitioner_account, reason, contents )
HG.test_controller.SetRead( 'petition', petition )
response = service.Request( HC.GET, 'petition', { 'content_type' : HC.CONTENT_TYPE_FILES, 'status' : HC.CONTENT_UPDATE_PETITION } )
self.assertEqual( response[ 'petition' ].GetSerialisableTuple(), petition.GetSerialisableTuple() )
# definitions
definitions_update = HydrusNetwork.DefinitionsUpdate()
for i in range( 100, 200 ):
definitions_update.AddRow( ( HC.DEFINITIONS_TYPE_TAGS, i, 'series:test ' + str( i ) ) )
definitions_update.AddRow( ( HC.DEFINITIONS_TYPE_HASHES, i + 500, HydrusData.GenerateKey() ) )
definitions_update_network_bytes = definitions_update.DumpToNetworkBytes()
definitions_update_hash = hashlib.sha256( definitions_update_network_bytes ).digest()
path = ServerFiles.GetExpectedFilePath( definitions_update_hash )
HydrusPaths.MakeSureDirectoryExists( path )
with open( path, 'wb' ) as f:
f.write( definitions_update_network_bytes )
response = service.Request( HC.GET, 'update', { 'update_hash' : definitions_update_hash } )
try: os.remove( path )
except: pass
self.assertEqual( response, definitions_update_network_bytes )
# content
rows = [ ( random.randint( 100, 1000 ), [ random.randint( 100, 1000 ) for i in range( 50 ) ] ) for j in range( 20 ) ]
content_update = HydrusNetwork.ContentUpdate()
for row in rows:
content_update.AddRow( ( HC.CONTENT_TYPE_MAPPINGS, HC.CONTENT_UPDATE_ADD, row ) )
content_update_network_bytes = content_update.DumpToNetworkBytes()
content_update_hash = hashlib.sha256( content_update_network_bytes ).digest()
path = ServerFiles.GetExpectedFilePath( content_update_hash )
with open( path, 'wb' ) as f:
f.write( content_update_network_bytes )
response = service.Request( HC.GET, 'update', { 'update_hash' : content_update_hash } )
try: os.remove( path )
except: pass
self.assertEqual( response, content_update_network_bytes )
# metadata
metadata = HydrusNetwork.Metadata()
metadata.AppendUpdate( [ definitions_update_hash, content_update_hash ], HydrusData.GetNow() - 101000, HydrusData.GetNow() - 1000, HydrusData.GetNow() + 100000 )
service._metadata = metadata
response = service.Request( HC.GET, 'metadata_slice', { 'since' : 0 } )
self.assertEqual( response[ 'metadata_slice' ].GetSerialisableTuple(), metadata.GetSerialisableTuple() )
# post content
raise NotImplementedError()
'''
update = HydrusData.ClientToServerContentUpdatePackage( {}, hash_ids_to_hashes )
service.Request( HC.POST, 'content_update_package', { 'update' : update } )
written = HG.test_controller.GetWrite( 'update' )
[ ( args, kwargs ) ] = written
( written_service_key, written_account, written_update ) = args
self.assertEqual( update.GetHashes(), written_update.GetHashes() )
'''
def _test_restricted( self, service ):
# access_key
registration_key = HydrusData.GenerateKey()
HG.test_controller.SetRead( 'access_key', self._access_key )
response = service.Request( HC.GET, 'access_key', { 'registration_key' : registration_key } )
self.assertEqual( response[ 'access_key' ], self._access_key )
# set up session
last_error = 0
account = self._account
HG.test_controller.SetRead( 'service', service )
HG.test_controller.SetRead( 'account_key_from_access_key', HydrusData.GenerateKey() )
HG.test_controller.SetRead( 'account', self._account )
# account
response = service.Request( HC.GET, 'account' )
self.assertEqual( repr( response[ 'account' ] ), repr( self._account ) )
# account_info
account_info = { 'message' : 'hello' }
HG.test_controller.SetRead( 'account_info', account_info )
HG.test_controller.SetRead( 'account_key_from_identifier', HydrusData.GenerateKey() )
response = service.Request( HC.GET, 'account_info', { 'subject_account_key' : HydrusData.GenerateKey() } )
self.assertEqual( response[ 'account_info' ], account_info )
# this not working for now, screw it m8
#response = service.Request( HC.GET, 'account_info', { 'subject_hash' : HydrusData.GenerateKey() } )
#self.assertEqual( response[ 'account_info' ], account_info )
#response = service.Request( HC.GET, 'account_info', { 'subject_hash' : HydrusData.GenerateKey(), 'subject_tag' : 'hello' } )
#self.assertEqual( response[ 'account_info' ], account_info )
# account_types
account_types = [ HydrusNetwork.AccountType.GenerateAdminAccountType( service.GetServiceType() ) ]
HG.test_controller.SetRead( 'account_types', account_types )
response = service.Request( HC.GET, 'account_types' )
self.assertEqual( response[ 'account_types' ][0].GetAccountTypeKey(), account_types[0].GetAccountTypeKey() )
service.Request( HC.POST, 'account_types', { 'account_types' : account_types, 'deletee_account_type_keys_to_new_account_type_keys' : {} } )
written = HG.test_controller.GetWrite( 'account_types' )
[ ( args, kwargs ) ] = written
( written_service_key, written_account, written_account_types, written_deletee_account_type_keys_to_new_account_type_keys ) = args
self.assertEqual( written_account_types[0].GetAccountTypeKey(), account_types[0].GetAccountTypeKey() )
self.assertEqual( written_deletee_account_type_keys_to_new_account_type_keys, {} )
# registration_keys
registration_key = HydrusData.GenerateKey()
HG.test_controller.SetRead( 'registration_keys', [ registration_key ] )
response = service.Request( HC.GET, 'registration_keys', { 'num' : 1, 'account_type_key' : os.urandom( 32 ), 'expires' : HydrusData.GetNow() + 1200 } )
self.assertEqual( response[ 'registration_keys' ], [ registration_key ] )
def _test_server_admin( self, service ):
# init
access_key = HydrusData.GenerateKey()
HG.test_controller.SetRead( 'access_key', access_key )
response = service.Request( HC.GET, 'access_key', { 'registration_key' : b'init' } )
self.assertEqual( response[ 'access_key' ], access_key )
#
## backup
response = service.Request( HC.POST, 'backup' )
#
# add some new services info
def _test_tag_repo( self, service ):
pass
def test_repository_file( self ):
host = '127.0.0.1'
port = HC.DEFAULT_SERVICE_PORT
self._test_basics( host, port )
self._test_restricted( self._clientside_file_service )
# broke since service rewrite
#self._test_repo( self._clientside_file_service )
#self._test_file_repo( self._clientside_file_service )
def test_repository_tag( self ):
host = '127.0.0.1'
port = HC.DEFAULT_SERVICE_PORT + 1
self._test_basics( host, port )
self._test_restricted( self._clientside_tag_service )
# broke since service rewrite
#self._test_repo( self._clientside_tag_service )
#self._test_tag_repo( self._clientside_tag_service )
def test_server_admin( self ):
host = '127.0.0.1'
port = HC.DEFAULT_SERVER_ADMIN_PORT
self._test_basics( host, port )
self._test_restricted( self._clientside_admin_service )
self._test_server_admin( self._clientside_admin_service )
def test_local_booru( self ):
host = '127.0.0.1'
port = HC.DEFAULT_LOCAL_BOORU_PORT
self._test_basics( host, port, https = False )
self._test_local_booru( host, port )
'''
class TestAMP( unittest.TestCase ):
@classmethod
def setUpClass( cls ):
cls._alice = HydrusData.GenerateKey()
cls._bob = HydrusData.GenerateKey()
cls._server_port = HC.DEFAULT_SERVICE_PORT + 10
cls._service_key = HydrusData.GenerateKey()
def TWISTEDSetup():
cls._factory = HydrusServer.MessagingServiceFactory( cls._service_key )
reactor.listenTCP( cls._server_port, cls._factory )
reactor.callFromThread( TWISTEDSetup )
time.sleep( 1 )
def _get_deferred_result( self, deferred ):
def err( failure ):
failure.trap( Exception )
return failure.type( failure.value )
deferred.addErrback( err )
before = time.time()
while not deferred.called:
time.sleep( 0.1 )
if time.time() - before > 10: raise Exception( 'Trying to get deferred timed out!' )
result = deferred.result
if issubclass( type( result ), Exception ): raise result
return result
def _get_client_protocol( self ):
point = TCP4ClientEndpoint( reactor, '127.0.0.1', self._server_port )
deferred = connectProtocol( point, HydrusServerAMP.MessagingClientProtocol() )
protocol = self._get_deferred_result( deferred )
return protocol
def _make_persistent_connection( self, protocol, access_key, name ):
identifier = hashlib.sha256( access_key ).digest()
HC.app.SetRead( 'im_identifier', identifier )
permissions = [ HC.GET_DATA, HC.POST_DATA, HC.POST_PETITIONS, HC.RESOLVE_PETITIONS, HC.MANAGE_USERS, HC.GENERAL_ADMIN, HC.EDIT_SERVICES ]
account_key = HydrusData.GenerateKey()
account_type = HC.AccountType( 'account', permissions, ( None, None ) )
created = HC.GetNow() - 100000
expires = None
used_bytes = 0
used_requests = 0
account = HC.Account( account_key, account_type, created, expires, used_bytes, used_requests )
HC.app.SetRead( 'account_key_from_access_key', HydrusData.GenerateKey() )
HC.app.SetRead( 'account', account )
deferred = protocol.callRemote( HydrusServerAMP.IMSessionKey, access_key = access_key, name = name )
result = self._get_deferred_result( deferred )
session_key = result[ 'session_key' ]
deferred = protocol.callRemote( HydrusServerAMP.IMLoginPersistent, network_version = HC.NETWORK_VERSION, session_key = session_key )
result = self._get_deferred_result( deferred )
self.assertEqual( result, {} )
def _make_temporary_connection( self, protocol, identifier, name ):
deferred = protocol.callRemote( HydrusServerAMP.IMLoginTemporary, network_version = HC.NETWORK_VERSION, identifier = identifier, name = name )
result = self._get_deferred_result( deferred )
self.assertEqual( result, {} )
def test_connections( self ):
persistent_protocol = self._get_client_protocol()
persistent_access_key = HydrusData.GenerateKey()
persistent_identifier = hashlib.sha256( persistent_access_key ).digest()
persistent_name = 'persistent'
self._make_persistent_connection( persistent_protocol, persistent_access_key, persistent_name )
self.assertIn( persistent_identifier, self._factory._persistent_connections )
self.assertIn( persistent_name, self._factory._persistent_connections[ persistent_identifier ] )
temp_protocol_1 = self._get_client_protocol()
temp_protocol_2 = self._get_client_protocol()
temp_name_1 = 'temp_1'
temp_identifier = HydrusData.GenerateKey()
temp_name_2 = 'temp_2'
self._make_temporary_connection( temp_protocol_1, temp_identifier, temp_name_1 )
self._make_temporary_connection( temp_protocol_2, temp_identifier, temp_name_2 )
self.assertIn( temp_identifier, self._factory._temporary_connections )
self.assertIn( temp_name_1, self._factory._temporary_connections[ temp_identifier ] )
self.assertIn( temp_name_2, self._factory._temporary_connections[ temp_identifier ] )
def test_status( self ):
# some of this is UDP, so get that working!
# add two bobs
# ask for status of the bobs
# test that we get both, online
# now disconnect a bob
# ask for bob status
# test that we only have one bob
# now disconnect other bob
# repeat for nothing
pass
def test_message( self ):
persistent_protocol = self._get_client_protocol()
persistent_access_key = HydrusData.GenerateKey()
persistent_identifier = hashlib.sha256( persistent_access_key ).digest()
persistent_name = 'persistent'
self._make_persistent_connection( persistent_protocol, persistent_access_key, persistent_name )
temp_protocol = self._get_client_protocol()
temp_identifier = HydrusData.GenerateKey()
temp_name = 'temp'
self._make_temporary_connection( temp_protocol, temp_identifier, temp_name )
#
HC.pubsub.ClearPubSubs()
message = 'hello temp'
deferred = persistent_protocol.callRemote( HydrusServerAMP.IMMessageServer, identifier_to = temp_identifier, name_to = temp_name, message = message )
result = self._get_deferred_result( deferred )
self.assertEqual( result, {} )
result = HC.pubsub.GetPubSubs( 'im_message_received' )
[ ( args, kwargs ) ] = result
self.assertEqual( args, ( persistent_identifier, persistent_name, temp_identifier, temp_name, message ) )
#
HC.pubsub.ClearPubSubs()
message = 'hello persistent'
deferred = temp_protocol.callRemote( HydrusServerAMP.IMMessageServer, identifier_to = persistent_identifier, name_to = persistent_name, message = message )
result = self._get_deferred_result( deferred )
self.assertEqual( result, {} )
result = HC.pubsub.GetPubSubs( 'im_message_received' )
[ ( args, kwargs ) ] = result
self.assertEqual( args, ( temp_identifier, temp_name, persistent_identifier, persistent_name, message ) )
'''