hydrus/hydrus/client/ClientMigration.py

766 lines
21 KiB
Python

import os
from hydrus.core import HydrusConstants as HC
from hydrus.core import HydrusData
from hydrus.core import HydrusLists
from hydrus.core import HydrusTagArchive
from hydrus.core import HydrusTime
from hydrus.client import ClientThreading
from hydrus.client import ClientLocation
from hydrus.client.metadata import ClientContentUpdates
pair_types_to_content_types = {}
pair_types_to_content_types[ HydrusTagArchive.TAG_PAIR_TYPE_PARENTS ] = HC.CONTENT_TYPE_TAG_PARENTS
pair_types_to_content_types[ HydrusTagArchive.TAG_PAIR_TYPE_SIBLINGS ] = HC.CONTENT_TYPE_TAG_SIBLINGS
content_types_to_pair_types = {}
content_types_to_pair_types[ HC.CONTENT_TYPE_TAG_PARENTS ] = HydrusTagArchive.TAG_PAIR_TYPE_PARENTS
content_types_to_pair_types[ HC.CONTENT_TYPE_TAG_SIBLINGS ] = HydrusTagArchive.TAG_PAIR_TYPE_SIBLINGS
def GetBasicSpeedStatement( num_done, time_started_precise ):
if num_done == 0:
rows_s = 0
else:
time_taken = HydrusTime.GetNowPrecise() - time_started_precise
rows_s = int( num_done / time_taken )
return '{} rows/s'.format( rows_s )
class MigrationDestination( object ):
def __init__( self, controller, name ):
self._controller = controller
self._name = name
def GetName( self ):
return self._name
def CleanUp( self ):
pass
def DoSomeWork( self, source ):
raise NotImplementedError()
def Prepare( self ):
pass
class MigrationDestinationHTA( MigrationDestination ):
def __init__( self, controller, path, desired_hash_type ):
name = os.path.basename( path )
MigrationDestination.__init__( self, controller, name )
self._path = path
self._desired_hash_type = desired_hash_type
self._time_started = 0
self._hta = None
def CleanUp( self ):
self._hta.CommitBigJob()
if HydrusTime.TimeHasPassed( self._time_started + 120 ):
self._hta.Optimise()
self._hta.Close()
self._hta = None
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
num_done = 0
data = source.GetSomeData()
for ( hash, tags ) in data:
self._hta.AddMappings( hash, tags )
num_done += len( tags )
return GetBasicSpeedStatement( num_done, time_started_precise )
def Prepare( self ):
self._time_started = HydrusTime.GetNow()
self._hta = HydrusTagArchive.HydrusTagArchive( self._path )
hta_hash_type = HydrusTagArchive.hash_str_to_type_lookup[ self._desired_hash_type ]
self._hta.SetHashType( hta_hash_type )
self._hta.BeginBigJob()
class MigrationDestinationHTPA( MigrationDestination ):
def __init__( self, controller, path, content_type ):
name = os.path.basename( path )
MigrationDestination.__init__( self, controller, name )
self._path = path
self._content_type = content_type
self._time_started = 0
self._htpa = None
def CleanUp( self ):
self._htpa.CommitBigJob()
if HydrusTime.TimeHasPassed( self._time_started + 120 ):
self._htpa.Optimise()
self._htpa.Close()
self._htpa = None
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
data = source.GetSomeData()
self._htpa.AddPairs( data )
num_done = len( data )
return GetBasicSpeedStatement( num_done, time_started_precise )
def Prepare( self ):
self._time_started = HydrusTime.GetNow()
self._htpa = HydrusTagArchive.HydrusTagPairArchive( self._path )
pair_type = content_types_to_pair_types[ self._content_type ]
self._htpa.SetPairType( pair_type )
self._htpa.BeginBigJob()
class MigrationDestinationList( MigrationDestination ):
def __init__( self, controller ):
name = 'simple list destination'
MigrationDestination.__init__( self, controller, name )
self._data_received = []
self._time_started = 0
def DoSomeWork( self, source ):
raise NotImplementedError()
def GetDataReceived( self ):
return self._data_received
class MigrationDestinationListMappings( MigrationDestinationList ):
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
num_done = 0
data = source.GetSomeData()
for ( hash, tags ) in data:
self._data_received.append( ( hash, tags ) )
num_done += len( tags )
return GetBasicSpeedStatement( num_done, time_started_precise )
class MigrationDestinationListPairs( MigrationDestinationList ):
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
data = source.GetSomeData()
self._data_received.extend( data )
num_done = len( data )
return GetBasicSpeedStatement( num_done, time_started_precise )
class MigrationDestinationTagService( MigrationDestination ):
def __init__( self, controller, tag_service_key, content_action ):
name = controller.services_manager.GetName( tag_service_key )
MigrationDestination.__init__( self, controller, name )
self._tag_service_key = tag_service_key
service = self._controller.services_manager.GetService( tag_service_key )
self._tag_service_type = service.GetServiceType()
self._content_action = content_action
def DoSomeWork( self, source ):
raise NotImplementedError()
class MigrationDestinationTagServiceMappings( MigrationDestinationTagService ):
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
data = source.GetSomeData()
content_updates = []
pairs = []
for ( hash, tags ) in data:
pairs.extend( ( ( tag, hash ) for tag in tags ) )
num_done = len( pairs )
tags_to_hashes = HydrusData.BuildKeyToListDict( pairs )
if self._content_action == HC.CONTENT_UPDATE_PETITION:
reason = 'Mass Migration Job'
else:
reason = None
for ( tag, hashes ) in tags_to_hashes.items():
content_updates.append( ClientContentUpdates.ContentUpdate( HC.CONTENT_TYPE_MAPPINGS, self._content_action, ( tag, hashes ), reason = reason ) )
content_update_package = ClientContentUpdates.ContentUpdatePackage.STATICCreateFromContentUpdates( self._tag_service_key, content_updates )
self._controller.WriteSynchronous( 'content_updates', content_update_package )
return GetBasicSpeedStatement( num_done, time_started_precise )
class MigrationDestinationTagServicePairs( MigrationDestinationTagService ):
def __init__( self, controller, tag_service_key, content_action, content_type ):
MigrationDestinationTagService.__init__( self, controller, tag_service_key, content_action )
self._content_type = content_type
def DoSomeWork( self, source ):
time_started_precise = HydrusTime.GetNowPrecise()
data = source.GetSomeData()
content_updates = []
if self._content_action in ( HC.CONTENT_UPDATE_PETITION, HC.CONTENT_UPDATE_PEND ):
reason = 'Mass Migration Job'
else:
reason = None
content_updates = [ ClientContentUpdates.ContentUpdate( self._content_type, self._content_action, tag_pair, reason = reason ) for tag_pair in data ]
content_update_package = ClientContentUpdates.ContentUpdatePackage.STATICCreateFromContentUpdates( self._tag_service_key, content_updates )
self._controller.WriteSynchronous( 'content_updates', content_update_package )
num_done = len( data )
return GetBasicSpeedStatement( num_done, time_started_precise )
class MigrationJob( object ):
def __init__( self, controller, title, source, destination ):
self._controller = controller
self._title = title
self._source = source
self._destination = destination
def Run( self ):
job_status = ClientThreading.JobStatus( pausable = True, cancellable = True )
job_status.SetStatusTitle( self._title )
self._controller.pub( 'message', job_status )
job_status.SetStatusText( 'preparing source' )
self._source.Prepare()
job_status.SetStatusText( 'preparing destination' )
self._destination.Prepare()
job_status.SetStatusText( 'beginning work' )
try:
while self._source.StillWorkToDo():
progress_statement = self._destination.DoSomeWork( self._source )
job_status.SetStatusText( progress_statement )
job_status.WaitIfNeeded()
if job_status.IsCancelled():
break
finally:
job_status.SetStatusText( 'done, cleaning up source' )
self._source.CleanUp()
job_status.SetStatusText( 'done, cleaning up destination' )
self._destination.CleanUp()
job_status.SetStatusText( 'done!' )
job_status.FinishAndDismiss( 3 )
class MigrationSource( object ):
def __init__( self, controller, name ):
self._controller = controller
self._name = name
self._work_to_do = True
def GetName( self ):
return self._name
def GetSomeData( self ):
raise NotImplementedError()
def CleanUp( self ):
pass
def Prepare( self ):
pass
def StillWorkToDo( self ):
return self._work_to_do
class MigrationSourceHTA( MigrationSource ):
def __init__( self, controller, path, location_context: ClientLocation.LocationContext, desired_hash_type, hashes, tag_filter ):
name = os.path.basename( path )
MigrationSource.__init__( self, controller, name )
self._path = path
self._location_context = location_context
self._desired_hash_type = desired_hash_type
self._hashes = hashes
self._tag_filter = tag_filter
self._hta = None
self._source_hash_type = None
self._iterator = None
def _ConvertHashes( self, source_hash_type, desired_hash_type, data ):
if source_hash_type != desired_hash_type:
fixed_data = []
for ( hash, tags ) in data:
source_to_desired = self._controller.Read( 'file_hashes', ( hash, ), source_hash_type, desired_hash_type )
if len( source_to_desired ) == 0:
continue
desired_hash = list( source_to_desired.values() )[0]
fixed_data.append( ( desired_hash, tags ) )
data = fixed_data
return data
def _FilterSHA256Hashes( self, data ):
if self._hashes is not None:
data = [ ( hash, tags ) for ( hash, tags ) in data if hash in self._hashes ]
if not self._location_context.IsAllKnownFiles():
filtered_data = []
all_hashes = [ hash for ( hash, tags ) in data ]
hashes_to_media_results = { media_result.GetHash() : media_result for media_result in self._controller.Read( 'media_results', all_hashes ) }
for ( hash, tags ) in data:
if hash in hashes_to_media_results:
media_result = hashes_to_media_results[ hash ]
if not media_result.GetLocationsManager().IsInLocationContext( self._location_context ):
continue
filtered_data.append( ( hash, tags ) )
data = filtered_data
return data
def _SHA256FilteringNeeded( self ):
return self._hashes is not None or not self._location_context.IsAllKnownFiles()
def CleanUp( self ):
self._hta.CommitBigJob()
self._hta.Close()
self._hta = None
self._iterator = None
def GetSomeData( self ):
data = HydrusLists.PullNFromIterator( self._iterator, 256 )
if len( data ) == 0:
self._work_to_do = False
return data
if not self._tag_filter.AllowsEverything():
filtered_data = []
for ( hash, tags ) in data:
tags = self._tag_filter.Filter( tags )
if len( tags ) > 0:
filtered_data.append( ( hash, tags ) )
data = filtered_data
if self._SHA256FilteringNeeded():
if self._source_hash_type == 'sha256':
data = self._FilterSHA256Hashes( data )
data = self._ConvertHashes( self._source_hash_type, self._desired_hash_type, data )
elif self._desired_hash_type == 'sha256':
data = self._ConvertHashes( self._source_hash_type, self._desired_hash_type, data )
data = self._FilterSHA256Hashes( data )
else:
data = self._ConvertHashes( self._source_hash_type, 'sha256', data )
data = self._FilterSHA256Hashes( data )
data = self._ConvertHashes( 'sha256', self._desired_hash_type, data )
else:
data = self._ConvertHashes( self._source_hash_type, self._desired_hash_type, data )
return data
def Prepare( self ):
self._hta = HydrusTagArchive.HydrusTagArchive( self._path )
self._hta.BeginBigJob()
self._source_hash_type = HydrusTagArchive.hash_type_to_str_lookup[ self._hta.GetHashType() ]
self._iterator = self._hta.IterateMappings()
class MigrationSourceHTPA( MigrationSource ):
def __init__( self, controller, path, left_tag_filter, right_tag_filter ):
name = os.path.basename( path )
MigrationSource.__init__( self, controller, name )
self._path = path
self._left_tag_filter = left_tag_filter
self._right_tag_filter = right_tag_filter
self._htpa = None
self._iterator = None
def CleanUp( self ):
self._htpa.CommitBigJob()
self._htpa.Close()
self._htpa = None
self._iterator = None
def GetSomeData( self ):
data = HydrusLists.PullNFromIterator( self._iterator, 256 )
if len( data ) == 0:
self._work_to_do = False
return data
if not ( self._left_tag_filter.AllowsEverything() and self._right_tag_filter.AllowsEverything() ):
data = [ ( left_tag, right_tag ) for ( left_tag, right_tag ) in data if self._left_tag_filter.TagOK( left_tag ) and self._right_tag_filter.TagOK( right_tag ) ]
return data
def Prepare( self ):
self._htpa = HydrusTagArchive.HydrusTagPairArchive( self._path )
self._htpa.BeginBigJob()
self._iterator = self._htpa.IteratePairs()
class MigrationSourceList( MigrationSource ):
def __init__( self, controller, data ):
name = 'simple list source'
MigrationSource.__init__( self, controller, name )
self._data = data
self._iterator = None
def GetSomeData( self ):
some_data = HydrusLists.PullNFromIterator( self._iterator, 5 )
if len( some_data ) == 0:
self._work_to_do = False
return some_data
def Prepare( self ):
self._iterator = iter( self._data )
class MigrationSourceTagServiceMappings( MigrationSource ):
def __init__( self, controller, tag_service_key, location_context, desired_hash_type, hashes, tag_filter, content_statuses ):
name = controller.services_manager.GetName( tag_service_key )
MigrationSource.__init__( self, controller, name )
self._location_context = location_context
self._tag_service_key = tag_service_key
self._desired_hash_type = desired_hash_type
self._hashes = hashes
self._tag_filter = tag_filter
self._content_statuses = content_statuses
self._database_temp_job_name = 'migrate_{}'.format( os.urandom( 16 ).hex() )
def CleanUp( self ):
self._controller.WriteSynchronous( 'migration_clear_job', self._database_temp_job_name )
def GetSomeData( self ):
data = self._controller.Read( 'migration_get_mappings', self._database_temp_job_name, self._location_context, self._tag_service_key, self._desired_hash_type, self._tag_filter, self._content_statuses )
if len( data ) == 0:
self._work_to_do = False
return data
def Prepare( self ):
# later can spread this out into bunch of small jobs, a start and a continue, based on tag filter subsets
self._controller.WriteSynchronous( 'migration_start_mappings_job', self._database_temp_job_name, self._location_context, self._tag_service_key, self._hashes, self._content_statuses )
class MigrationSourceTagServicePairs( MigrationSource ):
def __init__( self, controller, tag_service_key, content_type, left_tag_filter, right_tag_filter, content_statuses ):
name = controller.services_manager.GetName( tag_service_key )
MigrationSource.__init__( self, controller, name )
self._tag_service_key = tag_service_key
self._content_type = content_type
self._left_tag_filter = left_tag_filter
self._right_tag_filter = right_tag_filter
self._content_statuses = content_statuses
self._database_temp_job_name = 'migrate_{}'.format( os.urandom( 16 ).hex() )
def CleanUp( self ):
self._controller.WriteSynchronous( 'migration_clear_job', self._database_temp_job_name )
def GetSomeData( self ):
data = self._controller.Read( 'migration_get_pairs', self._database_temp_job_name, self._left_tag_filter, self._right_tag_filter )
if len( data ) == 0:
self._work_to_do = False
return data
def Prepare( self ):
self._controller.WriteSynchronous( 'migration_start_pairs_job', self._database_temp_job_name, self._tag_service_key, self._content_type, self._content_statuses )