mirror of
https://github.com/ceph/ceph
synced 2024-12-28 14:34:13 +00:00
Merge PR #26696 into master
* refs/pull/26696/head: osdc/Objecter: preserve read-into-existing-buffer behavior osdc/Objecter: don't use post_rx_buffer or revoke_rx_buffer at all msg/Connection: disable {post,revoke}_rx_buffer msg/async/ProtocolV1: disable rx_buffers behavior msg/simple/Pipe: disable rx_buffer code Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
commit
634780fe07
@ -214,14 +214,18 @@ public:
|
||||
}
|
||||
|
||||
void post_rx_buffer(ceph_tid_t tid, bufferlist& bl) {
|
||||
#if 0
|
||||
Mutex::Locker l(lock);
|
||||
++rx_buffers_version;
|
||||
rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
|
||||
#endif
|
||||
}
|
||||
|
||||
void revoke_rx_buffer(ceph_tid_t tid) {
|
||||
#if 0
|
||||
Mutex::Locker l(lock);
|
||||
rx_buffers.erase(tid);
|
||||
#endif
|
||||
}
|
||||
|
||||
utime_t get_last_keepalive() const {
|
||||
|
@ -798,6 +798,9 @@ CtPtr ProtocolV1::read_message_data_prepare() {
|
||||
|
||||
if (data_len) {
|
||||
// get a buffer
|
||||
#if 0
|
||||
// rx_buffers is broken by design... see
|
||||
// http://tracker.ceph.com/issues/22480
|
||||
map<ceph_tid_t, pair<bufferlist, int> >::iterator p =
|
||||
connection->rx_buffers.find(current_header.tid);
|
||||
if (p != connection->rx_buffers.end()) {
|
||||
@ -815,6 +818,12 @@ CtPtr ProtocolV1::read_message_data_prepare() {
|
||||
alloc_aligned_buffer(data_buf, data_len, data_off);
|
||||
data_blp = data_buf.begin();
|
||||
}
|
||||
#else
|
||||
ldout(cct, 20) << __func__ << " allocating new rx buffer at offset "
|
||||
<< data_off << dendl;
|
||||
alloc_aligned_buffer(data_buf, data_len, data_off);
|
||||
data_blp = data_buf.begin();
|
||||
#endif
|
||||
}
|
||||
|
||||
msg_left = data_len;
|
||||
|
@ -2143,7 +2143,7 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
|
||||
|
||||
bufferlist newbuf, rxbuf;
|
||||
bufferlist::iterator blp;
|
||||
int rxbuf_version = 0;
|
||||
// int rxbuf_version = 0;
|
||||
|
||||
while (left > 0) {
|
||||
// wait for data
|
||||
@ -2151,6 +2151,24 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
|
||||
goto out_dethrottle;
|
||||
|
||||
// get a buffer
|
||||
#if 0
|
||||
// The rx_buffers implementation is buggy:
|
||||
// - see http://tracker.ceph.com/issues/22480
|
||||
//
|
||||
// - From inspection, I think that we have problems if we read *part*
|
||||
// of the message into an rx_buffer, then drop the lock, someone revokes,
|
||||
// and then later try to read the rest. In that case our final bufferlist
|
||||
// will have part of the original static_buffer from the first chunk and
|
||||
// partly a piece that we allocated. I think that to make this correct,
|
||||
// we need to keep the bufferlist we are reading into in Connection under
|
||||
// the lock, and on revoke, if the data is partly read, rebuild() to copy
|
||||
// into fresh buffers so that all references to our static buffer are
|
||||
// cleared up.
|
||||
//
|
||||
// - Also... what happens if we fully read into the static
|
||||
// buffer, then revoke? We still have some bufferlist out there
|
||||
// in the process of getting dispatched back to objecter or
|
||||
// librados that references the static buffer.
|
||||
connection_state->lock.Lock();
|
||||
map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
|
||||
if (p != connection_state->rx_buffers.end()) {
|
||||
@ -2180,6 +2198,23 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
|
||||
ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
|
||||
ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
|
||||
connection_state->lock.Unlock();
|
||||
#else
|
||||
// rx_buffer-less implementation
|
||||
if (!newbuf.length()) {
|
||||
ldout(msgr->cct,20) << "reader allocating new rx buffer at offset "
|
||||
<< offset << dendl;
|
||||
alloc_aligned_buffer(newbuf, data_len, data_off);
|
||||
blp = newbuf.begin();
|
||||
blp.advance(offset);
|
||||
}
|
||||
bufferptr bp = blp.get_current_ptr();
|
||||
int read = std::min(bp.length(), left);
|
||||
ldout(msgr->cct,20) << "reader reading nonblocking into "
|
||||
<< (void*)bp.c_str() << " len " << bp.length()
|
||||
<< dendl;
|
||||
ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
|
||||
ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
|
||||
#endif
|
||||
if (got < 0)
|
||||
goto out_dethrottle;
|
||||
if (got > 0) {
|
||||
|
@ -2485,11 +2485,13 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
|
||||
return -ENOENT;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (s->con) {
|
||||
ldout(cct, 20) << " revoking rx buffer for " << tid
|
||||
<< " on " << s->con << dendl;
|
||||
s->con->revoke_rx_buffer(tid);
|
||||
}
|
||||
#endif
|
||||
|
||||
ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
|
||||
<< dendl;
|
||||
@ -3246,6 +3248,7 @@ void Objecter::_send_op(Op *op)
|
||||
ConnectionRef con = op->session->con;
|
||||
ceph_assert(con);
|
||||
|
||||
#if 0
|
||||
// preallocated rx buffer?
|
||||
if (op->con) {
|
||||
ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
|
||||
@ -3261,6 +3264,7 @@ void Objecter::_send_op(Op *op)
|
||||
op->con = con;
|
||||
op->con->post_rx_buffer(op->tid, *op->outbl);
|
||||
}
|
||||
#endif
|
||||
|
||||
op->incarnation = op->session->incarnation;
|
||||
|
||||
@ -3450,9 +3454,30 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
|
||||
|
||||
// got data?
|
||||
if (op->outbl) {
|
||||
#if 0
|
||||
if (op->con)
|
||||
op->con->revoke_rx_buffer(op->tid);
|
||||
m->claim_data(*op->outbl);
|
||||
#endif
|
||||
auto& bl = m->get_data();
|
||||
if (op->outbl->length() == bl.length() &&
|
||||
bl.get_num_buffers() <= 1) {
|
||||
// this is here to keep previous users to *relied* on getting data
|
||||
// read into existing buffers happy. Notably,
|
||||
// libradosstriper::RadosStriperImpl::aio_read().
|
||||
ldout(cct,10) << __func__ << " copying resulting " << bl.length()
|
||||
<< " into existing buffer of length " << op->outbl->length()
|
||||
<< dendl;
|
||||
bufferlist t;
|
||||
t.claim(*op->outbl);
|
||||
t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
|
||||
bl.copy(0, bl.length(), t.c_str());
|
||||
op->outbl->substr_of(t, 0, bl.length());
|
||||
} else {
|
||||
m->claim_data(*op->outbl);
|
||||
}
|
||||
lderr(cct) << __func__ << " data:\n";
|
||||
op->outbl->hexdump(*_dout);
|
||||
*_dout << dendl;
|
||||
op->outbl = 0;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user