Merge pull request #54215 from kchheda3/wip-fix-kafka-break

rgw/notification: Kafka persistent notifications not retried and removed even when the broker is down 

reviewd-by: cbodley, yuvalif
This commit is contained in:
Yuval Lifshitz 2023-12-06 10:21:42 +02:00 committed by GitHub
commit 70fbeb1746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -100,8 +100,9 @@ struct connection_t {
// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
cb_tag.cb(status);
ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" << cb_tag.tag <<
" for: " << broker << dendl;
ldout(cct, 20) << "Kafka destroy: invoking callback with tag="
<< cb_tag.tag << " for: " << broker
<< " with status: " << status << dendl;
});
callbacks.clear();
delivery_tag = 1;
@ -418,7 +419,9 @@ private:
if (tag) {
auto const q_len = conn->callbacks.size();
if (q_len < max_inflight) {
ldout(conn->cct, 20) << "Kafka publish (with callback, tag=" << *tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
ldout(conn->cct, 20)
<< "Kafka publish (with callback, tag=" << *tag
<< "): OK. Queue has: " << q_len + 1 << " callbacks" << dendl;
conn->callbacks.emplace_back(*tag, message->cb);
} else {
// immediately invoke callback with error - this is not a connection error
@ -463,6 +466,7 @@ private:
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
std::lock_guard lock(connections_lock);
conn->status = STATUS_CONNECTION_IDLE;
conn_it = connections.erase(conn_it);
--connection_count; \
continue;