cls/fifo: Journal is flat_set, not multimap

We don't really need the overhead and complexity of a multimap.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
This commit is contained in:
Adam C. Emerson 2022-11-30 17:43:15 -05:00
parent 32b514c52b
commit fcaa45d32d
2 changed files with 75 additions and 35 deletions

View File

@ -23,6 +23,8 @@
#include <string>
#include <vector>
#include <boost/container/flat_set.hpp>
#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
#include <fmt/format.h>
@ -117,7 +119,7 @@ inline std::ostream& operator <<(std::ostream& m, const data_params& d) {
struct journal_entry {
enum class Op {
unknown = 0,
unknown = -1,
create = 1,
set_head = 2,
remove = 3,
@ -125,11 +127,25 @@ struct journal_entry {
std::int64_t part_num{-1};
bool valid() const {
using enum Op;
switch (op) {
case create: [[fallthrough]];
case set_head: [[fallthrough]];
case remove:
return part_num >= 0;
default:
return false;
}
}
journal_entry() = default;
journal_entry(Op op, std::int64_t part_num)
: op(op), part_num(part_num) {}
void encode(ceph::buffer::list& bl) const {
ceph_assert(valid());
ENCODE_START(1, 1, bl);
encode((int)op, bl);
encode(part_num, bl);
@ -308,8 +324,39 @@ struct info {
std::int64_t min_push_part_num{0};
std::int64_t max_push_part_num{-1};
std::multimap<int64_t, journal_entry> journal;
boost::container::flat_set<journal_entry> journal;
static_assert(journal_entry::Op::create < journal_entry::Op::set_head);
// So we can get rid of the multimap without breaking compatibility
void encode_journal(bufferlist& bl) const {
using ceph::encode;
assert(journal.size() <= std::numeric_limits<uint32_t>::max());
uint32_t n = static_cast<uint32_t>(journal.size());
encode(n, bl);
for (const auto& entry : journal) {
encode(entry.part_num, bl);
encode(entry, bl);
}
}
void decode_journal( bufferlist::const_iterator& p) {
using enum journal_entry::Op;
using ceph::decode;
uint32_t n;
decode(n, p);
journal.clear();
while (n--) {
decltype(journal_entry::part_num) dummy;
decode(dummy, p);
journal_entry e;
decode(e, p);
if (!e.valid()) {
throw ceph::buffer::malformed_input();
} else {
journal.insert(std::move(e));
}
}
}
bool need_new_head() const {
return (head_part_num < min_push_part_num);
}
@ -332,7 +379,7 @@ struct info {
std::map<int64_t, std::string> tags;
encode(tags, bl);
encode(head_tag, bl);
encode(journal, bl);
encode_journal(bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
@ -349,7 +396,7 @@ struct info {
std::map<int64_t, std::string> tags;
decode(tags, bl);
decode(head_tag, bl);
decode(journal, bl);
decode_journal(bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter* f) const;
@ -379,19 +426,17 @@ struct info {
}
for (const auto& entry : update.journal_entries_add()) {
if (std::find_if(journal.begin(), journal.end(),
[&entry](const auto &x) { return x.second == entry; })
!= journal.end()) {
continue;
} else {
journal.emplace(entry.part_num, entry);
auto [iter, inserted] = journal.insert(entry);
if (inserted) {
changed = true;
}
}
for (const auto& entry : update.journal_entries_rm()) {
journal.erase(entry.part_num);
changed = true;
auto count = journal.erase(entry);
if (count > 0) {
changed = true;
}
}
if (update.head_part_num() && (head_part_num != *update.head_part_num())) {

View File

@ -13,6 +13,7 @@
*
*/
#include <algorithm>
#include <cstdint>
#include <numeric>
#include <optional>
@ -602,7 +603,7 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
l.unlock();
int r = 0;
for (auto& [n, entry] : tmpjournal) {
for (auto& entry : tmpjournal) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << entry << " tid=" << tid
<< dendl;
@ -689,13 +690,9 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
<< " update canceled, retrying: i=" << i << " tid="
<< tid << dendl;
for (auto& e : processed) {
auto jiter = info.journal.find(e.part_num);
/* journal entry was already processed */
if (jiter == info.journal.end() ||
!(jiter->second == e)) {
continue;
if (info.journal.contains(e)) {
new_processed.push_back(e);
}
new_processed.push_back(e);
}
processed = std::move(new_processed);
}
@ -721,7 +718,7 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
if (info.journal.contains(jentries.front())) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
@ -753,8 +750,8 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
r = _update_meta(dpp, u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
auto found = (info.journal.find(jentries.front().part_num) !=
info.journal.end());
auto found = (info.journal.contains({create, jentries.front().part_num}) ||
info.journal.contains({set_head, jentries.front().part_num}));
if ((info.max_push_part_num >= jentries.front().part_num &&
info.head_part_num >= new_head_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@ -880,12 +877,13 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
}
if (canceled) {
using enum fifo::journal_entry::Op;
std::unique_lock l(f->m);
auto iter = f->info.journal.find(jentries.front().part_num);
auto found = (f->info.journal.contains({create, jentries.front().part_num}) ||
f->info.journal.contains({set_head, jentries.front().part_num}));
auto max_push_part_num = f->info.max_push_part_num;
auto head_part_num = f->info.head_part_num;
auto version = f->info.version;
auto found = (iter != f->info.journal.end());
l.unlock();
if ((max_push_part_num >= jentries.front().part_num &&
head_part_num >= new_head_part_num)) {
@ -926,7 +924,8 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
if (info.journal.contains({create, jentries.front().part_num}) &&
(!is_head || info.journal.contains({set_head, jentries.front().part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
@ -2014,8 +2013,8 @@ private:
FIFO* const fifo;
std::vector<fifo::journal_entry> processed;
std::multimap<std::int64_t, fifo::journal_entry> journal;
std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
decltype(fifo->info.journal) journal;
decltype(journal)::iterator iter;
std::int64_t new_tail;
std::int64_t new_head;
std::int64_t new_max;
@ -2173,13 +2172,9 @@ public:
std::vector<fifo::journal_entry> new_processed;
std::unique_lock l(fifo->m);
for (auto& e : processed) {
auto jiter = fifo->info.journal.find(e.part_num);
/* journal entry was already processed */
if (jiter == fifo->info.journal.end() ||
!(jiter->second == e)) {
continue;
if (fifo->info.journal.contains(e)) {
new_processed.push_back(e);
}
new_processed.push_back(e);
}
processed = std::move(new_processed);
}
@ -2231,7 +2226,7 @@ public:
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << *iter
<< " tid=" << tid << dendl;
const auto entry = iter->second;
const auto entry = *iter;
switch (entry.op) {
using enum fifo::journal_entry::Op;
case create:
@ -2264,7 +2259,7 @@ public:
<< " entering: tid=" << tid << dendl;
switch (state) {
case entry_callback:
finish_je(dpp, std::move(p), r, iter->second);
finish_je(dpp, std::move(p), r, *iter);
return;
case pp_callback:
auto c = canceled;