1031 lines
27 KiB
Python
1031 lines
27 KiB
Python
import collections
|
|
import functools
|
|
import os
|
|
import typing
|
|
|
|
import psutil
|
|
import re
|
|
import send2trash
|
|
import shlex
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import threading
|
|
import traceback
|
|
|
|
from hydrus.core import HydrusConstants as HC
|
|
from hydrus.core import HydrusData
|
|
from hydrus.core import HydrusGlobals as HG
|
|
from hydrus.core import HydrusThreading
|
|
from hydrus.core import HydrusTime
|
|
|
|
mimes_to_default_thumbnail_paths = collections.defaultdict( lambda: os.path.join( HC.STATIC_DIR, 'hydrus.png' ) )
|
|
|
|
mimes_to_default_thumbnail_paths[ HC.APPLICATION_PDF ] = os.path.join( HC.STATIC_DIR, 'pdf.png' )
|
|
mimes_to_default_thumbnail_paths[ HC.APPLICATION_PSD ] = os.path.join( HC.STATIC_DIR, 'psd.png' )
|
|
mimes_to_default_thumbnail_paths[ HC.APPLICATION_CLIP ] = os.path.join( HC.STATIC_DIR, 'clip.png' )
|
|
mimes_to_default_thumbnail_paths[ HC.APPLICATION_SAI2 ] = os.path.join( HC.STATIC_DIR, 'sai.png' )
|
|
mimes_to_default_thumbnail_paths[ HC.APPLICATION_KRITA ] = os.path.join( HC.STATIC_DIR, 'krita.png' )
|
|
mimes_to_default_thumbnail_paths[ HC.IMAGE_SVG ] = os.path.join( HC.STATIC_DIR, 'svg.png' )
|
|
|
|
for mime in HC.AUDIO:
|
|
|
|
path = os.path.join( HC.STATIC_DIR, 'audio.png' )
|
|
|
|
mimes_to_default_thumbnail_paths[ mime ] = os.path.join( path )
|
|
|
|
|
|
for mime in HC.VIDEO:
|
|
|
|
path = os.path.join( HC.STATIC_DIR, 'video.png' )
|
|
|
|
mimes_to_default_thumbnail_paths[ mime ] = os.path.join( path )
|
|
|
|
|
|
for mime in HC.ARCHIVES:
|
|
|
|
path = os.path.join( HC.STATIC_DIR, 'zip.png' )
|
|
|
|
mimes_to_default_thumbnail_paths[ mime ] = os.path.join( path )
|
|
|
|
|
|
def AppendPathUntilNoConflicts( path ):
|
|
|
|
( path_absent_ext, ext ) = os.path.splitext( path )
|
|
|
|
good_path_absent_ext = path_absent_ext
|
|
|
|
i = 0
|
|
|
|
while os.path.exists( good_path_absent_ext + ext ):
|
|
|
|
good_path_absent_ext = path_absent_ext + '_' + str( i )
|
|
|
|
i += 1
|
|
|
|
|
|
return good_path_absent_ext + ext
|
|
|
|
def ConvertAbsPathToPortablePath( abs_path, base_dir_override = None ):
|
|
|
|
try:
|
|
|
|
if base_dir_override is None:
|
|
|
|
base_dir = HG.controller.GetDBDir()
|
|
|
|
else:
|
|
|
|
base_dir = base_dir_override
|
|
|
|
|
|
portable_path = os.path.relpath( abs_path, base_dir )
|
|
|
|
if portable_path.startswith( '..' ):
|
|
|
|
portable_path = abs_path
|
|
|
|
|
|
except:
|
|
|
|
portable_path = abs_path
|
|
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
portable_path = portable_path.replace( '\\', '/' ) # store seps as /, to maintain multiplatform uniformity
|
|
|
|
|
|
return portable_path
|
|
|
|
def ConvertPortablePathToAbsPath( portable_path, base_dir_override = None ):
|
|
|
|
portable_path = os.path.normpath( portable_path ) # collapses .. stuff and converts / to \\ for windows only
|
|
|
|
if os.path.isabs( portable_path ):
|
|
|
|
abs_path = portable_path
|
|
|
|
else:
|
|
|
|
if base_dir_override is None:
|
|
|
|
base_dir = HG.controller.GetDBDir()
|
|
|
|
else:
|
|
|
|
base_dir = base_dir_override
|
|
|
|
|
|
abs_path = os.path.normpath( os.path.join( base_dir, portable_path ) )
|
|
|
|
|
|
if not HC.PLATFORM_WINDOWS and not os.path.exists( abs_path ):
|
|
|
|
abs_path = abs_path.replace( '\\', '/' )
|
|
|
|
|
|
return abs_path
|
|
|
|
def CopyAndMergeTree( source, dest ):
|
|
|
|
pauser = HydrusThreading.BigJobPauser()
|
|
|
|
MakeSureDirectoryExists( dest )
|
|
|
|
num_errors = 0
|
|
|
|
for ( root, dirnames, filenames ) in os.walk( source ):
|
|
|
|
dest_root = root.replace( source, dest )
|
|
|
|
for dirname in dirnames:
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, dirname )
|
|
dest_path = os.path.join( dest_root, dirname )
|
|
|
|
MakeSureDirectoryExists( dest_path )
|
|
|
|
shutil.copystat( source_path, dest_path )
|
|
|
|
|
|
for filename in filenames:
|
|
|
|
if num_errors > 5:
|
|
|
|
raise Exception( 'Too many errors, directory copy abandoned.' )
|
|
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, filename )
|
|
dest_path = os.path.join( dest_root, filename )
|
|
|
|
ok = MirrorFile( source_path, dest_path )
|
|
|
|
if not ok:
|
|
|
|
num_errors += 1
|
|
|
|
|
|
|
|
|
|
|
|
def CopyFileLikeToFileLike( f_source, f_dest ):
|
|
|
|
for block in ReadFileLikeAsBlocks( f_source ): f_dest.write( block )
|
|
|
|
|
|
def DeletePath( path ) -> bool:
|
|
|
|
if HG.file_report_mode:
|
|
|
|
HydrusData.ShowText( 'Deleting {}'.format( path ) )
|
|
|
|
HydrusData.ShowText( ''.join( traceback.format_stack() ) )
|
|
|
|
|
|
if os.path.lexists( path ):
|
|
|
|
TryToMakeFileWriteable( path )
|
|
|
|
try:
|
|
|
|
if os.path.isdir( path ):
|
|
|
|
shutil.rmtree( path )
|
|
|
|
else:
|
|
|
|
os.remove( path )
|
|
|
|
|
|
except Exception as e:
|
|
|
|
if 'Error 32' in str( e ):
|
|
|
|
# file in use by another process
|
|
|
|
HydrusData.DebugPrint( 'Trying to delete ' + path + ' failed because it was in use by another process.' )
|
|
|
|
else:
|
|
|
|
HydrusData.ShowText( 'Trying to delete ' + path + ' caused the following error:' )
|
|
HydrusData.ShowException( e )
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
def DirectoryIsWriteable( path ):
|
|
|
|
while not os.path.exists( path ):
|
|
|
|
try:
|
|
|
|
path = os.path.dirname( path )
|
|
|
|
except:
|
|
|
|
return False
|
|
|
|
|
|
|
|
if not os.access( path, os.W_OK | os.X_OK ):
|
|
|
|
return False
|
|
|
|
|
|
# we'll actually do a file, since Program Files passes the above test lmaoooo
|
|
|
|
try:
|
|
|
|
# also, using tempfile.TemporaryFile actually loops on PermissionError from Windows lmaaaooooo, thinking this is an already existing file
|
|
# so, just do it manually!
|
|
|
|
test_path = os.path.join( path, 'hydrus_permission_test' )
|
|
|
|
with open( test_path, 'wb' ) as f:
|
|
|
|
f.write( b'If this file still exists, this directory can be written to but not deleted from.' )
|
|
|
|
|
|
os.unlink( test_path )
|
|
|
|
except:
|
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
def FileisWriteable( path: str ):
|
|
|
|
return os.access( path, os.W_OK )
|
|
|
|
def FilterFreePaths( paths ):
|
|
|
|
free_paths = []
|
|
|
|
for path in paths:
|
|
|
|
HydrusThreading.CheckIfThreadShuttingDown()
|
|
|
|
if PathIsFree( path ):
|
|
|
|
free_paths.append( path )
|
|
|
|
|
|
|
|
return free_paths
|
|
|
|
def GetDefaultLaunchPath():
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
return 'windows is called directly'
|
|
|
|
elif HC.PLATFORM_MACOS:
|
|
|
|
return 'open "%path%"'
|
|
|
|
elif HC.PLATFORM_LINUX:
|
|
|
|
return 'xdg-open "%path%"'
|
|
|
|
elif HC.PLATFORM_HAIKU:
|
|
|
|
return 'open "%path%"'
|
|
|
|
|
|
|
|
@functools.lru_cache( maxsize = 128 )
|
|
def GetPartitionInfo( path ) -> typing.Optional[ typing.NamedTuple ]:
|
|
|
|
path = path.lower()
|
|
|
|
try:
|
|
|
|
for scan_network in ( False, True ):
|
|
|
|
partition_infos = psutil.disk_partitions( all = scan_network )
|
|
|
|
def sort_descending_mountpoint( partition_info ): # i.e. put '/home' before '/'
|
|
|
|
return - len( partition_info.mountpoint )
|
|
|
|
|
|
partition_infos.sort( key = sort_descending_mountpoint )
|
|
|
|
for partition_info in partition_infos:
|
|
|
|
if path.startswith( partition_info.mountpoint.lower() ):
|
|
|
|
return partition_info
|
|
|
|
|
|
|
|
|
|
except UnicodeDecodeError: # wew lad psutil on some russian lad's fun filesystem
|
|
|
|
return None
|
|
|
|
|
|
return None
|
|
|
|
|
|
def GetDevice( path ) -> typing.Optional[ str ]:
|
|
|
|
partition_info = GetPartitionInfo( path )
|
|
|
|
if partition_info is None:
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
return partition_info.device
|
|
|
|
|
|
|
|
def GetFileSystemType( path: str ) -> typing.Optional[ str ]:
|
|
|
|
partition_info = GetPartitionInfo( path )
|
|
|
|
if partition_info is None:
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
return partition_info.fstype
|
|
|
|
|
|
|
|
def GetFreeSpace( path ):
|
|
|
|
disk_usage = psutil.disk_usage( path )
|
|
|
|
return disk_usage.free
|
|
|
|
|
|
def GetTotalSpace( path ):
|
|
|
|
disk_usage = psutil.disk_usage( path )
|
|
|
|
return disk_usage.total
|
|
|
|
|
|
def LaunchDirectory( path ):
|
|
|
|
def do_it():
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
os.startfile( path )
|
|
|
|
else:
|
|
|
|
if HC.PLATFORM_MACOS:
|
|
|
|
cmd = [ 'open', path ]
|
|
|
|
elif HC.PLATFORM_LINUX:
|
|
|
|
cmd = [ 'xdg-open', path ]
|
|
|
|
elif HC.PLATFORM_HAIKU:
|
|
|
|
cmd = [ 'open', path ]
|
|
|
|
|
|
# setsid call un-childs this new process
|
|
|
|
sbp_kwargs = HydrusData.GetSubprocessKWArgs()
|
|
|
|
preexec_fn = getattr( os, 'setsid', None )
|
|
|
|
HydrusData.CheckProgramIsNotShuttingDown()
|
|
|
|
process = subprocess.Popen( cmd, preexec_fn = preexec_fn, **sbp_kwargs )
|
|
|
|
HydrusThreading.SubprocessCommunicate( process )
|
|
|
|
|
|
|
|
thread = threading.Thread( target = do_it )
|
|
|
|
thread.daemon = True
|
|
|
|
thread.start()
|
|
|
|
def LaunchFile( path, launch_path = None ):
|
|
|
|
def do_it( launch_path ):
|
|
|
|
if HC.PLATFORM_WINDOWS and launch_path is None:
|
|
|
|
os.startfile( path )
|
|
|
|
else:
|
|
|
|
if launch_path is None:
|
|
|
|
launch_path = GetDefaultLaunchPath()
|
|
|
|
|
|
complete_launch_path = launch_path.replace( '%path%', path )
|
|
|
|
hide_terminal = False
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
cmd = complete_launch_path
|
|
|
|
preexec_fn = None
|
|
|
|
else:
|
|
|
|
cmd = shlex.split( complete_launch_path )
|
|
|
|
preexec_fn = getattr( os, 'setsid', None )
|
|
|
|
|
|
if HG.subprocess_report_mode:
|
|
|
|
message = 'Attempting to launch ' + path + ' using command ' + repr( cmd ) + '.'
|
|
|
|
HydrusData.ShowText( message )
|
|
|
|
|
|
try:
|
|
|
|
sbp_kwargs = HydrusData.GetSubprocessKWArgs( hide_terminal = hide_terminal, text = True )
|
|
|
|
HydrusData.CheckProgramIsNotShuttingDown()
|
|
|
|
process = subprocess.Popen( cmd, preexec_fn = preexec_fn, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, **sbp_kwargs )
|
|
|
|
( stdout, stderr ) = HydrusThreading.SubprocessCommunicate( process )
|
|
|
|
if HG.subprocess_report_mode:
|
|
|
|
if stdout is None and stderr is None:
|
|
|
|
HydrusData.ShowText( 'No stdout or stderr came back.' )
|
|
|
|
|
|
if stdout is not None:
|
|
|
|
HydrusData.ShowText( 'stdout: ' + repr( stdout ) )
|
|
|
|
|
|
if stderr is not None:
|
|
|
|
HydrusData.ShowText( 'stderr: ' + repr( stderr ) )
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
HydrusData.ShowText( 'Could not launch a file! Command used was:' + os.linesep + str( cmd ) )
|
|
|
|
HydrusData.ShowException( e )
|
|
|
|
|
|
|
|
|
|
thread = threading.Thread( target = do_it, args = ( launch_path, ) )
|
|
|
|
thread.daemon = True
|
|
|
|
thread.start()
|
|
|
|
def MakeSureDirectoryExists( path ):
|
|
|
|
it_exists_already = os.path.exists( path )
|
|
|
|
if it_exists_already:
|
|
|
|
if os.path.isdir( path ):
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
raise Exception( 'Sorry, the desired directory "{}" already exists as a normal file!'.format( path ) )
|
|
|
|
|
|
|
|
os.makedirs( path, exist_ok = True )
|
|
|
|
|
|
def FileModifiedTimeIsOk( mtime: int ):
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
# this is 1980-01-01 UTC, before which Windows can have trouble copying lmaoooooo
|
|
# This is the 'DOS' epoch
|
|
if mtime < 315532800:
|
|
|
|
return False
|
|
|
|
|
|
else:
|
|
|
|
# Epoch obviously
|
|
if mtime < 0:
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
def safe_copy2( source, dest ):
|
|
|
|
copy_metadata = True
|
|
|
|
mtime = os.path.getmtime( source )
|
|
|
|
if FileModifiedTimeIsOk( mtime ):
|
|
|
|
# this overwrites on conflict without hassle
|
|
shutil.copy2( source, dest )
|
|
|
|
else:
|
|
|
|
shutil.copy( source, dest )
|
|
|
|
|
|
def MergeFile( source, dest ):
|
|
|
|
# this can merge a file, but if it is given a dir it will just straight up overwrite not merge
|
|
|
|
if os.path.exists( source ) and os.path.exists( dest ) and os.path.samefile( source, dest ):
|
|
|
|
raise Exception( f'Woah, "{source}" and "{dest}" are the same file!' )
|
|
|
|
|
|
if not os.path.isdir( source ):
|
|
|
|
if PathsHaveSameSizeAndDate( source, dest ):
|
|
|
|
DeletePath( source )
|
|
|
|
return True
|
|
|
|
|
|
|
|
try:
|
|
|
|
# this overwrites on conflict without hassle
|
|
shutil.move( source, dest, copy_function = safe_copy2 )
|
|
|
|
except Exception as e:
|
|
|
|
HydrusData.ShowText( 'Trying to move ' + source + ' to ' + dest + ' caused the following problem:' )
|
|
|
|
HydrusData.ShowException( e )
|
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def MergeTree( source, dest, text_update_hook = None ):
|
|
|
|
if os.path.exists( source ) and os.path.exists( dest ) and os.path.samefile( source, dest ):
|
|
|
|
raise Exception( f'Woah, "{source}" and "{dest}" are the same directory!' )
|
|
|
|
|
|
pauser = HydrusThreading.BigJobPauser()
|
|
|
|
if not os.path.exists( dest ):
|
|
|
|
try:
|
|
|
|
shutil.move( source, dest, copy_function = safe_copy2 )
|
|
|
|
except OSError:
|
|
|
|
# if there were read only files in source and this was partition to partition, the copy2 goes ok but the subsequent source unlink fails
|
|
# so, if it seems this has happened, let's just try a walking mergetree, which should be able to deal with these readonlies on a file-by-file basis
|
|
if os.path.exists( dest ):
|
|
|
|
MergeTree( source, dest, text_update_hook = text_update_hook )
|
|
|
|
|
|
|
|
else:
|
|
|
|
# I had a thing here that tried to optimise if dest existed but was empty, but it wasn't neat
|
|
|
|
num_errors = 0
|
|
|
|
for ( root, dirnames, filenames ) in os.walk( source ):
|
|
|
|
if text_update_hook is not None:
|
|
|
|
text_update_hook( 'Copying ' + root + '.' )
|
|
|
|
|
|
dest_root = root.replace( source, dest )
|
|
|
|
for dirname in dirnames:
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, dirname )
|
|
dest_path = os.path.join( dest_root, dirname )
|
|
|
|
MakeSureDirectoryExists( dest_path )
|
|
|
|
shutil.copystat( source_path, dest_path )
|
|
|
|
|
|
for filename in filenames:
|
|
|
|
if num_errors > 5:
|
|
|
|
raise Exception( 'Too many errors, directory move abandoned.' )
|
|
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, filename )
|
|
dest_path = os.path.join( dest_root, filename )
|
|
|
|
ok = MergeFile( source_path, dest_path )
|
|
|
|
if not ok:
|
|
|
|
num_errors += 1
|
|
|
|
|
|
|
|
|
|
if num_errors == 0:
|
|
|
|
DeletePath( source )
|
|
|
|
|
|
|
|
|
|
def MirrorFile( source, dest ):
|
|
|
|
if os.path.exists( source ) and os.path.exists( dest ) and os.path.samefile( source, dest ):
|
|
|
|
return True
|
|
|
|
|
|
if not PathsHaveSameSizeAndDate( source, dest ):
|
|
|
|
try:
|
|
|
|
TryToMakeFileWriteable( dest )
|
|
|
|
safe_copy2( source, dest )
|
|
|
|
except Exception as e:
|
|
|
|
HydrusData.ShowText( 'Trying to copy ' + source + ' to ' + dest + ' caused the following problem:' )
|
|
|
|
HydrusData.ShowException( e )
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
def MirrorTree( source, dest, text_update_hook = None, is_cancelled_hook = None ):
|
|
|
|
if os.path.exists( source ) and os.path.exists( dest ) and os.path.samefile( source, dest ):
|
|
|
|
return
|
|
|
|
|
|
pauser = HydrusThreading.BigJobPauser()
|
|
|
|
MakeSureDirectoryExists( dest )
|
|
|
|
num_errors = 0
|
|
|
|
for ( root, dirnames, filenames ) in os.walk( source ):
|
|
|
|
if is_cancelled_hook is not None and is_cancelled_hook():
|
|
|
|
return
|
|
|
|
|
|
if text_update_hook is not None:
|
|
|
|
text_update_hook( 'Copying ' + root + '.' )
|
|
|
|
|
|
dest_root = root.replace( source, dest )
|
|
|
|
surplus_dest_paths = { os.path.join( dest_root, dest_filename ) for dest_filename in os.listdir( dest_root ) }
|
|
|
|
for dirname in dirnames:
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, dirname )
|
|
dest_path = os.path.join( dest_root, dirname )
|
|
|
|
surplus_dest_paths.discard( dest_path )
|
|
|
|
MakeSureDirectoryExists( dest_path )
|
|
|
|
shutil.copystat( source_path, dest_path )
|
|
|
|
|
|
for filename in filenames:
|
|
|
|
if num_errors > 5:
|
|
|
|
raise Exception( 'Too many errors, directory copy abandoned.' )
|
|
|
|
|
|
pauser.Pause()
|
|
|
|
source_path = os.path.join( root, filename )
|
|
|
|
dest_path = os.path.join( dest_root, filename )
|
|
|
|
surplus_dest_paths.discard( dest_path )
|
|
|
|
ok = MirrorFile( source_path, dest_path )
|
|
|
|
if not ok:
|
|
|
|
num_errors += 1
|
|
|
|
|
|
|
|
for dest_path in surplus_dest_paths:
|
|
|
|
pauser.Pause()
|
|
|
|
DeletePath( dest_path )
|
|
|
|
|
|
|
|
|
|
def OpenFileLocation( path ):
|
|
|
|
def do_it():
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
cmd = [ 'explorer', '/select,', path ]
|
|
|
|
elif HC.PLATFORM_MACOS:
|
|
|
|
cmd = [ 'open', '-R', path ]
|
|
|
|
elif HC.PLATFORM_LINUX:
|
|
|
|
raise NotImplementedError( 'Linux cannot open file locations!' )
|
|
|
|
elif HC.PLATFORM_HAIKU:
|
|
|
|
raise NotImplementedError( 'Haiku cannot open file locations!' )
|
|
|
|
|
|
sbp_kwargs = HydrusData.GetSubprocessKWArgs( hide_terminal = False )
|
|
|
|
HydrusData.CheckProgramIsNotShuttingDown()
|
|
|
|
process = subprocess.Popen( cmd, **sbp_kwargs )
|
|
|
|
HydrusThreading.SubprocessCommunicate( process )
|
|
|
|
|
|
thread = threading.Thread( target = do_it )
|
|
|
|
thread.daemon = True
|
|
|
|
thread.start()
|
|
|
|
def PathsHaveSameSizeAndDate( path1, path2 ):
|
|
|
|
if os.path.exists( path1 ) and os.path.exists( path2 ):
|
|
|
|
same_size = os.path.getsize( path1 ) == os.path.getsize( path2 )
|
|
same_modified_time = int( os.path.getmtime( path1 ) ) == int( os.path.getmtime( path2 ) )
|
|
|
|
if same_size and same_modified_time:
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
def PathIsFree( path ):
|
|
|
|
try:
|
|
|
|
stat_result = os.stat( path )
|
|
|
|
current_bits = stat_result.st_mode
|
|
|
|
if not current_bits & stat.S_IWRITE:
|
|
|
|
# read-only file, cannot do the rename check
|
|
|
|
return True
|
|
|
|
|
|
os.rename( path, path ) # rename a path to itself
|
|
|
|
return True
|
|
|
|
except OSError as e: # 'already in use by another process' or an odd filename too long error
|
|
|
|
HydrusData.Print( 'Already in use/inaccessible: ' + path )
|
|
|
|
|
|
return False
|
|
|
|
def ReadFileLikeAsBlocks( f ):
|
|
|
|
next_block = f.read( HC.READ_BLOCK_SIZE )
|
|
|
|
while len( next_block ) > 0:
|
|
|
|
yield next_block
|
|
|
|
next_block = f.read( HC.READ_BLOCK_SIZE )
|
|
|
|
|
|
def RecyclePath( path ):
|
|
|
|
if HG.file_report_mode:
|
|
|
|
HydrusData.ShowText( 'Recycling {}'.format( path ) )
|
|
|
|
HydrusData.ShowText( ''.join( traceback.format_stack() ) )
|
|
|
|
|
|
if os.path.lexists( path ):
|
|
|
|
TryToMakeFileWriteable( path )
|
|
|
|
try:
|
|
|
|
send2trash.send2trash( path )
|
|
|
|
except:
|
|
|
|
HydrusData.Print( 'Trying to recycle ' + path + ' created this error:' )
|
|
|
|
HydrusData.DebugPrint( traceback.format_exc() )
|
|
|
|
HydrusData.Print( 'It has been fully deleted instead.' )
|
|
|
|
DeletePath( path )
|
|
|
|
|
|
|
|
def SanitizeFilename( filename, force_ntfs = False ) -> str:
|
|
|
|
if HC.PLATFORM_WINDOWS or force_ntfs:
|
|
|
|
# \, /, :, *, ?, ", <, >, |
|
|
bad_characters = r'[\\/:*?"<>|]'
|
|
|
|
else:
|
|
|
|
bad_characters = '/'
|
|
|
|
|
|
return re.sub( bad_characters, '_', filename )
|
|
|
|
|
|
def SanitizePathForExport( directory_path, directories_and_filename ):
|
|
|
|
# this does not figure out the situation where the suffix directories cross a mount point to a new file system, but at that point it is user's job to fix
|
|
|
|
components = directories_and_filename.split( os.path.sep )
|
|
|
|
filename = components[-1]
|
|
|
|
suffix_directories = components[:-1]
|
|
|
|
fst = GetFileSystemType( directory_path )
|
|
|
|
if fst is None:
|
|
|
|
force_ntfs = False
|
|
|
|
else:
|
|
|
|
force_ntfs = fst.lower() in ( 'ntfs', 'exfat' )
|
|
|
|
|
|
suffix_directories = [ SanitizeFilename( suffix_directory, force_ntfs = force_ntfs ) for suffix_directory in suffix_directories ]
|
|
filename = SanitizeFilename( filename, force_ntfs = force_ntfs )
|
|
|
|
sanitized_components = suffix_directories
|
|
sanitized_components.append( filename )
|
|
|
|
return os.path.join( *sanitized_components )
|
|
|
|
|
|
def TryToGiveFileNicePermissionBits( path ):
|
|
|
|
if not os.path.exists( path ):
|
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
|
stat_result = os.stat( path )
|
|
|
|
current_bits = stat_result.st_mode
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
# this is actually the same value as S_IWUSR, but let's not try to second guess ourselves
|
|
desired_bits = stat.S_IREAD | stat.S_IWRITE
|
|
|
|
else:
|
|
|
|
# typically guarantee 644 for regular files m8, but now we also take umask into account
|
|
|
|
try:
|
|
|
|
umask = os.umask( 0o022 )
|
|
os.umask( umask )
|
|
|
|
except:
|
|
|
|
umask = 0o022
|
|
|
|
|
|
desired_bits = ( stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) & ~umask
|
|
|
|
|
|
if not ( desired_bits & current_bits ) == desired_bits:
|
|
|
|
os.chmod( path, current_bits | desired_bits )
|
|
|
|
|
|
except Exception as e:
|
|
|
|
HydrusData.Print( 'Wanted to add read and write permission to "{}", but had an error: {}'.format( path, str( e ) ) )
|
|
|
|
|
|
def TryToMakeFileWriteable( path ):
|
|
|
|
if not os.path.exists( path ):
|
|
|
|
return
|
|
|
|
|
|
if FileisWriteable( path ):
|
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
|
stat_result = os.stat( path )
|
|
|
|
current_bits = stat_result.st_mode
|
|
|
|
if HC.PLATFORM_WINDOWS:
|
|
|
|
# this is actually the same value as S_IWUSR, but let's not try to second guess ourselves
|
|
desired_bits = stat.S_IREAD | stat.S_IWRITE
|
|
|
|
else:
|
|
|
|
# this only does what we want if we own the file, but only owners can non-sudo change permission anyway
|
|
desired_bits = stat.S_IWUSR
|
|
|
|
|
|
if not ( desired_bits & current_bits ) == desired_bits:
|
|
|
|
os.chmod( path, current_bits | desired_bits )
|
|
|
|
|
|
except Exception as e:
|
|
|
|
HydrusData.Print( 'Wanted to add user write permission to "{}", but had an error: {}'.format( path, str( e ) ) )
|
|
|
|
|