Version 449 (Experimental)

This commit is contained in:
Hydrus Network Developer 2021-08-04 16:59:55 -05:00
parent 212088ba4b
commit eee739898a
12 changed files with 1339 additions and 552 deletions

View File

@ -8,6 +8,23 @@
<div class="content">
<h3 id="changelog"><a href="#changelog">changelog</a></h3>
<ul>
<li><h3 id="version_449"><a href="#version_449">version 449</a></h3></li>
<ul>
<li>this is an experimental release! please do not use this unless you are an advanced user who has a good backup, syncs with a repository (e.g. the PTR), and would like to help me out. if this is you, I don't need you to do anything too special, just please use the client and repo as normal, downloading and uploading, and let me know if anything messes up during normal operation</li>
<li>repository processing split:</li>
<li>tl;dr: nothing has changed, you don't have to do anything. with luck, your PTR service is going to fix some bad siblings and parents over the next couple of days</li>
<li>repositories now track what they have processed on a per-content basis. this gives some extra maintenance tools to, for instance, quickly reset and reprocess your ~150k tag siblings on the PTR without having to clear and reprocess all 1.3 billion mappings too</li>
<li>in review services, you now see definition updates and all a repository's content types processing progress independently (just files for a file repo, but mappings, siblings, and parents for a tag repo). most of the time they will all be the same, but each can be paused separately. it is now possible (though not yet super efficient, since definitions still run 100%) to sync with the PTR and only grab siblings and parents by simply pausing mappings in review services</li>
<li>I have also split the 'network' and 'processing' sync progress gauges and their buttons into separate boxes for clarity</li>
<li>the 'fill in content gaps' maintenance job now lets you choose which content types to do it for</li>
<li>also, a new 'reset content' maintenance job lets you choose to delete and reprocess by content type. the nuclear 'complete' reset is now really just for critical situations where definitions or database tables are irrevocably messed up</li>
<li>all users have their siblings and parents processing reset this week. next time you have update processing, they should blitz back in your next processing time within seconds, and with luck we'll wipe out some years-old legacy bugs and hopefully discover some info about the remaining bugs. most importantly, we can re-trigger this reprocess in just a few seconds to quickly test future fixes</li>
<li>a variety of tests such as 'service is mostly caught up' are now careful just to test for the currently unpaused content types</li>
<li>if you try to commit some content that is currently processing-paused, the client now says 'hey, sorry this is paused, I won't upload that stuff right now' but still upload everything else that isn't paused. this is a ' service is caught up' issue</li>
<li>tag display sync, which does background work to make sure siblings and parents appear as they should, will now not run for a service if any of the services it relies on for siblings or parents is not process synced. when this happens, it is also shown on the tag display sync review panel. this stops big changes like the new sibling/parent reset from causing display sync to do a whole bunch of work before the service is ready and happy with what it has. with luck it will also smooth out new users' first PTR sync too</li>
<li>clients now process the sub-updates of a repository update step in the order they were generated on the server, which _may_ fix some non-determinant update bugs we are trying to pin down</li>
<li>all update processing tracking is overhauled. all related code and database access techniques have been brushed up and should use less CPU and fail more gracefully</li>
</ul>
<li><h3 id="version_448"><a href="#version_448">version 448</a></h3></li>
<ul>
<li>client api:</li>

View File

@ -58,6 +58,10 @@ def GenerateDefaultServiceDictionary( service_type ):
dictionary[ 'update_downloading_paused' ] = False
dictionary[ 'update_processing_paused' ] = False
content_types = tuple( HC.REPOSITORY_CONTENT_TYPES[ service_type ] )
dictionary[ 'update_processing_content_types_paused' ] = [ [ content_type, False ] for content_type in content_types ]
if service_type == HC.IPFS:
@ -1444,6 +1448,14 @@ class ServiceRepository( ServiceRestricted ):
ServiceRestricted._DealWithFundamentalNetworkError( self )
def _GetContentTypesWeAreProcessing( self ):
content_types = { content_type for ( content_type, paused ) in self._update_processing_content_types_paused.items() if not paused }
content_types.add( HC.CONTENT_TYPE_DEFINITIONS )
return content_types
def _GetSerialisableDictionary( self ):
dictionary = ServiceRestricted._GetSerialisableDictionary( self )
@ -1452,6 +1464,7 @@ class ServiceRepository( ServiceRestricted ):
dictionary[ 'do_a_full_metadata_resync' ] = self._do_a_full_metadata_resync
dictionary[ 'update_downloading_paused' ] = self._update_downloading_paused
dictionary[ 'update_processing_paused' ] = self._update_processing_paused
dictionary[ 'update_processing_content_types_paused' ] = list( self._update_processing_content_types_paused.items() )
return dictionary
@ -1492,6 +1505,15 @@ class ServiceRepository( ServiceRestricted ):
self._update_downloading_paused = dictionary[ 'update_downloading_paused' ]
self._update_processing_paused = dictionary[ 'update_processing_paused' ]
if 'update_processing_content_types_paused' not in dictionary:
content_types = tuple( HC.REPOSITORY_CONTENT_TYPES[ self._service_type ] )
dictionary[ 'update_processing_content_types_paused' ] = [ [ content_type, False ] for content_type in content_types ]
self._update_processing_content_types_paused = dict( dictionary[ 'update_processing_content_types_paused' ] )
def _LogFinalRowSpeed( self, precise_timestamp, total_rows, row_name ):
@ -1792,17 +1814,24 @@ class ServiceRepository( ServiceRestricted ):
job_key.SetStatusTitle( title )
( this_is_first_definitions_work, definition_hashes, this_is_first_content_work, content_hashes ) = HG.client_controller.Read( 'repository_update_hashes_to_process', self._service_key )
content_types_to_process = self._GetContentTypesWeAreProcessing()
if len( definition_hashes ) == 0 and len( content_hashes ) == 0:
( this_is_first_definitions_work, definition_hashes_and_content_types, this_is_first_content_work, content_hashes_and_content_types ) = HG.client_controller.Read( 'repository_update_hashes_to_process', self._service_key, content_types_to_process )
if len( definition_hashes_and_content_types ) == 0 and len( content_hashes_and_content_types ) == 0:
return # no work to do
if len( content_hashes_and_content_types ) > 0:
content_hashes_and_content_types = self._metadata.SortContentHashesAndContentTypes( content_hashes_and_content_types )
HydrusData.Print( title )
num_updates_done = 0
num_updates_to_do = len( definition_hashes ) + len( content_hashes )
num_updates_to_do = len( definition_hashes_and_content_types ) + len( content_hashes_and_content_types )
HG.client_controller.pub( 'message', job_key )
HG.client_controller.frame_splash_status.SetTitleText( title, print_to_log = False )
@ -1817,7 +1846,7 @@ class ServiceRepository( ServiceRestricted ):
try:
for definition_hash in definition_hashes:
for ( definition_hash, content_types ) in definition_hashes_and_content_types:
progress_string = HydrusData.ConvertValueRangeToPrettyString( num_updates_done + 1, num_updates_to_do )
@ -1894,7 +1923,7 @@ class ServiceRepository( ServiceRestricted ):
start_time = HydrusData.GetNowPrecise()
num_rows_done = HG.client_controller.WriteSynchronous( 'process_repository_definitions', self._service_key, definition_hash, iterator_dict, job_key, work_time )
num_rows_done = HG.client_controller.WriteSynchronous( 'process_repository_definitions', self._service_key, definition_hash, iterator_dict, content_types, job_key, work_time )
time_it_took = HydrusData.GetNowPrecise() - start_time
@ -1944,7 +1973,7 @@ class ServiceRepository( ServiceRestricted ):
try:
for content_hash in content_hashes:
for ( content_hash, content_types ) in content_hashes_and_content_types:
progress_string = HydrusData.ConvertValueRangeToPrettyString( num_updates_done + 1, num_updates_to_do )
@ -1991,19 +2020,34 @@ class ServiceRepository( ServiceRestricted ):
raise Exception( 'An unusual error has occured during repository processing: an update file ({}) has incorrect metadata. Your repository should be paused, and all update files have been scheduled for a metadata rescan. Please permit file maintenance to fix them, or tell it to do so manually, before unpausing your repository.'.format( content_hash.hex() ) )
rows_in_this_update = content_update.GetNumRows()
rows_in_this_update = content_update.GetNumRows( content_types )
rows_done_in_this_update = 0
iterator_dict = {}
iterator_dict[ 'new_files' ] = iter( content_update.GetNewFiles() )
iterator_dict[ 'deleted_files' ] = iter( content_update.GetDeletedFiles() )
iterator_dict[ 'new_mappings' ] = HydrusData.SmoothOutMappingIterator( content_update.GetNewMappings(), 50 )
iterator_dict[ 'deleted_mappings' ] = HydrusData.SmoothOutMappingIterator( content_update.GetDeletedMappings(), 50 )
iterator_dict[ 'new_parents' ] = iter( content_update.GetNewTagParents() )
iterator_dict[ 'deleted_parents' ] = iter( content_update.GetDeletedTagParents() )
iterator_dict[ 'new_siblings' ] = iter( content_update.GetNewTagSiblings() )
iterator_dict[ 'deleted_siblings' ] = iter( content_update.GetDeletedTagSiblings() )
if HC.CONTENT_TYPE_FILES in content_types:
iterator_dict[ 'new_files' ] = iter( content_update.GetNewFiles() )
iterator_dict[ 'deleted_files' ] = iter( content_update.GetDeletedFiles() )
if HC.CONTENT_TYPE_MAPPINGS in content_types:
iterator_dict[ 'new_mappings' ] = HydrusData.SmoothOutMappingIterator( content_update.GetNewMappings(), 50 )
iterator_dict[ 'deleted_mappings' ] = HydrusData.SmoothOutMappingIterator( content_update.GetDeletedMappings(), 50 )
if HC.CONTENT_TYPE_TAG_PARENTS in content_types:
iterator_dict[ 'new_parents' ] = iter( content_update.GetNewTagParents() )
iterator_dict[ 'deleted_parents' ] = iter( content_update.GetDeletedTagParents() )
if HC.CONTENT_TYPE_TAG_SIBLINGS in content_types:
iterator_dict[ 'new_siblings' ] = iter( content_update.GetNewTagSiblings() )
iterator_dict[ 'deleted_siblings' ] = iter( content_update.GetDeletedTagSiblings() )
while len( iterator_dict ) > 0:
@ -2027,7 +2071,7 @@ class ServiceRepository( ServiceRestricted ):
start_time = HydrusData.GetNowPrecise()
num_rows_done = HG.client_controller.WriteSynchronous( 'process_repository_content', self._service_key, content_hash, iterator_dict, job_key, work_time )
num_rows_done = HG.client_controller.WriteSynchronous( 'process_repository_content', self._service_key, content_hash, iterator_dict, content_types, job_key, work_time )
time_it_took = HydrusData.GetNowPrecise() - start_time
@ -2074,14 +2118,14 @@ class ServiceRepository( ServiceRestricted ):
except Exception as e:
message = 'Failed to process updates for the {} repository! The error follows:'.format( self._name )
HydrusData.ShowText( message )
HydrusData.ShowException( e )
with self._lock:
message = 'Failed to process updates for the {} repository! The error follows:'.format( self._name )
HydrusData.ShowText( message )
HydrusData.ShowException( e )
self._do_a_full_metadata_resync = True
self._update_processing_paused = True
@ -2096,13 +2140,16 @@ class ServiceRepository( ServiceRestricted ):
if work_done:
self._is_mostly_caught_up = None
with self._lock:
self._is_mostly_caught_up = None
self._SetDirty()
HG.client_controller.pub( 'notify_new_force_refresh_tags_data' )
HG.client_controller.pub( 'notify_new_tag_display_application' )
self._SetDirty()
job_key.DeleteVariable( 'popup_text_1' )
job_key.DeleteVariable( 'popup_text_2' )
@ -2125,9 +2172,17 @@ class ServiceRepository( ServiceRestricted ):
service_key = self._service_key
( download_value, processing_value, range ) = HG.client_controller.Read( 'repository_progress', service_key )
( num_local_updates, num_updates, content_types_to_num_processed_updates, content_types_to_num_updates ) = HG.client_controller.Read( 'repository_progress', service_key )
return processing_value < range
for ( content_type, num_processed_updates ) in content_types_to_num_processed_updates.items():
if num_processed_updates < content_types_to_num_updates[ content_type ]:
return True
return False
def CanSyncDownload( self ):
@ -2276,7 +2331,13 @@ class ServiceRepository( ServiceRestricted ):
service_key = self._service_key
unprocessed_update_hashes = HG.client_controller.Read( 'repository_unprocessed_hashes', service_key )
content_types_to_process = self._GetContentTypesWeAreProcessing()
( this_is_first_definitions_work, definition_hashes_and_content_types, this_is_first_content_work, content_hashes_and_content_types ) = HG.client_controller.Read( 'repository_update_hashes_to_process', self._service_key, content_types_to_process )
missing_update_hashes = HG.client_controller.Read( 'missing_repository_update_hashes', service_key )
unprocessed_update_hashes = set( ( hash for ( hash, content_types ) in definition_hashes_and_content_types ) ).union( ( hash for ( hash, content_types ) in content_hashes_and_content_types ) ).union( missing_update_hashes )
with self._lock:
@ -2303,11 +2364,18 @@ class ServiceRepository( ServiceRestricted ):
def IsPausedUpdateProcessing( self ):
def IsPausedUpdateProcessing( self, content_type = None ):
with self._lock:
return self._update_processing_paused
if content_type is None:
return self._update_processing_paused
else:
return self._update_processing_content_types_paused[ content_type ]
@ -2330,16 +2398,25 @@ class ServiceRepository( ServiceRestricted ):
def PausePlayUpdateProcessing( self ):
def PausePlayUpdateProcessing( self, content_type = None ):
with self._lock:
self._update_processing_paused = not self._update_processing_paused
if content_type is None:
self._update_processing_paused = not self._update_processing_paused
else:
self._update_processing_content_types_paused[ content_type ] = not self._update_processing_content_types_paused[ content_type ]
self._SetDirty()
paused = self._update_processing_paused
self._is_mostly_caught_up = None
HG.client_controller.pub( 'important_dirt_to_clean' )
@ -2394,7 +2471,10 @@ class ServiceRepository( ServiceRestricted ):
except Exception as e:
self._DelayFutureRequests( str( e ) )
with self._lock:
self._DelayFutureRequests( str( e ) )
HydrusData.ShowText( 'The service "{}" encountered an error while trying to sync! The error was "{}". It will not do any work for a little while. If the fix is not obvious, please elevate this to hydrus dev.'.format( self._name, str( e ) ) )
@ -2486,7 +2566,10 @@ class ServiceRepository( ServiceRestricted ):
except HydrusExceptions.CancelledException as e:
self._DelayFutureRequests( str( e ) )
with self._lock:
self._DelayFutureRequests( str( e ) )
return

File diff suppressed because it is too large Load Diff

View File

@ -86,6 +86,16 @@ class ClientDBFilesStorage( HydrusDBModule.HydrusDBModule ):
return num_deleted
def ClearFilesTables( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )
self._c.execute( 'DELETE FROM {};'.format( current_files_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( deleted_files_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( pending_files_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( petitioned_files_table_name ) )
def ClearLocalDeleteRecord( self, hash_ids = None ):
# we delete from everywhere, but not for files currently in the trash
@ -142,6 +152,24 @@ class ClientDBFilesStorage( HydrusDBModule.HydrusDBModule ):
self._c.execute( 'CREATE TABLE local_file_deletion_reasons ( hash_id INTEGER PRIMARY KEY, reason_id INTEGER );' )
def DeletePending( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )
self._c.execute( 'DELETE FROM {};'.format( pending_files_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( petitioned_files_table_name ) )
def DropFilesTables( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( current_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( deleted_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( pending_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( petitioned_files_table_name ) )
def FilterAllCurrentHashIds( self, hash_ids, just_these_service_ids = None ):
if just_these_service_ids is None:
@ -232,24 +260,6 @@ class ClientDBFilesStorage( HydrusDBModule.HydrusDBModule ):
return pending_hash_ids
def DeletePending( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )
self._c.execute( 'DELETE FROM {};'.format( pending_files_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( petitioned_files_table_name ) )
def DropFilesTables( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( current_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( deleted_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( pending_files_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( petitioned_files_table_name ) )
def GenerateFilesTables( self, service_id: int ):
( current_files_table_name, deleted_files_table_name, pending_files_table_name, petitioned_files_table_name ) = GenerateFilesTableNames( service_id )

View File

@ -48,6 +48,16 @@ class ClientDBMappingsStorage( HydrusDBModule.HydrusDBModule ):
return expected_table_names
def ClearMappingsTables( self, service_id: int ):
( current_mappings_table_name, deleted_mappings_table_name, pending_mappings_table_name, petitioned_mappings_table_name ) = GenerateMappingsTableNames( service_id )
self._c.execute( 'DELETE FROM {};'.format( current_mappings_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( deleted_mappings_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( pending_mappings_table_name ) )
self._c.execute( 'DELETE FROM {};'.format( petitioned_mappings_table_name ) )
def DropMappingsTables( self, service_id: int ):
( current_mappings_table_name, deleted_mappings_table_name, pending_mappings_table_name, petitioned_mappings_table_name ) = GenerateMappingsTableNames( service_id )

View File

@ -1,3 +1,4 @@
import collections
import itertools
import os
import sqlite3
@ -13,6 +14,7 @@ from hydrus.core.networking import HydrusNetwork
from hydrus.client import ClientFiles
from hydrus.client.db import ClientDBDefinitionsCache
from hydrus.client.db import ClientDBFilesMaintenance
from hydrus.client.db import ClientDBFilesMetadataBasic
from hydrus.client.db import ClientDBFilesStorage
from hydrus.client.db import ClientDBServices
@ -37,11 +39,13 @@ def GenerateRepositoryTagDefinitionTableName( service_id: int ):
return tag_id_map_table_name
def GenerateRepositoryUpdatesTableName( service_id: int ):
def GenerateRepositoryUpdatesTableNames( service_id: int ):
repository_updates_table_name = 'repository_updates_{}'.format( service_id )
repository_unregistered_updates_table_name = 'repository_unregistered_updates_{}'.format( service_id )
repository_updates_processed_table_name = 'repository_updates_processed_{}'.format( service_id )
return repository_updates_table_name
return ( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name )
class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
@ -51,6 +55,7 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
cursor_transaction_wrapper: HydrusDB.DBCursorTransactionWrapper,
modules_services: ClientDBServices.ClientDBMasterServices,
modules_files_storage: ClientDBFilesStorage.ClientDBFilesStorage,
modules_files_metadata_basic: ClientDBFilesMetadataBasic.ClientDBFilesMetadataBasic,
modules_hashes_local_cache: ClientDBDefinitionsCache.ClientDBCacheLocalHashes,
modules_tags_local_cache: ClientDBDefinitionsCache.ClientDBCacheLocalTags,
modules_files_maintenance: ClientDBFilesMaintenance.ClientDBFilesMaintenance
@ -63,10 +68,33 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
self._cursor_transaction_wrapper = cursor_transaction_wrapper
self.modules_services = modules_services
self.modules_files_storage = modules_files_storage
self.modules_files_metadata_basic = modules_files_metadata_basic
self.modules_files_maintenance = modules_files_maintenance
self.modules_hashes_local_cache = modules_hashes_local_cache
self.modules_tags_local_cache = modules_tags_local_cache
self._service_ids_to_content_types_to_outstanding_local_processing = collections.defaultdict( dict )
def _ClearOutstandingWorkCache( self, service_id, content_type = None ):
if service_id not in self._service_ids_to_content_types_to_outstanding_local_processing:
return
if content_type is None:
del self._service_ids_to_content_types_to_outstanding_local_processing[ service_id ]
else:
if content_type in self._service_ids_to_content_types_to_outstanding_local_processing[ service_id ]:
del self._service_ids_to_content_types_to_outstanding_local_processing[ service_id ][ content_type ]
def _GetInitialIndexGenerationTuples( self ):
@ -77,7 +105,7 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
def _HandleCriticalRepositoryDefinitionError( self, service_id, name, bad_ids ):
self._ReprocessRepository( service_id, ( HC.APPLICATION_HYDRUS_UPDATE_DEFINITIONS, ) )
self._ReprocessRepository( service_id, ( HC.CONTENT_TYPE_DEFINITIONS, ) )
self._ScheduleRepositoryUpdateFileMaintenance( service_id, ClientFiles.REGENERATE_FILE_DATA_JOB_FILE_INTEGRITY_DATA )
self._ScheduleRepositoryUpdateFileMaintenance( service_id, ClientFiles.REGENERATE_FILE_DATA_JOB_FILE_METADATA )
@ -91,27 +119,74 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
raise Exception( message )
def _ReprocessRepository( self, service_id, update_mime_types ):
def _RegisterUpdates( self, service_id, hash_ids = None ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
update_hash_ids = set()
for update_mime_type in update_mime_types:
if hash_ids is None:
hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {} NATURAL JOIN files_info WHERE mime = ? AND processed = ?;'.format( repository_updates_table_name ), ( update_mime_type, True ) ) )
hash_ids = self._STS( self._c.execute( 'SELECT hash_id FROM {};'.format( repository_unregistered_updates_table_name ) ) )
update_hash_ids.update( hash_ids )
else:
with HydrusDB.TemporaryIntegerTable( self._c, hash_ids, 'hash_id' ) as temp_hash_ids_table_name:
hash_ids = self._STS( self._c.execute( 'SELECT hash_id FROM {} CROSS JOIN {} USING ( hash_id );'.format( temp_hash_ids_table_name, repository_unregistered_updates_table_name ) ) )
self._c.executemany( 'UPDATE {} SET processed = ? WHERE hash_id = ?;'.format( repository_updates_table_name ), ( ( False, hash_id ) for hash_id in update_hash_ids ) )
if len( hash_ids ) > 0:
self._ClearOutstandingWorkCache( service_id )
service_type = self.modules_services.GetService( service_id ).GetServiceType()
with HydrusDB.TemporaryIntegerTable( self._c, hash_ids, 'hash_id' ) as temp_hash_ids_table_name:
hash_ids_to_mimes = { hash_id : mime for ( hash_id, mime ) in self._c.execute( 'SELECT hash_id, mime FROM {} CROSS JOIN files_info USING ( hash_id );'.format( temp_hash_ids_table_name ) ) }
if len( hash_ids_to_mimes ) > 0:
inserts = []
processed = False
for ( hash_id, mime ) in hash_ids_to_mimes.items():
if mime == HC.APPLICATION_HYDRUS_UPDATE_DEFINITIONS:
content_types = ( HC.CONTENT_TYPE_DEFINITIONS, )
else:
content_types = tuple( HC.REPOSITORY_CONTENT_TYPES[ service_type ] )
inserts.extend( ( ( hash_id, content_type, processed ) for content_type in content_types ) )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( hash_id, content_type, processed ) VALUES ( ?, ?, ? );'.format( repository_updates_processed_table_name ), inserts )
self._c.executemany( 'DELETE FROM {} WHERE hash_id = ?;'.format( repository_unregistered_updates_table_name ), ( ( hash_id, ) for hash_id in hash_ids_to_mimes.keys() ) )
def _ReprocessRepository( self, service_id, content_types ):
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
self._c.executemany( 'UPDATE {} SET processed = ? WHERE content_type = ?;'.format( repository_updates_processed_table_name ), ( ( False, content_type ) for content_type in content_types ) )
self._ClearOutstandingWorkCache( service_id )
def _ScheduleRepositoryUpdateFileMaintenance( self, service_id, job_type ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
update_hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {};'.format( repository_updates_table_name ) ) )
table_join = self.modules_files_storage.GetCurrentTableJoinPhrase( self.modules_services.local_update_service_id, repository_updates_table_name )
update_hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {};'.format( table_join ) ) )
self.modules_files_maintenance.AddJobs( update_hash_ids, job_type )
@ -120,23 +195,25 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
service_id = self.modules_services.GetServiceId( service_key )
processed = False
inserts = []
for ( update_index, update_hashes ) in metadata_slice.GetUpdateIndicesAndHashes():
for update_hash in update_hashes:
hash_id = self.modules_hashes_local_cache.GetHashId( update_hash )
inserts.append( ( update_index, hash_id, processed ) )
hash_ids = self.modules_hashes_local_cache.GetHashIds( update_hashes )
inserts.extend( ( ( update_index, hash_id ) for hash_id in hash_ids ) )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
if len( inserts ) > 0:
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( update_index, hash_id ) VALUES ( ?, ? );'.format( repository_updates_table_name ), inserts )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( hash_id ) VALUES ( ? );'.format( repository_unregistered_updates_table_name ), ( ( hash_id, ) for ( update_index, hash_id ) in inserts ) )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( update_index, hash_id, processed ) VALUES ( ?, ?, ? );'.format( repository_updates_table_name ), inserts )
self._RegisterUpdates( service_id )
def CreateInitialTables( self ):
@ -146,22 +223,39 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
def DropRepositoryTables( self, service_id: int ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( repository_updates_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( repository_unregistered_updates_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( repository_updates_processed_table_name ) )
( hash_id_map_table_name, tag_id_map_table_name ) = GenerateRepositoryDefinitionTableNames( service_id )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( hash_id_map_table_name ) )
self._c.execute( 'DROP TABLE IF EXISTS {};'.format( tag_id_map_table_name ) )
self._ClearOutstandingWorkCache( service_id )
def DoOutstandingUpdateRegistration( self ):
for service_id in self.modules_services.GetServiceIds( HC.REPOSITORIES ):
self._RegisterUpdates( service_id )
def GenerateRepositoryTables( self, service_id: int ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
self._c.execute( 'CREATE TABLE IF NOT EXISTS {} ( update_index INTEGER, hash_id INTEGER, processed INTEGER_BOOLEAN, PRIMARY KEY ( update_index, hash_id ) );'.format( repository_updates_table_name ) )
self._CreateIndex( repository_updates_table_name, [ 'hash_id' ] )
self._c.execute( 'CREATE TABLE IF NOT EXISTS {} ( update_index INTEGER, hash_id INTEGER, PRIMARY KEY ( update_index, hash_id ) );'.format( repository_updates_table_name ) )
self._CreateIndex( repository_updates_table_name, [ 'hash_id' ] )
self._c.execute( 'CREATE TABLE IF NOT EXISTS {} ( hash_id INTEGER PRIMARY KEY );'.format( repository_unregistered_updates_table_name ) )
self._c.execute( 'CREATE TABLE IF NOT EXISTS {} ( hash_id INTEGER, content_type INTEGER, processed INTEGER_BOOLEAN, PRIMARY KEY ( hash_id, content_type ) );'.format( repository_updates_processed_table_name ) )
self._CreateIndex( repository_updates_processed_table_name, [ 'content_type' ] )
( hash_id_map_table_name, tag_id_map_table_name ) = GenerateRepositoryDefinitionTableNames( service_id )
@ -181,137 +275,144 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
service_id = self.modules_services.GetServiceId( service_key )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
( num_updates, ) = self._c.execute( 'SELECT COUNT( * ) FROM {};'.format( repository_updates_table_name ) ).fetchone()
( num_processed_updates, ) = self._c.execute( 'SELECT COUNT( * ) FROM {} WHERE processed = ?;'.format( repository_updates_table_name ), ( True, ) ).fetchone()
( num_updates, ) = self._c.execute( 'SELECT COUNT( * ) FROM {}'.format( repository_updates_table_name ) ).fetchone()
table_join = self.modules_files_storage.GetCurrentTableJoinPhrase( self.modules_services.local_update_service_id, repository_updates_table_name )
( num_local_updates, ) = self._c.execute( 'SELECT COUNT( * ) FROM {};'.format( table_join ) ).fetchone()
return ( num_local_updates, num_processed_updates, num_updates )
content_types_to_num_updates = collections.Counter( dict( self._c.execute( 'SELECT content_type, COUNT( * ) FROM {} GROUP BY content_type;'.format( repository_updates_processed_table_name ) ) ) )
content_types_to_num_processed_updates = collections.Counter( dict( self._c.execute( 'SELECT content_type, COUNT( * ) FROM {} WHERE processed = ? GROUP BY content_type;'.format( repository_updates_processed_table_name ), ( True, ) ) ) )
# little helpful thing that pays off later
for content_type in content_types_to_num_updates:
if content_type not in content_types_to_num_processed_updates:
content_types_to_num_processed_updates[ content_type ] = 0
return ( num_local_updates, num_updates, content_types_to_num_processed_updates, content_types_to_num_updates )
def GetRepositoryUpdateHashesICanProcess( self, service_key: bytes ):
def GetRepositoryUpdateHashesICanProcess( self, service_key: bytes, content_types_to_process ):
# it is important that we use lists and sort by update index!
# otherwise add/delete actions can occur in the wrong order
service_id = self.modules_services.GetServiceId( service_key )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
result = self._c.execute( 'SELECT 1 FROM {} CROSS JOIN files_info USING ( hash_id ) WHERE mime = ? AND processed = ?;'.format( repository_updates_table_name ), ( HC.APPLICATION_HYDRUS_UPDATE_DEFINITIONS, True ) ).fetchone()
result = self._c.execute( 'SELECT 1 FROM {} WHERE content_type = ? AND processed = ?;'.format( repository_updates_processed_table_name ), ( HC.CONTENT_TYPE_DEFINITIONS, True ) ).fetchone()
this_is_first_definitions_work = result is None
result = self._c.execute( 'SELECT 1 FROM {} CROSS JOIN files_info USING ( hash_id ) WHERE mime = ? AND processed = ?;'.format( repository_updates_table_name ), ( HC.APPLICATION_HYDRUS_UPDATE_CONTENT, True ) ).fetchone()
result = self._c.execute( 'SELECT 1 FROM {} WHERE content_type != ? AND processed = ?;'.format( repository_updates_processed_table_name ), ( HC.CONTENT_TYPE_DEFINITIONS, True ) ).fetchone()
this_is_first_content_work = result is None
update_indices_to_unprocessed_hash_ids = HydrusData.BuildKeyToSetDict( self._c.execute( 'SELECT update_index, hash_id FROM {} WHERE processed = ?;'.format( repository_updates_table_name ), ( False, ) ) )
min_unregistered_update_index = None
unprocessed_hash_ids = list( itertools.chain.from_iterable( update_indices_to_unprocessed_hash_ids.values() ) )
result = self._c.execute( 'SELECT MIN( update_index ) FROM {} CROSS JOIN {} USING ( hash_id );'.format( repository_unregistered_updates_table_name, repository_updates_table_name ) ).fetchone()
definition_hashes = []
content_hashes = []
if result is not None:
( min_unregistered_update_index, ) = result
if len( unprocessed_hash_ids ) > 0:
predicate_phrase = 'processed = False AND content_type IN {}'.format( HydrusData.SplayListForDB( content_types_to_process ) )
if min_unregistered_update_index is not None:
local_hash_ids = self.modules_files_storage.FilterCurrentHashIds( self.modules_services.local_update_service_id, unprocessed_hash_ids )
# can't process an update if any of its files are as yet unregistered (these are both unprocessed and unavailable)
# also, we mustn't skip any update indices, so if there is an invalid one, we won't do any after that!
hash_ids_i_can_process = []
predicate_phrase = '{} AND update_index < {}'.format( predicate_phrase, min_unregistered_update_index )
update_indices = sorted( update_indices_to_unprocessed_hash_ids.keys() )
query = 'SELECT update_index, hash_id, content_type FROM {} CROSS JOIN {} USING ( hash_id ) WHERE {};'.format( repository_updates_processed_table_name, repository_updates_table_name, predicate_phrase )
rows = self._c.execute( query ).fetchall()
update_indices_to_unprocessed_hash_ids = HydrusData.BuildKeyToSetDict( ( ( update_index, hash_id ) for ( update_index, hash_id, content_type ) in rows ) )
hash_ids_to_content_types_to_process = HydrusData.BuildKeyToSetDict( ( ( hash_id, content_type ) for ( update_index, hash_id, content_type ) in rows ) )
all_hash_ids = set( itertools.chain.from_iterable( update_indices_to_unprocessed_hash_ids.values() ) )
all_local_hash_ids = self.modules_files_storage.FilterCurrentHashIds( self.modules_services.local_update_service_id, all_hash_ids )
for sorted_update_index in sorted( update_indices_to_unprocessed_hash_ids.keys() ):
for update_index in update_indices:
unprocessed_hash_ids = update_indices_to_unprocessed_hash_ids[ sorted_update_index ]
if not unprocessed_hash_ids.issubset( all_local_hash_ids ):
this_update_unprocessed_hash_ids = update_indices_to_unprocessed_hash_ids[ update_index ]
# can't process an update if any of its unprocessed files are not local
# normally they'll always be available if registered, but just in case a user deletes one manually etc...
# also, we mustn't skip any update indices, so if there is an invalid one, we won't do any after that!
if local_hash_ids.issuperset( this_update_unprocessed_hash_ids ):
# if we have all the updates, we can process this index
hash_ids_i_can_process.extend( this_update_unprocessed_hash_ids )
else:
# if we don't have them all, we shouldn't do any more
break
update_indices_to_unprocessed_hash_ids = { update_index : unprocessed_hash_ids for ( update_index, unprocessed_hash_ids ) in update_indices_to_unprocessed_hash_ids.items() if update_index < sorted_update_index }
break
if len( hash_ids_i_can_process ) > 0:
# all the hashes are now good to go
all_hash_ids = set( itertools.chain.from_iterable( update_indices_to_unprocessed_hash_ids.values() ) )
hash_ids_to_hashes = self.modules_hashes_local_cache.GetHashIdsToHashes( hash_ids = all_hash_ids )
definition_hashes_and_content_types = []
content_hashes_and_content_types = []
definitions_content_types = { HC.CONTENT_TYPE_DEFINITIONS }
if len( update_indices_to_unprocessed_hash_ids ) > 0:
for update_index in sorted( update_indices_to_unprocessed_hash_ids.keys() ):
with HydrusDB.TemporaryIntegerTable( self._c, hash_ids_i_can_process, 'hash_id' ) as temp_hash_ids_table_name:
hash_ids_to_hashes_and_mimes = { hash_id : ( hash, mime ) for ( hash_id, hash, mime ) in self._c.execute( 'SELECT hash_id, hash, mime FROM {} CROSS JOIN hashes USING ( hash_id ) CROSS JOIN files_info USING ( hash_id );'.format( temp_hash_ids_table_name ) ) }
unprocessed_hash_ids = update_indices_to_unprocessed_hash_ids[ update_index ]
if len( hash_ids_to_hashes_and_mimes ) < len( hash_ids_i_can_process ):
self._ScheduleRepositoryUpdateFileMaintenance( service_id, ClientFiles.REGENERATE_FILE_DATA_JOB_FILE_INTEGRITY_DATA )
self._ScheduleRepositoryUpdateFileMaintenance( service_id, ClientFiles.REGENERATE_FILE_DATA_JOB_FILE_METADATA )
self._cursor_transaction_wrapper.CommitAndBegin()
raise Exception( 'An error was discovered during repository processing--some update files are missing file info or hashes. A maintenance routine will try to scan these files and fix this problem, but it may be more complicated to fix. Please contact hydev and let him know the details!' )
definition_hash_ids = { hash_id for hash_id in unprocessed_hash_ids if hash_ids_to_content_types_to_process[ hash_id ] == definitions_content_types }
content_hash_ids = { hash_id for hash_id in unprocessed_hash_ids if hash_id not in definition_hash_ids }
for hash_id in hash_ids_i_can_process:
for ( hash_ids, hashes_and_content_types ) in [
( definition_hash_ids, definition_hashes_and_content_types ),
( content_hash_ids, content_hashes_and_content_types )
]:
( hash, mime ) = hash_ids_to_hashes_and_mimes[ hash_id ]
if mime == HC.APPLICATION_HYDRUS_UPDATE_DEFINITIONS:
definition_hashes.append( hash )
elif mime == HC.APPLICATION_HYDRUS_UPDATE_CONTENT:
content_hashes.append( hash )
hashes_and_content_types.extend( ( ( hash_ids_to_hashes[ hash_id ], hash_ids_to_content_types_to_process[ hash_id ] ) for hash_id in hash_ids ) )
return ( this_is_first_definitions_work, definition_hashes, this_is_first_content_work, content_hashes )
return ( this_is_first_definitions_work, definition_hashes_and_content_types, this_is_first_content_work, content_hashes_and_content_types )
def GetRepositoryUpdateHashesIDoNotHave( self, service_key: bytes ):
service_id = self.modules_services.GetServiceId( service_key )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
desired_hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {} ORDER BY update_index ASC;'.format( repository_updates_table_name ) ) )
all_hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {} ORDER BY update_index ASC;'.format( repository_updates_table_name ) ) )
table_join = self.modules_files_storage.GetCurrentTableJoinPhrase( self.modules_services.local_update_service_id, repository_updates_table_name )
existing_hash_ids = self._STS( self._c.execute( 'SELECT hash_id FROM {};'.format( table_join ) ) )
needed_hash_ids = [ hash_id for hash_id in desired_hash_ids if hash_id not in existing_hash_ids ]
needed_hash_ids = [ hash_id for hash_id in all_hash_ids if hash_id not in existing_hash_ids ]
needed_hashes = self.modules_hashes_local_cache.GetHashes( needed_hash_ids )
return needed_hashes
def GetRepositoryUpdateHashesUnprocessed( self, service_key: bytes ):
service_id = self.modules_services.GetServiceId( service_key )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
unprocessed_hash_ids = self._STL( self._c.execute( 'SELECT hash_id FROM {} WHERE processed = ?;'.format( repository_updates_table_name ), ( False, ) ) )
hashes = self.modules_hashes_local_cache.GetHashes( unprocessed_hash_ids )
return hashes
def GetTablesAndColumnsThatUseDefinitions( self, content_type: int ) -> typing.List[ typing.Tuple[ str, str ] ]:
tables_and_columns = []
@ -320,7 +421,7 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
for service_id in self.modules_services.GetServiceIds( HC.REPOSITORIES ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
hash_id_map_table_name = GenerateRepositoryFileDefinitionTableName( service_id )
tables_and_columns.extend( [
@ -344,6 +445,30 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
return tables_and_columns
def HasLotsOfOutstandingLocalProcessing( self, service_id, content_types ):
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
content_types_to_outstanding_local_processing = self._service_ids_to_content_types_to_outstanding_local_processing[ service_id ]
for content_type in content_types:
if content_type not in content_types_to_outstanding_local_processing:
result = self._STL( self._c.execute( 'SELECT 1 FROM {} WHERE content_type = ? AND processed = ?;'.format( repository_updates_processed_table_name ), ( content_type, False ) ).fetchmany( 20 ) )
content_types_to_outstanding_local_processing[ content_type ] = len( result ) >= 20
if content_types_to_outstanding_local_processing[ content_type ]:
return True
return False
def NormaliseServiceHashId( self, service_id: int, service_hash_id: int ) -> int:
hash_id_map_table_name = GenerateRepositoryFileDefinitionTableName( service_id )
@ -409,7 +534,17 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
return tag_id
def ProcessRepositoryDefinitions( self, service_key: bytes, definition_hash: bytes, definition_iterator_dict, job_key, work_time ):
def NotifyUpdatesImported( self, hash_ids ):
for service_id in self.modules_services.GetServiceIds( HC.REPOSITORIES ):
self._RegisterUpdates( service_id, hash_ids )
def ProcessRepositoryDefinitions( self, service_key: bytes, definition_hash: bytes, definition_iterator_dict, content_types, job_key, work_time ):
# ignore content_types for now
service_id = self.modules_services.GetServiceId( service_key )
@ -484,16 +619,16 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
del definition_iterator_dict[ 'service_tag_ids_to_tags' ]
self.SetUpdateProcessed( service_id, definition_hash )
self.SetUpdateProcessed( service_id, definition_hash, ( HC.CONTENT_TYPE_DEFINITIONS, ) )
return num_rows_processed
def ReprocessRepository( self, service_key: bytes, update_mime_types: typing.Collection[ int ] ):
def ReprocessRepository( self, service_key: bytes, content_types: typing.Collection[ int ] ):
service_id = self.modules_services.GetServiceId( service_key )
self._ReprocessRepository( service_id, update_mime_types )
self._ReprocessRepository( service_id, content_types )
def ScheduleRepositoryUpdateFileMaintenance( self, service_key, job_type ):
@ -507,7 +642,7 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
service_id = self.modules_services.GetServiceId( service_key )
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
current_update_hash_ids = self._STS( self._c.execute( 'SELECT hash_id FROM {};'.format( repository_updates_table_name ) ) )
@ -516,6 +651,8 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
deletee_hash_ids = current_update_hash_ids.difference( all_future_update_hash_ids )
self._c.executemany( 'DELETE FROM {} WHERE hash_id = ?;'.format( repository_updates_table_name ), ( ( hash_id, ) for hash_id in deletee_hash_ids ) )
self._c.executemany( 'DELETE FROM {} WHERE hash_id = ?;'.format( repository_unregistered_updates_table_name ), ( ( hash_id, ) for hash_id in deletee_hash_ids ) )
self._c.executemany( 'DELETE FROM {} WHERE hash_id = ?;'.format( repository_updates_processed_table_name ), ( ( hash_id, ) for hash_id in deletee_hash_ids ) )
inserts = []
@ -525,32 +662,36 @@ class ClientDBRepositories( HydrusDBModule.HydrusDBModule ):
hash_id = self.modules_hashes_local_cache.GetHashId( update_hash )
result = self._c.execute( 'SELECT processed FROM {} WHERE hash_id = ?;'.format( repository_updates_table_name ), ( hash_id, ) ).fetchone()
if result is None:
if hash_id in current_update_hash_ids:
processed = False
inserts.append( ( update_index, hash_id, processed ) )
self._c.execute( 'UPDATE {} SET update_index = ? WHERE hash_id = ?;'.format( repository_updates_table_name ), ( update_index, hash_id ) )
else:
( processed, ) = result
self._c.execute( 'UPDATE {} SET update_index = ?, processed = ? WHERE hash_id = ?;'.format( repository_updates_table_name ), ( update_index, processed, hash_id ) )
inserts.append( ( update_index, hash_id ) )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( update_index, hash_id, processed ) VALUES ( ?, ?, ? );'.format( repository_updates_table_name ), inserts )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( update_index, hash_id ) VALUES ( ?, ? );'.format( repository_updates_table_name ), inserts )
self._c.executemany( 'INSERT OR IGNORE INTO {} ( hash_id ) VALUES ( ? );'.format( repository_unregistered_updates_table_name ), ( ( hash_id, ) for ( update_index, hash_id ) in inserts ) )
self._RegisterUpdates( service_id )
self._ClearOutstandingWorkCache( service_id )
def SetUpdateProcessed( self, service_id, update_hash: bytes ):
def SetUpdateProcessed( self, service_id: int, update_hash: bytes, content_types: typing.Collection[ int ] ):
repository_updates_table_name = GenerateRepositoryUpdatesTableName( service_id )
( repository_updates_table_name, repository_unregistered_updates_table_name, repository_updates_processed_table_name ) = GenerateRepositoryUpdatesTableNames( service_id )
update_hash_id = self.modules_hashes_local_cache.GetHashId( update_hash )
self._c.execute( 'UPDATE {} SET processed = ? WHERE hash_id = ?;'.format( repository_updates_table_name ), ( True, update_hash_id ) )
self._c.executemany( 'UPDATE {} SET processed = ? WHERE hash_id = ? AND content_type = ?;'.format( repository_updates_processed_table_name ), ( ( True, update_hash_id, content_type ) for content_type in content_types ) )
for content_type in content_types:
self._ClearOutstandingWorkCache( service_id, content_type )

View File

@ -130,19 +130,11 @@ def THREADUploadPending( service_key ):
nums_pending_for_this_service = nums_pending[ service_key ]
content_types_for_this_service = set()
if service_type in ( HC.IPFS, HC.FILE_REPOSITORY ):
content_types_for_this_service = { HC.CONTENT_TYPE_FILES }
elif service_type == HC.TAG_REPOSITORY:
content_types_for_this_service = { HC.CONTENT_TYPE_MAPPINGS, HC.CONTENT_TYPE_TAG_PARENTS, HC.CONTENT_TYPE_TAG_SIBLINGS }
content_types_for_this_service = set( HC.REPOSITORY_CONTENT_TYPES[ service_type ] )
if service_type in HC.REPOSITORIES:
paused_content_types = set()
unauthorised_content_types = set()
content_types_to_request = set()
@ -171,7 +163,14 @@ def THREADUploadPending( service_key ):
if account.HasPermission( content_type, permission ):
content_types_to_request.add( content_type )
if service.IsPausedUpdateProcessing( content_type ):
paused_content_types.add( content_type )
else:
content_types_to_request.add( content_type )
else:
@ -212,11 +211,25 @@ def THREADUploadPending( service_key ):
HG.client_controller.pub( 'message', unauthorised_job_key )
if len( paused_content_types ) > 0:
message = 'You have some pending content of type ({}), but processing for that is currently paused! No worries, but I won\'t upload the paused stuff. If you want to upload it, please unpause in _review services_ and then catch up processing.'.format(
', '.join( ( HC.content_type_string_lookup[ content_type ] for content_type in paused_content_types ) )
)
HydrusData.ShowText( message )
else:
content_types_to_request = content_types_for_this_service
if len( content_types_to_request ) == 0:
return
initial_num_pending = sum( nums_pending_for_this_service.values() )
num_to_do = initial_num_pending

View File

@ -4955,6 +4955,8 @@ class ReviewTagDisplayMaintenancePanel( ClientGUIScrolledPanels.ReviewPanel ):
num_items_to_regen = num_siblings_to_sync + num_parents_to_sync
sync_halted = False
if num_items_to_regen == 0:
message = 'All synced!'
@ -4972,6 +4974,14 @@ class ReviewTagDisplayMaintenancePanel( ClientGUIScrolledPanels.ReviewPanel ):
message = '{} siblings and {} parents to sync.'.format( HydrusData.ToHumanInt( num_siblings_to_sync ), HydrusData.ToHumanInt( num_parents_to_sync ) )
if len( status[ 'waiting_on_tag_repos' ] ) > 0:
message += os.linesep * 2
message += os.linesep.join( status[ 'waiting_on_tag_repos' ] )
sync_halted = True
self._siblings_and_parents_st.setText( message )
#
@ -4993,7 +5003,7 @@ class ReviewTagDisplayMaintenancePanel( ClientGUIScrolledPanels.ReviewPanel ):
value = 1
range = 1
sync_possible = False
sync_work_to_do = False
else:
@ -5015,15 +5025,15 @@ class ReviewTagDisplayMaintenancePanel( ClientGUIScrolledPanels.ReviewPanel ):
sync_possible = True
sync_work_to_do = True
self._progress.SetValue( message, value, range )
self._refresh_button.setEnabled( True )
self._go_faster_button.setVisible( sync_possible )
self._go_faster_button.setEnabled( sync_possible )
self._go_faster_button.setVisible( sync_work_to_do and not sync_halted )
self._go_faster_button.setEnabled( sync_work_to_do and not sync_halted )
if HG.client_controller.tag_display_maintenance_manager.CurrentlyGoingFaster( self._service_key ):

View File

@ -1589,7 +1589,7 @@ class ReviewServicePanel( QW.QWidget ):
if service_type in HC.REPOSITORIES:
subpanels.append( ( ReviewServiceRepositorySubPanel( self, service ), CC.FLAGS_EXPAND_PERPENDICULAR ) )
subpanels.append( ( ReviewServiceRepositorySubPanel( self, service ), CC.FLAGS_EXPAND_SIZER_PERPENDICULAR ) )
if service_type == HC.IPFS:
@ -2518,19 +2518,19 @@ class ReviewServiceRestrictedSubPanel( ClientGUICommon.StaticBox ):
class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
class ReviewServiceRepositorySubPanel( QW.QWidget ):
def __init__( self, parent, service ):
ClientGUICommon.StaticBox.__init__( self, parent, 'repository sync' )
QW.QWidget.__init__( self, parent )
self._service = service
self._my_updater = ClientGUIAsync.FastThreadToGUIUpdater( self, self._Refresh )
self._content_panel = QW.QWidget( self )
self._network_panel = ClientGUICommon.StaticBox( self, 'network sync' )
self._repo_options_st = ClientGUICommon.BetterStaticText( self )
self._repo_options_st = ClientGUICommon.BetterStaticText( self._network_panel )
tt = 'The update period is how often the repository bundles its recent uploads into a package for users to download. Anything you upload may take this long for other people to see.'
tt += os.linesep * 2
@ -2538,35 +2538,62 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
self._repo_options_st.setToolTip( tt )
self._metadata_st = ClientGUICommon.BetterStaticText( self )
self._metadata_st = ClientGUICommon.BetterStaticText( self._network_panel )
self._download_progress = ClientGUICommon.TextAndGauge( self )
self._download_progress = ClientGUICommon.TextAndGauge( self._network_panel )
self._update_downloading_paused_button = ClientGUICommon.BetterBitmapButton( self, CC.global_pixmaps().pause, self._PausePlayUpdateDownloading )
self._update_downloading_paused_button = ClientGUICommon.BetterBitmapButton( self._network_panel, CC.global_pixmaps().pause, self._PausePlayUpdateDownloading )
self._update_downloading_paused_button.setToolTip( 'pause/play update downloading' )
self._update_processing_paused_button = ClientGUICommon.BetterBitmapButton( self, CC.global_pixmaps().pause, self._PausePlayUpdateProcessing )
self._update_processing_paused_button.setToolTip( 'pause/play update processing' )
self._processing_progress = ClientGUICommon.TextAndGauge( self )
self._is_mostly_caught_up_st = ClientGUICommon.BetterStaticText( self )
self._sync_remote_now_button = ClientGUICommon.BetterButton( self, 'download now', self._SyncRemoteNow )
self._sync_processing_now_button = ClientGUICommon.BetterButton( self, 'process now', self._SyncProcessingNow )
self._export_updates_button = ClientGUICommon.BetterButton( self, 'export updates', self._ExportUpdates )
self._sync_remote_now_button = ClientGUICommon.BetterButton( self._network_panel, 'download now', self._SyncRemoteNow )
reset_menu_items = []
reset_menu_items.append( ( 'normal', 'do a full metadata resync', 'Resync all update information.', self._DoAFullMetadataResync ) )
self._reset_downloading_button = ClientGUIMenuButton.MenuButton( self, 'reset downloading', reset_menu_items )
self._reset_downloading_button = ClientGUIMenuButton.MenuButton( self._network_panel, 'reset downloading', reset_menu_items )
self._export_updates_button = ClientGUICommon.BetterButton( self._network_panel, 'export updates', self._ExportUpdates )
#
self._processing_panel = ClientGUICommon.StaticBox( self, 'processing sync' )
self._update_processing_paused_button = ClientGUICommon.BetterBitmapButton( self._processing_panel, CC.global_pixmaps().pause, self._PausePlayUpdateProcessing )
self._update_processing_paused_button.setToolTip( 'pause/play all update processing' )
self._processing_definitions_progress = ClientGUICommon.TextAndGauge( self._processing_panel )
#
content_types = tuple( HC.REPOSITORY_CONTENT_TYPES[ self._service.GetServiceType() ] )
self._content_types_to_gauges_and_buttons = {}
for content_type in content_types:
processing_progress = ClientGUICommon.TextAndGauge( self._processing_panel )
processing_paused_button = ClientGUICommon.BetterBitmapButton( self._processing_panel, CC.global_pixmaps().pause, self._PausePlayUpdateProcessing, content_type )
processing_paused_button.setToolTip( 'pause/play update processing for {}'.format( HC.content_type_string_lookup[ content_type ] ) )
self._content_types_to_gauges_and_buttons[ content_type ] = ( processing_progress, processing_paused_button )
#
self._is_mostly_caught_up_st = ClientGUICommon.BetterStaticText( self._processing_panel )
self._sync_processing_now_button = ClientGUICommon.BetterButton( self._processing_panel, 'process now', self._SyncProcessingNow )
reset_menu_items = []
reset_menu_items.append( ( 'normal', 'fill in definition gaps', 'Reprocess all definitions.', self._ReprocessDefinitions ) )
reset_menu_items.append( ( 'normal', 'fill in content gaps', 'Reprocess all content.', self._ReprocessContent ) )
reset_menu_items.append( ( 'separator', None, None, None ) )
reset_menu_items.append( ( 'normal', 'wipe database data and reprocess from update files', 'Reset entire repository.', self._Reset ) )
reset_menu_items.append( ( 'normal', 'delete and reprocess specific content', 'Reset some of the repository\'s content.', self._ResetProcessing ) )
reset_menu_items.append( ( 'separator', None, None, None ) )
reset_menu_items.append( ( 'normal', 'wipe all database data and reprocess', 'Reset entire repository.', self._Reset ) )
self._reset_processing_button = ClientGUIMenuButton.MenuButton( self, 'reset processing', reset_menu_items )
@ -2584,22 +2611,55 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
self._reset_processing_button.hide()
self._network_panel.Add( self._repo_options_st, CC.FLAGS_EXPAND_PERPENDICULAR )
self._network_panel.Add( self._metadata_st, CC.FLAGS_EXPAND_PERPENDICULAR )
self._network_panel.Add( self._download_progress, CC.FLAGS_EXPAND_PERPENDICULAR )
self._network_panel.Add( self._update_downloading_paused_button, CC.FLAGS_ON_RIGHT )
hbox = QP.HBoxLayout()
QP.AddToLayout( hbox, self._sync_remote_now_button, CC.FLAGS_CENTER_PERPENDICULAR )
QP.AddToLayout( hbox, self._sync_processing_now_button, CC.FLAGS_CENTER_PERPENDICULAR )
QP.AddToLayout( hbox, self._export_updates_button, CC.FLAGS_CENTER_PERPENDICULAR )
QP.AddToLayout( hbox, self._reset_downloading_button, CC.FLAGS_CENTER_PERPENDICULAR )
QP.AddToLayout( hbox, self._export_updates_button, CC.FLAGS_CENTER_PERPENDICULAR )
self._network_panel.Add( hbox, CC.FLAGS_ON_RIGHT )
#
self._processing_panel.Add( self._processing_definitions_progress, CC.FLAGS_EXPAND_PERPENDICULAR )
hbox = QP.HBoxLayout()
QP.AddToLayout( hbox, ClientGUICommon.BetterStaticText( self._processing_panel, label = 'pause/play all processing: ' ), CC.FLAGS_CENTER )
QP.AddToLayout( hbox, self._update_processing_paused_button, CC.FLAGS_CENTER )
self._processing_panel.Add( hbox, CC.FLAGS_ON_RIGHT )
for content_type in content_types:
( gauge, button ) = self._content_types_to_gauges_and_buttons[ content_type ]
self._processing_panel.Add( gauge, CC.FLAGS_EXPAND_PERPENDICULAR )
self._processing_panel.Add( button, CC.FLAGS_ON_RIGHT )
self._processing_panel.Add( self._is_mostly_caught_up_st, CC.FLAGS_EXPAND_PERPENDICULAR )
hbox = QP.HBoxLayout()
QP.AddToLayout( hbox, self._sync_processing_now_button, CC.FLAGS_CENTER_PERPENDICULAR )
QP.AddToLayout( hbox, self._reset_processing_button, CC.FLAGS_CENTER_PERPENDICULAR )
self.Add( self._repo_options_st, CC.FLAGS_EXPAND_PERPENDICULAR )
self.Add( self._metadata_st, CC.FLAGS_EXPAND_PERPENDICULAR )
self.Add( self._download_progress, CC.FLAGS_EXPAND_PERPENDICULAR )
self.Add( self._update_downloading_paused_button, CC.FLAGS_ON_RIGHT )
self.Add( self._processing_progress, CC.FLAGS_EXPAND_PERPENDICULAR )
self.Add( self._update_processing_paused_button, CC.FLAGS_ON_RIGHT )
self.Add( self._is_mostly_caught_up_st, CC.FLAGS_EXPAND_PERPENDICULAR )
self.Add( hbox, CC.FLAGS_ON_RIGHT )
self._processing_panel.Add( hbox, CC.FLAGS_ON_RIGHT )
#
vbox = QP.VBoxLayout()
QP.AddToLayout( vbox, self._network_panel, CC.FLAGS_EXPAND_PERPENDICULAR )
QP.AddToLayout( vbox, self._processing_panel, CC.FLAGS_EXPAND_PERPENDICULAR )
self.setLayout( vbox )
HG.client_controller.sub( self, 'ServiceUpdated', 'service_updated' )
@ -2732,9 +2792,9 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
self._service.PausePlayUpdateDownloading()
def _PausePlayUpdateProcessing( self ):
def _PausePlayUpdateProcessing( self, content_type = None ):
self._service.PausePlayUpdateProcessing()
self._service.PausePlayUpdateProcessing( content_type = content_type )
def _Refresh( self ):
@ -2760,7 +2820,9 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
#
if self._service.IsPausedUpdateProcessing():
all_processing_paused = self._service.IsPausedUpdateProcessing()
if all_processing_paused:
ClientGUIFunctions.SetBitmapButtonBitmap( self._update_processing_paused_button, CC.global_pixmaps().play )
@ -2769,6 +2831,25 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
ClientGUIFunctions.SetBitmapButtonBitmap( self._update_processing_paused_button, CC.global_pixmaps().pause )
for ( gauge, button ) in self._content_types_to_gauges_and_buttons.values():
button.setEnabled( not all_processing_paused )
#
for ( content_type, ( gauge, button ) ) in self._content_types_to_gauges_and_buttons.items():
if self._service.IsPausedUpdateProcessing( content_type ):
ClientGUIFunctions.SetBitmapButtonBitmap( button, CC.global_pixmaps().play )
else:
ClientGUIFunctions.SetBitmapButtonBitmap( button, CC.global_pixmaps().pause )
#
repo_options_text_components = []
@ -2808,7 +2889,7 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
service_key = service.GetServiceKey()
HG.client_controller.WriteSynchronous( 'reprocess_repository', service_key, ( HC.APPLICATION_HYDRUS_UPDATE_DEFINITIONS, ) )
HG.client_controller.WriteSynchronous( 'reprocess_repository', service_key, ( HC.CONTENT_TYPE_DEFINITIONS, ) )
my_updater.Update()
@ -2829,26 +2910,33 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
def _ReprocessContent( self ):
def do_it( service, my_updater ):
def do_it( service, my_updater, content_types_to_reset ):
service_key = service.GetServiceKey()
HG.client_controller.WriteSynchronous( 'reprocess_repository', service_key, ( HC.APPLICATION_HYDRUS_UPDATE_CONTENT, ) )
HG.client_controller.WriteSynchronous( 'reprocess_repository', service_key, content_types_to_reset )
my_updater.Update()
content_types = self._SelectContentTypes()
if len( content_types ) == 0:
return
name = self._service.GetName()
message = 'This will command the client to reprocess all content updates for {}. It will not delete anything.'.format( name )
message = 'This will command the client to reprocess ({}) for {}. It will not delete anything.'.format( ', '.join( ( HC.content_type_string_lookup[ content_type ] for content_type in content_types ) ), name )
message += os.linesep * 2
message += 'This is a only useful as a debug tool for filling in \'gaps\'. If you do not understand what this does, turn back now.'
message += 'This is a only useful as a debug tool for filling in \'gaps\' caused by processing bugs or database damage. If you do not understand what this does, turn back now.'
result = ClientGUIDialogsQuick.GetYesNo( self, message )
if result == QW.QDialog.Accepted:
HG.client_controller.CallToThread( do_it, self._service, self._my_updater )
HG.client_controller.CallToThread( do_it, self._service, self._my_updater, content_types )
@ -2856,7 +2944,7 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
name = self._service.GetName()
message = 'This will delete all the processed information for ' + name + ' from the database.' + os.linesep * 2 + 'Once the service is reset, you will have to reprocess everything from your downloaded update files. The client will naturally do this in its idle time as before, just starting over from the beginning.' + os.linesep * 2 + 'If you do not understand what this does, click no!'
message = 'This will delete all the processed information for ' + name + ' from the database, including definitions.' + os.linesep * 2 + 'Once the service is reset, you will have to reprocess everything from your downloaded update files. The client will naturally do this in its idle time as before, just starting over from the beginning.' + os.linesep * 2 + 'This is a severe maintenance task that is only appropriate after trying to recover from critical database error. If you do not understand what this does, click no!'
result = ClientGUIDialogsQuick.GetYesNo( self, message )
@ -2873,6 +2961,56 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
def _ResetProcessing( self ):
def do_it( service, my_updater, content_types_to_reset ):
service_key = service.GetServiceKey()
HG.client_controller.WriteSynchronous( 'reset_repository_processing', service_key, content_types_to_reset )
my_updater.Update()
content_types = self._SelectContentTypes()
if len( content_types ) == 0:
return
name = self._service.GetName()
message = 'You are about to delete and reprocess ({}) for {}.'.format( ', '.join( ( HC.content_type_string_lookup[ content_type ] for content_type in content_types ) ), name )
message += os.linesep * 2
message += 'It may take some time to delete it all, and then future idle time to reprocess. It is only worth doing this if you believe there are logical problems in the initial process. If you just want to fill in gaps, use that simpler maintenance task, which runs much faster.'
result = ClientGUIDialogsQuick.GetYesNo( self, message )
if result == QW.QDialog.Accepted:
HG.client_controller.CallToThread( do_it, self._service, self._my_updater, content_types )
def _SelectContentTypes( self ):
choice_tuples = [ ( HC.content_type_string_lookup[ content_type ], content_type, False ) for content_type in self._content_types_to_gauges_and_buttons.keys() ]
try:
result = ClientGUIDialogsQuick.SelectMultipleFromList( self, 'select the content to delete and reprocess', choice_tuples )
except HydrusExceptions.CancelledException:
return []
content_types = result
return content_types
def _SyncRemoteNow( self ):
def do_it( service, my_updater ):
@ -2922,15 +3060,46 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
def THREADFetchInfo( self, service ):
def qt_code( download_text, download_value, processing_text, processing_value, range, is_mostly_caught_up ):
def qt_code( num_local_updates, num_updates, content_types_to_num_processed_updates, content_types_to_num_updates, is_mostly_caught_up ):
if not self or not QP.isValid( self ):
return
self._download_progress.SetValue( download_text, download_value, range )
self._processing_progress.SetValue( processing_text, processing_value, range )
download_text = 'downloaded {}'.format( HydrusData.ConvertValueRangeToPrettyString( num_local_updates, num_updates ) )
self._download_progress.SetValue( download_text, num_local_updates, num_updates )
processing_work_to_do = False
d_value = content_types_to_num_processed_updates[ HC.CONTENT_TYPE_DEFINITIONS ]
d_range = content_types_to_num_updates[ HC.CONTENT_TYPE_DEFINITIONS ]
if d_value < d_range:
processing_work_to_do = True
definitions_text = 'definitions: {}'.format( HydrusData.ConvertValueRangeToPrettyString( d_value, d_range ) )
self._processing_definitions_progress.SetValue( definitions_text, d_value, d_range )
for ( content_type, ( gauge, button ) ) in self._content_types_to_gauges_and_buttons.items():
c_value = content_types_to_num_processed_updates[ content_type ]
c_range = content_types_to_num_updates[ content_type ]
if not self._service.IsPausedUpdateProcessing( content_type ) and c_value < c_range:
# there is work to do on downloads that we have on disk
processing_work_to_do = True
content_text = '{}: {}'.format( HC.content_type_string_lookup[ content_type ], HydrusData.ConvertValueRangeToPrettyString( c_value, c_range ) )
gauge.SetValue( content_text, c_value, c_range )
if is_mostly_caught_up:
@ -2943,26 +3112,10 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
self._is_mostly_caught_up_st.setText( caught_up_text )
if download_value == 0:
self._export_updates_button.setEnabled( False )
else:
self._export_updates_button.setEnabled( True )
if processing_value == 0:
self._reset_processing_button.setEnabled( False )
else:
self._reset_processing_button.setEnabled( True )
self._export_updates_button.setEnabled( d_value > 0 )
metadata_due = self._service.GetMetadata().UpdateDue( from_client = True )
updates_due = download_value < range
updates_due = num_local_updates < num_updates
download_work_to_do = metadata_due or updates_due
@ -2977,8 +3130,6 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
self._sync_remote_now_button.setEnabled( False )
processing_work_to_do = processing_value < download_value
can_sync_process = self._service.CanSyncProcess()
if processing_work_to_do and can_sync_process:
@ -2991,15 +3142,11 @@ class ReviewServiceRepositorySubPanel( ClientGUICommon.StaticBox ):
( download_value, processing_value, range ) = HG.client_controller.Read( 'repository_progress', service.GetServiceKey() )
( num_local_updates, num_updates, content_types_to_num_processed_updates, content_types_to_num_updates ) = HG.client_controller.Read( 'repository_progress', service.GetServiceKey() )
is_mostly_caught_up = service.IsMostlyCaughtUp()
download_text = 'downloaded ' + HydrusData.ConvertValueRangeToPrettyString( download_value, range )
processing_text = 'processed ' + HydrusData.ConvertValueRangeToPrettyString( processing_value, range )
QP.CallAfter( qt_code, download_text, download_value, processing_text, processing_value, range, is_mostly_caught_up )
QP.CallAfter( qt_code, num_local_updates, num_updates, content_types_to_num_processed_updates, content_types_to_num_updates, is_mostly_caught_up )

View File

@ -404,7 +404,10 @@ class TagDisplayMaintenanceManager( object ):
status = self._controller.Read( 'tag_display_maintenance_status', service_key )
self._service_keys_to_needs_work[ service_key ] = status[ 'num_siblings_to_sync' ] + status[ 'num_parents_to_sync' ] > 0
work_to_do = status[ 'num_siblings_to_sync' ] + status[ 'num_parents_to_sync' ] > 0
sync_halted = len( status[ 'waiting_on_tag_repos' ] ) > 0
self._service_keys_to_needs_work[ service_key ] = work_to_do and not sync_halted
if self._service_keys_to_needs_work[ service_key ]:

View File

@ -81,7 +81,7 @@ options = {}
# Misc
NETWORK_VERSION = 20
SOFTWARE_VERSION = 448
SOFTWARE_VERSION = 449
CLIENT_API_VERSION = 18
SERVER_THUMBNAIL_DIMENSIONS = ( 200, 200 )
@ -150,6 +150,7 @@ CONTENT_TYPE_TITLE = 17
CONTENT_TYPE_NOTES = 18
CONTENT_TYPE_FILE_VIEWING_STATS = 19
CONTENT_TYPE_TAG = 20
CONTENT_TYPE_DEFINITIONS = 21
content_type_string_lookup = {}
@ -173,6 +174,7 @@ content_type_string_lookup[ CONTENT_TYPE_TIMESTAMP ] = 'timestamp'
content_type_string_lookup[ CONTENT_TYPE_TITLE ] = 'title'
content_type_string_lookup[ CONTENT_TYPE_NOTES ] = 'notes'
content_type_string_lookup[ CONTENT_TYPE_FILE_VIEWING_STATS ] = 'file viewing stats'
content_type_string_lookup[ CONTENT_TYPE_DEFINITIONS ] = 'definitions'
REPOSITORY_CONTENT_TYPES = [ CONTENT_TYPE_FILES, CONTENT_TYPE_MAPPINGS, CONTENT_TYPE_TAG_PARENTS, CONTENT_TYPE_TAG_SIBLINGS ]
@ -436,6 +438,11 @@ ALL_SERVICES = REMOTE_SERVICES + LOCAL_SERVICES + ( COMBINED_FILE, COMBINED_TAG
SERVICES_WITH_THUMBNAILS = [ FILE_REPOSITORY, LOCAL_FILE_DOMAIN ]
REPOSITORY_CONTENT_TYPES = {
FILE_REPOSITORY : ( CONTENT_TYPE_FILES, ),
TAG_REPOSITORY : ( CONTENT_TYPE_MAPPINGS, CONTENT_TYPE_TAG_PARENTS, CONTENT_TYPE_TAG_SIBLINGS )
}
DELETE_FILES_PETITION = 0
DELETE_TAG_PETITION = 1

View File

@ -1,4 +1,5 @@
import collections
import itertools
import threading
import time
import typing
@ -1553,12 +1554,17 @@ class ContentUpdate( HydrusSerialisable.SerialisableBase ):
return self._GetContent( HC.CONTENT_TYPE_TAG_SIBLINGS, HC.CONTENT_UPDATE_ADD )
def GetNumRows( self ):
def GetNumRows( self, content_types_to_count = None ):
num = 0
for content_type in self._content_data:
if content_types_to_count is not None and content_type not in content_types_to_count:
continue
for action in self._content_data[ content_type ]:
data = self._content_data[ content_type ][ action ]
@ -1896,6 +1902,7 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
self._next_update_due = next_update_due
self._update_hashes = set()
self._update_hashes_ordered = []
self._biggest_end = self._CalculateBiggestEnd()
@ -1964,7 +1971,13 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
self._metadata[ update_index ] = ( update_hashes, begin, end )
for update_index in sorted( self._metadata.keys() ):
( update_hashes, begin, end ) = self._metadata[ update_index ]
self._update_hashes.update( update_hashes )
self._update_hashes_ordered.extend( update_hashes )
self._biggest_end = self._CalculateBiggestEnd()
@ -1979,6 +1992,7 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
self._metadata[ update_index ] = ( update_hashes, begin, end )
self._update_hashes.update( update_hashes )
self._update_hashes_ordered.extend( update_hashes )
self._next_update_due = next_update_due
@ -2078,9 +2092,7 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
with self._lock:
num_update_hashes = sum( ( len( update_hashes ) for ( update_hashes, begin, end ) in self._metadata.values() ) )
return num_update_hashes
return len( self._update_hashes )
@ -2100,20 +2112,11 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
if update_index is None:
all_update_hashes = set()
for ( update_hashes, begin, end ) in self._metadata.values():
all_update_hashes.update( update_hashes )
return all_update_hashes
return set( self._update_hashes )
else:
update_hashes = self._GetUpdateHashes( update_index )
return update_hashes
return set( self._GetUpdateHashes( update_index ) )
@ -2179,6 +2182,18 @@ class Metadata( HydrusSerialisable.SerialisableBase ):
def SortContentHashesAndContentTypes( self, content_hashes_and_content_types ):
with self._lock:
content_hashes_to_content_types = dict( content_hashes_and_content_types )
content_hashes_and_content_types = [ ( update_hash, content_hashes_to_content_types[ update_hash ] ) for update_hash in self._update_hashes_ordered if update_hash in content_hashes_to_content_types ]
return content_hashes_and_content_types
def UpdateASAP( self ):
with self._lock: