hydrus/include/ClientNetworkingDomain.py

586 lines
19 KiB
Python

import ClientConstants as CC
import ClientParsing
import ClientThreading
import collections
import HydrusConstants as HC
import HydrusGlobals as HG
import HydrusData
import HydrusExceptions
import HydrusSerialisable
import os
import threading
import time
import urlparse
def ConvertDomainIntoAllApplicableDomains( domain ):
domains = []
while domain.count( '.' ) > 0:
# let's discard www.blah.com so we don't end up tracking it separately to blah.com--there's not much point!
startswith_www = domain.count( '.' ) > 1 and domain.startswith( 'www' )
if not startswith_www:
domains.append( domain )
domain = '.'.join( domain.split( '.' )[1:] ) # i.e. strip off the leftmost subdomain maps.google.com -> google.com
return domains
def ConvertURLIntoDomain( url ):
parser_result = urlparse.urlparse( url )
domain = HydrusData.ToByteString( parser_result.netloc )
return domain
VALID_DENIED = 0
VALID_APPROVED = 1
VALID_UNKNOWN = 2
valid_str_lookup = {}
valid_str_lookup[ VALID_DENIED ] = 'denied'
valid_str_lookup[ VALID_APPROVED ] = 'approved'
valid_str_lookup[ VALID_UNKNOWN ] = 'unknown'
class NetworkDomainManager( HydrusSerialisable.SerialisableBase ):
SERIALISABLE_TYPE = HydrusSerialisable.SERIALISABLE_TYPE_NETWORK_DOMAIN_MANAGER
SERIALISABLE_VERSION = 1
def __init__( self ):
HydrusSerialisable.SerialisableBase.__init__( self )
self.engine = None
self._url_matches = HydrusSerialisable.SerialisableList()
self._network_contexts_to_custom_header_dicts = collections.defaultdict( dict )
self._domains_to_url_matches = collections.defaultdict( list )
self._dirty = False
self._lock = threading.Lock()
self._RecalcCache()
def _GetSerialisableInfo( self ):
serialisable_url_matches = self._url_matches.GetSerialisableTuple()
serialisable_network_contexts_to_custom_header_dicts = [ ( network_context.GetSerialisableTuple(), custom_header_dict.items() ) for ( network_context, custom_header_dict ) in self._network_contexts_to_custom_header_dicts.items() ]
return ( serialisable_url_matches, serialisable_network_contexts_to_custom_header_dicts )
def _GetURLMatch( self, url ):
domain = ConvertURLIntoDomain( url )
if domain in self._domains_to_url_matches:
url_matches = self._domains_to_url_matches[ domain ]
# it would be nice to somehow sort these based on descending complexity
# maybe by length of example url
# in this way, url matches can have overlapping desmaign
# e.g. 'post url' vs 'post url, manga subpage'
for url_match in url_matches:
( result_bool, result_reason ) = url_match.Test( url )
if result_bool:
return url_match
return None
def _InitialiseFromSerialisableInfo( self, serialisable_info ):
( serialisable_url_matches, serialisable_network_contexts_to_custom_header_dicts ) = serialisable_info
self._url_matches = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_url_matches )
self._network_contexts_to_custom_header_dicts = collections.defaultdict( dict )
for ( serialisable_network_context, custom_header_dict_items ) in serialisable_network_contexts_to_custom_header_dicts:
network_context = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_network_context )
custom_header_dict = dict( custom_header_dict_items )
self._network_contexts_to_custom_header_dicts[ network_context ] = custom_header_dict
def _RecalcCache( self ):
self._domains_to_url_matches = collections.defaultdict( list )
for url_match in self._url_matches:
domain = url_match.GetDomain()
self._domains_to_url_matches[ domain ].append( url_match )
def _SetDirty( self ):
self._dirty = True
def CanValidateInPopup( self, network_contexts ):
# we can always do this for headers
return True
def GenerateValidationPopupProcess( self, network_contexts ):
with self._lock:
header_tuples = []
for network_context in network_contexts:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
for ( key, ( value, approved, reason ) ) in custom_header_dict.items():
if approved == VALID_UNKNOWN:
header_tuples.append( ( network_context, key, value, reason ) )
process = DomainValidationPopupProcess( self, header_tuples )
return process
def GetDownloader( self, url ):
with self._lock:
# this might be better as getdownloaderkey, but we'll see how it shakes out
# might also be worth being a getifhasdownloader
# match the url to a url_match, then lookup that in a 'this downloader can handle this url_match type' dict that we'll manage
pass
def GetHeaders( self, network_contexts ):
with self._lock:
headers = {}
for network_context in network_contexts:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
for ( key, ( value, approved, reason ) ) in custom_header_dict.items():
if approved == VALID_APPROVED:
headers[ key ] = value
return headers
def GetNetworkContextsToCustomHeaderDicts( self ):
with self._lock:
return dict( self._network_contexts_to_custom_header_dicts )
def IsDirty( self ):
with self._lock:
return self._dirty
def IsValid( self, network_contexts ):
# for now, let's say that denied headers are simply not added, not that they invalidate a query
for network_context in network_contexts:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
for ( value, approved, reason ) in custom_header_dict.values():
if approved == VALID_UNKNOWN:
return False
return True
def NormaliseURL( self, url ):
# call this before an entry into a seed cache or the db
# use it in the dialog to review mass db-level changes
with self._lock:
url_match = self._GetURLMatch( url )
if url_match is None:
return url
normalised_url = url_match.Normalise( url )
return normalised_url
def SetClean( self ):
with self._lock:
self._dirty = False
def SetHeaderValidation( self, network_context, key, approved ):
with self._lock:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
if key in custom_header_dict:
( value, old_approved, reason ) = custom_header_dict[ key ]
custom_header_dict[ key ] = ( value, approved, reason )
self._SetDirty()
def SetNetworkContextsToCustomHeaderDicts( self, network_contexts_to_custom_header_dicts ):
with self._lock:
self._network_contexts_to_custom_header_dicts = network_contexts_to_custom_header_dicts
self._SetDirty()
HydrusSerialisable.SERIALISABLE_TYPES_TO_OBJECT_TYPES[ HydrusSerialisable.SERIALISABLE_TYPE_NETWORK_DOMAIN_MANAGER ] = NetworkDomainManager
class DomainValidationPopupProcess( object ):
def __init__( self, domain_manager, header_tuples ):
self._domain_manager = domain_manager
self._header_tuples = header_tuples
self._is_done = False
def IsDone( self ):
return self._is_done
def Start( self ):
try:
results = []
for ( network_context, key, value, reason ) in self._header_tuples:
job_key = ClientThreading.JobKey()
# generate question
question = 'For the network context ' + network_context.ToUnicode() + ', can the client set this header?'
question += os.linesep * 2
question += key + ': ' + value
question += os.linesep * 2
question += reason
job_key.SetVariable( 'popup_yes_no_question', question )
HG.client_controller.pub( 'message', job_key )
result = job_key.GetIfHasVariable( 'popup_yes_no_answer' )
while result is None:
if HG.view_shutdown:
return
time.sleep( 0.25 )
result = job_key.GetIfHasVariable( 'popup_yes_no_answer' )
if result:
approved = VALID_APPROVED
else:
approved = VALID_DENIED
self._domain_manager.SetHeaderValidation( network_context, key, approved )
finally:
self._is_done = True
class URLMatch( HydrusSerialisable.SerialisableBaseNamed ):
SERIALISABLE_TYPE = HydrusSerialisable.SERIALISABLE_TYPE_URL_MATCH
SERIALISABLE_VERSION = 1
def __init__( self, name, preferred_scheme = 'https', netloc = 'hostname.com', subdomain_is_important = False, path_components = None, parameters = None, example_url = 'https://hostname.com/post/page.php?id=123456&s=view' ):
if path_components is None:
path_components = HydrusSerialisable.SerialisableList()
path_components.append( ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'post', example_string = 'post' ) )
path_components.append( ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'page.php', example_string = 'page.php' ) )
if parameters is None:
parameters = HydrusSerialisable.SerialisableDictionary()
parameters[ 's' ] = ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'view', example_string = 'view' )
parameters[ 'id' ] = ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FLEXIBLE, match_value = ClientParsing.NUMERIC, example_string = '123456' )
# an edit dialog panel for this that has example url and testing of current values
# a parent panel or something that lists all current urls in the db that match and how they will be clipped, is this ok? kind of thing.
HydrusSerialisable.SerialisableBaseNamed.__init__( self, name )
self._preferred_scheme = preferred_scheme
self._netloc = netloc
self._subdomain_is_important = subdomain_is_important
self._path_components = path_components
self._parameters = parameters
self._example_url = example_url
def _ClipNetLoc( self, netloc ):
if self._subdomain_is_important:
# for domains like artistname.website.com, where removing the subdomain may break the url, we leave it alone
pass
else:
# for domains like mediaserver4.website.com, where multiple subdomains serve the same content as the larger site
# if the main site doesn't deliver the same content as the subdomain, then subdomain_is_important
netloc = self._netloc
return netloc
def _GetSerialisableInfo( self ):
serialisable_path_components = self._path_components.GetSerialisableTuple()
serialisable_parameters = self._parameters.GetSerialisableTuple()
return ( self._preferred_scheme, self._netloc, self._subdomain_is_important, serialisable_path_components, serialisable_parameters, self._example_url )
def _InitialiseFromSerialisableInfo( self, serialisable_info ):
( self._preferred_scheme, self._netloc, self._subdomain_is_important, serialisable_path_components, serialisable_parameters, self._example_url ) = serialisable_info
self._path_components = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_path_components )
self._parameters = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_parameters )
def _ClipPath( self, path ):
# /post/show/1326143/akunim-anthro-armband-armwear-clothed-clothing-fem
while path.startswith( '/' ):
path = path[ 1 : ]
# post/show/1326143/akunim-anthro-armband-armwear-clothed-clothing-fem
path_components = path.split( '/' )
path = '/'.join( path_components[ : len( self._path_components ) ] )
# post/show/1326143
if len( path ) > 0:
path = '/' + path
# /post/show/1326143
return path
def _ClipQuery( self, query ):
valid_parameters = []
for ( key, value ) in urlparse.parse_qsl( query ):
if key in self._parameters:
valid_parameters.append( ( key, value ) )
valid_parameters.sort()
query = '&'.join( ( key + '=' + value for ( key, value ) in valid_parameters ) )
return query
def GetDomain( self ):
return ConvertURLIntoDomain( self._example_url )
def Normalise( self, url ):
p = urlparse.urlparse( url )
scheme = self._preferred_scheme
netloc = self._ClipNetLoc( p.netloc )
path = self._ClipPath( p.path )
params = ''
query = self._ClipQuery( p.query )
fragment = ''
r = urlparse.ParseResult( scheme, netloc, path, params, query, fragment )
return r.geturl()
def Test( self, url ):
# split the url into parts according to urlparse
p = urlparse.urlparse( url )
# test p.netloc with netloc, taking subdomain_is_important into account
url_path = p.path
while url_path.startswith( '/' ):
url_path = url_path[ 1 : ]
url_path_components = p.path.split( '/' )
if len( url_path_components ) < len( self._path_components ):
return ( False, p.path + ' did not have ' + str( len( self._path_components ) ) + ' components' )
for ( url_path_component, expected_path_component ) in zip( url_path_components, self._path_components ):
( bool_result, reason ) = expected_path_component.Test( url_path_component )
if not bool_result:
return ( bool_result, reason )
url_parameters_list = urlparse.parse_qsl( p.query )
if len( url_parameters_list ) < len( self._parameters ):
return ( False, p.query + ' did not have ' + str( len( self._parameters ) ) + ' value pairs' )
for ( key, url_value ) in url_parameters_list:
if key not in self._parameters:
return ( False, key + ' not found in ' + p.query )
expected_value = self._parameters[ key ]
( bool_result, reason ) = expected_value.Test( url_value )
if not bool_result:
return ( bool_result, reason )
return ( True, 'good' )
HydrusSerialisable.SERIALISABLE_TYPES_TO_OBJECT_TYPES[ HydrusSerialisable.SERIALISABLE_TYPE_URLS_IMPORT ] = URLMatch