mirror of
https://github.com/ceph/ceph
synced 2024-12-27 14:03:25 +00:00
radosgw-admin: add command to resync encrypted multipart objects
Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
237fa27710
commit
e90bfba99c
@ -5123,6 +5123,140 @@ int RGWRados::bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo&
|
||||
return CLSRGWIssueBucketRebuild(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
|
||||
}
|
||||
|
||||
static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
|
||||
optional_yield y, RGWRados* store,
|
||||
RGWBucketInfo& bucket_info,
|
||||
RGWObjectCtx& obj_ctx,
|
||||
const RGWObjState& state)
|
||||
{
|
||||
// only overwrite if the tag hasn't changed
|
||||
obj_ctx.set_atomic(state.obj);
|
||||
|
||||
// make a tiny adjustment to the existing mtime so that fetch_remote_obj()
|
||||
// won't return ERR_NOT_MODIFIED when resyncing the object
|
||||
const auto set_mtime = state.mtime + std::chrono::nanoseconds(1);
|
||||
|
||||
// use set_attrs() to update the mtime in a bucket index transaction so the
|
||||
// change is recorded in bilog and datalog entries. this will cause any peer
|
||||
// zones to resync the object
|
||||
auto add_attrs = std::map<std::string, bufferlist>{
|
||||
{ RGW_ATTR_PREFIX "resync-encrypted-multipart", bufferlist{} },
|
||||
};
|
||||
|
||||
return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
|
||||
add_attrs, nullptr, y, set_mtime);
|
||||
}
|
||||
|
||||
static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
|
||||
optional_yield y, RGWRados* store,
|
||||
RGWBucketInfo& bucket_info,
|
||||
RGWObjectCtx& obj_ctx,
|
||||
const rgw_bucket_dir_entry& dirent,
|
||||
Formatter* f)
|
||||
{
|
||||
const auto obj = rgw_obj{bucket_info.bucket, dirent.key};
|
||||
|
||||
RGWObjState* astate = nullptr;
|
||||
RGWObjManifest* manifest = nullptr;
|
||||
constexpr bool follow_olh = false; // dirent will have version ids
|
||||
int ret = store->get_obj_state(dpp, &obj_ctx, bucket_info, obj,
|
||||
&astate, &manifest, follow_olh, y);
|
||||
if (ret < 0) {
|
||||
ldpp_dout(dpp, 4) << obj << " does not exist" << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
// check whether the object is encrypted
|
||||
if (auto i = astate->attrset.find(RGW_ATTR_CRYPT_MODE);
|
||||
i == astate->attrset.end()) {
|
||||
ldpp_dout(dpp, 4) << obj << " is not encrypted" << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
// check whether the object is multipart
|
||||
if (!manifest) {
|
||||
ldpp_dout(dpp, 4) << obj << " has no manifest so is not multipart" << dendl;
|
||||
return;
|
||||
}
|
||||
const RGWObjManifest::obj_iterator end = manifest->obj_end(dpp);
|
||||
if (end.get_cur_part_id() == 0) {
|
||||
ldpp_dout(dpp, 4) << obj << " manifest is not multipart" << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
ret = resync_encrypted_multipart(dpp, y, store, bucket_info,
|
||||
obj_ctx, *astate);
|
||||
if (ret < 0) {
|
||||
ldpp_dout(dpp, 0) << "ERROR: failed to update " << obj
|
||||
<< ": " << cpp_strerror(ret) << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
f->open_object_section("object");
|
||||
encode_json("name", obj.key.name, f);
|
||||
if (!obj.key.instance.empty()) {
|
||||
encode_json("version", obj.key.instance, f);
|
||||
}
|
||||
encode_json("mtime", astate->mtime, f);
|
||||
f->close_section(); // "object"
|
||||
}
|
||||
|
||||
int RGWRados::bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
|
||||
optional_yield y,
|
||||
rgw::sal::RadosStore* driver,
|
||||
RGWBucketInfo& bucket_info,
|
||||
const std::string& marker,
|
||||
RGWFormatterFlusher& flusher)
|
||||
{
|
||||
RGWRados::Bucket target(this, bucket_info);
|
||||
RGWRados::Bucket::List list_op(&target);
|
||||
|
||||
list_op.params.marker.name = marker;
|
||||
list_op.params.enforce_ns = true; // only empty ns
|
||||
list_op.params.list_versions = true;
|
||||
list_op.params.allow_unordered = true;
|
||||
|
||||
/* List bucket entries in chunks. */
|
||||
static constexpr int MAX_LIST_OBJS = 100;
|
||||
std::vector<rgw_bucket_dir_entry> entries;
|
||||
entries.reserve(MAX_LIST_OBJS);
|
||||
|
||||
int processed = 0;
|
||||
bool is_truncated = true;
|
||||
|
||||
Formatter* f = flusher.get_formatter();
|
||||
f->open_array_section("progress");
|
||||
|
||||
do {
|
||||
int ret = list_op.list_objects(dpp, MAX_LIST_OBJS, &entries, nullptr,
|
||||
&is_truncated, y);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
f->open_object_section("batch");
|
||||
f->open_array_section("modified");
|
||||
|
||||
for (const auto& dirent : entries) {
|
||||
RGWObjectCtx obj_ctx{driver};
|
||||
try_resync_encrypted_multipart(dpp, y, this, bucket_info,
|
||||
obj_ctx, dirent, f);
|
||||
}
|
||||
|
||||
f->close_section(); // "modified"
|
||||
|
||||
processed += entries.size();
|
||||
encode_json("total processed", processed, f);
|
||||
encode_json("marker", list_op.get_next_marker().name, f);
|
||||
f->close_section(); // "batch"
|
||||
|
||||
flusher.flush(); // flush after each 'chunk'
|
||||
} while (is_truncated);
|
||||
|
||||
f->close_section(); // "progress" array
|
||||
return 0;
|
||||
}
|
||||
|
||||
int RGWRados::bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
|
||||
{
|
||||
RGWSI_RADOS::Pool index_pool;
|
||||
|
@ -1522,6 +1522,17 @@ public:
|
||||
std::map<RGWObjCategory, RGWStorageStats> *existing_stats,
|
||||
std::map<RGWObjCategory, RGWStorageStats> *calculated_stats);
|
||||
int bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info);
|
||||
|
||||
// Search the bucket for encrypted multipart uploads, and increase their mtime
|
||||
// slightly to generate a bilog entry to trigger a resync to repair any
|
||||
// corrupted replicas. See https://tracker.ceph.com/issues/46062
|
||||
int bucket_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
|
||||
optional_yield y,
|
||||
rgw::sal::RadosStore* driver,
|
||||
RGWBucketInfo& bucket_info,
|
||||
const std::string& marker,
|
||||
RGWFormatterFlusher& flusher);
|
||||
|
||||
int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
|
||||
int remove_objs_from_index(const DoutPrefixProvider *dpp,
|
||||
RGWBucketInfo& bucket_info,
|
||||
|
@ -674,6 +674,7 @@ enum class OPT {
|
||||
BUCKET_RADOS_LIST,
|
||||
BUCKET_SHARD_OBJECTS,
|
||||
BUCKET_OBJECT_SHARD,
|
||||
BUCKET_RESYNC_ENCRYPTED_MULTIPART,
|
||||
POLICY,
|
||||
POOL_ADD,
|
||||
POOL_RM,
|
||||
@ -894,6 +895,7 @@ static SimpleCmd::Commands all_cmds = {
|
||||
{ "bucket shard objects", OPT::BUCKET_SHARD_OBJECTS },
|
||||
{ "bucket shard object", OPT::BUCKET_SHARD_OBJECTS },
|
||||
{ "bucket object shard", OPT::BUCKET_OBJECT_SHARD },
|
||||
{ "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART },
|
||||
{ "policy", OPT::POLICY },
|
||||
{ "pool add", OPT::POOL_ADD },
|
||||
{ "pool rm", OPT::POOL_RM },
|
||||
@ -7182,6 +7184,47 @@ int main(int argc, const char **argv)
|
||||
formatter->flush(cout);
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART) {
|
||||
// repair logic for replication of encrypted multipart uploads:
|
||||
// https://tracker.ceph.com/issues/46062
|
||||
if (bucket_name.empty()) {
|
||||
cerr << "ERROR: bucket not specified" << std::endl;
|
||||
return EINVAL;
|
||||
}
|
||||
int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
|
||||
if (ret < 0) {
|
||||
return -ret;
|
||||
}
|
||||
|
||||
auto rados_driver = dynamic_cast<rgw::sal::RadosStore*>(driver);
|
||||
if (!rados_driver) {
|
||||
cerr << "ERROR: this command can only work when the cluster "
|
||||
"has a RADOS backing store." << std::endl;
|
||||
return EPERM;
|
||||
}
|
||||
|
||||
// fail if recovery wouldn't generate replication log entries
|
||||
if (!rados_driver->svc()->zone->need_to_log_data() && !yes_i_really_mean_it) {
|
||||
cerr << "This command is only necessary for replicated buckets." << std::endl;
|
||||
cerr << "do you really mean it? (requires --yes-i-really-mean-it)" << std::endl;
|
||||
return EPERM;
|
||||
}
|
||||
|
||||
formatter->open_object_section("modified");
|
||||
encode_json("bucket", bucket->get_name(), formatter.get());
|
||||
encode_json("bucket_id", bucket->get_bucket_id(), formatter.get());
|
||||
|
||||
ret = rados_driver->getRados()->bucket_resync_encrypted_multipart(
|
||||
dpp(), null_yield, rados_driver, bucket->get_info(),
|
||||
marker, stream_flusher);
|
||||
if (ret < 0) {
|
||||
return -ret;
|
||||
}
|
||||
formatter->close_section();
|
||||
formatter->flush(cout);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (opt_cmd == OPT::BUCKET_CHOWN) {
|
||||
if (bucket_name.empty()) {
|
||||
cerr << "ERROR: bucket name not specified" << std::endl;
|
||||
|
Loading…
Reference in New Issue
Block a user