hydrus/include/ClientNetworkingDomain.py

865 lines
28 KiB
Python
Raw Normal View History

2017-09-13 20:50:41 +00:00
import ClientConstants as CC
2017-09-27 21:52:54 +00:00
import ClientParsing
2017-10-04 17:51:58 +00:00
import ClientThreading
import collections
2017-09-13 20:50:41 +00:00
import HydrusConstants as HC
import HydrusGlobals as HG
import HydrusData
import HydrusExceptions
2017-09-27 21:52:54 +00:00
import HydrusSerialisable
2017-10-11 17:38:14 +00:00
import os
2017-10-25 21:45:15 +00:00
import re
2017-09-13 20:50:41 +00:00
import threading
2017-10-04 17:51:58 +00:00
import time
2017-09-13 20:50:41 +00:00
import urlparse
2017-10-04 17:51:58 +00:00
def ConvertDomainIntoAllApplicableDomains( domain ):
2017-10-25 21:45:15 +00:00
# is an ip address, possibly with a port
if re.search( '^[\d\.):]+$', domain ) is not None:
return [ domain ]
2017-10-04 17:51:58 +00:00
domains = []
while domain.count( '.' ) > 0:
2017-11-01 20:37:39 +00:00
# let's discard www.blah.com and www2.blah.com so we don't end up tracking it separately to blah.com--there's not much point!
2017-10-04 17:51:58 +00:00
startswith_www = domain.count( '.' ) > 1 and domain.startswith( 'www' )
if not startswith_www:
domains.append( domain )
domain = '.'.join( domain.split( '.' )[1:] ) # i.e. strip off the leftmost subdomain maps.google.com -> google.com
return domains
2017-11-01 20:37:39 +00:00
def ConvertDomainIntoSecondLevelDomain( domain ):
return ConvertDomainIntoAllApplicableDomains( domain )[-1]
2017-10-04 17:51:58 +00:00
def ConvertURLIntoDomain( url ):
parser_result = urlparse.urlparse( url )
domain = HydrusData.ToByteString( parser_result.netloc )
return domain
2017-11-01 20:37:39 +00:00
def GetCookie( cookies, search_domain, name ):
existing_domains = cookies.list_domains()
for existing_domain in existing_domains:
# blah.com is viewable by blah.com
matches_exactly = existing_domain == search_domain
# .blah.com is viewable by blah.com
matches_dot = existing_domain == '.' + search_domain
# .blah.com applies to subdomain.blah.com, blah.com does not
valid_subdomain = existing_domain.startwith( '.' ) and search_domain.endswith( existing_domain )
if matches_exactly or matches_dot or valid_subdomain:
cookie_dict = cookies.get_dict( existing_domain )
if name in cookie_dict:
return cookie_dict[ name ]
raise HydrusExceptions.DataMissing( 'Cookie ' + name + ' not found for domain ' + search_domain + '!' )
2017-10-04 17:51:58 +00:00
VALID_DENIED = 0
VALID_APPROVED = 1
VALID_UNKNOWN = 2
2017-10-11 17:38:14 +00:00
valid_str_lookup = {}
valid_str_lookup[ VALID_DENIED ] = 'denied'
valid_str_lookup[ VALID_APPROVED ] = 'approved'
valid_str_lookup[ VALID_UNKNOWN ] = 'unknown'
2017-10-04 17:51:58 +00:00
class NetworkDomainManager( HydrusSerialisable.SerialisableBase ):
2017-10-11 17:38:14 +00:00
SERIALISABLE_TYPE = HydrusSerialisable.SERIALISABLE_TYPE_NETWORK_DOMAIN_MANAGER
2017-11-29 21:48:23 +00:00
SERIALISABLE_NAME = 'Domain Manager'
2017-12-06 22:06:56 +00:00
SERIALISABLE_VERSION = 2
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
def __init__( self ):
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
HydrusSerialisable.SerialisableBase.__init__( self )
self.engine = None
self._url_matches = HydrusSerialisable.SerialisableList()
2017-10-11 17:38:14 +00:00
self._network_contexts_to_custom_header_dicts = collections.defaultdict( dict )
2017-10-04 17:51:58 +00:00
2017-12-06 22:06:56 +00:00
self._url_match_names_to_display = {}
self._url_match_names_to_page_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
self._url_match_names_to_gallery_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
2017-10-04 17:51:58 +00:00
self._domains_to_url_matches = collections.defaultdict( list )
self._dirty = False
2017-09-13 20:50:41 +00:00
self._lock = threading.Lock()
2017-10-04 17:51:58 +00:00
self._RecalcCache()
2017-09-13 20:50:41 +00:00
2017-10-11 17:38:14 +00:00
def _GetSerialisableInfo( self ):
serialisable_url_matches = self._url_matches.GetSerialisableTuple()
2017-12-06 22:06:56 +00:00
serialisable_url_match_names_to_display = self._url_match_names_to_display.items()
serialisable_url_match_names_to_page_parsing_keys = self._url_match_names_to_page_parsing_keys.GetSerialisableTuple()
serialisable_url_match_names_to_gallery_parsing_keys = self._url_match_names_to_gallery_parsing_keys.GetSerialisableTuple()
2017-10-11 17:38:14 +00:00
serialisable_network_contexts_to_custom_header_dicts = [ ( network_context.GetSerialisableTuple(), custom_header_dict.items() ) for ( network_context, custom_header_dict ) in self._network_contexts_to_custom_header_dicts.items() ]
2017-12-06 22:06:56 +00:00
return ( serialisable_url_matches, serialisable_url_match_names_to_display, serialisable_url_match_names_to_page_parsing_keys, serialisable_url_match_names_to_gallery_parsing_keys, serialisable_network_contexts_to_custom_header_dicts )
2017-10-11 17:38:14 +00:00
2017-09-27 21:52:54 +00:00
def _GetURLMatch( self, url ):
2017-12-06 22:06:56 +00:00
domain = ConvertDomainIntoSecondLevelDomain( ConvertURLIntoDomain( url ) )
2017-09-27 21:52:54 +00:00
if domain in self._domains_to_url_matches:
url_matches = self._domains_to_url_matches[ domain ]
for url_match in url_matches:
2017-11-22 21:03:07 +00:00
try:
url_match.Test( url )
2017-12-06 22:06:56 +00:00
return url_match
2017-09-27 21:52:54 +00:00
2017-11-22 21:03:07 +00:00
except HydrusExceptions.URLMatchException:
continue
2017-09-27 21:52:54 +00:00
return None
2017-10-11 17:38:14 +00:00
def _InitialiseFromSerialisableInfo( self, serialisable_info ):
2017-12-06 22:06:56 +00:00
( serialisable_url_matches, serialisable_url_match_names_to_display, serialisable_url_match_names_to_page_parsing_keys, serialisable_url_match_names_to_gallery_parsing_keys, serialisable_network_contexts_to_custom_header_dicts ) = serialisable_info
2017-10-11 17:38:14 +00:00
self._url_matches = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_url_matches )
2017-12-06 22:06:56 +00:00
self._url_match_names_to_display = dict( serialisable_url_match_names_to_display )
self._url_match_names_to_page_parsing_keys = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_url_match_names_to_page_parsing_keys )
self._url_match_names_to_gallery_parsing_keys = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_url_match_names_to_gallery_parsing_keys )
2017-10-11 17:38:14 +00:00
self._network_contexts_to_custom_header_dicts = collections.defaultdict( dict )
for ( serialisable_network_context, custom_header_dict_items ) in serialisable_network_contexts_to_custom_header_dicts:
network_context = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_network_context )
custom_header_dict = dict( custom_header_dict_items )
self._network_contexts_to_custom_header_dicts[ network_context ] = custom_header_dict
2017-10-04 17:51:58 +00:00
def _RecalcCache( self ):
self._domains_to_url_matches = collections.defaultdict( list )
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
for url_match in self._url_matches:
domain = url_match.GetDomain()
self._domains_to_url_matches[ domain ].append( url_match )
2017-09-13 20:50:41 +00:00
2017-12-06 22:06:56 +00:00
# we now sort them in descending complexity so that
# post url/manga subpage
# is before
# post url
def key( u_m ):
2018-01-10 22:41:51 +00:00
return u_m.GetExampleURL().count( '/' )
2017-12-06 22:06:56 +00:00
for url_matches in self._domains_to_url_matches.values():
url_matches.sort( key = key, reverse = True )
2017-10-04 17:51:58 +00:00
def _SetDirty( self ):
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
self._dirty = True
2017-09-13 20:50:41 +00:00
2017-12-06 22:06:56 +00:00
def _UpdateSerialisableInfo( self, version, old_serialisable_info ):
if version == 1:
( serialisable_url_matches, serialisable_network_contexts_to_custom_header_dicts ) = old_serialisable_info
url_matches = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_url_matches )
url_match_names_to_display = {}
url_match_names_to_page_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
url_match_names_to_gallery_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
for url_match in url_matches:
name = url_match.GetName()
if url_match.IsPostURL():
url_match_names_to_display[ name ] = True
url_match_names_to_page_parsing_keys[ name ] = None
2017-12-13 22:33:07 +00:00
if url_match.IsGalleryURL() or url_match.IsWatchableURL():
2017-12-06 22:06:56 +00:00
url_match_names_to_gallery_parsing_keys[ name ] = None
serialisable_url_match_names_to_display = url_match_names_to_display.items()
serialisable_url_match_names_to_page_parsing_keys = url_match_names_to_page_parsing_keys.GetSerialisableTuple()
serialisable_url_match_names_to_gallery_parsing_keys = url_match_names_to_gallery_parsing_keys.GetSerialisableTuple()
new_serialisable_info = ( serialisable_url_matches, serialisable_url_match_names_to_display, serialisable_url_match_names_to_page_parsing_keys, serialisable_url_match_names_to_gallery_parsing_keys, serialisable_network_contexts_to_custom_header_dicts )
return ( 2, new_serialisable_info )
def _UpdateURLMatchLinks( self ):
for url_match in self._url_matches:
name = url_match.GetName()
if url_match.IsPostURL():
if name not in self._url_match_names_to_display:
self._url_match_names_to_display[ name ] = True
if name not in self._url_match_names_to_page_parsing_keys:
self._url_match_names_to_page_parsing_keys[ name ] = None
2017-12-13 22:33:07 +00:00
if url_match.IsGalleryURL() or url_match.IsWatchableURL():
2017-12-06 22:06:56 +00:00
if name not in self._url_match_names_to_gallery_parsing_keys:
self._url_match_names_to_gallery_parsing_keys[ name ] = None
2017-10-04 17:51:58 +00:00
def CanValidateInPopup( self, network_contexts ):
2017-09-27 21:52:54 +00:00
2017-10-04 17:51:58 +00:00
# we can always do this for headers
2017-09-27 21:52:54 +00:00
2017-10-04 17:51:58 +00:00
return True
2017-09-27 21:52:54 +00:00
2017-12-06 22:06:56 +00:00
def ConvertURLsToMediaViewerTuples( self, urls ):
url_tuples = []
with self._lock:
for url in urls:
url_match = self._GetURLMatch( url )
if url_match is None:
domain = ConvertURLIntoDomain( url )
url_tuples.append( ( domain, url ) )
else:
name = url_match.GetName()
if url_match.IsPostURL() and name in self._url_match_names_to_display:
if self._url_match_names_to_display[ name ]:
url_tuples.append( ( name, url ) )
if len( url_tuples ) == 10:
break
url_tuples.sort()
return url_tuples
2017-10-11 17:38:14 +00:00
def GenerateValidationPopupProcess( self, network_contexts ):
2017-10-04 17:51:58 +00:00
2017-09-13 20:50:41 +00:00
with self._lock:
2017-10-11 17:38:14 +00:00
header_tuples = []
for network_context in network_contexts:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
for ( key, ( value, approved, reason ) ) in custom_header_dict.items():
if approved == VALID_UNKNOWN:
header_tuples.append( ( network_context, key, value, reason ) )
2017-09-27 21:52:54 +00:00
2017-10-11 17:38:14 +00:00
process = DomainValidationPopupProcess( self, header_tuples )
return process
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
2017-09-27 21:52:54 +00:00
def GetDownloader( self, url ):
with self._lock:
# this might be better as getdownloaderkey, but we'll see how it shakes out
# might also be worth being a getifhasdownloader
# match the url to a url_match, then lookup that in a 'this downloader can handle this url_match type' dict that we'll manage
pass
2017-10-11 17:38:14 +00:00
def GetHeaders( self, network_contexts ):
with self._lock:
headers = {}
for network_context in network_contexts:
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
for ( key, ( value, approved, reason ) ) in custom_header_dict.items():
if approved == VALID_APPROVED:
headers[ key ] = value
return headers
def GetNetworkContextsToCustomHeaderDicts( self ):
with self._lock:
return dict( self._network_contexts_to_custom_header_dicts )
2017-11-29 21:48:23 +00:00
def GetURLMatches( self ):
with self._lock:
return list( self._url_matches )
2017-12-06 22:06:56 +00:00
def GetURLMatchLinks( self ):
with self._lock:
return ( dict( self._url_match_names_to_display ), dict( self._url_match_names_to_page_parsing_keys ), dict( self._url_match_names_to_gallery_parsing_keys ) )
def Initialise( self ):
self._RecalcCache()
2017-10-11 17:38:14 +00:00
def IsDirty( self ):
with self._lock:
return self._dirty
2017-10-04 17:51:58 +00:00
def IsValid( self, network_contexts ):
2017-09-27 21:52:54 +00:00
2017-10-04 17:51:58 +00:00
# for now, let's say that denied headers are simply not added, not that they invalidate a query
2017-09-27 21:52:54 +00:00
2017-10-04 17:51:58 +00:00
for network_context in network_contexts:
2017-10-11 17:38:14 +00:00
if network_context in self._network_contexts_to_custom_header_dicts:
2017-10-04 17:51:58 +00:00
2017-10-11 17:38:14 +00:00
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
2017-10-04 17:51:58 +00:00
2017-10-11 17:38:14 +00:00
for ( value, approved, reason ) in custom_header_dict.values():
2017-10-04 17:51:58 +00:00
if approved == VALID_UNKNOWN:
return False
return True
2017-09-27 21:52:54 +00:00
def NormaliseURL( self, url ):
with self._lock:
url_match = self._GetURLMatch( url )
if url_match is None:
2017-09-13 20:50:41 +00:00
2017-09-27 21:52:54 +00:00
return url
2017-09-13 20:50:41 +00:00
2017-09-27 21:52:54 +00:00
normalised_url = url_match.Normalise( url )
return normalised_url
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
def SetClean( self ):
2017-09-13 20:50:41 +00:00
with self._lock:
2017-10-04 17:51:58 +00:00
self._dirty = False
def SetHeaderValidation( self, network_context, key, approved ):
with self._lock:
2017-10-11 17:38:14 +00:00
if network_context in self._network_contexts_to_custom_header_dicts:
custom_header_dict = self._network_contexts_to_custom_header_dicts[ network_context ]
if key in custom_header_dict:
( value, old_approved, reason ) = custom_header_dict[ key ]
custom_header_dict[ key ] = ( value, approved, reason )
self._SetDirty()
2017-10-04 17:51:58 +00:00
2017-10-11 17:38:14 +00:00
def SetNetworkContextsToCustomHeaderDicts( self, network_contexts_to_custom_header_dicts ):
with self._lock:
self._network_contexts_to_custom_header_dicts = network_contexts_to_custom_header_dicts
2017-10-04 17:51:58 +00:00
2017-10-11 17:38:14 +00:00
self._SetDirty()
2017-10-04 17:51:58 +00:00
2017-11-29 21:48:23 +00:00
def SetURLMatches( self, url_matches ):
with self._lock:
self._url_matches = HydrusSerialisable.SerialisableList()
self._url_matches.extend( url_matches )
2017-12-06 22:06:56 +00:00
self._UpdateURLMatchLinks()
self._RecalcCache()
self._SetDirty()
def SetURLMatchLinks( self, url_match_names_to_display, url_match_names_to_page_parsing_keys, url_match_names_to_gallery_parsing_keys ):
with self._lock:
self._url_match_names_to_display = {}
self._url_match_names_to_page_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
self._url_match_names_to_gallery_parsing_keys = HydrusSerialisable.SerialisableBytesDictionary()
self._url_match_names_to_display.update( url_match_names_to_display )
self._url_match_names_to_page_parsing_keys.update( url_match_names_to_page_parsing_keys )
self._url_match_names_to_gallery_parsing_keys.update( url_match_names_to_gallery_parsing_keys )
2017-11-29 21:48:23 +00:00
self._SetDirty()
2017-10-04 17:51:58 +00:00
HydrusSerialisable.SERIALISABLE_TYPES_TO_OBJECT_TYPES[ HydrusSerialisable.SERIALISABLE_TYPE_NETWORK_DOMAIN_MANAGER ] = NetworkDomainManager
2017-10-11 17:38:14 +00:00
class DomainValidationPopupProcess( object ):
2017-10-04 17:51:58 +00:00
def __init__( self, domain_manager, header_tuples ):
self._domain_manager = domain_manager
self._header_tuples = header_tuples
self._is_done = False
def IsDone( self ):
return self._is_done
def Start( self ):
try:
results = []
2017-10-11 17:38:14 +00:00
for ( network_context, key, value, reason ) in self._header_tuples:
2017-10-04 17:51:58 +00:00
job_key = ClientThreading.JobKey()
# generate question
2017-10-11 17:38:14 +00:00
question = 'For the network context ' + network_context.ToUnicode() + ', can the client set this header?'
question += os.linesep * 2
question += key + ': ' + value
question += os.linesep * 2
question += reason
2017-10-04 17:51:58 +00:00
job_key.SetVariable( 'popup_yes_no_question', question )
2017-10-11 17:38:14 +00:00
HG.client_controller.pub( 'message', job_key )
2017-10-04 17:51:58 +00:00
result = job_key.GetIfHasVariable( 'popup_yes_no_answer' )
while result is None:
if HG.view_shutdown:
return
time.sleep( 0.25 )
2017-10-11 17:38:14 +00:00
result = job_key.GetIfHasVariable( 'popup_yes_no_answer' )
2017-10-04 17:51:58 +00:00
if result:
approved = VALID_APPROVED
else:
approved = VALID_DENIED
self._domain_manager.SetHeaderValidation( network_context, key, approved )
finally:
self._is_done = True
2017-09-13 20:50:41 +00:00
2017-09-27 21:52:54 +00:00
class URLMatch( HydrusSerialisable.SerialisableBaseNamed ):
2017-09-13 20:50:41 +00:00
2017-09-27 21:52:54 +00:00
SERIALISABLE_TYPE = HydrusSerialisable.SERIALISABLE_TYPE_URL_MATCH
2017-11-29 21:48:23 +00:00
SERIALISABLE_NAME = 'URL Match'
2017-09-27 21:52:54 +00:00
SERIALISABLE_VERSION = 1
2017-11-29 21:48:23 +00:00
def __init__( self, name, url_type = None, preferred_scheme = 'https', netloc = 'hostname.com', allow_subdomains = False, keep_subdomains = False, path_components = None, parameters = None, example_url = 'https://hostname.com/post/page.php?id=123456&s=view' ):
if url_type is None:
url_type = HC.URL_TYPE_POST
2017-09-27 21:52:54 +00:00
if path_components is None:
path_components = HydrusSerialisable.SerialisableList()
path_components.append( ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'post', example_string = 'post' ) )
path_components.append( ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'page.php', example_string = 'page.php' ) )
if parameters is None:
parameters = HydrusSerialisable.SerialisableDictionary()
parameters[ 's' ] = ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FIXED, match_value = 'view', example_string = 'view' )
parameters[ 'id' ] = ClientParsing.StringMatch( match_type = ClientParsing.STRING_MATCH_FLEXIBLE, match_value = ClientParsing.NUMERIC, example_string = '123456' )
2017-09-13 20:50:41 +00:00
2017-11-29 21:48:23 +00:00
# if the args are not serialisable stuff, lets overwrite here
path_components = HydrusSerialisable.SerialisableList( path_components )
parameters = HydrusSerialisable.SerialisableDictionary( parameters )
2017-09-13 20:50:41 +00:00
2017-09-27 21:52:54 +00:00
HydrusSerialisable.SerialisableBaseNamed.__init__( self, name )
2017-11-29 21:48:23 +00:00
self._url_type = url_type
2017-10-04 17:51:58 +00:00
self._preferred_scheme = preferred_scheme
self._netloc = netloc
2017-11-29 21:48:23 +00:00
self._allow_subdomains = allow_subdomains
self._keep_subdomains = keep_subdomains
2017-10-04 17:51:58 +00:00
self._path_components = path_components
self._parameters = parameters
2017-09-13 20:50:41 +00:00
2017-10-04 17:51:58 +00:00
self._example_url = example_url
2017-09-13 20:50:41 +00:00
def _ClipNetLoc( self, netloc ):
2017-11-29 21:48:23 +00:00
if self._keep_subdomains:
2017-09-13 20:50:41 +00:00
# for domains like artistname.website.com, where removing the subdomain may break the url, we leave it alone
pass
else:
# for domains like mediaserver4.website.com, where multiple subdomains serve the same content as the larger site
netloc = self._netloc
return netloc
2017-09-27 21:52:54 +00:00
def _GetSerialisableInfo( self ):
serialisable_path_components = self._path_components.GetSerialisableTuple()
serialisable_parameters = self._parameters.GetSerialisableTuple()
2017-11-29 21:48:23 +00:00
return ( self._url_type, self._preferred_scheme, self._netloc, self._allow_subdomains, self._keep_subdomains, serialisable_path_components, serialisable_parameters, self._example_url )
2017-09-27 21:52:54 +00:00
def _InitialiseFromSerialisableInfo( self, serialisable_info ):
2017-11-29 21:48:23 +00:00
( self._url_type, self._preferred_scheme, self._netloc, self._allow_subdomains, self._keep_subdomains, serialisable_path_components, serialisable_parameters, self._example_url ) = serialisable_info
2017-09-27 21:52:54 +00:00
self._path_components = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_path_components )
self._parameters = HydrusSerialisable.CreateFromSerialisableTuple( serialisable_parameters )
2017-09-13 20:50:41 +00:00
def _ClipPath( self, path ):
# /post/show/1326143/akunim-anthro-armband-armwear-clothed-clothing-fem
while path.startswith( '/' ):
path = path[ 1 : ]
# post/show/1326143/akunim-anthro-armband-armwear-clothed-clothing-fem
path_components = path.split( '/' )
path = '/'.join( path_components[ : len( self._path_components ) ] )
# post/show/1326143
if len( path ) > 0:
path = '/' + path
# /post/show/1326143
return path
def _ClipQuery( self, query ):
valid_parameters = []
for ( key, value ) in urlparse.parse_qsl( query ):
if key in self._parameters:
valid_parameters.append( ( key, value ) )
valid_parameters.sort()
query = '&'.join( ( key + '=' + value for ( key, value ) in valid_parameters ) )
return query
2017-10-04 17:51:58 +00:00
def GetDomain( self ):
2017-12-06 22:06:56 +00:00
return ConvertDomainIntoSecondLevelDomain( HydrusData.ToByteString( self._netloc ) )
2017-10-04 17:51:58 +00:00
2017-11-29 21:48:23 +00:00
def GetExampleURL( self ):
return self._example_url
def GetURLType( self ):
return self._url_type
2017-12-06 22:06:56 +00:00
def IsGalleryURL( self ):
return self._url_type == HC.URL_TYPE_GALLERY
def IsPostURL( self ):
return self._url_type == HC.URL_TYPE_POST
2017-12-13 22:33:07 +00:00
def IsWatchableURL( self ):
return self._url_type == HC.URL_TYPE_WATCHABLE
2017-09-27 21:52:54 +00:00
def Normalise( self, url ):
2017-09-13 20:50:41 +00:00
p = urlparse.urlparse( url )
scheme = self._preferred_scheme
netloc = self._ClipNetLoc( p.netloc )
path = self._ClipPath( p.path )
params = ''
query = self._ClipQuery( p.query )
fragment = ''
r = urlparse.ParseResult( scheme, netloc, path, params, query, fragment )
return r.geturl()
def Test( self, url ):
p = urlparse.urlparse( url )
2017-11-29 21:48:23 +00:00
if self._allow_subdomains:
if p.netloc != self._netloc and not p.netloc.endswith( '.' + self._netloc ):
raise HydrusExceptions.URLMatchException( p.netloc + ' (potentially excluding subdomains) did not match ' + self._netloc )
else:
if p.netloc != self._netloc:
raise HydrusExceptions.URLMatchException( p.netloc + ' did not match ' + self._netloc )
2017-09-13 20:50:41 +00:00
url_path = p.path
while url_path.startswith( '/' ):
url_path = url_path[ 1 : ]
2017-11-29 21:48:23 +00:00
url_path_components = url_path.split( '/' )
2017-09-13 20:50:41 +00:00
if len( url_path_components ) < len( self._path_components ):
2017-11-29 21:48:23 +00:00
raise HydrusExceptions.URLMatchException( url_path + ' did not have ' + str( len( self._path_components ) ) + ' components' )
2017-09-13 20:50:41 +00:00
for ( url_path_component, expected_path_component ) in zip( url_path_components, self._path_components ):
2017-11-22 21:03:07 +00:00
try:
expected_path_component.Test( url_path_component )
2017-09-13 20:50:41 +00:00
2017-11-22 21:03:07 +00:00
except HydrusExceptions.StringMatchException as e:
raise HydrusExceptions.URLMatchException( unicode( e ) )
2017-09-13 20:50:41 +00:00
url_parameters_list = urlparse.parse_qsl( p.query )
2017-11-29 21:48:23 +00:00
url_parameters = dict( url_parameters_list )
if len( url_parameters ) < len( self._parameters ):
2017-09-13 20:50:41 +00:00
2017-11-22 21:03:07 +00:00
raise HydrusExceptions.URLMatchException( p.query + ' did not have ' + str( len( self._parameters ) ) + ' value pairs' )
2017-09-13 20:50:41 +00:00
2017-11-29 21:48:23 +00:00
for ( key, string_match ) in self._parameters.items():
2017-09-13 20:50:41 +00:00
2017-11-29 21:48:23 +00:00
if key not in url_parameters:
2017-09-13 20:50:41 +00:00
2017-11-22 21:03:07 +00:00
raise HydrusExceptions.URLMatchException( key + ' not found in ' + p.query )
2017-09-13 20:50:41 +00:00
2017-11-29 21:48:23 +00:00
value = url_parameters[ key ]
2017-09-13 20:50:41 +00:00
2017-11-22 21:03:07 +00:00
try:
2017-11-29 21:48:23 +00:00
string_match.Test( value )
2017-09-13 20:50:41 +00:00
2017-11-22 21:03:07 +00:00
except HydrusExceptions.StringMatchException as e:
raise HydrusExceptions.URLMatchException( unicode( e ) )
2017-09-13 20:50:41 +00:00
2017-11-29 21:48:23 +00:00
def ToTuple( self ):
return ( self._url_type, self._preferred_scheme, self._netloc, self._allow_subdomains, self._keep_subdomains, self._path_components, self._parameters, self._example_url )
HydrusSerialisable.SERIALISABLE_TYPES_TO_OBJECT_TYPES[ HydrusSerialisable.SERIALISABLE_TYPE_URL_MATCH ] = URLMatch
2017-09-27 21:52:54 +00:00