diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc index fba76395542..45710f9abe3 100644 --- a/src/cls/2pc_queue/cls_2pc_queue.cc +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -55,6 +55,36 @@ static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, return 0; } +static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_stats_ret op_ret; + + // get head + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + const auto remaining_size = (head.tail.offset >= head.front.offset) ? + (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) : + head.front.offset - head.tail.offset; + op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size; + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what()); + return -EINVAL; + } + op_ret.queue_entries = urgent_data.committed_entries; + + encode(op_ret, *out); + + return 0; +} + static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { cls_2pc_queue_reserve_op res_op; try { @@ -112,7 +142,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff cls_2pc_reservations::iterator last_reservation; std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct, std::forward_as_tuple(urgent_data.last_id), - std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries)); if (!result) { // an old reservation that was never committed or aborted is in the map // caller should try again assuming other IDs are ok @@ -148,7 +178,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff } std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct, std::forward_as_tuple(urgent_data.last_id), - std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries)); if (!result) { // an old reservation that was never committed or aborted is in the map // caller should try again assuming other IDs are ok @@ -268,6 +298,7 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe } urgent_data.reserved_size -= res.size; + urgent_data.committed_entries += res.entries; if (xattr_reservations.empty()) { // remove the reservation from urgent data @@ -577,6 +608,7 @@ CLS_INIT(2pc_queue) cls_handle_t h_class; cls_method_handle_t h_2pc_queue_init; cls_method_handle_t h_2pc_queue_get_capacity; + cls_method_handle_t h_2pc_queue_get_topic_stats; cls_method_handle_t h_2pc_queue_reserve; cls_method_handle_t h_2pc_queue_commit; cls_method_handle_t h_2pc_queue_abort; @@ -589,6 +621,7 @@ CLS_INIT(2pc_queue) cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init); cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity); + cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats); cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve); cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit); cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort); diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc index 6868b2b6f83..42632cba61a 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.cc +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -31,6 +31,21 @@ int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) { return 0; } +int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) { + cls_queue_get_stats_ret op_ret; + auto iter = bl.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + committed_entries = op_ret.queue_entries; + size = op_ret.queue_size; + + return 0; +} + #ifndef CLS_CLIENT_HIDE_IOCTX int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) { bufferlist in, out; @@ -44,12 +59,31 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uin #endif // optionally async method for getting capacity (bytes) -// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results +// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) { bufferlist in; op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval); } +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) { + bufferlist in, out; + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out); + if (r < 0 ) { + return r; + } + + return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size); +} +#endif + +// optionally async method for getting number of commited entries and size (bytes) +// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results +void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval); +} + int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) { cls_2pc_queue_reserve_ret op_ret; diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h index e0bdeafd590..20043edd200 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_client.h +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -19,6 +19,8 @@ void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& q #ifndef CLS_CLIENT_HIDE_IOCTX // return capacity (bytes) int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size); +// return the number of committed entries and size (bytes) +int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size); // make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) // return a reservation id if reservations is possible, 0 otherwise @@ -37,7 +39,12 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& // after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results +void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size); +int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size); // optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) // notes: diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h index 160c5b66e9f..ea7afa943ca 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_const.h +++ b/src/cls/2pc_queue/cls_2pc_queue_const.h @@ -4,6 +4,7 @@ #define TPC_QUEUE_INIT "2pc_queue_init" #define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity" +#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats" #define TPC_QUEUE_RESERVE "2pc_queue_reserve" #define TPC_QUEUE_COMMIT "2pc_queue_commit" #define TPC_QUEUE_ABORT "2pc_queue_abort" diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h index 7c94cdebfe0..2413fd7043d 100644 --- a/src/cls/2pc_queue/cls_2pc_queue_types.h +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -8,25 +8,30 @@ struct cls_2pc_reservation { using id_t = uint32_t; inline static const id_t NO_ID{0}; - uint64_t size; // how many entries are reserved + uint64_t size; // how much size to reserve (bytes) ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations) + uint32_t entries; // how many entries are reserved - cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) : - size(_size), timestamp(_timestamp) {} + cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) : + size(_size), timestamp(_timestamp), entries(_entries) {} cls_2pc_reservation() = default; void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(size, bl); encode(timestamp, bl); + encode(entries, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(size, bl); decode(timestamp, bl); + if (struct_v >= 2) { + decode(entries, bl); + } DECODE_FINISH(bl); } }; @@ -40,22 +45,27 @@ struct cls_2pc_urgent_data cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id cls_2pc_reservations reservations; // reservation list (keyed by id) bool has_xattrs{false}; + uint32_t committed_entries{0}; // how many entries have been committed so far void encode(ceph::buffer::list& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(reserved_size, bl); encode(last_id, bl); encode(reservations, bl); encode(has_xattrs, bl); + encode(committed_entries, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(reserved_size, bl); decode(last_id, bl); decode(reservations, bl); decode(has_xattrs, bl); + if (struct_v >= 2) { + decode(committed_entries, bl); + } DECODE_FINISH(bl); } }; diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h index 64891cffb39..8209659bda9 100644 --- a/src/cls/queue/cls_queue_ops.h +++ b/src/cls/queue/cls_queue_ops.h @@ -136,4 +136,26 @@ struct cls_queue_get_capacity_ret { }; WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret) +struct cls_queue_get_stats_ret { + uint64_t queue_size; + uint32_t queue_entries; + + cls_queue_get_stats_ret() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(queue_size, bl); + encode(queue_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(queue_size, bl); + decode(queue_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_get_stats_ret) + #endif /* CEPH_CLS_QUEUE_OPS_H */ diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index b1835016ec0..324fb4460bb 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -977,6 +977,26 @@ int publish_abort(reservation_t& res) { return 0; } +int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, + const std::string &topic_name, rgw_topic_stats &stats, optional_yield y) +{ + cls_2pc_reservations reservations; + auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl; + return ret; + } + stats.queue_reservations = reservations.size(); + + ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl; + return ret; + } + + return 0; +} + reservation_t::reservation_t(const DoutPrefixProvider* _dpp, rgw::sal::RadosStore* _store, const req_state* _s, @@ -1020,4 +1040,12 @@ reservation_t::~reservation_t() { publish_abort(*this); } +void rgw_topic_stats::dump(Formatter *f) const { + f->open_object_section("Topic Stats"); + f->dump_int("Reservations", queue_reservations); + f->dump_int("Size", queue_size); + f->dump_int("Entries", queue_entries); + f->close_section(); +} + } // namespace rgw::notify diff --git a/src/rgw/driver/rados/rgw_notify.h b/src/rgw/driver/rados/rgw_notify.h index 9269611e4a6..460a7bacb5d 100644 --- a/src/rgw/driver/rados/rgw_notify.h +++ b/src/rgw/driver/rados/rgw_notify.h @@ -98,8 +98,18 @@ struct reservation_t { ~reservation_t(); }; + + +struct rgw_topic_stats { + std::size_t queue_reservations; // number of reservations + uint64_t queue_size; // in bytes + uint32_t queue_entries; // number of entries + + void dump(Formatter *f) const; +}; + // create a reservation on the 2-phase-commit queue - int publish_reserve(const DoutPrefixProvider *dpp, +int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, reservation_t& reservation, const RGWObjTags* req_tags); @@ -117,5 +127,8 @@ int publish_commit(rgw::sal::Object* obj, // cancel the reservation int publish_abort(reservation_t& reservation); +int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx, + const std::string &topic_name, rgw_topic_stats &stats, optional_yield y); + } diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 96742ad20ba..dbf28c5a22a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -840,6 +840,7 @@ enum class OPT { PUBSUB_NOTIFICATION_LIST, PUBSUB_NOTIFICATION_GET, PUBSUB_NOTIFICATION_RM, + PUBSUB_TOPIC_STATS, SCRIPT_PUT, SCRIPT_GET, SCRIPT_RM, @@ -1074,6 +1075,7 @@ static SimpleCmd::Commands all_cmds = { { "notification list", OPT::PUBSUB_NOTIFICATION_LIST }, { "notification get", OPT::PUBSUB_NOTIFICATION_GET }, { "notification rm", OPT::PUBSUB_NOTIFICATION_RM }, + { "topic stats", OPT::PUBSUB_TOPIC_STATS }, { "script put", OPT::SCRIPT_PUT }, { "script get", OPT::SCRIPT_GET }, { "script rm", OPT::SCRIPT_RM }, @@ -4198,6 +4200,7 @@ int main(int argc, const char **argv) OPT::PUBSUB_NOTIFICATION_LIST, OPT::PUBSUB_TOPIC_GET, OPT::PUBSUB_NOTIFICATION_GET, + OPT::PUBSUB_TOPIC_STATS , OPT::SCRIPT_GET, }; @@ -4284,7 +4287,8 @@ int main(int argc, const char **argv) && opt_cmd != OPT::PUBSUB_TOPIC_GET && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET && opt_cmd != OPT::PUBSUB_TOPIC_RM - && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) { + && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM + && opt_cmd != OPT::PUBSUB_TOPIC_STATS ) { cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl; return EINVAL; } @@ -10513,7 +10517,8 @@ next: return EINVAL; } - ret = rgw::notify::remove_persistent_topic(dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield); + ret = rgw::notify::remove_persistent_topic( + dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield); if (ret < 0) { cerr << "ERROR: could not remove persistent topic: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -10558,6 +10563,24 @@ next: } } + if (opt_cmd == OPT::PUBSUB_TOPIC_STATS) { + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + + rgw::notify::rgw_topic_stats stats; + ret = rgw::notify::get_persistent_queue_stats_by_topic_name( + dpp(), static_cast(driver)->getRados()->get_notif_pool_ctx(), topic_name, + stats, null_yield); + if (ret < 0) { + cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("", stats, formatter.get()); + formatter->flush(cout); + } + if (opt_cmd == OPT::SCRIPT_PUT) { if (!str_script_ctx) { cerr << "ERROR: context was not provided (via --context)" << std::endl; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 8a9b2932198..3fd249f61f0 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -3023,6 +3023,90 @@ def test_ps_s3_persistent_cleanup(): http_server.close() +@attr('basic_test') +def test_ps_s3_persistent_topic_stats(): + """ test persistent topic stats """ + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) + assert_equal(result[1], 0) + + # delete objects from the bucket + client_threads = [] + start_time = time.time() + count = 0 + for key in bucket.list(): + count += 1 + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + if count%100 == 0: + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + client_threads = [] + start_time = time.time() + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name]) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) + assert_equal(result[1], 0) + + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + @attr('manual_test') def test_ps_s3_persistent_notification_pushback(): """ test pushing persistent notification pushback """