mirror of
https://github.com/ceph/ceph
synced 2024-12-19 09:57:05 +00:00
Merge pull request #44699 from ivancich/wip-bucket-index-timeout-config
core,rgw: allow configuration of bi tx timeout; add instrumentation Reviewed-by: Matt Benjamin <mbenjami@redhat.com> Reviewed-by: Adam C. Emerson <aemerson@redhat.com> Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
commit
323c4fffdd
@ -13,6 +13,8 @@
|
||||
#include "common/Clock.h"
|
||||
#include "common/strtol.h"
|
||||
#include "common/escape.h"
|
||||
#include "common/config_proxy.h"
|
||||
#include "osd/osd_types.h"
|
||||
|
||||
#include "include/compat.h"
|
||||
#include <boost/lexical_cast.hpp>
|
||||
@ -34,6 +36,13 @@ using ceph::timespan;
|
||||
CLS_VER(1,0)
|
||||
CLS_NAME(rgw)
|
||||
|
||||
// special logging for bucket index transaction instrumentation; if
|
||||
// instrumenting, log at level 0 and include string "BITX" in log
|
||||
// message to make entries easier to find
|
||||
#define CLS_LOG_BITX(is_bitx, level, fmt, ...) \
|
||||
if (is_bitx) \
|
||||
{ CLS_LOG(0, "BITX: " fmt, ##__VA_ARGS__); } \
|
||||
else { CLS_LOG(level, fmt, ##__VA_ARGS__); }
|
||||
|
||||
// No UTF-8 character can begin with 0x80, so this is a safe indicator
|
||||
// of a special bucket-index entry for the first byte. Note: although
|
||||
@ -847,34 +856,57 @@ static int read_key_entry(cls_method_context_t hctx, const cls_rgw_obj_key& key,
|
||||
string *idx, rgw_bucket_dir_entry *entry,
|
||||
bool special_delete_marker_name = false);
|
||||
|
||||
static std::string modify_op_str(RGWModifyOp op) {
|
||||
return std::string(to_string(op));
|
||||
}
|
||||
|
||||
static std::string modify_op_str(uint8_t op) {
|
||||
return modify_op_str((RGWModifyOp) op);
|
||||
}
|
||||
|
||||
int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(10, "entered %s", __func__);
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
// decode request
|
||||
rgw_cls_obj_prepare_op op;
|
||||
auto iter = in->cbegin();
|
||||
try {
|
||||
decode(op, iter);
|
||||
} catch (ceph::buffer::error& err) {
|
||||
CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to decode request\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: failed to decode request", __func__);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (op.tag.empty()) {
|
||||
CLS_LOG(1, "ERROR: tag is empty\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: tag is empty", __func__);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
CLS_LOG(1, "rgw_bucket_prepare_op(): request: op=%d name=%s instance=%s tag=%s",
|
||||
op.op, op.key.name.c_str(), op.key.instance.c_str(), op.tag.c_str());
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"INFO: %s: request: op=%s name=%s tag=%s", __func__,
|
||||
modify_op_str(op.op).c_str(), op.key.to_string().c_str(), op.tag.c_str());
|
||||
|
||||
// get on-disk state
|
||||
string idx;
|
||||
std::string idx;
|
||||
|
||||
rgw_bucket_dir_entry entry;
|
||||
int rc = read_key_entry(hctx, op.key, &idx, &entry);
|
||||
if (rc < 0 && rc != -ENOENT)
|
||||
if (rc < 0 && rc != -ENOENT) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s could not read key entry, key=%s, rc=%d",
|
||||
__func__, op.key.to_string().c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
bool noent = (rc == -ENOENT);
|
||||
|
||||
@ -892,13 +924,29 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
info.timestamp = real_clock::now();
|
||||
info.state = CLS_RGW_STATE_PENDING_MODIFY;
|
||||
info.op = op.op;
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: inserting tag %s op %s into pending map for entry %s",
|
||||
__func__, op.tag.c_str(), modify_op_str(info.op).c_str(),
|
||||
entry.key.to_string().c_str());
|
||||
entry.pending_map.insert(pair<string, rgw_bucket_pending_info>(op.tag, info));
|
||||
|
||||
// write out new key to disk
|
||||
bufferlist info_bl;
|
||||
encode(entry, info_bl);
|
||||
return cls_cxx_map_set_val(hctx, idx, &info_bl);
|
||||
}
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &info_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s could not set value for key, key=%s, rc=%d",
|
||||
__func__, escape_str(idx).c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10, "EXITING %s, returning 0", __func__);
|
||||
return 0;
|
||||
} // rgw_bucket_prepare_op
|
||||
|
||||
static void unaccount_entry(rgw_bucket_dir_header& header,
|
||||
rgw_bucket_dir_entry& entry)
|
||||
@ -1032,7 +1080,15 @@ static int complete_remove_obj(cls_method_context_t hctx,
|
||||
|
||||
int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(10, "entered %s", __func__);
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
// decode request
|
||||
rgw_cls_obj_complete_op op;
|
||||
@ -1040,19 +1096,22 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
try {
|
||||
decode(op, iter);
|
||||
} catch (ceph::buffer::error& err) {
|
||||
CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode request\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: failed to decode request", __func__);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
CLS_LOG(1, "rgw_bucket_complete_op(): request: op=%d name=%s instance=%s ver=%lu:%llu tag=%s",
|
||||
op.op, op.key.name.c_str(), op.key.instance.c_str(),
|
||||
(unsigned long)op.ver.pool, (unsigned long long)op.ver.epoch,
|
||||
op.tag.c_str());
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"INFO: %s: request: op=%s name=%s ver=%lu:%llu tag=%s",
|
||||
__func__,
|
||||
modify_op_str(op.op).c_str(), op.key.to_string().c_str(),
|
||||
(unsigned long)op.ver.pool, (unsigned long long)op.ver.epoch,
|
||||
op.tag.c_str());
|
||||
|
||||
rgw_bucket_dir_header header;
|
||||
int rc = read_bucket_header(hctx, &header);
|
||||
if (rc < 0) {
|
||||
CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to read header\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: failed to read header, rc=%d",
|
||||
__func__, rc);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
@ -1068,6 +1127,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
entry.locator = op.locator;
|
||||
ondisk = false;
|
||||
} else if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: read key entry failed, key=%s, rc=%d",
|
||||
__func__, op.key.to_string().c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -1079,17 +1141,23 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
if (op.tag.size()) {
|
||||
auto pinter = entry.pending_map.find(op.tag);
|
||||
if (pinter == entry.pending_map.end()) {
|
||||
CLS_LOG(1, "ERROR: couldn't find tag for pending operation\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: couldn't find tag for pending operation with tag %s",
|
||||
__func__, op.tag.c_str());
|
||||
return -EINVAL;
|
||||
}
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"INFO: %s: removing tag %s from pending map",
|
||||
__func__, op.tag.c_str());
|
||||
entry.pending_map.erase(pinter);
|
||||
}
|
||||
|
||||
if (op.tag.size() && op.op == CLS_RGW_OP_CANCEL) {
|
||||
CLS_LOG(1, "rgw_bucket_complete_op(): cancel requested\n");
|
||||
CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: op is cancel", __func__);
|
||||
} else if (op.ver.pool == entry.ver.pool &&
|
||||
op.ver.epoch && op.ver.epoch <= entry.ver.epoch) {
|
||||
CLS_LOG(1, "rgw_bucket_complete_op(): skipping request, old epoch\n");
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: skipping request, old epoch", __func__);
|
||||
op.op = CLS_RGW_OP_CANCEL;
|
||||
}
|
||||
|
||||
@ -1103,10 +1171,16 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
log_op = false; // don't log cancelation
|
||||
if (op.tag.size()) {
|
||||
// we removed this tag from pending_map so need to write the changes
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: unable to set map val, key=%s, rc=%d",
|
||||
__func__, escape_str(idx).c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
@ -1115,26 +1189,47 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
// unaccount deleted entry
|
||||
unaccount_entry(header, entry);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: delete op, key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
entry.meta = op.meta;
|
||||
if (!ondisk) {
|
||||
// no entry to erase
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: key=%s not on disk, no action",
|
||||
__func__, escape_str(idx).c_str());
|
||||
log_op = false;
|
||||
} else if (!entry.pending_map.size()) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: removing map entry with key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
rc = cls_cxx_map_remove_key(hctx, idx);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: unable to remove map key, key=%s, rc=%d",
|
||||
__func__, escape_str(idx).c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
entry.exists = false;
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: unable to set map val, key=%s, rc=%d",
|
||||
__func__, escape_str(idx).c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
} // CLS_RGW_OP_DEL
|
||||
else if (op.op == CLS_RGW_OP_ADD) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: add op, key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
// unaccount overwritten entry
|
||||
unaccount_entry(header, entry);
|
||||
|
||||
@ -1151,8 +1246,14 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
stats.actual_size += meta.size;
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(idx).c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: unable to set map value at key=%s, rc=%d",
|
||||
__func__, escape_str(idx).c_str(), rc);
|
||||
return rc;
|
||||
}
|
||||
} // CLS_RGW_OP_ADD
|
||||
@ -1163,20 +1264,41 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
header.max_marker, op.bilog_flags, NULL, NULL,
|
||||
&op.zones_trace);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"ERROR: %s: log_index_operation failed with rc=%d",
|
||||
__func__, rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
CLS_LOG(20, "rgw_bucket_complete_op(): remove_objs.size()=%d",
|
||||
(int)op.remove_objs.size());
|
||||
CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: remove_objs.size()=%d",
|
||||
__func__, (int)op.remove_objs.size());
|
||||
for (const auto& remove_key : op.remove_objs) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: completing object remove key=%s",
|
||||
__func__, escape_str(remove_key.to_string()).c_str());
|
||||
rc = complete_remove_obj(hctx, header, remove_key, default_log_op);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"WARNING: %s: complete_remove_obj, failed to remove entry, "
|
||||
"name=%s read_index_entry ret=%d, continuing",
|
||||
__func__, escape_str(remove_key.to_string()).c_str(), rc);
|
||||
continue; // part cleanup errors are not fatal
|
||||
}
|
||||
} // remove loop
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"INFO: %s: writing bucket header", __func__);
|
||||
rc = write_bucket_header(hctx, &header);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"ERROR: %s: failed to write bucket header ret=%d",
|
||||
__func__, rc);
|
||||
}
|
||||
|
||||
return write_bucket_header(hctx, &header);
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"EXITING %s: returning %d", __func__, rc);
|
||||
return rc;
|
||||
} // rgw_bucket_complete_op
|
||||
|
||||
template <class T>
|
||||
@ -2155,7 +2277,15 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
|
||||
int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(1, "entered %s", __func__);
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
bufferlist header_bl;
|
||||
rgw_bucket_dir_header header;
|
||||
@ -2163,13 +2293,23 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
|
||||
int rc = read_bucket_header(hctx, &header);
|
||||
if (rc < 0) {
|
||||
CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to read header\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: failed to read header", __func__);
|
||||
return rc;
|
||||
}
|
||||
|
||||
const uint64_t config_op_expiration =
|
||||
conf->rgw_pending_bucket_index_op_expiration;
|
||||
|
||||
// priority order -- 1) bucket header, 2) global config, 3) DEFAULT;
|
||||
// a value of zero indicates go down the list
|
||||
timespan tag_timeout(
|
||||
std::chrono::seconds(
|
||||
header.tag_timeout ? header.tag_timeout : CEPH_RGW_TAG_TIMEOUT));
|
||||
header.tag_timeout ?
|
||||
header.tag_timeout :
|
||||
(config_op_expiration ?
|
||||
config_op_expiration :
|
||||
CEPH_RGW_DEFAULT_TAG_TIMEOUT)));
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: tag_timeout=%ld", __func__, tag_timeout.count());
|
||||
|
||||
auto in_iter = in->cbegin();
|
||||
|
||||
@ -2181,18 +2321,37 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
decode(op, in_iter);
|
||||
decode(cur_change, in_iter);
|
||||
} catch (ceph::buffer::error& err) {
|
||||
CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode request\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"ERROR: %s: failed to decode request", __func__);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
bufferlist cur_disk_bl;
|
||||
// check if the log op flag is set and strip it from the op
|
||||
bool log_op = (op & CEPH_RGW_DIR_SUGGEST_LOG_OP) != 0;
|
||||
op &= CEPH_RGW_DIR_SUGGEST_OP_MASK;
|
||||
|
||||
string cur_change_key;
|
||||
encode_obj_index_key(cur_change.key, &cur_change_key);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"INFO: %s: op=%c, cur_change_key=%s, cur_change.exists=%d",
|
||||
__func__, op, escape_str(cur_change_key).c_str(), cur_change.exists);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
int ret = cls_cxx_map_get_val(hctx, cur_change_key, &cur_disk_bl);
|
||||
if (ret < 0 && ret != -ENOENT)
|
||||
if (ret < 0 && ret != -ENOENT) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"ERROR: %s: accessing map, key=%s error=%d", __func__,
|
||||
escape_str(cur_change_key).c_str(), ret);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (ret == -ENOENT) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"WARNING: %s: accessing map, key not found key=%s, continuing",
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -2201,7 +2360,8 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
try {
|
||||
decode(cur_disk, cur_disk_iter);
|
||||
} catch (ceph::buffer::error& error) {
|
||||
CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode cur_disk\n");
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ERROR: %s: failed to decode cur_disk",
|
||||
__func__);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
@ -2209,19 +2369,27 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
// these pending entries will prevent us from applying suggested changes
|
||||
real_time cur_time = real_clock::now();
|
||||
auto iter = cur_disk.pending_map.begin();
|
||||
while(iter != cur_disk.pending_map.end()) {
|
||||
auto cur_iter = iter++;
|
||||
while (iter != cur_disk.pending_map.end()) {
|
||||
auto cur_iter = iter++; // IMPORTANT, cur_iter might be invalidated
|
||||
if (cur_time > (cur_iter->second.timestamp + timespan(tag_timeout))) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"WARNING: %s: expired pending map entry for \"%s\" "
|
||||
"(pending_state=%d, op=%s) expired and was removed",
|
||||
__func__,
|
||||
cur_iter->first.c_str(),
|
||||
cur_iter->second.state,
|
||||
modify_op_str(iter->second.op).c_str());
|
||||
cur_disk.pending_map.erase(cur_iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // while
|
||||
} // if
|
||||
|
||||
CLS_LOG(20, "cur_disk.pending_map.empty()=%d op=%d cur_disk.exists=%d "
|
||||
"cur_disk.index_ver=%d cur_change.exists=%d cur_change.index_ver=%d",
|
||||
cur_disk.pending_map.empty(), (int)op, cur_disk.exists,
|
||||
(int)cur_disk.index_ver, cur_change.exists,
|
||||
(int)cur_change.index_ver);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: op=%c cur_disk.pending_map.empty()=%d cur_disk.exists=%d "
|
||||
"cur_disk.index_ver=%d cur_change.exists=%d cur_change.index_ver=%d",
|
||||
__func__, op, cur_disk.pending_map.empty(), cur_disk.exists,
|
||||
(int)cur_disk.index_ver, cur_change.exists,
|
||||
(int)cur_change.index_ver);
|
||||
|
||||
if (cur_change.index_ver < cur_disk.index_ver) {
|
||||
// a pending on-disk entry was completed since this suggestion was made,
|
||||
@ -2231,9 +2399,11 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
}
|
||||
|
||||
if (cur_disk.pending_map.empty()) {
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: cur_disk.pending_map is empty", __func__);
|
||||
if (cur_disk.exists) {
|
||||
rgw_bucket_category_stats& old_stats = header.stats[cur_disk.meta.category];
|
||||
CLS_LOG(10, "total_entries: %" PRId64 " -> %" PRId64 "", old_stats.num_entries, old_stats.num_entries - 1);
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: stats.num_entries: %ld -> %ld",
|
||||
__func__, old_stats.num_entries, old_stats.num_entries - 1);
|
||||
old_stats.num_entries--;
|
||||
old_stats.total_size -= cur_disk.meta.accounted_size;
|
||||
old_stats.total_size_rounded -= cls_rgw_get_rounded_size(cur_disk.meta.accounted_size);
|
||||
@ -2241,26 +2411,38 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
header_changed = true;
|
||||
}
|
||||
rgw_bucket_category_stats& stats = header.stats[cur_change.meta.category];
|
||||
bool log_op = (op & CEPH_RGW_DIR_SUGGEST_LOG_OP) != 0;
|
||||
op &= CEPH_RGW_DIR_SUGGEST_OP_MASK;
|
||||
|
||||
switch(op) {
|
||||
case CEPH_RGW_REMOVE:
|
||||
CLS_LOG(10, "CEPH_RGW_REMOVE name=%s instance=%s", cur_change.key.name.c_str(), cur_change.key.instance.c_str());
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"INFO: %s: CEPH_RGW_REMOVE name=%s encoded=%s",
|
||||
__func__, escape_str(cur_change.key.to_string()).c_str(),
|
||||
escape_str(cur_change_key).c_str());
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: removing map entry with key=%s",
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
ret = cls_cxx_map_remove_key(hctx, cur_change_key);
|
||||
if (ret < 0)
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to remove key, key=%s, error=%d",
|
||||
__func__, escape_str(cur_change_key).c_str(), ret);
|
||||
return ret;
|
||||
}
|
||||
if (log_op && cur_disk.exists && !header.syncstopped) {
|
||||
ret = log_index_operation(hctx, cur_disk.key, CLS_RGW_OP_DEL, cur_disk.tag, cur_disk.meta.mtime,
|
||||
cur_disk.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
|
||||
if (ret < 0) {
|
||||
CLS_LOG(0, "ERROR: %s: failed to log operation ret=%d", __func__, ret);
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: failed to log operation ret=%d",
|
||||
__func__, ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case CEPH_RGW_UPDATE:
|
||||
CLS_LOG(10, "CEPH_RGW_UPDATE name=%s instance=%s total_entries: %" PRId64 " -> %" PRId64 "",
|
||||
cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1);
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"INFO: %s: CEPH_RGW_UPDATE name=%s stats.num_entries: %ld -> %ld",
|
||||
__func__, escape_str(cur_change.key.to_string()).c_str(),
|
||||
stats.num_entries, stats.num_entries + 1);
|
||||
|
||||
stats.num_entries++;
|
||||
stats.total_size += cur_change.meta.accounted_size;
|
||||
@ -2270,14 +2452,21 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
cur_change.index_ver = header.ver;
|
||||
bufferlist cur_state_bl;
|
||||
encode(cur_change, cur_state_bl);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(cur_change.key.to_string()).c_str());
|
||||
ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl);
|
||||
if (ret < 0)
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to set value for key, key=%s, error=%d",
|
||||
__func__, escape_str(cur_change_key).c_str(), ret);
|
||||
return ret;
|
||||
}
|
||||
if (log_op && !header.syncstopped) {
|
||||
ret = log_index_operation(hctx, cur_change.key, CLS_RGW_OP_ADD, cur_change.tag, cur_change.meta.mtime,
|
||||
cur_change.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
|
||||
if (ret < 0) {
|
||||
CLS_LOG(0, "ERROR: %s: failed to log operation ret=%d", __func__, ret);
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: failed to log operation ret=%d", __func__, ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
@ -2287,10 +2476,21 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
} // while (!in_iter.end())
|
||||
|
||||
if (header_changed) {
|
||||
return write_bucket_header(hctx, &header);
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: bucket header changed, writing", __func__);
|
||||
int ret = write_bucket_header(hctx, &header);
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"ERROR: %s: failed to write bucket header ret=%d",
|
||||
__func__, ret);
|
||||
} else {
|
||||
CLS_LOG_BITX(bitx_inst, 10, "EXITING %s, returning %d", __func__, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10, "EXITING %s, returning 0", __func__);
|
||||
return 0;
|
||||
}
|
||||
} // rgw_dir_suggest_changes
|
||||
|
||||
static int rgw_obj_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
|
@ -15,12 +15,13 @@
|
||||
|
||||
#include "rgw/rgw_basic_types.h"
|
||||
|
||||
#define CEPH_RGW_REMOVE 'r'
|
||||
#define CEPH_RGW_UPDATE 'u'
|
||||
#define CEPH_RGW_TAG_TIMEOUT 120
|
||||
#define CEPH_RGW_REMOVE 'r' // value 114
|
||||
#define CEPH_RGW_UPDATE 'u' // value 117
|
||||
#define CEPH_RGW_DIR_SUGGEST_LOG_OP 0x80
|
||||
#define CEPH_RGW_DIR_SUGGEST_OP_MASK 0x7f
|
||||
|
||||
constexpr uint64_t CEPH_RGW_DEFAULT_TAG_TIMEOUT = 120; // in seconds
|
||||
|
||||
class JSONObj;
|
||||
|
||||
using ceph::operator <<;
|
||||
|
@ -3304,3 +3304,25 @@ options:
|
||||
services:
|
||||
- rgw
|
||||
with_legacy: true
|
||||
- name: rgw_pending_bucket_index_op_expiration
|
||||
type: uint
|
||||
level: advanced
|
||||
default: 120
|
||||
desc: Number of seconds a pending operation can remain in bucket index shard.
|
||||
long_desc: Number of seconds a pending operation can remain in bucket
|
||||
index shard before it expires. Used for transactional bucket index
|
||||
operations, and if the operation does not complete in this time
|
||||
period, the operation will be dropped.
|
||||
services:
|
||||
- rgw
|
||||
- osd
|
||||
with_legacy: true
|
||||
- name: rgw_bucket_index_transaction_instrumentation
|
||||
type: bool
|
||||
level: dev
|
||||
default: false
|
||||
desc: Turns on extra instrumentation surrounding bucket index transactions.
|
||||
services:
|
||||
- rgw
|
||||
- osd
|
||||
with_legacy: true
|
||||
|
@ -496,6 +496,20 @@ ceph_release_t cls_get_min_compatible_client(cls_method_context_t hctx)
|
||||
return ceph_release_t::nautilus;
|
||||
}
|
||||
|
||||
const ConfigProxy& cls_get_config(cls_method_context_t hctx)
|
||||
{
|
||||
// FIXME ; segfault if ever called
|
||||
static ConfigProxy* dummy = nullptr;
|
||||
return *dummy;
|
||||
}
|
||||
|
||||
const object_info_t& cls_get_object_info(cls_method_context_t hctx)
|
||||
{
|
||||
// FIXME ; segfault if ever called
|
||||
static object_info_t* dummy = nullptr;
|
||||
return *dummy;
|
||||
}
|
||||
|
||||
int cls_get_snapset_seq(cls_method_context_t hctx, uint64_t *snap_seq)
|
||||
{
|
||||
return 0;
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include "common/debug.h"
|
||||
|
||||
#include "objclass/objclass.h"
|
||||
#include "osd/osd_internal_types.h"
|
||||
|
||||
#include "osd/ClassHandler.h"
|
||||
|
||||
#include "auth/Crypto.h"
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
struct obj_list_watch_response_t;
|
||||
class PGLSFilter;
|
||||
class object_info_t;
|
||||
|
||||
extern "C" {
|
||||
#endif
|
||||
@ -142,6 +143,8 @@ extern uint64_t cls_get_features(cls_method_context_t hctx);
|
||||
extern uint64_t cls_get_client_features(cls_method_context_t hctx);
|
||||
extern ceph_release_t cls_get_required_osd_release(cls_method_context_t hctx);
|
||||
extern ceph_release_t cls_get_min_compatible_client(cls_method_context_t hctx);
|
||||
extern const ConfigProxy& cls_get_config(cls_method_context_t hctx);
|
||||
extern const object_info_t& cls_get_object_info(cls_method_context_t hctx);
|
||||
|
||||
/* helpers */
|
||||
extern void cls_cxx_subop_version(cls_method_context_t hctx, std::string *s);
|
||||
|
@ -617,7 +617,6 @@ uint64_t cls_current_version(cls_method_context_t hctx)
|
||||
return ctx->pg->get_last_user_version();
|
||||
}
|
||||
|
||||
|
||||
int cls_current_subop_num(cls_method_context_t hctx)
|
||||
{
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
@ -649,6 +648,18 @@ ceph_release_t cls_get_min_compatible_client(cls_method_context_t hctx)
|
||||
return ctx->pg->get_osdmap()->get_require_min_compat_client();
|
||||
}
|
||||
|
||||
const ConfigProxy& cls_get_config(cls_method_context_t hctx)
|
||||
{
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
return ctx->pg->get_cct()->_conf;
|
||||
}
|
||||
|
||||
const object_info_t& cls_get_object_info(cls_method_context_t hctx)
|
||||
{
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
return ctx->obs->oi;
|
||||
}
|
||||
|
||||
int cls_get_snapset_seq(cls_method_context_t hctx, uint64_t *snap_seq) {
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
if (!ctx->new_obs.exists || (ctx->new_obs.oi.is_whiteout() &&
|
||||
|
@ -55,7 +55,8 @@
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
#define BUCKET_TAG_TIMEOUT 30
|
||||
// seconds for timeout during RGWBucket::check_object_index
|
||||
constexpr uint64_t BUCKET_TAG_QUICK_TIMEOUT = 30;
|
||||
|
||||
using namespace std;
|
||||
|
||||
@ -601,7 +602,8 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
bucket->set_tag_timeout(dpp, BUCKET_TAG_TIMEOUT);
|
||||
// use a quicker/shorter tag timeout during this process
|
||||
bucket->set_tag_timeout(dpp, BUCKET_TAG_QUICK_TIMEOUT);
|
||||
|
||||
rgw::sal::Bucket::ListResults results;
|
||||
results.is_truncated = true;
|
||||
@ -627,6 +629,7 @@ int RGWBucket::check_object_index(const DoutPrefixProvider *dpp,
|
||||
|
||||
formatter->close_section();
|
||||
|
||||
// restore normal tag timeout for bucket
|
||||
bucket->set_tag_timeout(dpp, 0);
|
||||
|
||||
return 0;
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "common/errno.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "common/Throttle.h"
|
||||
#include "common/BackTrace.h"
|
||||
|
||||
#include "rgw_sal.h"
|
||||
#include "rgw_zone.h"
|
||||
@ -102,6 +103,9 @@
|
||||
using namespace std;
|
||||
using namespace librados;
|
||||
|
||||
#define ldout_bitx(_bitx, _ctx, _level) if(_bitx) { ldout(_ctx, 0) << "BITX: "
|
||||
#define dendl_bitx dendl ; }
|
||||
|
||||
static string shadow_ns = "shadow";
|
||||
static string default_bucket_index_pool_suffix = "rgw.buckets.index";
|
||||
static string default_storage_extra_pool_suffix = "rgw.buckets.non-ec";
|
||||
@ -891,11 +895,22 @@ void RGWIndexCompletionManager::process()
|
||||
|
||||
r = store->guard_reshard(&dpp, &bs, c->obj, bucket_info,
|
||||
[&](RGWRados::BucketShard *bs) -> int {
|
||||
const bool bitx = ctx()->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, ctx(), 10) <<
|
||||
"ENTERING " << __func__ << ": bucket-shard=" << bs <<
|
||||
" obj=" << c->obj << " tag=" << c->tag <<
|
||||
" op=" << c->op << ", remove_objs=" << c->remove_objs << dendl_bitx;
|
||||
ldout_bitx(bitx, ctx(), 25) <<
|
||||
"BACKTRACE: " << __func__ << ": " << ClibBackTrace(1) << dendl_bitx;
|
||||
|
||||
librados::ObjectWriteOperation o;
|
||||
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
|
||||
cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
|
||||
c->log_op, c->bilog_op, &c->zones_trace);
|
||||
return bs->bucket_obj.operate(&dpp, &o, null_yield);
|
||||
int ret = bs->bucket_obj.operate(&dpp, &o, null_yield);
|
||||
ldout_bitx(bitx, ctx(), 10) <<
|
||||
"EXITING " << __func__ << ": ret=" << dendl_bitx;
|
||||
return ret;
|
||||
});
|
||||
if (r < 0) {
|
||||
ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
|
||||
@ -8370,6 +8385,10 @@ bool RGWRados::process_expire_objects(const DoutPrefixProvider *dpp)
|
||||
int RGWRados::cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs, RGWModifyOp op, string& tag,
|
||||
rgw_obj& obj, uint16_t bilog_flags, optional_yield y, rgw_zone_set *_zones_trace)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket-shard=" << bs << " obj=" << obj << " tag=" << tag << " op=" << op << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 25) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl_bitx;
|
||||
|
||||
rgw_zone_set zones_trace;
|
||||
if (_zones_trace) {
|
||||
zones_trace = *_zones_trace;
|
||||
@ -8380,7 +8399,9 @@ int RGWRados::cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs,
|
||||
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
|
||||
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
|
||||
cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), svc.zone->get_zone().log_data, bilog_flags, zones_trace);
|
||||
return bs.bucket_obj.operate(dpp, &o, y);
|
||||
int ret = bs.bucket_obj.operate(dpp, &o, y);
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << ": ret=" << ret << dendl_bitx;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
|
||||
@ -8388,6 +8409,12 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
|
||||
rgw_bucket_dir_entry& ent, RGWObjCategory category,
|
||||
list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket-shard=" << bs <<
|
||||
" obj=" << obj << " tag=" << tag << " op=" << op <<
|
||||
", remove_objs=" << (remove_objs ? *remove_objs : std::list<rgw_obj_index_key>()) << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 25) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl_bitx;
|
||||
|
||||
ObjectWriteOperation o;
|
||||
rgw_bucket_dir_entry_meta dir_meta;
|
||||
dir_meta = ent.meta;
|
||||
@ -8412,6 +8439,8 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
|
||||
librados::AioCompletion *completion = arg->rados_completion;
|
||||
int ret = bs.bucket_obj.aio_operate(arg->rados_completion, &o);
|
||||
completion->release(); /* can't reference arg here, as it might have already been released */
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << ": ret=" << ret << dendl_bitx;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -8509,14 +8538,15 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
optional_yield y,
|
||||
RGWBucketListNameFilter force_check_filter)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
/* expansion_factor allows the number of entries to read to grow
|
||||
* exponentially; this is used when earlier reads are producing too
|
||||
* few results, perhaps due to filtering or to a series of
|
||||
* namespaced entries */
|
||||
|
||||
ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ": " <<
|
||||
bucket_info.bucket <<
|
||||
" start_after=\"" << start_after <<
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": " << bucket_info.bucket <<
|
||||
" start_after=\"" << start_after.to_string() <<
|
||||
"\", prefix=\"" << prefix <<
|
||||
", delimiter=\"" << delimiter <<
|
||||
"\", shard_id=" << shard_id <<
|
||||
@ -8524,7 +8554,8 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
", list_versions=" << list_versions <<
|
||||
", expansion_factor=" << expansion_factor <<
|
||||
", force_check_filter is " <<
|
||||
(force_check_filter ? "set" : "unset") << dendl;
|
||||
(force_check_filter ? "set" : "unset") << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 25) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl_bitx;
|
||||
|
||||
m.clear();
|
||||
|
||||
@ -8688,6 +8719,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
* well. */
|
||||
librados::IoCtx sub_ctx;
|
||||
sub_ctx.dup(ioctx);
|
||||
ldout_bitx(bitx, cct, 20) << "INFO: " << __func__ <<
|
||||
" calling check_disk_state bucket=" << bucket_info.bucket <<
|
||||
" entry=" << dirent.key << dendl_bitx;
|
||||
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent,
|
||||
updates[tracker.oid_name], y);
|
||||
if (r < 0 && r != -ENOENT) {
|
||||
@ -8747,6 +8781,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
// we don't care if we lose suggested updates, send them off blindly
|
||||
AioCompletion *c =
|
||||
librados::Rados::aio_create_completion(nullptr, nullptr);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
": doing dir_suggest on " << miter.first << dendl_bitx;
|
||||
ioctx.aio_operate(miter.first, c, &o);
|
||||
c->release();
|
||||
}
|
||||
@ -8781,6 +8818,7 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
": returning, last_entry NOT SET" << dendl;
|
||||
}
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -8814,15 +8852,16 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
|
||||
rgw_obj_index_key *last_entry,
|
||||
optional_yield y,
|
||||
RGWBucketListNameFilter force_check_filter) {
|
||||
ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << " " <<
|
||||
bucket_info.bucket <<
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": " << bucket_info.bucket <<
|
||||
" start_after=\"" << start_after <<
|
||||
"\", prefix=\"" << prefix <<
|
||||
"\", shard_id=" << shard_id <<
|
||||
"\", num_entries=" << num_entries <<
|
||||
", list_versions=" << list_versions <<
|
||||
", force_check_filter is " <<
|
||||
(force_check_filter ? "set" : "unset") << dendl;
|
||||
(force_check_filter ? "set" : "unset") << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 25) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl_bitx;
|
||||
|
||||
ent_list.clear();
|
||||
static MultipartMetaFilter multipart_meta_filter;
|
||||
@ -8907,7 +8946,7 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
|
||||
r = rgw_rados_operate(dpp, ioctx, oid, &op, nullptr, null_yield);
|
||||
if (r < 0) {
|
||||
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
|
||||
" error in rgw_rados_operate (bucket list op), r=" << r << dendl;
|
||||
": error in rgw_rados_operate (bucket list op), r=" << r << dendl;
|
||||
return r;
|
||||
}
|
||||
|
||||
@ -8923,10 +8962,13 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
|
||||
* and if the tags are old we need to do cleanup as well. */
|
||||
librados::IoCtx sub_ctx;
|
||||
sub_ctx.dup(ioctx);
|
||||
ldout_bitx(bitx, cct, 20) << "INFO: " << __func__ <<
|
||||
": calling check_disk_state bucket=" << bucket_info.bucket <<
|
||||
" entry=" << dirent.key << dendl_bitx;
|
||||
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, updates[oid], y);
|
||||
if (r < 0 && r != -ENOENT) {
|
||||
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
|
||||
" error in check_disk_state, r=" << r << dendl;
|
||||
": error in check_disk_state, r=" << r << dendl;
|
||||
return r;
|
||||
}
|
||||
} else {
|
||||
@ -8974,6 +9016,9 @@ check_updates:
|
||||
cls_rgw_suggest_changes(o, miter->second);
|
||||
// we don't care if we lose suggested updates, send them off blindly
|
||||
AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
" doing dir_suggest on " << miter->first << dendl_bitx;
|
||||
ioctx.aio_operate(miter->first, c, &o);
|
||||
c->release();
|
||||
}
|
||||
@ -8983,6 +9028,7 @@ check_updates:
|
||||
*last_entry = last_added_entry;
|
||||
}
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
} // RGWRados::cls_bucket_list_unordered
|
||||
|
||||
@ -9075,6 +9121,11 @@ int RGWRados::cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, string& oid
|
||||
|
||||
int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket=" << bucket_info.bucket <<
|
||||
" oid_list.size()=" << oid_list.size() << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 25) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl_bitx;
|
||||
|
||||
RGWSI_RADOS::Pool index_pool;
|
||||
string dir_oid;
|
||||
|
||||
@ -9089,16 +9140,22 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInf
|
||||
for (auto iter = oid_list.begin(); iter != oid_list.end(); ++iter) {
|
||||
rgw_bucket_dir_entry entry;
|
||||
entry.key = *iter;
|
||||
ldpp_dout(dpp, 2) << "RGWRados::remove_objs_from_index bucket=" << bucket_info.bucket << " obj=" << entry.key.name << ":" << entry.key.instance << dendl;
|
||||
ldout_bitx(bitx, cct, 5) << "INFO: " << __func__ <<
|
||||
": encoding removal of bucket=" << bucket_info.bucket <<
|
||||
" entry=" << entry.key << " in updates" << dendl_bitx;
|
||||
entry.ver.epoch = (uint64_t)-1; // ULLONG_MAX, needed to that objclass doesn't skip out request
|
||||
updates.append(CEPH_RGW_REMOVE | suggest_flag);
|
||||
encode(entry, updates);
|
||||
}
|
||||
|
||||
bufferlist out;
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
": calling dir_suggest on shards of dir=" << bucket_info.bucket << dendl_bitx;
|
||||
|
||||
bufferlist out;
|
||||
r = index_pool.ioctx().exec(dir_oid, RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates, out);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) <<
|
||||
"EXITING " << __func__ << " and returning " << r << dendl_bitx;
|
||||
return r;
|
||||
}
|
||||
|
||||
@ -9110,6 +9167,10 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
bufferlist& suggested_updates,
|
||||
optional_yield y)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket=" <<
|
||||
bucket_info.bucket << " dir_entry=" << list_state.key << dendl_bitx;
|
||||
|
||||
std::unique_ptr<rgw::sal::Bucket> bucket;
|
||||
store->get_bucket(nullptr, bucket_info, &bucket);
|
||||
uint8_t suggest_flag = (svc.zone->get_zone().log_data ? CEPH_RGW_DIR_SUGGEST_LOG_OP : 0);
|
||||
@ -9136,9 +9197,11 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
|
||||
list_state.pending_map.clear(); // we don't need this and it inflates size
|
||||
if (!list_state.is_delete_marker() && !astate->exists) {
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << ": disk state exists" << dendl_bitx;
|
||||
/* object doesn't exist right now -- hopefully because it's
|
||||
* marked as !exists and got deleted */
|
||||
if (list_state.exists) {
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << ": index list state exists" << dendl_bitx;
|
||||
/* FIXME: what should happen now? Work out if there are any
|
||||
* non-bad ways this could happen (there probably are, but annoying
|
||||
* to handle!) */
|
||||
@ -9147,6 +9210,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
// encode a suggested removal of that key
|
||||
list_state.ver.epoch = io_ctx.get_last_version();
|
||||
list_state.ver.pool = io_ctx.get_id();
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << ": encoding remove of " << list_state.key << " on suggested_updates" << dendl_bitx;
|
||||
cls_rgw_encode_suggestion(CEPH_RGW_REMOVE | suggest_flag, list_state, suggested_updates);
|
||||
return -ENOENT;
|
||||
}
|
||||
@ -9183,10 +9247,11 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
RGWSI_Tier_RADOS::raw_obj_to_obj(manifest->get_obj().bucket, raw_loc, &loc);
|
||||
|
||||
if (loc.key.ns == RGW_OBJ_NS_MULTIPART) {
|
||||
ldpp_dout(dpp, 0) << "check_disk_state(): removing manifest part from index: " << loc << dendl;
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << " removing manifest part from index loc=" << loc << dendl_bitx;
|
||||
r = delete_obj_index(loc, astate->mtime, dpp);
|
||||
if (r < 0) {
|
||||
ldpp_dout(dpp, 0) << "WARNING: delete_obj_index() returned r=" << r << dendl;
|
||||
ldout_bitx(bitx, cct, 0) <<
|
||||
"WARNING: " << __func__ << ": delete_obj_index returned r=" << r << dendl_bitx;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -9226,9 +9291,13 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
|
||||
list_state.exists = true;
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
": encoding update of " << list_state.key << " on suggested_updates" << dendl_bitx;
|
||||
cls_rgw_encode_suggestion(CEPH_RGW_UPDATE | suggest_flag, list_state, suggested_updates);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
}
|
||||
} // RGWRados::check_disk_state
|
||||
|
||||
int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
|
||||
{
|
||||
|
@ -4,6 +4,7 @@
|
||||
#ifndef CEPH_RGWRADOS_H
|
||||
#define CEPH_RGWRADOS_H
|
||||
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <boost/container/flat_map.hpp>
|
||||
|
||||
@ -658,6 +659,13 @@ public:
|
||||
int init(const rgw_bucket& _bucket, int sid, const rgw::bucket_index_layout_generation& idx_layout, RGWBucketInfo* out, const DoutPrefixProvider *dpp);
|
||||
int init(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj);
|
||||
int init(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int sid);
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const BucketShard& bs) {
|
||||
out << "BucketShard:{ bucket=" << bs.bucket <<
|
||||
", shard_id=" << bs.shard_id <<
|
||||
", bucket_ojb=" << bs.bucket_obj << "}";
|
||||
return out;
|
||||
}
|
||||
};
|
||||
|
||||
class Object {
|
||||
|
Loading…
Reference in New Issue
Block a user