Merge branch 'wip-obsync'

This commit is contained in:
Colin Patrick McCabe 2011-05-26 13:17:03 -07:00
commit ae5bbc7b08
4 changed files with 411 additions and 163 deletions

View File

@ -29,6 +29,7 @@ import mimetypes
import os
from StringIO import StringIO
import rados
import rgw
import re
import shutil
import string
@ -44,12 +45,26 @@ global opts
# Translation table mapping users in the source to users in the destination.
global xuser
# Librgw instance
global lrgw
lrgw = None
###### Constants #######
RGW_META_BUCKET_NAME = ".rgw"
ACL_XATTR = "rados.acl"
META_XATTR_PREFIX = "rados.meta."
CONTENT_TYPE_XATTR = "rados.content_type"
RGW_META_BUCKET_NAME = ".rgw"
RGW_USERS_UID_BUCKET_NAME = ".users.uid"
RGW_META_ETAG = "user.rgw.etag"
RGW_META_PREFIX = "user.x-amz-meta-"
RGW_META_CONTENT_TYPE = "user.rgw.content_type"
RGW_META_ACL = "user.rgw.acl"
def vvprint(s):
if (opts.more_verbose):
print s
###### Exception classes #######
class InvalidLocalName(Exception):
pass
@ -272,6 +287,12 @@ class AclPolicy(object):
self.owner_display_name = owner_display_name
self.grants = grants # dict of { string -> ACLGrant }
@staticmethod
def create_default(owner_id):
grants = { }
grants[ACL_TYPE_CANON_USER + owner_id] = \
AclGrant(ACL_TYPE_CANON_USER + owner_id, None, "FULL_CONTROL")
return AclPolicy(owner_id, None, grants)
@staticmethod
def from_xml(s):
root = etree.parse(StringIO(s))
owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS))
@ -382,21 +403,27 @@ class Object(object):
self.meta = meta
def equals(self, rhs):
if (self.name != rhs.name):
vvprint("EQUALS: self.name = %s, rhs.name = %s" % (self.name, rhs.name))
return False
if (self.md5 != rhs.md5):
vvprint("EQUALS: self.md5 = %s, rhs.md5 = %s" % (self.md5, rhs.md5))
return False
if (self.size != rhs.size):
vvprint("EQUALS: self.size = %d, rhs.size = %d" % (self.size, rhs.size))
return False
for k,v in self.meta.items():
if (not rhs.meta.has_key(k)):
vvprint("EQUALS: rhs.meta lacks key %s" % k)
return False
if (rhs.meta[k] != v):
vvprint("EQUALS: self.meta[%s] = %s, rhs.meta[%s] = %s" % \
(k, v, k, rhs.meta[k]))
return False
for k,v in rhs.meta.items():
if (not self.meta.has_key(k)):
vvprint("EQUALS: self.meta lacks key %s" % k)
return False
if (self.meta[k] != v):
return False
vvprint("EQUALS: the objects are equal.")
return True
def local_name(self):
return s3_name_to_local_name(self.name)
@ -427,13 +454,19 @@ class Object(object):
###### Store #######
class Store(object):
@staticmethod
def make_store(url, create, akey, skey):
def make_store(url, is_dst, create, akey, skey):
s3_url = strip_prefix("s3://", url)
if (s3_url):
return S3Store(s3_url, create, akey, skey)
rados_url = strip_prefix("rados:", url)
rados_url = strip_prefix("rgw:", url)
if (rados_url):
return RadosStore(rados_url, create, akey, skey)
dst_owner = None
if (is_dst):
if not os.environ.has_key("DST_OWNER"):
raise Exception("You must set DST_OWNER when uploading \
files to RgwStore.")
dst_owner = os.environ["DST_OWNER"]
return RgwStore(rados_url, create, akey, skey, dst_owner)
file_url = strip_prefix("file://", url)
if (file_url):
return FileStore(file_url, create)
@ -672,11 +705,7 @@ class FileStoreIterator(object):
# Ignore non-files when iterating.
if (not os.path.isfile(path)):
continue
try:
obj_name = local_name_to_s3_name(path[len(self.base)+1:])
except LocalFileIsAcl, e:
# ignore ACL side files when iterating
continue
obj_name = local_name_to_s3_name(path[len(self.base)+1:])
return Object.from_file(obj_name, path)
class FileStore(Store):
@ -744,14 +773,14 @@ class FileStore(Store):
if (opts.more_verbose):
print "FileStore: removed %s" % obj.name
###### Rados store #######
class RadosStoreIterator(object):
"""RadosStore iterator"""
def __init__(self, it, rados_store):
###### Rgw store #######
class RgwStoreIterator(object):
"""RgwStore iterator"""
def __init__(self, it, rgw_store):
self.it = it # has type rados.ObjectIterator
self.rados_store = rados_store
self.prefix = self.rados_store.prefix
self.prefix_len = len(self.rados_store.prefix)
self.rgw_store = rgw_store
self.prefix = self.rgw_store.key_prefix
self.prefix_len = len(self.rgw_store.key_prefix)
def __iter__(self):
return self
def next(self):
@ -763,18 +792,24 @@ class RadosStoreIterator(object):
# do the prefixes match?
if rados_obj.key[:self.prefix_len] == self.prefix:
break
ret = self.rados_store.obsync_obj_from_rgw(rados_obj.key)
ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key)
if (ret == None):
raise Exception("internal iterator error")
return ret
class RadosStore(Store):
def __init__(self, url, create, akey, skey):
class RgwStore(Store):
def __init__(self, url, create, akey, skey, owner):
global lrgw
if (lrgw == None):
lrgw = rgw.Rgw()
self.owner = owner
self.user_exists_cache = {}
self.users_uid_ioctx = None
# Parse the rados url
conf_end = string.find(url, ":")
if (conf_end == -1):
raise Exception("RadosStore URLs are of the form \
rados:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
raise Exception("RgwStore URLs are of the form \
rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
self.conf_file_path = url[0:conf_end]
bucket_end = url.find(":", conf_end+1)
if (bucket_end == -1):
@ -784,72 +819,107 @@ rados:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf."
self.rgw_bucket_name = url[conf_end+1:bucket_end]
self.key_prefix = url[bucket_end+1:]
if (self.rgw_bucket_name == ""):
raise Exception("RadosStore URLs are of the form \
rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
raise Exception("RgwStore URLs are of the form \
rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
if (opts.more_verbose):
print "self.conf_file_path = '" + self.conf_file_path + "', ",
print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
print "self.key_prefix = '" + self.key_prefix + "'"
acl_hack = getenv("ACL_HACK", None)
if (acl_hack == None):
raise Exception("RadosStore error: You must specify an environment " +
"variable called ACL_HACK containing the name of a file. This " +
"file contains a serialized RGW ACL that you want " +
"to insert into the user.rgw.acl extended attribute of all " +
"the objects you create. This is a hack and yes, it will go " +
"away soon.")
acl_hack_f = open(acl_hack, "r")
try:
self.acl_hack = acl_hack_f.read()
finally:
acl_hack_f.close()
self.rados = rados.Rados()
self.rados.conf_read_file(self.conf_file_path)
self.rados.connect()
if self.owner != None and not self.user_exists(self.owner):
raise Exception("Unknown owner! DST_OWNER=%s" % self.owner)
if (not self.rados.pool_exists(self.rgw_bucket_name)):
if (create):
self.create_rgw_bucket(self.rgw_bucket_name)
else:
raise NonexistentStore()
elif self.owner == None:
# Figure out what owner we should use when creating objects.
# We use the owner of the destination bucket
ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
try:
bin_ = ioctx.get_xattr(self.rgw_bucket_name, RGW_META_ACL)
xml = lrgw.acl_bin2xml(bin_)
acl = AclPolicy.from_xml(xml)
self.owner = acl.owner_id
if (opts.more_verbose):
print "using owner \"%s\"" % self.owner
finally:
ioctx.close()
self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
Store.__init__(self, "rados:" + url)
Store.__init__(self, "rgw:" + url)
def create_rgw_bucket(self, rgw_bucket_name):
global lrgw
""" Create an rgw bucket named 'rgw_bucket_name' """
if (self.owner == None):
raise Exception("Can't create a bucket without knowing who " +
"should own it. Please set DST_OWNER")
self.rados.create_pool(self.rgw_bucket_name)
meta_ctx = None
ioctx = None
try:
meta_ctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
meta_ctx.write(rgw_bucket_name, "", 0)
print "meta_ctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
"user.rgw.acl, self.acl_hack=" + self.acl_hack + ")"
meta_ctx.set_xattr(rgw_bucket_name, "user.rgw.acl", self.acl_hack)
ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
ioctx.write(rgw_bucket_name, "", 0)
print "ioctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
"user.rgw.acl=" + self.owner + ")"
new_bucket_acl = "\
<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \
<Owner><ID>%s</ID></Owner><AccessControlList>\
<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
xsi:type=\"CanonicalUser\"><ID>%s</ID> \
<DisplayName>display-name</DisplayName></Grantee> \
<Permission>FULL_CONTROL</Permission></Grant>\
</AccessControlList></AccessControlPolicy>" % (self.owner, self.owner)
new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl)
ioctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin)
finally:
if (meta_ctx):
meta_ctx.close()
def obsync_obj_from_rgw(self, key):
if (ioctx):
ioctx.close()
def obsync_obj_from_rgw(self, obj_name):
"""Create an obsync object from a Rados object"""
try:
size, tm = self.ioctx.stat(key)
size, tm = self.ioctx.stat(obj_name)
except rados.ObjectNotFound:
return None
md5 = self.ioctx.get_xattr(key, "user.rgw.etag")
# TODO: support meta
return Object(key, md5, size, {})
md5 = None
meta = {}
for k,v in self.ioctx.get_xattrs(obj_name):
if k == RGW_META_ETAG:
md5 = v
elif k == RGW_META_CONTENT_TYPE:
meta[CONTENT_TYPE_XATTR] = v
elif k[:len(RGW_META_PREFIX)] == RGW_META_PREFIX:
meta["rados.meta." + k[len(RGW_META_PREFIX):]] = v
elif opts.more_verbose:
print "ignoring unknown xattr " + k
if (md5 == None):
raise RuntimeError("error on object %s: expected to find " + \
"extended attribute %s" % (obj_name, RGW_META_ETAG))
if (opts.more_verbose):
print "meta = " + str(meta)
return Object(obj_name, md5, size, meta)
def __str__(self):
return "rados:" + self.conf_file_path + ":" + self.rgw_bucket_name + ":" + self.key_prefix
return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name
def get_acl(self, obj):
acl = LocalAcl(obj.name)
# todo: set XML ACL
return acl
global lrgw
bin_ = None
try:
bin_ = self.ioctx.get_xattr(obj.name, RGW_META_ACL)
except rados.NoData:
return LocalAcl.get_empty(obj.name)
xml = lrgw.acl_bin2xml(bin_)
return LocalAcl.from_xml(obj.name, xml)
def make_local_copy(self, obj):
temp_file = None
temp_file_f = None
try:
# read the object from rados in chunks
# read the object from rgw in chunks
temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
temp_file_f = open(temp_file.name, 'w')
off = 0
while True:
buf = self.ioctx.read(obj.name, off, 8192)
buf = self.ioctx.read(obj.name, offset = off, length = 8192)
if (len(buf) == 0):
break
temp_file_f.write(buf)
@ -857,7 +927,6 @@ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
break
off += 8192
temp_file_f.close()
# TODO: implement ACLs
except:
if (temp_file_f):
temp_file_f.close()
@ -866,13 +935,26 @@ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
raise
return LocalCopy(obj.name, temp_file.name, True)
def all_objects(self):
it = self.bucket.list_objects()
return RadosStoreIterator(it, self.key_prefix)
it = self.ioctx.list_objects()
return RgwStoreIterator(it, self)
def locate_object(self, obj):
return self.obsync_obj_from_rgw(obj.name)
def user_exists(self, user_id):
if (self.user_exists_cache.has_key(user_id)):
return self.user_exists_cache[user_id]
if (self.users_uid_ioctx == None):
# will be closed in __del__
self.users_uid_ioctx = self.rados.open_ioctx(RGW_USERS_UID_BUCKET_NAME)
try:
self.users_uid_ioctx.stat(user_id)
except rados.ObjectNotFound:
return False
self.user_exists_cache[user_id] = True
return True
def upload(self, local_copy, src_acl, obj):
global lrgw
if (opts.more_verbose):
print "RadosStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
"obj='" + obj.name + "'"
if (opts.dry_run):
return
@ -886,17 +968,28 @@ rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
if (len(buf) < 8192):
break
off += 8192
# TODO: examine obj.meta
self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
self.ioctx.set_xattr(obj.name, "user.rgw.acl", self.acl_hack)
self.ioctx.set_xattr(obj.name, "user.rgw.content_type",
"application/octet-stream")
if (src_acl.acl_policy == None):
ap = AclPolicy.create_default(self.owner)
else:
ap = src_acl.acl_policy
xml = ap.to_xml()
bin_ = lrgw.acl_xml2bin(xml)
self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_)
content_type = "application/octet-stream"
for k,v in obj.meta.items():
if k == CONTENT_TYPE_XATTR:
content_type = v
elif k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX:
self.ioctx.set_xattr(obj.name,
RGW_META_PREFIX + k[len(META_XATTR_PREFIX):], v)
self.ioctx.set_xattr(obj.name, "user.rgw.content_type", content_type)
def remove(self, obj):
if (opts.dry_run):
return
self.ioctx.remove_object(obj.name)
if (opts.more_verbose):
print "RadosStore: removed %s" % obj.name
print "RgwStore: removed %s" % obj.name
###### Functions #######
def delete_unreferenced(src, dst):
""" delete everything from dst that is not referenced in src """
@ -972,6 +1065,8 @@ parser.add_option("-c", "--create-dest", action="store_true", \
parser.add_option("--delete-before", action="store_true", \
dest="delete_before", help="delete objects that aren't in SOURCE from \
DESTINATION before transferring any objects")
parser.add_option("--boto-retries", dest="boto_retries", type="int",
help="set number of times we'll retry the same S3 operation")
parser.add_option("-d", "--delete-after", action="store_true", \
dest="delete_after", help="delete objects that aren't in SOURCE from \
DESTINATION after doing all transfers.")
@ -998,6 +1093,11 @@ if (opts.run_unit_tests):
test_acl_policy()
sys.exit(0)
if opts.boto_retries != None:
if not boto.config.has_section('Boto'):
boto.config.add_section('Boto')
boto.config.set('Boto', 'num_retries', str(opts.boto_retries))
opts.preserve_acls = not opts.no_preserve_acls
if (opts.create and opts.dry_run):
raise Exception("You can't run with both --create-dest and --dry-run! \
@ -1028,7 +1128,7 @@ dst_name = args[1]
try:
if (opts.more_verbose):
print "SOURCE: " + src_name
src = Store.make_store(src_name, False,
src = Store.make_store(src_name, False, False,
getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"))
except NonexistentStore, e:
print >>stderr, "Fatal error: Source " + src_name + " does not exist."
@ -1040,7 +1140,7 @@ except Exception, e:
try:
if (opts.more_verbose):
print "DESTINATION: " + dst_name
dst = Store.make_store(dst_name, opts.create,
dst = Store.make_store(dst_name, True, opts.create,
getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"))
except NonexistentStore, e:
print >>stderr, "Fatal error: Destination " + dst_name + " does " +\

View File

@ -49,8 +49,56 @@ def get_nonce():
else:
return random.randint(9999, 99999)
def get_s3_connection(conf):
return boto.s3.connection.S3Connection(
aws_access_key_id = conf["access_key"],
aws_secret_access_key = conf["secret_key"],
host = conf["host"],
# TODO support & test all variations
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
is_secure=False,
)
def read_s3_config(cfg, section, sconfig, name):
# TODO: support 'port', 'is_secure'
sconfig[name] = {}
for var in [ 'access_key', 'host', 'secret_key', 'user_id',
'display_name', 'email', 'consistency', ]:
try:
sconfig[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
# Make sure connection works
try:
conn = get_s3_connection(sconfig[name])
except Exception, e:
print >>stderr, "error initializing connection!"
raise
# Create bucket name
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
random.seed()
try:
sconfig[name]["bucket_name"] = \
template.format(random=get_nonce())
except:
print >>stderr, "error parsing bucket prefix template"
raise
def read_rgw_config(cfg, section, sconfig, rconfig, name):
rconfig[name] = {}
for var in [ 'ceph_conf' ]:
try:
rconfig[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
def read_config():
config = {}
sconfig = {}
rconfig = {}
cfg = ConfigParser.RawConfigParser()
try:
path = os.environ['S3TEST_CONF']
@ -65,70 +113,38 @@ def read_config():
(type_, name) = section.split(None, 1)
except ValueError:
continue
if type_ != 's3':
continue
# TODO: support 'port', 'is_secure'
config[name] = {}
for var in [ 'access_key', 'host', 'secret_key', 'user_id',
'display_name', 'email', 'consistency', ]:
try:
config[name][var] = cfg.get(section, var)
except ConfigParser.NoOptionError:
pass
# Make sure connection works
try:
conn = boto.s3.connection.S3Connection(
aws_access_key_id = config[name]["access_key"],
aws_secret_access_key = config[name]["secret_key"],
host = config[name]["host"],
# TODO support & test all variations
calling_format=boto.s3.connection.OrdinaryCallingFormat(),
)
except Exception, e:
print >>stderr, "error initializing connection!"
raise
# Create bucket name
try:
template = cfg.get('fixtures', 'bucket prefix')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
template = 'test-{random}-'
random.seed()
try:
config[name]["bucket_name"] = \
template.format(random=get_nonce())
except:
print >>stderr, "error parsing bucket prefix template"
raise
return config
if type_ == 's3':
read_s3_config(cfg, section, sconfig, name)
elif type_ == 'rgw':
read_rgw_config(cfg, section, sconfig, rconfig, name)
for k,v in rconfig.items():
if (not sconfig.has_key(k)):
raise Exception("Can't find the S3 bucket associated with \
rgw pool %s" % k)
v["bucket"] = sconfig[k]
return sconfig, rconfig
def obsync(src, dst, misc):
env = {}
full = ["./obsync"]
e = {}
if (isinstance(src, ObSyncTestBucket)):
full.append(src.url)
e["SRC_AKEY"] = src.akey
e["SRC_SKEY"] = src.skey
else:
if (isinstance(src, str)):
full.append(src)
if (isinstance(dst, ObSyncTestBucket)):
full.append(dst.url)
e["DST_AKEY"] = dst.akey
e["DST_SKEY"] = dst.skey
if (dst.consistency != None):
e["DST_CONSISTENCY"] = dst.consistency
else:
src.to_src(env, full)
if (isinstance(dst, str)):
full.append(dst)
else:
dst.to_dst(env, full)
full.extend(misc)
full.append("--boto-retries=1")
if (opts.more_verbose):
for k,v in e.items():
for k,v in env.items():
print str(k) + "=" + str(v) + " ",
print
for f in full:
print f,
print
return subprocess.call(full, stderr=opts.error_out, env=e)
return subprocess.call(full, stderr=opts.error_out, env=env)
def obsync_check(src, dst, opts):
ret = obsync(src, dst, opts)
@ -139,19 +155,38 @@ def cleanup_tempdir():
if tdir != None and opts.keep_tempdir == False:
shutil.rmtree(tdir)
def compare_directories(dir_a, dir_b, expect_same = True):
def compare_directories(dir_a, dir_b, expect_same = True, compare_xattr = True):
if (opts.verbose):
print "comparing directories %s and %s" % (dir_a, dir_b)
full = ["diff", "-q"]
full.extend(["-r", dir_a, dir_b])
ret = subprocess.call(full)
if ((ret == 0) and (not expect_same)):
info = []
for root, dirs, files in os.walk(dir_a):
for filename in files:
afile = os.path.join(root, filename)
bfile = dir_b + afile[len(dir_a):]
if (not os.path.exists(bfile)):
info.append("Not found: %s: " % bfile)
else:
ret = subprocess.call(["diff", "-q", afile, bfile])
if (ret != 0):
info.append("Files differ: %s and %s" % (afile, bfile))
elif compare_xattr:
xinfo = xattr_diff(afile, bfile)
info.extend(xinfo)
for root, dirs, files in os.walk(dir_b):
for filename in files:
bfile = os.path.join(root, filename)
afile = dir_a + bfile[len(dir_b):]
if (not os.path.exists(afile)):
info.append("Not found: %s" % afile)
if ((len(info) == 0) and (not expect_same)):
print "expected the directories %s and %s to differ, but \
they were the same!" % (dir_a, dir_b)
print "\n".join(info)
raise Exception("compare_directories failed!")
if ((ret != 0) and expect_same):
if ((len(info) != 0) and expect_same):
print "expected the directories %s and %s to be the same, but \
they were different!" % (dir_a, dir_b)
print "\n".join(info)
raise Exception("compare_directories failed!")
def count_obj_in_dir(d):
@ -161,17 +196,35 @@ def count_obj_in_dir(d):
num_objects = num_objects + 1
return num_objects
def xuser(src, dst):
return [ "--xuser", config[src]["user_id"] + "=" + config[dst]["user_id"]]
def xuser(sconfig, src, dst):
return [ "--xuser", sconfig[src]["user_id"] + "=" + sconfig[dst]["user_id"]]
def get_optional(h, k):
if (h.has_key(k)):
print "found " + str(h[k])
return h[k]
else:
print "found nothing"
return None
def xattr_diff(afile, bfile):
def tuple_list_to_hash(tl):
ret = {}
for k,v in tl:
ret[k] = v
return ret
info = []
a_attr = tuple_list_to_hash(xattr.get_all(afile, namespace=xattr.NS_USER))
b_attr = tuple_list_to_hash(xattr.get_all(bfile, namespace=xattr.NS_USER))
for ka,va in a_attr.items():
if b_attr.has_key(ka):
if b_attr[ka] != va:
info.append("xattrs differ for %s" % ka)
else:
info.append("only in %s: %s" % (afile, ka))
for kb,vb in b_attr.items():
if not a_attr.has_key(kb):
info.append("only in %s: %s" % (bfile, kb))
return info
def xattr_sync_impl(file_name, meta):
xlist = xattr.get_all(file_name, namespace=xattr.NS_USER)
to_delete = []
@ -220,12 +273,35 @@ def assert_xattr(file_name, meta):
###### ObSyncTestBucket #######
class ObSyncTestBucket(object):
def __init__(self, name, url, akey, skey, consistency):
self.name = name
self.url = url
self.akey = akey
self.skey = skey
self.consistency = consistency
def __init__(self, conf):
self.conf = conf
self.name = conf["bucket_name"]
self.url = "s3://" + conf["host"] + "/" + conf["bucket_name"]
self.akey = conf["access_key"]
self.skey = conf["secret_key"]
self.consistency = get_optional(conf, "consistency")
def to_src(self, env, args):
env["SRC_AKEY"] = self.akey
env["SRC_SKEY"] = self.skey
args.append(self.url)
def to_dst(self, env, args):
env["DST_AKEY"] = self.akey
env["DST_SKEY"] = self.skey
args.append(self.url)
if (self.consistency != None):
env["DST_CONSISTENCY"] = self.consistency
class ObSyncTestPool(object):
def __init__(self, bucket, ceph_conf):
self.bucket = bucket
self.ceph_conf = ceph_conf
def to_src(self, env, args):
args.append(self.get_url())
def to_dst(self, env, args):
env["DST_OWNER"] = self.bucket["user_id"]
args.append(self.get_url())
def get_url(self):
return "rgw:%s:%s" % (self.ceph_conf, self.bucket["bucket_name"])
###### Main #######
# change directory to obsync directory
@ -254,20 +330,21 @@ if (opts.more_verbose):
opts.verbose = True
# parse configuration file
config = read_config()
sconfig, rconfig = read_config()
opts.buckets = []
opts.buckets.append(ObSyncTestBucket(config["main"]["bucket_name"], \
"s3://" + config["main"]["host"] + "/" + config["main"]["bucket_name"], \
config["main"]["access_key"], config["main"]["secret_key"],
get_optional(config["main"], "consistency")))
opts.buckets.append(ObSyncTestBucket(config["alt"]["bucket_name"], \
"s3://" + config["alt"]["host"] + "/" + config["alt"]["bucket_name"], \
config["alt"]["access_key"], config["alt"]["secret_key"],
get_optional(config["alt"], "consistency")))
opts.buckets.append(ObSyncTestBucket(sconfig["main"]))
opts.buckets.append(ObSyncTestBucket(sconfig["alt"]))
if not config["main"]["user_id"]:
opts.pools = []
if (rconfig.has_key("main")):
if (opts.verbose):
print "running rgw target tests..."
opts.pools.append(ObSyncTestPool(sconfig["main"], \
rconfig["main"]["ceph_conf"]))
if not sconfig["main"]["user_id"]:
raise Exception("You must specify a user_id for the main section.")
if not config["alt"]["user_id"]:
if not sconfig["alt"]["user_id"]:
raise Exception("You must specify a user_id for the alt section.")
# set up temporary directory
@ -373,14 +450,14 @@ obsync_check("file://%s/dir1" % tdir, "file://%s/dira" % tdir, ["-c"])
synthetic_xml1 = \
"<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n\
<Owner>\n\
<ID>" + config["main"]["user_id"] + "</ID>\n\
<ID>" + sconfig["main"]["user_id"] + "</ID>\n\
<DisplayName></DisplayName>\n\
</Owner>\n\
<AccessControlList>\n\
<Grant>\n\
<Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
xsi:type=\"CanonicalUser\">\n\
<ID>" + config["main"]["user_id"] + "</ID>\n\
<ID>" + sconfig["main"]["user_id"] + "</ID>\n\
<DisplayName></DisplayName>\n\
</Grantee>\n\
<Permission>FULL_CONTROL</Permission>\n\
@ -389,11 +466,12 @@ xsi:type=\"CanonicalUser\">\n\
</AccessControlPolicy>"
xattr.set("%s/dira/a" % tdir, ACL_XATTR, synthetic_xml1,
namespace=xattr.NS_USER)
print "set attr on %s" % ("%s/dira/a" % tdir)
if (opts.verbose):
print "set attr on %s" % ("%s/dira/a" % tdir)
# test ACL transformations
# canonicalize xml by parse + write out
obsync_check("file://%s/dira" % tdir, "file://%s/dira2" % tdir,
["-d", "-c"] + xuser("main", "alt"))
["-d", "-c"] + xuser(sconfig, "main", "alt"))
# test that ACL is preserved
obsync_check("file://%s/dira2" % tdir, "file://%s/dira3" % tdir,
["-d", "-c"])
@ -405,7 +483,7 @@ if (synthetic_xml2 != synthetic_xml3):
raise Exception("xml not preserved across obsync!")
# test ACL transformation
obsync_check("file://%s/dira3" % tdir, "file://%s/dira4" % tdir,
["-d", "-c"] + xuser("main", "alt"))
["-d", "-c"] + xuser(sconfig, "main", "alt"))
synthetic_xml4 = xattr.get("%s/dira4/a" % tdir, ACL_XATTR,
namespace=xattr.NS_USER)
if (synthetic_xml3 != synthetic_xml4):
@ -418,7 +496,7 @@ if (synthetic_xml4 != synthetic_xml5):
raise Exception("xml not preserved across obsync!")
# test ACL transformation back
obsync_check("file://%s/dira5" % tdir, "file://%s/dira6" % tdir,
["-d", "-c"] + xuser("alt", "main"))
["-d", "-c"] + xuser(sconfig, "alt", "main"))
if (synthetic_xml5 != synthetic_xml2):
raise Exception("expected to transform XML back to original form \
through a double xuser")
@ -443,7 +521,7 @@ obsync_check("file://%s/dir1" % tdir, opts.buckets[0], [])
# make sure that the copy worked
obsync_check(opts.buckets[0], "file://%s/dir3" % tdir, ["-c"])
compare_directories("%s/dir1" % tdir, "%s/dir3" % tdir)
compare_directories("%s/dir1" % tdir, "%s/dir3" % tdir, compare_xattr = False)
if (opts.verbose):
print "successfully copied the sample directory to " + opts.buckets[0].name
@ -465,6 +543,49 @@ if (whole_file != "a"):
if (opts.verbose):
print "successfully copied a directory with --follow-symlinks"
# empty out bucket[0]
obsync_check("file://%s/empty1" % tdir, opts.buckets[0],
["--delete-after"])
def rmbucket(bucket):
conn = get_s3_connection(bucket.conf)
bucket = conn.get_bucket(bucket.name)
bucket.delete()
# rgw target tests
if len(opts.pools) > 0:
rmbucket(opts.buckets[0])
os.mkdir("%s/rgw1" % tdir)
f = open("%s/rgw1/aaa" % tdir, 'w')
f.write("aaa")
f.close()
f = open("%s/rgw1/crazy" % tdir, 'w')
for i in range(0, 1000):
f.write("some crazy text\n")
f.close()
f = open("%s/rgw1/brick" % tdir, 'w')
f.write("br\0ick")
f.close()
# we should fail here, because we didn't supply -c, and the bucket
# doesn't exist
ret = obsync("%s/rgw1" % tdir, opts.pools[0], [])
if (ret == 0):
raise RuntimeError("expected this call to obsync to fail, because \
we didn't supply -c. But it succeeded.")
if (opts.verbose):
print "first rgw: call failed as expected."
print "testing rgw target with --create"
obsync_check("%s/rgw1" % tdir, opts.pools[0], ["--create"])
obsync_check(opts.pools[0], "%s/rgw2" % tdir, ["-c"])
compare_directories("%s/rgw1" % tdir, "%s/rgw2" % tdir, compare_xattr = False)
# some tests with xattrs
xattr_sync("%s/rgw2/brick" % tdir, { CONTENT_TYPE_XATTR : "bricks" })
xattr_sync("%s/rgw2/crazy" % tdir, { CONTENT_TYPE_XATTR : "text/plain",
"rados.meta.froobs" : "quux", "rados.meta.gaz" : "luxx" } )
obsync_check("%s/rgw2" % tdir, opts.pools[0], [])
obsync_check(opts.pools[0], "%s/rgw3" % tdir, ["-c"])
compare_directories("%s/rgw2" % tdir, "%s/rgw3" % tdir, compare_xattr = True)
# test escaping
os.mkdir("%s/escape_dir1" % tdir)
f = open("%s/escape_dir1/$$foo" % tdir, 'w')
@ -475,7 +596,8 @@ f.write("blarg/")
f.close()
obsync_check("file://%s/escape_dir1" % tdir, opts.buckets[0], ["-d"])
obsync_check(opts.buckets[0], "file://%s/escape_dir2" % tdir, ["-c"])
compare_directories("%s/escape_dir1" % tdir, "%s/escape_dir2" % tdir)
compare_directories("%s/escape_dir1" % tdir, "%s/escape_dir2" % tdir,
compare_xattr = False)
# some more tests with --no-preserve-acls
obsync_check("file://%s/dir1" % tdir, opts.buckets[0],
@ -489,11 +611,11 @@ obsync_check("file://%s/dir1" % tdir, opts.buckets[0], ["--delete-before"])
if (opts.verbose):
print "copying " + opts.buckets[0].name + " to " + opts.buckets[1].name
obsync_check(opts.buckets[0], opts.buckets[1], ["-c", "--delete-after"] + \
xuser("main", "alt"))
xuser(sconfig, "main", "alt"))
if (opts.verbose):
print "copying bucket1 to dir4..."
obsync_check(opts.buckets[1], "file://%s/dir4" % tdir, ["-c"])
compare_directories("%s/dir1" % tdir, "%s/dir4" % tdir)
compare_directories("%s/dir1" % tdir, "%s/dir4" % tdir, compare_xattr = False)
if (opts.verbose):
print "successfully copied " + opts.buckets[0].name + " to " + \
opts.buckets[1].name
@ -514,7 +636,7 @@ bucket0_count=%d, bucket1_count=%d" % (bucket0_count, bucket1_count))
if (opts.verbose):
print "copying bucket0 to bucket1..."
obsync_check(opts.buckets[0], opts.buckets[1], ["-c", "--delete-before"] + \
xuser("main", "alt"))
xuser(sconfig, "main", "alt"))
obsync_check(opts.buckets[0], "%s/bucket0_out" % tdir, ["--delete-after"])
obsync_check(opts.buckets[1], "%s/bucket1_out" % tdir, ["--delete-after"])
bucket0_count = count_obj_in_dir("/%s/bucket0_out" % tdir)
@ -557,4 +679,13 @@ obsync_check(opts.buckets[0], "%s/user_defined_md2" % tdir, ["-c"])
assert_xattr("%s/user_defined_md2/spork" % tdir,
{ "rados.meta.tines" : "3", "rados.content_type" : "application/octet-stream" })
# more rgw target tests
if len(opts.pools) > 0:
# synchronize from an s3 bucket to an bucket directly
obsync_check(opts.buckets[1], opts.pools[0], ["--delete-after"] + \
xuser(sconfig, "main", "alt"))
obsync_check(opts.pools[0], "%s/rgw4" % tdir, ["--delete-after", "-c"])
obsync_check(opts.buckets[1], "%s/rgw5" % tdir, ["--delete-after", "-c"])
compare_directories("%s/rgw4" % tdir, "%s/rgw5" % tdir, compare_xattr = True)
sys.exit(0)

View File

@ -23,6 +23,9 @@ class PermissionError(Exception):
class ObjectNotFound(Exception):
pass
class NoData(Exception):
pass
class ObjectExists(Exception):
pass
@ -56,6 +59,8 @@ def make_ex(ret, msg):
return NoSpace(msg)
elif (ret == errno.EEXIST):
return ObjectExists(msg)
elif (ret == errno.ENODATA):
return NoData(msg)
else:
return Error(msg + (": error code %d" % ret))
@ -214,7 +219,7 @@ class ObjectIterator(object):
ret = self.ioctx.librados.rados_objects_list_next(self.ctx, byref(key))
if ret < 0:
raise StopIteration()
return Object(self.ioctx, key)
return Object(self.ioctx, key.value)
def __del__(self):
self.ioctx.librados.rados_objects_list_close(self.ctx)
@ -378,7 +383,7 @@ written." % (self.name, ret, length))
c_size_t(length), c_uint64(offset))
if ret < 0:
raise make_ex("Ioctx.read(%s): failed to read %s" % (self.name, key))
return ret_buf.value
return ctypes.string_at(ret_buf, ret)
def get_stats(self):
self.require_ioctx_open()
@ -426,7 +431,7 @@ written." % (self.name, ret, length))
c_char_p(xattr_name), ret_buf, c_size_t(ret_length))
if ret < 0:
raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
return ret_buf.value
return ctypes.string_at(ret_buf, ret)
def get_xattrs(self, oid):
self.require_ioctx_open()
@ -497,7 +502,7 @@ class Object(object):
self.state = "exists"
def __str__(self):
return "rados.Object(ioctx=%s,key=%s)" % (str(self.ioctx), self.key.value)
return "rados.Object(ioctx=%s,key=%s)" % (str(self.ioctx), self.key)
def require_object_exists(self):
if self.state != "exists":

View File

@ -60,6 +60,12 @@ got %s" % def_str)
for obj in foo3_ioctx.list_objects():
print str(obj)
foo3_ioctx.write_full("ghi", "g\0h\0i")
ghi_str = foo3_ioctx.read("ghi")
if (ghi_str != "g\0h\0i"):
raise RuntimeError("error reading object ghi: expected value g\\0h\\0\i, \
got %s" % (ghi_str))
# do some things with extended attributes
foo3_ioctx.set_xattr("abc", "a", "1")
foo3_ioctx.set_xattr("def", "b", "2")
@ -83,6 +89,12 @@ if (found["a"] != "1"):
if (found["c"] != "3"):
raise RuntimeError("error: expected object abc to have c=3")
foo3_ioctx.set_xattr("def", "zeroholder", "a\0b")
ret = foo3_ioctx.get_xattr("def", "zeroholder")
if (ret != "a\0b"):
raise RuntimeError("error: set_xattr/get_xattr failed with " +
"an extended attribute containing NULL")
# create some snapshots and do stuff with them
print "creating snap bjork"
foo3_ioctx.create_snap("bjork")