import cStringIO import ClientConstants as CC import ClientNetworkingContexts import ClientNetworkingDomain import HydrusConstants as HC import HydrusData import HydrusExceptions import HydrusGlobals as HG import HydrusNetworking import os import requests import threading import traceback import time def ConvertStatusCodeAndDataIntoExceptionInfo( status_code, data, is_hydrus_service = False ): error_text = data if len( error_text ) > 1024: large_chunk = error_text[:4096] smaller_chunk = large_chunk[:256] HydrusData.DebugPrint( large_chunk ) error_text = 'The server\'s error text was too long to display. The first part follows, while a larger chunk has been written to the log.' error_text += os.linesep error_text += smaller_chunk if status_code == 304: eclass = HydrusExceptions.NotModifiedException elif status_code == 401: eclass = HydrusExceptions.PermissionException elif status_code == 403: eclass = HydrusExceptions.ForbiddenException elif status_code == 404: eclass = HydrusExceptions.NotFoundException elif status_code == 419: eclass = HydrusExceptions.SessionException elif status_code == 426: eclass = HydrusExceptions.NetworkVersionException elif status_code == 509: eclass = HydrusExceptions.BandwidthException elif status_code >= 500: if is_hydrus_service and status_code == 503: eclass = HydrusExceptions.ServerBusyException else: eclass = HydrusExceptions.ServerException else: eclass = HydrusExceptions.NetworkException e = eclass( error_text ) return ( e, error_text ) class NetworkJob( object ): IS_HYDRUS_SERVICE = False def __init__( self, method, url, body = None, referral_url = None, temp_path = None ): self.engine = None self._lock = threading.Lock() self._method = method self._url = url self._domain = ClientNetworkingDomain.ConvertURLIntoDomain( self._url ) self._second_level_domain = ClientNetworkingDomain.ConvertURLIntoSecondLevelDomain( self._url ) self._body = body self._referral_url = referral_url self._temp_path = temp_path self._files = None self._for_login = False self._current_connection_attempt_number = 1 self._additional_headers = {} self._creation_time = HydrusData.GetNow() self._bandwidth_tracker = HydrusNetworking.BandwidthTracker() self._wake_time = 0 self._content_type = None self._stream_io = cStringIO.StringIO() self._error_exception = Exception( 'Exception not initialised.' ) # PyLint hint, wew self._error_exception = None self._error_text = None self._is_done_event = threading.Event() self._is_started = False self._is_done = False self._is_cancelled = False self._gallery_token_name = None self._gallery_token_consumed = False self._bandwidth_manual_override = False self._bandwidth_manual_override_delayed_timestamp = None self._last_time_ongoing_bandwidth_failed = 0 self._status_text = u'initialising\u2026' self._num_bytes_read = 0 self._num_bytes_to_read = 1 self._death_time = None self._file_import_options = None self._network_contexts = self._GenerateNetworkContexts() ( self._session_network_context, self._login_network_context ) = self._GenerateSpecificNetworkContexts() def _CanReattemptConnection( self ): max_attempts_allowed = 3 return self._current_connection_attempt_number <= max_attempts_allowed def _CanReattemptRequest( self ): if self._method == 'GET': max_attempts_allowed = 5 elif self._method == 'POST': max_attempts_allowed = 1 return self._current_connection_attempt_number <= max_attempts_allowed def _GenerateNetworkContexts( self ): network_contexts = [] network_contexts.append( ClientNetworkingContexts.GLOBAL_NETWORK_CONTEXT ) domains = ClientNetworkingDomain.ConvertDomainIntoAllApplicableDomains( self._domain ) network_contexts.extend( ( ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_DOMAIN, domain ) for domain in domains ) ) return network_contexts def _GenerateSpecificNetworkContexts( self ): # we always store cookies in the larger session (even if the cookie itself refers to a subdomain in the session object) # but we can login to a specific subdomain session_network_context = ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_DOMAIN, self._second_level_domain ) login_network_context = ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_DOMAIN, self._domain ) return ( session_network_context, login_network_context ) def _SendRequestAndGetResponse( self ): with self._lock: ncs = list( self._network_contexts ) headers = self.engine.domain_manager.GetHeaders( ncs ) with self._lock: method = self._method url = self._url data = self._body files = self._files if self.IS_HYDRUS_SERVICE: headers[ 'User-Agent' ] = 'hydrus client/' + str( HC.NETWORK_VERSION ) if self._referral_url is not None: headers[ 'referer' ] = HydrusData.ToByteString( self._referral_url ) for ( key, value ) in self._additional_headers.items(): headers[ key ] = HydrusData.ToByteString( value ) self._status_text = u'sending request\u2026' snc = self._session_network_context session = self.engine.session_manager.GetSession( snc ) connect_timeout = HG.client_controller.new_options.GetInteger( 'network_timeout' ) read_timeout = connect_timeout * 6 response = session.request( method, url, data = data, files = files, headers = headers, stream = True, timeout = ( connect_timeout, read_timeout ) ) return response def _IsCancelled( self ): if self._is_cancelled: return True if self.engine.controller.ModelIsShutdown(): return True return False def _IsDone( self ): if self._is_done: return True if self.engine.controller.ModelIsShutdown(): return True return False def _ObeysBandwidth( self ): if self._bandwidth_manual_override: return False if self._bandwidth_manual_override_delayed_timestamp is not None and HydrusData.TimeHasPassed( self._bandwidth_manual_override_delayed_timestamp ): return False if self._method == 'POST': return False if self._for_login: return False return True def _OngoingBandwidthOK( self ): now = HydrusData.GetNow() if now == self._last_time_ongoing_bandwidth_failed: # it won't have changed, so no point spending any cpu checking return False else: result = self.engine.bandwidth_manager.CanContinueDownload( self._network_contexts ) if not result: self._last_time_ongoing_bandwidth_failed = now return result def _ReadResponse( self, response, stream_dest, max_allowed = None ): with self._lock: if self._content_type is not None and self._content_type in HC.mime_enum_lookup: mime = HC.mime_enum_lookup[ self._content_type ] else: mime = None if 'content-length' in response.headers: self._num_bytes_to_read = int( response.headers[ 'content-length' ] ) if max_allowed is not None and self._num_bytes_to_read > max_allowed: raise HydrusExceptions.NetworkException( 'The url was apparently ' + HydrusData.ConvertIntToBytes( self._num_bytes_to_read ) + ' but the max network size for this type of job is ' + HydrusData.ConvertIntToBytes( max_allowed ) + '!' ) if self._file_import_options is not None: certain = True self._file_import_options.CheckNetworkDownload( mime, self._num_bytes_to_read, certain ) else: self._num_bytes_to_read = None for chunk in response.iter_content( chunk_size = 65536 ): if self._IsCancelled(): return stream_dest.write( chunk ) chunk_length = len( chunk ) with self._lock: self._num_bytes_read += chunk_length if max_allowed is not None and self._num_bytes_read > max_allowed: raise HydrusExceptions.NetworkException( 'The url exceeded the max network size for this type of job, which is ' + HydrusData.ConvertIntToBytes( max_allowed ) + '!' ) if self._file_import_options is not None: certain = False self._file_import_options.CheckNetworkDownload( mime, self._num_bytes_to_read, certain ) self._ReportDataUsed( chunk_length ) self._WaitOnOngoingBandwidth() if HG.view_shutdown: raise HydrusExceptions.ShutdownException() if self._num_bytes_to_read is not None and self._num_bytes_read < self._num_bytes_to_read * 0.8: raise HydrusExceptions.ShouldReattemptNetworkException( 'Was expecting ' + HydrusData.ConvertIntToBytes( self._num_bytes_to_read ) + ' but only got ' + HydrusData.ConvertIntToBytes( self._num_bytes_read ) + '.' ) def _ReportDataUsed( self, num_bytes ): self._bandwidth_tracker.ReportDataUsed( num_bytes ) self.engine.bandwidth_manager.ReportDataUsed( self._network_contexts, num_bytes ) def _SetCancelled( self ): self._is_cancelled = True self._SetDone() def _SetError( self, e, error ): self._error_exception = e self._error_text = error self._SetDone() def _SetDone( self ): self._is_done = True self._is_done_event.set() def _Sleep( self, seconds ): self._wake_time = HydrusData.GetNow() + seconds def _WaitOnOngoingBandwidth( self ): while not self._OngoingBandwidthOK() and not self._IsCancelled(): time.sleep( 0.1 ) def AddAdditionalHeader( self, key, value ): with self._lock: self._additional_headers[ key ] = value def BandwidthOK( self ): with self._lock: if self._ObeysBandwidth(): result = self.engine.bandwidth_manager.TryToStartRequest( self._network_contexts ) if result: self._bandwidth_tracker.ReportRequestUsed() else: bandwidth_waiting_duration = self.engine.bandwidth_manager.GetWaitingEstimate( self._network_contexts ) will_override = self._bandwidth_manual_override_delayed_timestamp is not None override_coming_first = False if will_override: override_waiting_duration = self._bandwidth_manual_override_delayed_timestamp - HydrusData.GetNow() override_coming_first = override_waiting_duration < bandwidth_waiting_duration if override_coming_first: waiting_duration = override_waiting_duration prefix = 'overriding bandwidth ' waiting_str = HydrusData.TimestampToPrettyTimeDelta( self._bandwidth_manual_override_delayed_timestamp, just_now_string = 'imminently', just_now_threshold = 2 ) else: waiting_duration = bandwidth_waiting_duration prefix = 'bandwidth free ' waiting_str = HydrusData.TimestampToPrettyTimeDelta( HydrusData.GetNow() + waiting_duration, just_now_string = 'imminently', just_now_threshold = 2 ) self._status_text = prefix + waiting_str + u'\u2026' if waiting_duration > 1200: self._Sleep( 30 ) elif waiting_duration > 120: self._Sleep( 10 ) elif waiting_duration > 10: self._Sleep( 1 ) return result else: self._bandwidth_tracker.ReportRequestUsed() self.engine.bandwidth_manager.ReportRequestUsed( self._network_contexts ) return True def Cancel( self ): with self._lock: self._status_text = 'cancelled!' self._SetCancelled() def CanValidateInPopup( self ): with self._lock: return self.engine.domain_manager.CanValidateInPopup( self._network_contexts ) def CheckCanLogin( self ): with self._lock: if self._for_login: raise HydrusExceptions.LoginException( 'Login jobs should not be asked if they can login!' ) else: return self.engine.login_manager.CheckCanLogin( self._login_network_context ) def GenerateLoginProcess( self ): with self._lock: if self._for_login: raise Exception( 'Login jobs should not be asked to generate login processes!' ) else: return self.engine.login_manager.GenerateLoginProcess( self._login_network_context ) def GenerateValidationPopupProcess( self ): with self._lock: return self.engine.domain_manager.GenerateValidationPopupProcess( self._network_contexts ) def GetContent( self ): with self._lock: self._stream_io.seek( 0 ) return self._stream_io.read() def GetContentType( self ): with self._lock: return self._content_type def GetCreationTime( self ): with self._lock: return self._creation_time def GetDomain( self ): with self._lock: return self._domain def GetErrorException( self ): with self._lock: return self._error_exception def GetErrorText( self ): with self._lock: return self._error_text def GetNetworkContexts( self ): with self._lock: return list( self._network_contexts ) def GetSecondLevelDomain( self ): with self._lock: return self._second_level_domain def GetSession( self ): with self._lock: snc = self._session_network_context session = self.engine.session_manager.GetSession( snc ) return session def GetStatus( self ): with self._lock: return ( self._status_text, self._bandwidth_tracker.GetUsage( HC.BANDWIDTH_TYPE_DATA, 1 ), self._num_bytes_read, self._num_bytes_to_read ) def GetTotalDataUsed( self ): with self._lock: return self._bandwidth_tracker.GetUsage( HC.BANDWIDTH_TYPE_DATA, None ) def GetURL( self ): with self._lock: return self._url def HasError( self ): with self._lock: return self._error_exception is not None def IsAsleep( self ): with self._lock: return not HydrusData.TimeHasPassed( self._wake_time ) def IsCancelled( self ): with self._lock: return self._IsCancelled() def IsDone( self ): with self._lock: return self._IsDone() def IsValid( self ): with self._lock: return self.engine.domain_manager.IsValid( self._network_contexts ) def NeedsLogin( self ): with self._lock: if self._for_login: return False else: return self.engine.login_manager.NeedsLogin( self._login_network_context ) def NoEngineYet( self ): return self.engine is None def ObeysBandwidth( self ): return self._ObeysBandwidth() def OverrideBandwidth( self, delay = None ): with self._lock: if delay is None: self._bandwidth_manual_override = True self._wake_time = 0 else: self._bandwidth_manual_override_delayed_timestamp = HydrusData.GetNow() + delay self._wake_time = min( self._wake_time, self._bandwidth_manual_override_delayed_timestamp + 1 ) def OverrideToken( self ): with self._lock: self._gallery_token_consumed = True self._wake_time = 0 def SetDeathTime( self, death_time ): self._death_time = death_time def SetError( self, e, error ): with self._lock: self._SetError( e, error ) def SetFiles( self, files ): with self._lock: self._files = files def SetFileImportOptions( self, file_import_options ): with self._lock: self._file_import_options = file_import_options def SetForLogin( self, for_login ): with self._lock: self._for_login = for_login def SetGalleryToken( self, token_name ): with self._lock: self._gallery_token_name = token_name def SetStatus( self, text ): with self._lock: self._status_text = text def Sleep( self, seconds ): with self._lock: self._Sleep( seconds ) def Start( self ): try: with self._lock: self._is_started = True self._status_text = u'job started' request_completed = False while not request_completed: try: response = self._SendRequestAndGetResponse() with self._lock: if self._body is not None: self._ReportDataUsed( len( self._body ) ) if 'Content-Type' in response.headers: self._content_type = response.headers[ 'Content-Type' ] if response.ok: with self._lock: self._status_text = u'downloading\u2026' if self._temp_path is None: self._ReadResponse( response, self._stream_io, 104857600 ) else: with open( self._temp_path, 'wb' ) as f: self._ReadResponse( response, f ) with self._lock: self._status_text = 'done!' else: with self._lock: self._status_text = str( response.status_code ) + ' - ' + str( response.reason ) self._ReadResponse( response, self._stream_io, 104857600 ) with self._lock: self._stream_io.seek( 0 ) data = self._stream_io.read() ( e, error_text ) = ConvertStatusCodeAndDataIntoExceptionInfo( response.status_code, data, self.IS_HYDRUS_SERVICE ) self._SetError( e, error_text ) request_completed = True except HydrusExceptions.ShouldReattemptNetworkException as e: self._current_connection_attempt_number += 1 if not self._CanReattemptRequest(): raise HydrusExceptions.NetworkException( 'Ran out of reattempts on this error: ' + HydrusData.ToUnicode( e ) ) with self._lock: self._status_text = HydrusData.ToUnicode( e ) + '--retrying' time.sleep( 3 ) except requests.exceptions.ChunkedEncodingError: self._current_connection_attempt_number += 1 if not self._CanReattemptRequest(): raise HydrusExceptions.ConnectionException( 'Unable to complete request--it broke mid-way!' ) with self._lock: self._status_text = u'connection broke mid-request--retrying' time.sleep( 3 ) except ( requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout ): self._current_connection_attempt_number += 1 if not self._CanReattemptConnection(): raise HydrusExceptions.ConnectionException( 'Could not connect!' ) with self._lock: self._status_text = u'connection failed--retrying' time.sleep( 3 ) except requests.exceptions.ReadTimeout: self._current_connection_attempt_number += 1 if not self._CanReattemptRequest(): raise HydrusExceptions.ConnectionException( 'Connection successful, but reading response timed out!' ) with self._lock: self._status_text = u'read timed out--retrying' time.sleep( 3 ) except Exception as e: with self._lock: self._status_text = 'unexpected error!' trace = traceback.format_exc() HydrusData.Print( trace ) self._SetError( e, trace ) finally: with self._lock: self._SetDone() def TokensOK( self ): with self._lock: need_token = self._gallery_token_name is not None and not self._gallery_token_consumed sld = self._second_level_domain gtn = self._gallery_token_name if need_token: ( consumed, next_timestamp ) = HG.client_controller.network_engine.bandwidth_manager.TryToConsumeAGalleryToken( sld, gtn ) with self._lock: if consumed: self._status_text = 'slot consumed, starting soon' self._gallery_token_consumed = True else: self._status_text = 'waiting for a ' + self._gallery_token_name + ' slot: next ' + HydrusData.TimestampToPrettyTimeDelta( next_timestamp, just_now_threshold = 1 ) self._Sleep( 1 ) return False return True def WaitUntilDone( self ): while True: self._is_done_event.wait( 5 ) with self._lock: if not self._is_started and self._death_time is not None and HydrusData.TimeHasPassed( self._death_time ): raise Exception( 'Network job death time reached--not sure what the error was. Maybe a paused service?' ) if self.IsDone(): break with self._lock: if self.engine.controller.ModelIsShutdown(): raise HydrusExceptions.ShutdownException() elif self._error_exception is not None: if isinstance( self._error_exception, Exception ): raise self._error_exception else: raise Exception( 'Problem in network error handling.' ) elif self._IsCancelled(): if self._method == 'POST': message = 'Upload cancelled!' else: message = 'Download cancelled!' raise HydrusExceptions.CancelledException( message ) class NetworkJobDownloader( NetworkJob ): def __init__( self, downloader_page_key, method, url, body = None, referral_url = None, temp_path = None ): self._downloader_page_key = downloader_page_key NetworkJob.__init__( self, method, url, body = body, referral_url = referral_url, temp_path = temp_path ) def _GenerateNetworkContexts( self ): network_contexts = NetworkJob._GenerateNetworkContexts( self ) network_contexts.append( ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_DOWNLOADER_PAGE, self._downloader_page_key ) ) return network_contexts class NetworkJobSubscription( NetworkJob ): def __init__( self, subscription_key, method, url, body = None, referral_url = None, temp_path = None ): self._subscription_key = subscription_key NetworkJob.__init__( self, method, url, body = body, referral_url = referral_url, temp_path = temp_path ) def _GenerateNetworkContexts( self ): network_contexts = NetworkJob._GenerateNetworkContexts( self ) network_contexts.append( ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_SUBSCRIPTION, self._subscription_key ) ) return network_contexts class NetworkJobHydrus( NetworkJob ): IS_HYDRUS_SERVICE = True def __init__( self, service_key, method, url, body = None, referral_url = None, temp_path = None ): self._service_key = service_key NetworkJob.__init__( self, method, url, body = body, referral_url = referral_url, temp_path = temp_path ) def _CheckHydrusVersion( self, service_type, response ): service_string = HC.service_string_lookup[ service_type ] headers = response.headers if 'server' not in headers or service_string not in headers[ 'server' ]: raise HydrusExceptions.WrongServiceTypeException( 'Target was not a ' + service_string + '!' ) server_header = headers[ 'server' ] ( service_string_gumpf, network_version ) = server_header.split( '/' ) network_version = int( network_version ) if network_version != HC.NETWORK_VERSION: if network_version > HC.NETWORK_VERSION: message = 'Your client is out of date; please download the latest release.' else: message = 'The server is out of date; please ask its admin to update to the latest release.' raise HydrusExceptions.NetworkVersionException( 'Network version mismatch! The server\'s network version was ' + str( network_version ) + ', whereas your client\'s is ' + str( HC.NETWORK_VERSION ) + '! ' + message ) def _GenerateNetworkContexts( self ): network_contexts = [] network_contexts.append( ClientNetworkingContexts.GLOBAL_NETWORK_CONTEXT ) network_contexts.append( ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_HYDRUS, self._service_key ) ) return network_contexts def _GenerateSpecificNetworkContexts( self ): # we store cookies on and login to the same hydrus-specific context session_network_context = ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_HYDRUS, self._service_key ) login_network_context = session_network_context return ( session_network_context, login_network_context ) def _ReportDataUsed( self, num_bytes ): service = self.engine.controller.services_manager.GetService( self._service_key ) service_type = service.GetServiceType() if service_type in HC.RESTRICTED_SERVICES: account = service.GetAccount() account.ReportDataUsed( num_bytes ) NetworkJob._ReportDataUsed( self, num_bytes ) def _SendRequestAndGetResponse( self ): service = self.engine.controller.services_manager.GetService( self._service_key ) service_type = service.GetServiceType() if service_type in HC.RESTRICTED_SERVICES: account = service.GetAccount() account.ReportRequestUsed() response = NetworkJob._SendRequestAndGetResponse( self ) if service_type in HC.RESTRICTED_SERVICES: self._CheckHydrusVersion( service_type, response ) return response class NetworkJobWatcherPage( NetworkJob ): def __init__( self, watcher_key, method, url, body = None, referral_url = None, temp_path = None ): self._watcher_key = watcher_key NetworkJob.__init__( self, method, url, body = body, referral_url = referral_url, temp_path = temp_path ) def _GenerateNetworkContexts( self ): network_contexts = NetworkJob._GenerateNetworkContexts( self ) network_contexts.append( ClientNetworkingContexts.NetworkContext( CC.NETWORK_CONTEXT_WATCHER_PAGE, self._watcher_key ) ) return network_contexts