RGW: Add observability over the persistent topics queue

Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
This commit is contained in:
Ali Masarwa 2023-06-06 16:20:40 +03:00
parent a58490c1bd
commit 9d64b3f3e6
10 changed files with 268 additions and 13 deletions

View File

@ -55,6 +55,36 @@ static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in,
return 0;
}
static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
cls_queue_get_stats_ret op_ret;
// get head
cls_queue_head head;
auto ret = queue_read_head(hctx, head);
if (ret < 0) {
return ret;
}
const auto remaining_size = (head.tail.offset >= head.front.offset) ?
(head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
head.front.offset - head.tail.offset;
op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size;
cls_2pc_urgent_data urgent_data;
try {
auto in_iter = head.bl_urgent_data.cbegin();
decode(urgent_data, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
return -EINVAL;
}
op_ret.queue_entries = urgent_data.committed_entries;
encode(op_ret, *out);
return 0;
}
static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
cls_2pc_queue_reserve_op res_op;
try {
@ -112,7 +142,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
cls_2pc_reservations::iterator last_reservation;
std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
@ -148,7 +178,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
}
std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
@ -268,6 +298,7 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe
}
urgent_data.reserved_size -= res.size;
urgent_data.committed_entries += res.entries;
if (xattr_reservations.empty()) {
// remove the reservation from urgent data
@ -577,6 +608,7 @@ CLS_INIT(2pc_queue)
cls_handle_t h_class;
cls_method_handle_t h_2pc_queue_init;
cls_method_handle_t h_2pc_queue_get_capacity;
cls_method_handle_t h_2pc_queue_get_topic_stats;
cls_method_handle_t h_2pc_queue_reserve;
cls_method_handle_t h_2pc_queue_commit;
cls_method_handle_t h_2pc_queue_abort;
@ -589,6 +621,7 @@ CLS_INIT(2pc_queue)
cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats);
cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);

View File

@ -31,6 +31,21 @@ int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
return 0;
}
int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) {
cls_queue_get_stats_ret op_ret;
auto iter = bl.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
committed_entries = op_ret.queue_entries;
size = op_ret.queue_size;
return 0;
}
#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
bufferlist in, out;
@ -44,12 +59,31 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uin
#endif
// optionally async method for getting capacity (bytes)
// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
}
#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) {
bufferlist in, out;
const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out);
if (r < 0 ) {
return r;
}
return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size);
}
#endif
// optionally async method for getting number of commited entries and size (bytes)
// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval);
}
int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
cls_2pc_queue_reserve_ret op_ret;

View File

@ -19,6 +19,8 @@ void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& q
#ifndef CLS_CLIENT_HIDE_IOCTX
// return capacity (bytes)
int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size);
// return the number of committed entries and size (bytes)
int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size);
// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// return a reservation id if reservations is possible, 0 otherwise
@ -37,7 +39,12 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string&
// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
// optionally async method for getting capacity (bytes)
// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size);
// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// notes:

View File

@ -4,6 +4,7 @@
#define TPC_QUEUE_INIT "2pc_queue_init"
#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats"
#define TPC_QUEUE_RESERVE "2pc_queue_reserve"
#define TPC_QUEUE_COMMIT "2pc_queue_commit"
#define TPC_QUEUE_ABORT "2pc_queue_abort"

View File

@ -8,25 +8,30 @@ struct cls_2pc_reservation
{
using id_t = uint32_t;
inline static const id_t NO_ID{0};
uint64_t size; // how many entries are reserved
uint64_t size; // how much size to reserve (bytes)
ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations)
uint32_t entries; // how many entries are reserved
cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) :
size(_size), timestamp(_timestamp) {}
cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) :
size(_size), timestamp(_timestamp), entries(_entries) {}
cls_2pc_reservation() = default;
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
ENCODE_START(2, 1, bl);
encode(size, bl);
encode(timestamp, bl);
encode(entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
DECODE_START(2, bl);
decode(size, bl);
decode(timestamp, bl);
if (struct_v >= 2) {
decode(entries, bl);
}
DECODE_FINISH(bl);
}
};
@ -40,22 +45,27 @@ struct cls_2pc_urgent_data
cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
cls_2pc_reservations reservations; // reservation list (keyed by id)
bool has_xattrs{false};
uint32_t committed_entries{0}; // how many entries have been committed so far
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
ENCODE_START(2, 1, bl);
encode(reserved_size, bl);
encode(last_id, bl);
encode(reservations, bl);
encode(has_xattrs, bl);
encode(committed_entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
DECODE_START(2, bl);
decode(reserved_size, bl);
decode(last_id, bl);
decode(reservations, bl);
decode(has_xattrs, bl);
if (struct_v >= 2) {
decode(committed_entries, bl);
}
DECODE_FINISH(bl);
}
};

View File

@ -136,4 +136,26 @@ struct cls_queue_get_capacity_ret {
};
WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
struct cls_queue_get_stats_ret {
uint64_t queue_size;
uint32_t queue_entries;
cls_queue_get_stats_ret() {}
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(queue_size, bl);
encode(queue_entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(queue_size, bl);
decode(queue_entries, bl);
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(cls_queue_get_stats_ret)
#endif /* CEPH_CLS_QUEUE_OPS_H */

View File

@ -977,6 +977,26 @@ int publish_abort(reservation_t& res) {
return 0;
}
int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
{
cls_2pc_reservations reservations;
auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
return ret;
}
stats.queue_reservations = reservations.size();
ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
return ret;
}
return 0;
}
reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
const req_state* _s,
@ -1020,4 +1040,12 @@ reservation_t::~reservation_t() {
publish_abort(*this);
}
void rgw_topic_stats::dump(Formatter *f) const {
f->open_object_section("Topic Stats");
f->dump_int("Reservations", queue_reservations);
f->dump_int("Size", queue_size);
f->dump_int("Entries", queue_entries);
f->close_section();
}
} // namespace rgw::notify

View File

@ -98,8 +98,18 @@ struct reservation_t {
~reservation_t();
};
struct rgw_topic_stats {
std::size_t queue_reservations; // number of reservations
uint64_t queue_size; // in bytes
uint32_t queue_entries; // number of entries
void dump(Formatter *f) const;
};
// create a reservation on the 2-phase-commit queue
int publish_reserve(const DoutPrefixProvider *dpp,
int publish_reserve(const DoutPrefixProvider *dpp,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
@ -117,5 +127,8 @@ int publish_commit(rgw::sal::Object* obj,
// cancel the reservation
int publish_abort(reservation_t& reservation);
int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
}

View File

@ -840,6 +840,7 @@ enum class OPT {
PUBSUB_NOTIFICATION_LIST,
PUBSUB_NOTIFICATION_GET,
PUBSUB_NOTIFICATION_RM,
PUBSUB_TOPIC_STATS,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
@ -1074,6 +1075,7 @@ static SimpleCmd::Commands all_cmds = {
{ "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
{ "topic stats", OPT::PUBSUB_TOPIC_STATS },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
@ -4198,6 +4200,7 @@ int main(int argc, const char **argv)
OPT::PUBSUB_NOTIFICATION_LIST,
OPT::PUBSUB_TOPIC_GET,
OPT::PUBSUB_NOTIFICATION_GET,
OPT::PUBSUB_TOPIC_STATS ,
OPT::SCRIPT_GET,
};
@ -4284,7 +4287,8 @@ int main(int argc, const char **argv)
&& opt_cmd != OPT::PUBSUB_TOPIC_GET
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
&& opt_cmd != OPT::PUBSUB_TOPIC_STATS ) {
cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
return EINVAL;
}
@ -10513,7 +10517,8 @@ next:
return EINVAL;
}
ret = rgw::notify::remove_persistent_topic(dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
ret = rgw::notify::remove_persistent_topic(
dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
if (ret < 0) {
cerr << "ERROR: could not remove persistent topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
@ -10558,6 +10563,24 @@ next:
}
}
if (opt_cmd == OPT::PUBSUB_TOPIC_STATS) {
if (topic_name.empty()) {
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
rgw::notify::rgw_topic_stats stats;
ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
stats, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("", stats, formatter.get());
formatter->flush(cout);
}
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;

View File

@ -3023,6 +3023,90 @@ def test_ps_s3_persistent_cleanup():
http_server.close()
@attr('basic_test')
def test_ps_s3_persistent_topic_stats():
""" test persistent topic stats """
conn = connection()
zonegroup = 'default'
# create random port for the http server
host = get_ip()
port = random.randint(10000, 20000)
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name])
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 0)
assert_equal(result[1], 0)
# create objects in the bucket (async)
number_of_objects = 10
client_threads = []
start_time = time.time()
for i in range(number_of_objects):
key = bucket.new_key('key-'+str(i))
content = str(os.urandom(1024*1024))
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name])
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
assert_equal(result[1], 0)
# delete objects from the bucket
client_threads = []
start_time = time.time()
count = 0
for key in bucket.list():
count += 1
thr = threading.Thread(target = key.delete, args=())
thr.start()
client_threads.append(thr)
if count%100 == 0:
[thr.join() for thr in client_threads]
time_diff = time.time() - start_time
print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
client_threads = []
start_time = time.time()
# topic stats
result = admin(['topic', 'stats', '--topic', topic_name])
parsed_result = json.loads(result[0])
assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
assert_equal(result[1], 0)
# cleanup
s3_notification_conf.del_config()
topic_conf.del_config()
# delete the bucket
conn.delete_bucket(bucket_name)
@attr('manual_test')
def test_ps_s3_persistent_notification_pushback():
""" test pushing persistent notification pushback """