Merge pull request #34788 from yuvalif/cls_q_has_empty_markers

cls/queue: fix empty markers when listing entries

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Kefu Chai 2020-05-05 12:47:27 +08:00 committed by GitHub
commit 91253038f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 0 deletions

View File

@ -297,6 +297,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
uint64_t data_size = 0, num_ops = 0;
uint16_t entry_start = 0;
bufferlist bl;
string last_marker;
do
{
CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset);
@ -337,6 +338,10 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process);
cls_queue_entry entry;
ceph_assert(it.get_off() == index);
//Use the last marker saved in previous iteration as the marker for this entry
if (offset_populated) {
entry.marker = last_marker;
}
//Populate offset if not done in previous iteration
if (! offset_populated) {
cls_queue_marker marker = {entry_start_offset + index, gen};
@ -370,6 +375,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
// Copy unprocessed data to bl
bl_chunk.splice(index, size_to_process, &bl);
offset_populated = true;
last_marker = entry.marker;
CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
break;
}
@ -386,6 +392,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
it.copy(size_to_process, bl);
offset_populated = true;
entry_start_processed = true;
last_marker = entry.marker;
CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
break;
}
@ -396,6 +403,7 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
data_size = 0;
entry_start = 0;
num_ops++;
last_marker.clear();
if (num_ops == op.max) {
CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
break;

View File

@ -161,6 +161,50 @@ TEST_F(TestClsQueue, Dequeue)
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
}
TEST_F(TestClsQueue, DequeueMarker)
{
const std::string queue_name = "my-queue";
const uint64_t queue_size = 1024*1024;
librados::ObjectWriteOperation op;
op.create(true);
cls_queue_init(op, queue_name, queue_size);
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
// test multiple enqueues
test_enqueue(queue_name, 10, 1000, 0);
const auto remove_elements = 1024;
const std::string marker;
bool truncated;
std::string end_marker;
std::vector<cls_queue_entry> entries;
auto ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
ASSERT_EQ(truncated, true);
cls_queue_marker after_deleted_marker;
// remove specific markers
for (const auto& entry : entries) {
cls_queue_marker marker;
marker.from_str(entry.marker.c_str());
ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
if (marker.offset > 0 && marker.offset % 2 == 0) {
after_deleted_marker = marker;
cls_queue_remove_entries(op, marker.to_str());
}
}
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
entries.clear();
ret = cls_queue_list_entries(ioctx, queue_name, marker, remove_elements, entries, &truncated, end_marker);
ASSERT_EQ(0, ret);
for (const auto& entry : entries) {
cls_queue_marker marker;
marker.from_str(entry.marker.c_str());
ASSERT_EQ(marker.from_str(entry.marker.c_str()), 0);
ASSERT_GE(marker.gen, after_deleted_marker.gen);
ASSERT_GE(marker.offset, after_deleted_marker.offset);
}
}
TEST_F(TestClsQueue, ListEmpty)
{
const std::string queue_name = "my-queue";