mirror of
https://github.com/ceph/ceph
synced 2025-03-18 16:36:12 +00:00
rgw: additional configurable CLS instrumentation on bucket index TXs
This supplements an earlier commit to add additional instrumentation on bucket index transactions on the CLS side. The instrumentation is triggered by setting the configuration option rgw_bucket_index_transaction_instrumentation in the [global] section. Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
This commit is contained in:
parent
cdfc29ff5a
commit
85f4ee7811
@ -14,6 +14,7 @@
|
||||
#include "common/strtol.h"
|
||||
#include "common/escape.h"
|
||||
#include "common/config_proxy.h"
|
||||
#include "osd/osd_types.h"
|
||||
|
||||
#include "include/compat.h"
|
||||
#include <boost/lexical_cast.hpp>
|
||||
@ -857,14 +858,16 @@ static int read_key_entry(cls_method_context_t hctx, const cls_rgw_obj_key& key,
|
||||
|
||||
int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(10, "entered %s", __func__);
|
||||
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
// decode request
|
||||
rgw_cls_obj_prepare_op op;
|
||||
auto iter = in->cbegin();
|
||||
@ -882,7 +885,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"%s: request: op=%d name=%s instance=%s tag=%s", __func__,
|
||||
"INFO: %s: request: op=%d name=%s instance=%s tag=%s", __func__,
|
||||
op.op, op.key.name.c_str(), op.key.instance.c_str(), op.tag.c_str());
|
||||
|
||||
// get on-disk state
|
||||
@ -913,11 +916,17 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
info.timestamp = real_clock::now();
|
||||
info.state = CLS_RGW_STATE_PENDING_MODIFY;
|
||||
info.op = op.op;
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: inserting tag %s op %c into pending map for entry %s",
|
||||
__func__, op.tag, info.op, entry.key.to_string().c_str());
|
||||
entry.pending_map.insert(pair<string, rgw_bucket_pending_info>(op.tag, info));
|
||||
|
||||
// write out new key to disk
|
||||
bufferlist info_bl;
|
||||
encode(entry, info_bl);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, idx.c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &info_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
@ -926,6 +935,8 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1, "EXITING %s, returning 0", __func__);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1061,14 +1072,16 @@ static int complete_remove_obj(cls_method_context_t hctx,
|
||||
|
||||
int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(10, "entered %s", __func__);
|
||||
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
// decode request
|
||||
rgw_cls_obj_complete_op op;
|
||||
auto iter = in->cbegin();
|
||||
@ -1080,7 +1093,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"%s: request: op=%d name=%s instance=%s ver=%lu:%llu tag=%s",
|
||||
"INFO: %s: request: op=%d name=%s instance=%s ver=%lu:%llu tag=%s",
|
||||
__func__,
|
||||
op.op, op.key.name.c_str(), op.key.instance.c_str(),
|
||||
(unsigned long)op.ver.pool, (unsigned long long)op.ver.epoch,
|
||||
@ -1124,14 +1137,17 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
__func__, op.tag.c_str());
|
||||
return -EINVAL;
|
||||
}
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"INFO: %s: removing tag %s from pending map",
|
||||
__func__, op.tag.c_str());
|
||||
entry.pending_map.erase(pinter);
|
||||
}
|
||||
|
||||
if (op.tag.size() && op.op == CLS_RGW_OP_CANCEL) {
|
||||
CLS_LOG_BITX(bitx_inst, 1, "%s: cancel requested", __func__);
|
||||
CLS_LOG_BITX(bitx_inst, 20, "INFO: %s: op is cancel", __func__);
|
||||
} else if (op.ver.pool == entry.ver.pool &&
|
||||
op.ver.epoch && op.ver.epoch <= entry.ver.epoch) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"%s: skipping request, old epoch", __func__);
|
||||
op.op = CLS_RGW_OP_CANCEL;
|
||||
}
|
||||
@ -1146,6 +1162,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
log_op = false; // don't log cancelation
|
||||
if (op.tag.size()) {
|
||||
// we removed this tag from pending_map so need to write the changes
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, idx.c_str());
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
@ -1161,11 +1180,20 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
// unaccount deleted entry
|
||||
unaccount_entry(header, entry);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: delete op, key=%s",
|
||||
__func__, idx.c_str());
|
||||
entry.meta = op.meta;
|
||||
if (!ondisk) {
|
||||
// no entry to erase
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: key=%s not on disk, no action",
|
||||
__func__, idx.c_str());
|
||||
log_op = false;
|
||||
} else if (!entry.pending_map.size()) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: removing map entry with key=%s",
|
||||
__func__, idx.c_str());
|
||||
rc = cls_cxx_map_remove_key(hctx, idx);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
@ -1177,6 +1205,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
entry.exists = false;
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, idx.c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
@ -1187,6 +1218,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
}
|
||||
} // CLS_RGW_OP_DEL
|
||||
else if (op.op == CLS_RGW_OP_ADD) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: add op, key=%s",
|
||||
__func__, idx.c_str());
|
||||
// unaccount overwritten entry
|
||||
unaccount_entry(header, entry);
|
||||
|
||||
@ -1203,6 +1237,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
stats.actual_size += meta.size;
|
||||
bufferlist new_key_bl;
|
||||
encode(entry, new_key_bl);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, idx.c_str());
|
||||
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
@ -1218,9 +1255,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
header.max_marker, op.bilog_flags, NULL, NULL,
|
||||
&op.zones_trace);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"%s: log_index_operation failed with rc=%d",
|
||||
__func__, rc);
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"%s: log_index_operation failed with rc=%d",
|
||||
__func__, rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
@ -1228,17 +1265,22 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
CLS_LOG(20, "rgw_bucket_complete_op(): remove_objs.size()=%d",
|
||||
(int)op.remove_objs.size());
|
||||
for (const auto& remove_key : op.remove_objs) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: completing object remove key=%s",
|
||||
__func__, remove_key.to_string().c_str());
|
||||
rc = complete_remove_obj(hctx, header, remove_key, default_log_op);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 1,
|
||||
"%s: cls_cxx_map_remove_key, failed to remove entry, "
|
||||
"name=%s instance=%s read_index_entry ret=%d",
|
||||
"WARNING: %s: complete_remove_obj, failed to remove entry, "
|
||||
"name=%s instance=%s read_index_entry ret=%d, continuing",
|
||||
__func__, remove_key.name.c_str(),
|
||||
remove_key.instance.c_str(), rc);
|
||||
continue; // part cleanup errors are not fatal
|
||||
}
|
||||
}
|
||||
} // remove loop
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"INFO: %s: writing bucket header", __func__);
|
||||
rc = write_bucket_header(hctx, &header);
|
||||
if (rc < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
@ -1246,8 +1288,10 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
|
||||
__func__, rc);
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"EXITING %s: returning %d", __func__, rc);
|
||||
return rc;
|
||||
}
|
||||
} // rgw_bucket_complete_op
|
||||
|
||||
template <class T>
|
||||
static int write_entry(cls_method_context_t hctx, T& entry, const string& key)
|
||||
@ -2225,14 +2269,16 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
|
||||
int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
bufferlist *in, bufferlist *out)
|
||||
{
|
||||
CLS_LOG(1, "entered %s", __func__);
|
||||
|
||||
const ConfigProxy& conf = cls_get_config(hctx);
|
||||
const object_info_t& oi = cls_get_object_info(hctx);
|
||||
|
||||
// bucket index transaction instrumentation
|
||||
const bool bitx_inst =
|
||||
conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1, "ENTERING %s for object oid=%s key=%s",
|
||||
__func__, oi.soid.oid.name.c_str(), oi.soid.get_key().c_str());
|
||||
|
||||
bufferlist header_bl;
|
||||
rgw_bucket_dir_header header;
|
||||
bool header_changed = false;
|
||||
@ -2255,6 +2301,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
(config_op_expiration ?
|
||||
config_op_expiration :
|
||||
CEPH_RGW_DEFAULT_TAG_TIMEOUT)));
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: tag_timeout=%ld", __func__, tag_timeout.count());
|
||||
|
||||
auto in_iter = in->cbegin();
|
||||
|
||||
@ -2272,20 +2319,31 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
}
|
||||
|
||||
bufferlist cur_disk_bl;
|
||||
// check if the log op flag is set and strip it from the op
|
||||
bool log_op = (op & CEPH_RGW_DIR_SUGGEST_LOG_OP) != 0;
|
||||
op &= CEPH_RGW_DIR_SUGGEST_OP_MASK;
|
||||
|
||||
string cur_change_key;
|
||||
encode_obj_index_key(cur_change.key, &cur_change_key);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"INFO: %s: op=%c, cur_change_key=%s, cur_change.exists=%d",
|
||||
__func__, op, escape_str(cur_change_key).c_str(), cur_change.exists);
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
int ret = cls_cxx_map_get_val(hctx, cur_change_key, &cur_disk_bl);
|
||||
if (ret < 0 && ret != -ENOENT) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"ERROR: %s: accessing map, key=%s error=%d", __func__,
|
||||
cur_change_key.c_str(), ret);
|
||||
escape_str(cur_change_key).c_str(), ret);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (ret == -ENOENT) {
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"WARNING: %s: accessing map, key not found key=%s, continuing",
|
||||
__func__, cur_change_key.c_str());
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -2318,9 +2376,10 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
} // while
|
||||
} // if
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20, "cur_disk.pending_map.empty()=%d op=%d cur_disk.exists=%d "
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: cur_disk.pending_map.empty()=%d op=%c cur_disk.exists=%d "
|
||||
"cur_disk.index_ver=%d cur_change.exists=%d cur_change.index_ver=%d",
|
||||
cur_disk.pending_map.empty(), (int)op, cur_disk.exists,
|
||||
__func__, cur_disk.pending_map.empty(), op, cur_disk.exists,
|
||||
(int)cur_disk.index_ver, cur_change.exists,
|
||||
(int)cur_change.index_ver);
|
||||
|
||||
@ -2332,9 +2391,10 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
}
|
||||
|
||||
if (cur_disk.pending_map.empty()) {
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: cur_disk.pending_map is empty", __func__);
|
||||
if (cur_disk.exists) {
|
||||
rgw_bucket_category_stats& old_stats = header.stats[cur_disk.meta.category];
|
||||
CLS_LOG_BITX(bitx_inst, 10, "total_entries: %" PRId64 " -> %" PRId64 "\n", old_stats.num_entries, old_stats.num_entries - 1);
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: stats.num_entries: %ld -> %ld", __func__, old_stats.num_entries, old_stats.num_entries - 1);
|
||||
old_stats.num_entries--;
|
||||
old_stats.total_size -= cur_disk.meta.accounted_size;
|
||||
old_stats.total_size_rounded -= cls_rgw_get_rounded_size(cur_disk.meta.accounted_size);
|
||||
@ -2342,17 +2402,22 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
header_changed = true;
|
||||
}
|
||||
rgw_bucket_category_stats& stats = header.stats[cur_change.meta.category];
|
||||
bool log_op = (op & CEPH_RGW_DIR_SUGGEST_LOG_OP) != 0;
|
||||
op &= CEPH_RGW_DIR_SUGGEST_OP_MASK;
|
||||
|
||||
switch(op) {
|
||||
case CEPH_RGW_REMOVE:
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"CEPH_RGW_REMOVE name=%s instance=%s",
|
||||
cur_change.key.name.c_str(), cur_change.key.instance.c_str());
|
||||
"INFO: %s: CEPH_RGW_REMOVE name=%s instance=%s encoded=%s",
|
||||
__func__, cur_change.key.name.c_str(),
|
||||
cur_change.key.instance.c_str(),
|
||||
escape_str(cur_change_key).c_str());
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: removing map entry with key=%s",
|
||||
__func__, escape_str(cur_change_key).c_str());
|
||||
ret = cls_cxx_map_remove_key(hctx, cur_change_key);
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to remove key, key=%s, error=%d",
|
||||
__func__, cur_change_key.c_str(), ret);
|
||||
__func__, escape_str(cur_change_key).c_str(), ret);
|
||||
return ret;
|
||||
}
|
||||
if (log_op && cur_disk.exists && !header.syncstopped) {
|
||||
@ -2367,8 +2432,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
break;
|
||||
case CEPH_RGW_UPDATE:
|
||||
CLS_LOG_BITX(bitx_inst, 10,
|
||||
"CEPH_RGW_UPDATE name=%s instance=%s total_entries: %" PRId64 " -> %" PRId64 "\n",
|
||||
cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1);
|
||||
"INFO: %s: CEPH_RGW_UPDATE name=%s instance=%s stats.num_entries: %ld -> %ld",
|
||||
__func__, cur_change.key.name.c_str(), cur_change.key.instance.c_str(),
|
||||
stats.num_entries, stats.num_entries + 1);
|
||||
|
||||
stats.num_entries++;
|
||||
stats.total_size += cur_change.meta.accounted_size;
|
||||
@ -2378,10 +2444,14 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
cur_change.index_ver = header.ver;
|
||||
bufferlist cur_state_bl;
|
||||
encode(cur_change, cur_state_bl);
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 20,
|
||||
"INFO: %s: setting map entry at key=%s",
|
||||
__func__, cur_change.key.name.c_str());
|
||||
ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl);
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0, "ERROR: %s: unable to set value for key, key=%s, error=%d",
|
||||
__func__, cur_change_key.c_str(), ret);
|
||||
__func__, escape_str(cur_change_key).c_str(), ret);
|
||||
return ret;
|
||||
}
|
||||
if (log_op && !header.syncstopped) {
|
||||
@ -2398,15 +2468,19 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
|
||||
} // while (!in_iter.end())
|
||||
|
||||
if (header_changed) {
|
||||
CLS_LOG_BITX(bitx_inst, 10, "INFO: %s: header changed, writing", __func__);
|
||||
int ret = write_bucket_header(hctx, &header);
|
||||
if (ret < 0) {
|
||||
CLS_LOG_BITX(bitx_inst, 0,
|
||||
"ERROR: %s: failed to write bucket header ret=%d",
|
||||
__func__, ret);
|
||||
} else {
|
||||
CLS_LOG_BITX(bitx_inst, 1, "EXITING %s, returning %d", __func__, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
CLS_LOG_BITX(bitx_inst, 1, "EXITING %s, returning 0", __func__);
|
||||
return 0;
|
||||
} // rgw_dir_suggest_changes
|
||||
|
||||
|
@ -15,8 +15,8 @@
|
||||
|
||||
#include "rgw/rgw_basic_types.h"
|
||||
|
||||
#define CEPH_RGW_REMOVE 'r'
|
||||
#define CEPH_RGW_UPDATE 'u'
|
||||
#define CEPH_RGW_REMOVE 'r' // value 114
|
||||
#define CEPH_RGW_UPDATE 'u' // value 117
|
||||
#define CEPH_RGW_DIR_SUGGEST_LOG_OP 0x80
|
||||
#define CEPH_RGW_DIR_SUGGEST_OP_MASK 0x7f
|
||||
|
||||
|
@ -503,6 +503,13 @@ const ConfigProxy& cls_get_config(cls_method_context_t hctx)
|
||||
return *dummy;
|
||||
}
|
||||
|
||||
const object_info_t& cls_get_object_info(cls_method_context_t hctx)
|
||||
{
|
||||
// FIXME ; segfault if ever called
|
||||
static object_info_t* dummy = nullptr;
|
||||
return *dummy;
|
||||
}
|
||||
|
||||
int cls_get_snapset_seq(cls_method_context_t hctx, uint64_t *snap_seq)
|
||||
{
|
||||
return 0;
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include "common/debug.h"
|
||||
|
||||
#include "objclass/objclass.h"
|
||||
#include "osd/osd_internal_types.h"
|
||||
|
||||
#include "osd/ClassHandler.h"
|
||||
|
||||
#include "auth/Crypto.h"
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
struct obj_list_watch_response_t;
|
||||
class PGLSFilter;
|
||||
class object_info_t;
|
||||
|
||||
extern "C" {
|
||||
#endif
|
||||
@ -143,6 +144,7 @@ extern uint64_t cls_get_client_features(cls_method_context_t hctx);
|
||||
extern ceph_release_t cls_get_required_osd_release(cls_method_context_t hctx);
|
||||
extern ceph_release_t cls_get_min_compatible_client(cls_method_context_t hctx);
|
||||
extern const ConfigProxy& cls_get_config(cls_method_context_t hctx);
|
||||
extern const object_info_t& cls_get_object_info(cls_method_context_t hctx);
|
||||
|
||||
/* helpers */
|
||||
extern void cls_cxx_subop_version(cls_method_context_t hctx, std::string *s);
|
||||
|
@ -617,7 +617,6 @@ uint64_t cls_current_version(cls_method_context_t hctx)
|
||||
return ctx->pg->get_last_user_version();
|
||||
}
|
||||
|
||||
|
||||
int cls_current_subop_num(cls_method_context_t hctx)
|
||||
{
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
@ -655,6 +654,12 @@ const ConfigProxy& cls_get_config(cls_method_context_t hctx)
|
||||
return ctx->pg->get_cct()->_conf;
|
||||
}
|
||||
|
||||
const object_info_t& cls_get_object_info(cls_method_context_t hctx)
|
||||
{
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
return ctx->obs->oi;
|
||||
}
|
||||
|
||||
int cls_get_snapset_seq(cls_method_context_t hctx, uint64_t *snap_seq) {
|
||||
PrimaryLogPG::OpContext *ctx = *(PrimaryLogPG::OpContext **)hctx;
|
||||
if (!ctx->new_obs.exists || (ctx->new_obs.oi.is_whiteout() &&
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "common/errno.h"
|
||||
#include "common/Formatter.h"
|
||||
#include "common/Throttle.h"
|
||||
#include "common/BackTrace.h"
|
||||
|
||||
#include "rgw_sal.h"
|
||||
#include "rgw_zone.h"
|
||||
@ -102,6 +103,9 @@
|
||||
using namespace std;
|
||||
using namespace librados;
|
||||
|
||||
#define ldout_bitx(_bitx, _ctx, _level) if(_bitx) { ldout(_ctx, 0) << "BITX: "
|
||||
#define dendl_bitx dendl ; }
|
||||
|
||||
static string shadow_ns = "shadow";
|
||||
static string default_bucket_index_pool_suffix = "rgw.buckets.index";
|
||||
static string default_storage_extra_pool_suffix = "rgw.buckets.non-ec";
|
||||
@ -8370,6 +8374,10 @@ bool RGWRados::process_expire_objects(const DoutPrefixProvider *dpp)
|
||||
int RGWRados::cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs, RGWModifyOp op, string& tag,
|
||||
rgw_obj& obj, uint16_t bilog_flags, optional_yield y, rgw_zone_set *_zones_trace)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket-shard=" << bs << " obj=" << obj << " tag=" << tag << " op=" << op << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 10) << "BACKTRACE: " << __func__ << ": " << BackTrace(0) << dendl_bitx;
|
||||
|
||||
rgw_zone_set zones_trace;
|
||||
if (_zones_trace) {
|
||||
zones_trace = *_zones_trace;
|
||||
@ -8380,7 +8388,9 @@ int RGWRados::cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs,
|
||||
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
|
||||
cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
|
||||
cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), svc.zone->get_zone().log_data, bilog_flags, zones_trace);
|
||||
return bs.bucket_obj.operate(dpp, &o, y);
|
||||
int ret = bs.bucket_obj.operate(dpp, &o, y);
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << ": ret=" << ret << dendl_bitx;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
|
||||
@ -8388,6 +8398,12 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
|
||||
rgw_bucket_dir_entry& ent, RGWObjCategory category,
|
||||
list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket-shard=" << bs <<
|
||||
" obj=" << obj << " tag=" << tag << " op=" << op <<
|
||||
", remove_objs=" << (remove_objs ? *remove_objs : std::list<rgw_obj_index_key>()) << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 10) << "BACKTRACE: " << __func__ << ": " << BackTrace(0) << dendl_bitx;
|
||||
|
||||
ObjectWriteOperation o;
|
||||
rgw_bucket_dir_entry_meta dir_meta;
|
||||
dir_meta = ent.meta;
|
||||
@ -8412,6 +8428,8 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
|
||||
librados::AioCompletion *completion = arg->rados_completion;
|
||||
int ret = bs.bucket_obj.aio_operate(arg->rados_completion, &o);
|
||||
completion->release(); /* can't reference arg here, as it might have already been released */
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << ": ret=" << ret << dendl_bitx;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -8509,22 +8527,25 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
optional_yield y,
|
||||
RGWBucketListNameFilter force_check_filter)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
/* expansion_factor allows the number of entries to read to grow
|
||||
* exponentially; this is used when earlier reads are producing too
|
||||
* few results, perhaps due to filtering or to a series of
|
||||
* namespaced entries */
|
||||
|
||||
ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << ": " <<
|
||||
bucket_info.bucket <<
|
||||
" start_after=\"" << start_after <<
|
||||
"\", prefix=\"" << prefix <<
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": " << bucket_info.bucket <<
|
||||
" start_after=\"" << start_after.name <<
|
||||
"[" << start_after.instance <<
|
||||
"]\", prefix=\"" << prefix <<
|
||||
", delimiter=\"" << delimiter <<
|
||||
"\", shard_id=" << shard_id <<
|
||||
"\", num_entries=" << num_entries <<
|
||||
"\" num_entries=" << num_entries <<
|
||||
", list_versions=" << list_versions <<
|
||||
", expansion_factor=" << expansion_factor <<
|
||||
", force_check_filter is " <<
|
||||
(force_check_filter ? "set" : "unset") << dendl;
|
||||
(force_check_filter ? "set" : "unset") << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 20) << "BACKTRACE: " << __func__ << ": " << BackTrace(0) << dendl_bitx;
|
||||
|
||||
m.clear();
|
||||
|
||||
@ -8688,6 +8709,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
* well. */
|
||||
librados::IoCtx sub_ctx;
|
||||
sub_ctx.dup(ioctx);
|
||||
ldout_bitx(bitx, cct, 20) << "INFO: " << __func__ <<
|
||||
" calling check_disk_state bucket=" << bucket_info.bucket <<
|
||||
" entry=" << dirent.key << dendl_bitx;
|
||||
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent,
|
||||
updates[tracker.oid_name], y);
|
||||
if (r < 0 && r != -ENOENT) {
|
||||
@ -8747,6 +8771,9 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
// we don't care if we lose suggested updates, send them off blindly
|
||||
AioCompletion *c =
|
||||
librados::Rados::aio_create_completion(nullptr, nullptr);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
" doing dir_suggest on " << miter.first << dendl_bitx;
|
||||
ioctx.aio_operate(miter.first, c, &o);
|
||||
c->release();
|
||||
}
|
||||
@ -8781,6 +8808,7 @@ int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
|
||||
": returning, last_entry NOT SET" << dendl;
|
||||
}
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -8814,15 +8842,16 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
|
||||
rgw_obj_index_key *last_entry,
|
||||
optional_yield y,
|
||||
RGWBucketListNameFilter force_check_filter) {
|
||||
ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ << " " <<
|
||||
bucket_info.bucket <<
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": " << bucket_info.bucket <<
|
||||
" start_after=\"" << start_after <<
|
||||
"\", prefix=\"" << prefix <<
|
||||
"\", shard_id=" << shard_id <<
|
||||
"\", num_entries=" << num_entries <<
|
||||
", list_versions=" << list_versions <<
|
||||
", force_check_filter is " <<
|
||||
(force_check_filter ? "set" : "unset") << dendl;
|
||||
(force_check_filter ? "set" : "unset") << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 20) << "BACKTRACE: " << __func__ << ": " << BackTrace(0) << dendl_bitx;
|
||||
|
||||
ent_list.clear();
|
||||
static MultipartMetaFilter multipart_meta_filter;
|
||||
@ -8923,6 +8952,9 @@ int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
|
||||
* and if the tags are old we need to do cleanup as well. */
|
||||
librados::IoCtx sub_ctx;
|
||||
sub_ctx.dup(ioctx);
|
||||
ldout_bitx(bitx, cct, 20) << "INFO: " << __func__ <<
|
||||
" calling check_disk_state bucket=" << bucket_info.bucket <<
|
||||
" entry=" << dirent.key << dendl_bitx;
|
||||
r = check_disk_state(dpp, sub_ctx, bucket_info, dirent, dirent, updates[oid], y);
|
||||
if (r < 0 && r != -ENOENT) {
|
||||
ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
|
||||
@ -8974,6 +9006,9 @@ check_updates:
|
||||
cls_rgw_suggest_changes(o, miter->second);
|
||||
// we don't care if we lose suggested updates, send them off blindly
|
||||
AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
" doing dir_suggest on " << miter->first << dendl_bitx;
|
||||
ioctx.aio_operate(miter->first, c, &o);
|
||||
c->release();
|
||||
}
|
||||
@ -8983,6 +9018,7 @@ check_updates:
|
||||
*last_entry = last_added_entry;
|
||||
}
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
} // RGWRados::cls_bucket_list_unordered
|
||||
|
||||
@ -9075,6 +9111,11 @@ int RGWRados::cls_obj_usage_log_clear(const DoutPrefixProvider *dpp, string& oid
|
||||
|
||||
int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << ": bucket=" << bucket_info.bucket <<
|
||||
" entry_key_list.size()=" << entry_key_list.size() << dendl_bitx;
|
||||
ldout_bitx(bitx, cct, 20) << "BACKTRACE: " << __func__ << ": " << BackTrace(0) << dendl_bitx;
|
||||
|
||||
RGWSI_RADOS::Pool index_pool;
|
||||
string dir_oid;
|
||||
|
||||
@ -9089,16 +9130,23 @@ int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInf
|
||||
for (auto iter = oid_list.begin(); iter != oid_list.end(); ++iter) {
|
||||
rgw_bucket_dir_entry entry;
|
||||
entry.key = *iter;
|
||||
ldpp_dout(dpp, 2) << "RGWRados::remove_objs_from_index bucket=" << bucket_info.bucket << " obj=" << entry.key.name << ":" << entry.key.instance << dendl;
|
||||
ldout_bitx(bitx, cct, 5) << "INFO: " << __func__ <<
|
||||
": encoding removal of bucket=" << bucket_info.bucket <<
|
||||
" shard=" << shard <<
|
||||
" entry=" << entry.key << " in updates" << dendl_bitx;
|
||||
entry.ver.epoch = (uint64_t)-1; // ULLONG_MAX, needed to that objclass doesn't skip out request
|
||||
updates.append(CEPH_RGW_REMOVE | suggest_flag);
|
||||
encode(entry, updates);
|
||||
}
|
||||
|
||||
bufferlist out;
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
": calling dir_suggest on shards of dir=" << bucket_info.bucket << dendl_bitx;
|
||||
|
||||
bufferlist out;
|
||||
r = index_pool.ioctx().exec(dir_oid, RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates, out);
|
||||
|
||||
ldout_bitx(bitx, cct, 20) <<
|
||||
"EXITING " << __func__ << " and returning " << r << dendl_bitx;
|
||||
return r;
|
||||
}
|
||||
|
||||
@ -9110,6 +9158,10 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
bufferlist& suggested_updates,
|
||||
optional_yield y)
|
||||
{
|
||||
const bool bitx = cct->_conf->rgw_bucket_index_transaction_instrumentation;
|
||||
ldout_bitx(bitx, cct, 10) << "ENTERING " << __func__ << " bucket=" <<
|
||||
bucket_info.bucket << " dir_entry=" << list_state.key << dendl_bitx;
|
||||
|
||||
std::unique_ptr<rgw::sal::Bucket> bucket;
|
||||
store->get_bucket(nullptr, bucket_info, &bucket);
|
||||
uint8_t suggest_flag = (svc.zone->get_zone().log_data ? CEPH_RGW_DIR_SUGGEST_LOG_OP : 0);
|
||||
@ -9136,9 +9188,11 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
|
||||
list_state.pending_map.clear(); // we don't need this and it inflates size
|
||||
if (!list_state.is_delete_marker() && !astate->exists) {
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << " disk state exists" << dendl_bitx;
|
||||
/* object doesn't exist right now -- hopefully because it's
|
||||
* marked as !exists and got deleted */
|
||||
if (list_state.exists) {
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << " index list state exists" << dendl_bitx;
|
||||
/* FIXME: what should happen now? Work out if there are any
|
||||
* non-bad ways this could happen (there probably are, but annoying
|
||||
* to handle!) */
|
||||
@ -9147,6 +9201,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
// encode a suggested removal of that key
|
||||
list_state.ver.epoch = io_ctx.get_last_version();
|
||||
list_state.ver.pool = io_ctx.get_id();
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << " encoding remove of " << list_state.key << " on suggested_updates" << dendl_bitx;
|
||||
cls_rgw_encode_suggestion(CEPH_RGW_REMOVE | suggest_flag, list_state, suggested_updates);
|
||||
return -ENOENT;
|
||||
}
|
||||
@ -9183,10 +9238,11 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
RGWSI_Tier_RADOS::raw_obj_to_obj(manifest->get_obj().bucket, raw_loc, &loc);
|
||||
|
||||
if (loc.key.ns == RGW_OBJ_NS_MULTIPART) {
|
||||
ldpp_dout(dpp, 0) << "check_disk_state(): removing manifest part from index: " << loc << dendl;
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ << " removing manifest part from index loc=" << loc << dendl_bitx;
|
||||
r = delete_obj_index(loc, astate->mtime, dpp);
|
||||
if (r < 0) {
|
||||
ldpp_dout(dpp, 0) << "WARNING: delete_obj_index() returned r=" << r << dendl;
|
||||
ldout_bitx(bitx, cct, 0) <<
|
||||
"WARNING: " << __func__ << ": delete_obj_index returned r=" << r << dendl_bitx;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -9226,9 +9282,13 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
|
||||
|
||||
list_state.exists = true;
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "INFO: " << __func__ <<
|
||||
" encoding update of " << list_state.key << " on suggested_updates" << dendl_bitx;
|
||||
cls_rgw_encode_suggestion(CEPH_RGW_UPDATE | suggest_flag, list_state, suggested_updates);
|
||||
|
||||
ldout_bitx(bitx, cct, 10) << "EXITING " << __func__ << dendl_bitx;
|
||||
return 0;
|
||||
}
|
||||
} // RGWRados::check_disk_state
|
||||
|
||||
int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
|
||||
{
|
||||
|
@ -4,6 +4,7 @@
|
||||
#ifndef CEPH_RGWRADOS_H
|
||||
#define CEPH_RGWRADOS_H
|
||||
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <boost/container/flat_map.hpp>
|
||||
|
||||
@ -658,6 +659,13 @@ public:
|
||||
int init(const rgw_bucket& _bucket, int sid, const rgw::bucket_index_layout_generation& idx_layout, RGWBucketInfo* out, const DoutPrefixProvider *dpp);
|
||||
int init(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj);
|
||||
int init(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int sid);
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const BucketShard& bs) {
|
||||
out << "BucketShard:{ bucket=" << bs.bucket <<
|
||||
", shard_id=" << bs.shard_id <<
|
||||
", bucket_ojb=" << bs.bucket_obj << "}";
|
||||
return out;
|
||||
}
|
||||
};
|
||||
|
||||
class Object {
|
||||
|
Loading…
Reference in New Issue
Block a user