test/rgw: add test for logrecord reshard

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
This commit is contained in:
liangmingyuan 2024-03-31 21:54:07 +08:00
parent 72997836c2
commit cec5e83d8e
5 changed files with 100 additions and 1 deletions

View File

@ -14,6 +14,7 @@ overrides:
rgw bucket counters cache: true
rgw sts key: abcdefghijklmnop
rgw s3 auth use sts: true
rgw reshard progress judge interval: 10
rgw:
compression type: random
storage classes: LUKEWARM, FROZEN

View File

@ -76,6 +76,16 @@ def get_bucket_num_shards(bucket_name, bucket_id):
num_shards = json_op['data']['bucket_info']['num_shards']
return num_shards
def get_bucket_reshard_status(bucket_name):
"""
function to get bucket reshard status
"""
cmd = exec_cmd("radosgw-admin bucket stats --bucket {}".format(bucket_name))
json_op = json.loads(cmd)
#print(json.dumps(json_op, indent = 4, sort_keys=True))
reshard_status = json_op['reshard_status']
return reshard_status
def run_bucket_reshard_cmd(bucket_name, num_shards, **kwargs):
cmd = 'radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(bucket_name, num_shards)
cmd += ' --rgw-reshard-bucket-lock-duration 30' # reduce to minimum
@ -139,6 +149,11 @@ def test_bucket_reshard(conn, name, **fault):
bucket.delete_objects(Delete={'Objects':[{'Key':o.key} for o in objs]})
bucket.delete()
def calc_reshardlog_count(json_op):
cnt = 0
for shard in json_op:
cnt += len(shard['shard_entries'])
return cnt
def main():
"""
@ -235,6 +250,68 @@ def main():
log.debug('TEST: reshard bucket with abort at change_reshard_state\n')
test_bucket_reshard(connection, 'abort-at-change-reshard-state', abort_at='change_reshard_state')
# TESTCASE 'logrecord could be stopped after reshard failed'
log.debug(' test: logrecord could be stopped after reshard failed')
num_shards = get_bucket_stats(BUCKET_NAME).num_shards
assert "None" == get_bucket_reshard_status(BUCKET_NAME)
_, ret = run_bucket_reshard_cmd(BUCKET_NAME, num_shards + 1, check_retcode=False, abort_at='change_reshard_state')
assert(ret != 0 and ret != errno.EBUSY)
assert "InLogrecord" == get_bucket_reshard_status(BUCKET_NAME)
bucket.put_object(Key='put_during_logrecord', Body=b"some_data")
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 1
# end up with logrecord status, the logrecord will be purged
time.sleep(30)
assert "InLogrecord" == get_bucket_reshard_status(BUCKET_NAME)
bucket.put_object(Key='put_during_logrecord1', Body=b"some_data1")
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 0
assert "None" == get_bucket_reshard_status(BUCKET_NAME)
# TESTCASE 'duplicated entries should be purged before reshard'
log.debug(' test: duplicated entries should be purged before reshard')
num_shards = get_bucket_stats(BUCKET_NAME).num_shards
_, ret = run_bucket_reshard_cmd(BUCKET_NAME, num_shards + 1, check_retcode=False, abort_at='do_reshard')
assert(ret != 0 and ret != errno.EBUSY)
assert "InLogrecord" == get_bucket_reshard_status(BUCKET_NAME)
bucket.put_object(Key='put_during_logrecord2', Body=b"some_data2")
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 1
# begin to reshard again, the duplicated entries will be purged
time.sleep(30)
_, ret = run_bucket_reshard_cmd(BUCKET_NAME, num_shards + 1, check_retcode=False, abort_at='logrecord_writes')
assert(ret != 0 and ret != errno.EBUSY)
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 0
# TESTCASE 'duplicated entries can be purged manually'
log.debug(' test: duplicated entries can be purged manually')
time.sleep(30)
num_shards = get_bucket_stats(BUCKET_NAME).num_shards
_, ret = run_bucket_reshard_cmd(BUCKET_NAME, num_shards + 1, check_retcode=False, abort_at='do_reshard')
assert(ret != 0 and ret != errno.EBUSY)
assert "InLogrecord" == get_bucket_reshard_status(BUCKET_NAME)
bucket.put_object(Key='put_during_logrecord3', Body=b"some_data3")
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 1
time.sleep(30)
exec_cmd('radosgw-admin reshardlog purge --bucket %s' % BUCKET_NAME)
cmd = exec_cmd('radosgw-admin reshardlog list --bucket %s' % BUCKET_NAME)
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert calc_reshardlog_count(json_op) == 0
log.debug('check reshard logrecord successfully')
# TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds'
log.debug(' test: reshard versioned bucket')
num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1
@ -288,6 +365,8 @@ def main():
time.sleep(1)
ver_bucket.put_object(Key='put_during_reshard', Body=b"some_data")
log.debug('put object successful')
# waiter for delay reshard to finish
time.sleep(5)
# TESTCASE 'check that bucket stats are correct after reshard with unlinked entries'
log.debug('TEST: check that bucket stats are correct after reshard with unlinked entries\n')

View File

@ -11074,7 +11074,7 @@ next:
for (; i < max_shards; i++) {
formatter->open_object_section("shard");
encode_json("shard_id", i, formatter.get());
formatter->open_array_section("single shard entries");
formatter->open_array_section("shard_entries");
RGWRados::BucketShard bs(static_cast<rgw::sal::RadosStore*>(driver)->getRados());
int ret = bs.init(dpp(), bucket->get_info(), index, i, null_yield);
if (ret < 0) {

View File

@ -182,6 +182,8 @@
reshard cancel cancel resharding a bucket
reshard stale-instances list list stale-instances from bucket resharding
reshard stale-instances delete cleanup stale-instances from bucket resharding
reshardlog list list bucket reshard newest generation log
reshardlog purge trim all bucket resharding log
sync error list list sync error
sync error trim trim sync error
mfa create create a new MFA TOTP token

View File

@ -1389,6 +1389,7 @@ TEST_F(cls_rgw, reshardlog_list)
// record a log in prepare
cls_rgw_obj_key obj2 = str_int("obj2", 0);
entries.clear();
index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc);
ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
ASSERT_FALSE(is_truncated);
@ -1400,4 +1401,20 @@ TEST_F(cls_rgw, reshardlog_list)
ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
ASSERT_FALSE(is_truncated);
ASSERT_EQ(1u, entries.size());
// record a log in deleting obj
entries.clear();
index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, obj1, loc);
index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 1, obj1, meta);
ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
ASSERT_FALSE(is_truncated);
ASSERT_EQ(2u, entries.size());
// overwrite the log writen
entries.clear();
index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, obj2, loc);
index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 1, obj2, meta);
ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
ASSERT_FALSE(is_truncated);
ASSERT_EQ(2u, entries.size());
}