mirror of
https://github.com/ceph/ceph
synced 2025-02-24 11:37:37 +00:00
obsync: check if ACLs match
If all other aspects of two objects match, we should make sure the ACLs match before deciding that there's nothing for us to do. Restructure LocalCopy so that it no longer contains the ACL. Create LocalAcl to represent a cached local copy of the ACL. Add get_acl methods to all stores. Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
This commit is contained in:
parent
90ee7cf911
commit
f175d27a48
@ -367,21 +367,56 @@ Cannot handle this URL.")
|
||||
|
||||
###### LocalCopy ######
|
||||
class LocalCopy(object):
|
||||
def __init__(self, obj_name, path, path_is_temp, acl_path, acl_is_temp):
|
||||
def __init__(self, obj_name, path, path_is_temp):
|
||||
self.obj_name = obj_name
|
||||
self.path = path
|
||||
self.path_is_temp = path_is_temp
|
||||
self.acl_path = acl_path
|
||||
self.acl_is_temp = acl_is_temp
|
||||
def remove(self):
|
||||
if ((self.path_is_temp == True) and (self.path != None)):
|
||||
os.unlink(self.path)
|
||||
self.path = None
|
||||
self.path_is_temp = False
|
||||
def __del__(self):
|
||||
self.remove()
|
||||
|
||||
class LocalAcl(object):
|
||||
def __init__(self, obj_name):
|
||||
self.obj_name = obj_name
|
||||
self.acl_path = None
|
||||
self.acl_is_temp = False
|
||||
def set_acl_xml(self, acl_xml):
|
||||
self.remove()
|
||||
self.acl_is_temp = True
|
||||
self.acl_path = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
|
||||
temp_acl_file_f = open(self.acl_path, 'w')
|
||||
try:
|
||||
temp_acl_file_f.write(acl_xml)
|
||||
finally:
|
||||
temp_acl_file_f.close()
|
||||
def __del__(self):
|
||||
self.remove()
|
||||
def remove(self):
|
||||
if (self.path_is_temp and self.path):
|
||||
os.unlink(self.path)
|
||||
self.path = None
|
||||
if (self.acl_is_temp and self.acl_path):
|
||||
if ((self.acl_is_temp == True) and (self.acl_path != None)):
|
||||
os.unlink(self.acl_path)
|
||||
self.acl_path = None
|
||||
self.acl_is_temp = False
|
||||
def equals(self, rhs):
|
||||
""" Compare two cached ACL files """
|
||||
if (self.acl_path == None):
|
||||
return (rhs.acl_path == None)
|
||||
if (rhs.acl_path == None):
|
||||
return (self.acl_path == None)
|
||||
f = open(self.acl_path, 'r')
|
||||
try:
|
||||
my_xml = f.read()
|
||||
finally:
|
||||
f.close()
|
||||
f = open(rhs.acl_path, 'r')
|
||||
try:
|
||||
rhs_xml = f.read()
|
||||
finally:
|
||||
f.close()
|
||||
return my_xml == rhs_xml
|
||||
def translate_users(self, xusers):
|
||||
# Do we even have an ACL?
|
||||
if (self.acl_path == None):
|
||||
@ -462,28 +497,21 @@ s3://host/bucket/key_prefix. Failed to find the bucket.")
|
||||
Store.__init__(self, "s3://" + url)
|
||||
def __str__(self):
|
||||
return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix
|
||||
def get_acl(self, obj):
|
||||
acl = LocalAcl(obj.name)
|
||||
acl_xml = self.bucket.get_xml_acl(obj.name)
|
||||
acl.set_acl_xml(acl_xml)
|
||||
return acl
|
||||
def make_local_copy(self, obj):
|
||||
k = Key(self.bucket)
|
||||
k.key = obj.name
|
||||
temp_file = None
|
||||
temp_acl_file = None
|
||||
temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
|
||||
try:
|
||||
temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
|
||||
k.get_contents_to_filename(temp_file.name)
|
||||
if (opts.preserve_acls):
|
||||
temp_acl_file = tempfile.NamedTemporaryFile(mode='w+b',
|
||||
delete=False).name
|
||||
acl_xml = self.bucket.get_xml_acl(k)
|
||||
temp_acl_file_f = open(temp_acl_file, 'w')
|
||||
temp_acl_file_f.write(acl_xml)
|
||||
temp_acl_file_f.close()
|
||||
k.get_contents_to_filename(temp_file)
|
||||
except:
|
||||
if (temp_file):
|
||||
os.unlink(temp_file.name)
|
||||
if (temp_acl_file):
|
||||
os.unlink(temp_acl_file)
|
||||
os.unlink(temp_file)
|
||||
raise
|
||||
return LocalCopy(obj.name, temp_file.name, True, temp_acl_file, True)
|
||||
return LocalCopy(obj.name, temp_file, True)
|
||||
def all_objects(self):
|
||||
blrs = self.bucket.list(prefix = self.key_prefix)
|
||||
return S3StoreIterator(blrs.__iter__())
|
||||
@ -492,18 +520,12 @@ s3://host/bucket/key_prefix. Failed to find the bucket.")
|
||||
if (k == None):
|
||||
return None
|
||||
return Object(obj.name, etag_to_md5(k.etag), k.size)
|
||||
def upload(self, local_copy, obj):
|
||||
def upload(self, local_copy, src_acl, obj):
|
||||
if (opts.more_verbose):
|
||||
print "UPLOAD: local_copy.path='" + local_copy.path + "' " + \
|
||||
print "S3Store.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
|
||||
"obj='" + obj.name + "'"
|
||||
if (opts.dry_run):
|
||||
return
|
||||
if (opts.preserve_acls and local_copy.acl_path):
|
||||
f = open(local_copy.acl_path, 'r')
|
||||
try:
|
||||
acl_xml = f.read()
|
||||
finally:
|
||||
f.close()
|
||||
# mime = mimetypes.guess_type(local_copy.path)[0]
|
||||
# if (mime == NoneType):
|
||||
# mime = "application/octet-stream"
|
||||
@ -511,7 +533,12 @@ s3://host/bucket/key_prefix. Failed to find the bucket.")
|
||||
k.key = obj.name
|
||||
#k.set_metadata("Content-Type", mime)
|
||||
k.set_contents_from_filename(local_copy.path)
|
||||
if (opts.preserve_acls and local_copy.acl_path):
|
||||
if (src_acl.acl_path != None):
|
||||
f = open(src_acl.acl_path, 'r')
|
||||
try:
|
||||
acl_xml = f.read()
|
||||
finally:
|
||||
f.close()
|
||||
self.bucket.set_acl(acl_xml, k)
|
||||
|
||||
def remove(self, obj):
|
||||
@ -566,15 +593,15 @@ class FileStore(Store):
|
||||
Store.__init__(self, "file://" + url)
|
||||
def __str__(self):
|
||||
return "file://" + self.base
|
||||
def get_acl(self, obj):
|
||||
acl = LocalAcl(obj.name)
|
||||
acl_name = get_local_acl_file_name(obj.local_name())
|
||||
if (os.path.exists(acl_name)):
|
||||
acl.acl_path = self.base + "/" + acl_name
|
||||
return acl
|
||||
def make_local_copy(self, obj):
|
||||
local_name = obj.local_name()
|
||||
acl_name = get_local_acl_file_name(local_name)
|
||||
if (opts.preserve_acls and os.path.exists(acl_name)):
|
||||
full_acl_name = self.base + "/" + acl_name
|
||||
else:
|
||||
full_acl_name = None
|
||||
return LocalCopy(obj.name, self.base + "/" + local_name, False,
|
||||
full_acl_name, False)
|
||||
return LocalCopy(obj.name, self.base + "/" + local_name, False)
|
||||
def all_objects(self):
|
||||
return FileStoreIterator(self.base)
|
||||
def locate_object(self, obj):
|
||||
@ -590,7 +617,10 @@ class FileStore(Store):
|
||||
if (not found):
|
||||
return None
|
||||
return Object.from_file(obj.name, path)
|
||||
def upload(self, local_copy, obj):
|
||||
def upload(self, local_copy, src_acl, obj):
|
||||
if (opts.more_verbose):
|
||||
print "FileStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
|
||||
"obj='" + obj.name + "'"
|
||||
if (opts.dry_run):
|
||||
return
|
||||
s = local_copy.path
|
||||
@ -599,8 +629,8 @@ class FileStore(Store):
|
||||
#print "s='" + s +"', d='" + d + "'"
|
||||
mkdir_p(os.path.dirname(d))
|
||||
shutil.copy(s, d)
|
||||
if (opts.preserve_acls and local_copy.acl_path):
|
||||
shutil.copy(local_copy.acl_path,
|
||||
if (src_acl.acl_path != None):
|
||||
shutil.copy(src_acl.acl_path,
|
||||
self.base + "/" + get_local_acl_file_name(lname))
|
||||
def remove(self, obj):
|
||||
if (opts.dry_run):
|
||||
@ -701,6 +731,10 @@ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
|
||||
return Object(key, md5, size)
|
||||
def __str__(self):
|
||||
return "rados:" + self.conf_file_path + ":" + self.rgw_bucket_name + ":" + self.key_prefix
|
||||
def get_acl(self, obj):
|
||||
acl = LocalAcl(obj.name)
|
||||
# todo: set XML ACL
|
||||
return acl
|
||||
def make_local_copy(self, obj):
|
||||
temp_file = None
|
||||
temp_file_f = None
|
||||
@ -724,15 +758,15 @@ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
|
||||
if (temp_file):
|
||||
os.unlink(temp_file.name)
|
||||
raise
|
||||
return LocalCopy(obj.name, temp_file.name, True, None, True)
|
||||
return LocalCopy(obj.name, temp_file.name, True)
|
||||
def all_objects(self):
|
||||
it = self.bucket.list_objects()
|
||||
return RadosStoreIterator(it, self.key_prefix)
|
||||
def locate_object(self, obj):
|
||||
return self.obsync_obj_from_rgw(obj.name)
|
||||
def upload(self, local_copy, obj):
|
||||
def upload(self, local_copy, src_acl, obj):
|
||||
if (opts.more_verbose):
|
||||
print "UPLOAD: local_copy.path='" + local_copy.path + "' " + \
|
||||
print "RadosStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
|
||||
"obj='" + obj.name + "'"
|
||||
if (opts.dry_run):
|
||||
return
|
||||
@ -915,6 +949,8 @@ for sobj in src.all_objects():
|
||||
print "handling " + sobj.name
|
||||
dobj = dst.locate_object(sobj)
|
||||
upload = False
|
||||
src_acl = None
|
||||
dst_acl = None
|
||||
if (dobj == None):
|
||||
if (opts.verbose):
|
||||
print "+ " + sobj.name
|
||||
@ -923,17 +959,32 @@ for sobj in src.all_objects():
|
||||
if (opts.verbose):
|
||||
print "> " + sobj.name
|
||||
upload = True
|
||||
elif (opts.preserve_acls):
|
||||
# Do the ACLs match?
|
||||
src_acl = src.get_acl(sobj)
|
||||
dst_acl = dst.get_acl(dobj)
|
||||
src_acl.translate_users(xuser)
|
||||
if (not src_acl.equals(dst_acl)):
|
||||
upload = True
|
||||
if (opts.verbose):
|
||||
print "^ " + sobj.name
|
||||
dst_acl.remove()
|
||||
else:
|
||||
if (opts.verbose):
|
||||
print ". " + sobj.name
|
||||
if (upload):
|
||||
if (opts.preserve_acls and src_acl == None):
|
||||
src_acl = src.get_acl(sobj)
|
||||
src_acl.translate_users(xuser)
|
||||
else:
|
||||
# Just default to an empty ACL
|
||||
src_acl = LocalAcl(sobj.name)
|
||||
local_copy = src.make_local_copy(sobj)
|
||||
try:
|
||||
if (opts.preserve_acls):
|
||||
local_copy.translate_users(xuser)
|
||||
dst.upload(local_copy, sobj)
|
||||
dst.upload(local_copy, src_acl, sobj)
|
||||
finally:
|
||||
local_copy.remove()
|
||||
src_acl.remove()
|
||||
|
||||
if (opts.delete_after):
|
||||
delete_unreferenced(src, dst)
|
||||
|
Loading…
Reference in New Issue
Block a user