Merge pull request #32934 from rzarzynski/wip-bl-32bytes

include, common: make ceph::bufferlist 32 bytes long on x86

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-02-09 18:41:03 +08:00 committed by GitHub
commit 1867afcef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 87 additions and 208 deletions

View File

@ -263,29 +263,6 @@ static ceph::spinlock debug_lock;
}
};
class buffer::raw_unshareable : public buffer::raw {
public:
MEMPOOL_CLASS_HELPERS();
explicit raw_unshareable(unsigned l) : raw(l) {
if (len)
data = new char[len];
else
data = 0;
}
raw_unshareable(unsigned l, char *b) : raw(b, l) {
}
raw* clone_empty() override {
return new raw_char(len);
}
bool is_shareable() const override {
return false; // !shareable, will force make_shareable()
}
~raw_unshareable() override {
delete[] data;
}
};
class buffer::raw_static : public buffer::raw {
public:
MEMPOOL_CLASS_HELPERS();
@ -382,10 +359,6 @@ static ceph::spinlock debug_lock;
}
}
ceph::unique_leakable_ptr<buffer::raw> buffer::create_unshareable(unsigned len) {
return ceph::unique_leakable_ptr<buffer::raw>(new raw_unshareable(len));
}
buffer::ptr::ptr(ceph::unique_leakable_ptr<raw> r)
: _raw(r.release()),
_off(0),
@ -955,14 +928,14 @@ static ceph::spinlock debug_lock;
: _buffers(std::move(other._buffers)),
_carriage(&always_empty_bptr),
_len(other._len),
_memcopy_count(other._memcopy_count) {
_num(other._num) {
other.clear();
}
void buffer::list::swap(list& other) noexcept
{
std::swap(_len, other._len);
std::swap(_memcopy_count, other._memcopy_count);
std::swap(_num, other._num);
std::swap(_carriage, other._carriage);
_buffers.swap(other._buffers);
}
@ -1121,7 +1094,7 @@ static ceph::spinlock debug_lock;
bool buffer::list::is_contiguous() const
{
return _buffers.size() <= 1;
return _num <= 1;
}
bool buffer::list::is_n_page_sized() const
@ -1158,11 +1131,11 @@ static ceph::spinlock debug_lock;
uint64_t buffer::list::get_wasted_space() const
{
if (_buffers.size() == 1)
if (_num == 1)
return _buffers.back().wasted();
std::vector<const raw*> raw_vec;
raw_vec.reserve(_buffers.size());
raw_vec.reserve(_num);
for (const auto& p : _buffers)
raw_vec.push_back(p._raw);
std::sort(raw_vec.begin(), raw_vec.end());
@ -1187,6 +1160,7 @@ static ceph::spinlock debug_lock;
if (_len == 0) {
_carriage = &always_empty_bptr;
_buffers.clear_and_dispose();
_num = 0;
return;
}
if ((_len & ~CEPH_PAGE_MASK) == 0)
@ -1203,12 +1177,14 @@ static ceph::spinlock debug_lock;
nb->copy_in(pos, node.length(), node.c_str(), false);
pos += node.length();
}
_memcopy_count += pos;
_carriage = &always_empty_bptr;
_buffers.clear_and_dispose();
if (likely(nb->length())) {
_carriage = nb.get();
_buffers.push_back(*nb.release());
_num = 1;
} else {
_carriage = &always_empty_bptr;
_num = 0;
}
invalidate_crc();
}
@ -1222,10 +1198,9 @@ static ceph::spinlock debug_lock;
unsigned align_memory,
unsigned max_buffers)
{
unsigned old_memcopy_count = _memcopy_count;
bool had_to_rebuild = false;
if (max_buffers && _buffers.size() > max_buffers
&& _len > (max_buffers * align_size)) {
if (max_buffers && _num > max_buffers && _len > (max_buffers * align_size)) {
align_size = round_up_to(round_up_to(_len, max_buffers) / max_buffers, align_size);
}
auto p = std::begin(_buffers);
@ -1255,8 +1230,10 @@ static ceph::spinlock debug_lock;
offset += p->length();
// no need to reallocate, relinking is enough thankfully to bi::list.
auto p_after = _buffers.erase_after(p_prev);
_num -= 1;
unaligned._buffers.push_back(*p);
unaligned._len += p->length();
unaligned._num += 1;
p = p_after;
} while (p != std::end(_buffers) &&
(!p->is_aligned(align_memory) ||
@ -1266,13 +1243,13 @@ static ceph::spinlock debug_lock;
unaligned.rebuild(
ptr_node::create(
buffer::create_aligned(unaligned._len, align_memory)));
_memcopy_count += unaligned._len;
had_to_rebuild = true;
}
_buffers.insert_after(p_prev, *ptr_node::create(unaligned._buffers.front()).release());
_num += 1;
++p_prev;
}
return (old_memcopy_count != _memcopy_count);
return had_to_rebuild;
}
bool buffer::list::rebuild_page_aligned()
@ -1287,41 +1264,28 @@ static ceph::spinlock debug_lock;
ptr->set_length(0); // unused, so far.
_carriage = ptr.get();
_buffers.push_back(*ptr.release());
_num += 1;
}
}
// sort-of-like-assignment-op
void buffer::list::claim(list& bl, unsigned int flags)
void buffer::list::claim(list& bl)
{
// free my buffers
clear();
claim_append(bl, flags);
claim_append(bl);
}
void buffer::list::claim_append(list& bl, unsigned int flags)
void buffer::list::claim_append(list& bl)
{
// steal the other guy's buffers
_len += bl._len;
if (!(flags & CLAIM_ALLOW_NONSHAREABLE)) {
auto curbuf = bl._buffers.begin();
auto curbuf_prev = bl._buffers.before_begin();
while (curbuf != bl._buffers.end()) {
const auto* const raw = curbuf->_raw;
if (unlikely(raw && !raw->is_shareable())) {
auto* clone = ptr_node::copy_hypercombined(*curbuf);
curbuf = bl._buffers.erase_after_and_dispose(curbuf_prev);
bl._buffers.insert_after(curbuf_prev, *clone);
++curbuf_prev;
} else {
curbuf_prev = curbuf++;
}
}
}
_num += bl._num;
_buffers.splice_back(bl._buffers);
bl._carriage = &always_empty_bptr;
bl._buffers.clear_and_dispose();
bl._len = 0;
bl._num = 0;
}
void buffer::list::claim_append_piecewise(list& bl)
@ -1344,10 +1308,12 @@ static ceph::spinlock debug_lock;
buf->set_length(0); // unused, so far.
_carriage = buf.get();
_buffers.push_back(*buf.release());
_num += 1;
} else if (unlikely(_carriage != &_buffers.back())) {
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
_carriage->append(c);
_len++;
@ -1367,6 +1333,7 @@ static ceph::spinlock debug_lock;
new_back->set_length(0); // unused, so far.
_carriage = new_back.get();
_buffers.push_back(*new_back.release());
_num += 1;
return _buffers.back();
}
@ -1385,6 +1352,7 @@ static ceph::spinlock debug_lock;
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
_carriage->append(data, first_round);
}
@ -1412,6 +1380,7 @@ static ceph::spinlock debug_lock;
buffer::ptr_node::create(buffer::create(len)).release();
new_back->set_length(0); // unused, so far.
_buffers.push_back(*new_back);
_num += 1;
_carriage = new_back;
return { new_back->c_str(), &new_back->_len, &_len };
} else {
@ -1419,6 +1388,7 @@ static ceph::spinlock debug_lock;
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
return { _carriage->end_c_str(), &_carriage->_len, &_len };
}
@ -1449,11 +1419,13 @@ static ceph::spinlock debug_lock;
// add new item to list
_buffers.push_back(*ptr_node::create(bp, off, len).release());
_len += len;
_num += 1;
}
void buffer::list::append(const list& bl)
{
_len += bl._len;
_num += bl._num;
for (const auto& node : bl._buffers) {
_buffers.push_back(*ptr_node::create(node).release());
}
@ -1484,6 +1456,7 @@ static ceph::spinlock debug_lock;
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
_carriage->set_length(_carriage->length() + len);
return { _carriage->end_c_str() - len };
@ -1494,6 +1467,7 @@ static ceph::spinlock debug_lock;
auto bp = ptr_node::create(len);
bp->zero(false);
_len += len;
_num += 1;
_buffers.push_front(*bp.release());
}
@ -1508,6 +1482,7 @@ static ceph::spinlock debug_lock;
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
_carriage->append_zeros(first_round);
}
@ -1590,6 +1565,7 @@ static ceph::spinlock debug_lock;
//cout << "copying partial of " << *curbuf << std::endl;
_buffers.push_back(*ptr_node::create( *curbuf, off, len ).release());
_len += len;
_num += 1;
break;
}
@ -1598,6 +1574,7 @@ static ceph::spinlock debug_lock;
unsigned howmuch = curbuf->length() - off;
_buffers.push_back(*ptr_node::create( *curbuf, off, howmuch ).release());
_len += howmuch;
_num += 1;
len -= howmuch;
off = 0;
++curbuf;
@ -1640,6 +1617,7 @@ static ceph::spinlock debug_lock;
_buffers.insert_after(curbuf_prev,
*ptr_node::create(*curbuf, 0, off).release());
_len += off;
_num += 1;
++curbuf_prev;
}
@ -1664,6 +1642,7 @@ static ceph::spinlock debug_lock;
if (claim_by)
claim_by->append( *curbuf, off, howmuch );
_len -= (*curbuf).length();
_num -= 1;
curbuf = _buffers.erase_after_and_dispose(curbuf_prev);
len -= howmuch;
off = 0;
@ -1943,7 +1922,7 @@ int buffer::list::write_fd(int fd, uint64_t offset) const
iovec iov[IOV_MAX];
auto p = std::cbegin(_buffers);
uint64_t left_pbrs = std::size(_buffers);
uint64_t left_pbrs = get_num_buffers();
while (left_pbrs) {
ssize_t bytes = 0;
unsigned iovlen = 0;
@ -2161,26 +2140,10 @@ buffer::ptr_node::create_hypercombined(ceph::unique_leakable_ptr<buffer::raw> r)
new ptr_node(std::move(r)));
}
buffer::ptr_node* buffer::ptr_node::copy_hypercombined(
const buffer::ptr_node& copy_this)
{
// FIXME: we don't currently hypercombine buffers due to crashes
// observed in the rados suite. After fixing we'll use placement
// new to create ptr_node on buffer::raw::bptr_storage.
auto raw_new = copy_this._raw->clone();
return new ptr_node(copy_this, std::move(raw_new));
}
buffer::ptr_node* buffer::ptr_node::cloner::operator()(
const buffer::ptr_node& clone_this)
{
const raw* const raw_this = clone_this._raw;
if (likely(!raw_this || raw_this->is_shareable())) {
return new ptr_node(clone_this);
} else {
// clone non-shareable buffers (make shareable)
return copy_hypercombined(clone_this);
}
return new ptr_node(clone_this);
}
std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
@ -2224,8 +2187,6 @@ MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable,
buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static,
buffer_meta);

View File

@ -74,7 +74,7 @@ ptr::operator seastar::temporary_buffer<char>() &&
list::operator seastar::net::packet() &&
{
seastar::net::packet p;
p.reserve(_buffers.size());
p.reserve(_num);
for (auto& ptr : _buffers) {
// append each ptr as a temporary_buffer
p = seastar::net::packet(std::move(p), std::move(ptr));

View File

@ -166,7 +166,6 @@ inline namespace v14_2_0 {
ceph::unique_leakable_ptr<raw> create_aligned_in_mempool(unsigned len, unsigned align, int mempool);
ceph::unique_leakable_ptr<raw> create_page_aligned(unsigned len);
ceph::unique_leakable_ptr<raw> create_small_page_aligned(unsigned len);
ceph::unique_leakable_ptr<raw> create_unshareable(unsigned len);
ceph::unique_leakable_ptr<raw> claim_buffer(unsigned len, char *buf, deleter del);
#ifdef HAVE_SEASTAR
@ -443,7 +442,6 @@ inline namespace v14_2_0 {
// _root.next can be thought as _head
ptr_hook _root;
ptr_hook* _tail;
std::size_t _size;
public:
template <class T>
@ -512,17 +510,14 @@ inline namespace v14_2_0 {
buffers_t()
: _root(&_root),
_tail(&_root),
_size(0) {
_tail(&_root) {
}
buffers_t(const buffers_t&) = delete;
buffers_t(buffers_t&& other)
: _root(other._root.next == &other._root ? &_root : other._root.next),
_tail(other._tail == &other._root ? &_root : other._tail),
_size(other._size) {
_tail(other._tail == &other._root ? &_root : other._tail) {
other._root.next = &other._root;
other._tail = &other._root;
other._size = 0;
_tail->next = &_root;
}
@ -539,14 +534,12 @@ inline namespace v14_2_0 {
// this updates _root.next when called on empty
_tail->next = &item;
_tail = &item;
_size++;
}
void push_front(reference item) {
item.next = _root.next;
_root.next = &item;
_tail = _tail == &_root ? &item : _tail;
_size++;
}
// *_after
@ -556,7 +549,6 @@ inline namespace v14_2_0 {
it->next = to_erase->next;
_root.next = _root.next == to_erase ? to_erase->next : _root.next;
_tail = _tail == to_erase ? (ptr_hook*)&*it : _tail;
_size--;
return it->next;
}
@ -565,11 +557,10 @@ inline namespace v14_2_0 {
it->next = &item;
_root.next = it == end() ? &item : _root.next;
_tail = const_iterator(_tail) == it ? &item : _tail;
_size++;
}
void splice_back(buffers_t& other) {
if (other._size == 0) {
if (other.empty()) {
return;
}
@ -577,14 +568,11 @@ inline namespace v14_2_0 {
// will update root.next if empty() == true
_tail->next = other._root.next;
_tail = other._tail;
_size += other._size;
other._root.next = &other._root;
other._tail = &other._root;
other._size = 0;
}
std::size_t size() const { return _size; }
bool empty() const { return _tail == &_root; }
const_iterator begin() const {
@ -634,7 +622,6 @@ inline namespace v14_2_0 {
}
_root.next = &_root;
_tail = &_root;
_size = 0;
}
iterator erase_after_and_dispose(iterator it) {
auto* to_dispose = &*std::next(it);
@ -656,7 +643,6 @@ inline namespace v14_2_0 {
_tail->next = &_root;
other._tail->next = &other._root;
std::swap(_size, other._size);
}
};
@ -670,8 +656,7 @@ inline namespace v14_2_0 {
// bufferlist holds have this trait -- if somebody ::push_back(const ptr&),
// he expects it won't change.
ptr* _carriage;
unsigned _len;
unsigned _memcopy_count; //the total of memcopy using rebuild().
unsigned _len, _num;
template <bool is_const>
class CEPH_BUFFER_API iterator_impl {
@ -951,21 +936,21 @@ inline namespace v14_2_0 {
list()
: _carriage(&always_empty_bptr),
_len(0),
_memcopy_count(0) {
_num(0) {
}
// cppcheck-suppress noExplicitConstructor
// cppcheck-suppress noExplicitConstructor
list(unsigned prealloc)
: _carriage(&always_empty_bptr),
_len(0),
_memcopy_count(0) {
_num(0) {
reserve(prealloc);
}
list(const list& other)
: _carriage(&always_empty_bptr),
_len(other._len),
_memcopy_count(other._memcopy_count) {
_num(other._num) {
_buffers.clone_from(other._buffers);
}
list(list&& other) noexcept;
@ -979,6 +964,7 @@ inline namespace v14_2_0 {
_carriage = &always_empty_bptr;
_buffers.clone_from(other._buffers);
_len = other._len;
_num = other._num;
}
return *this;
}
@ -986,13 +972,13 @@ inline namespace v14_2_0 {
_buffers = std::move(other._buffers);
_carriage = other._carriage;
_len = other._len;
_memcopy_count = other._memcopy_count;
_num = other._num;
other.clear();
return *this;
}
uint64_t get_wasted_space() const;
unsigned get_num_buffers() const { return _buffers.size(); }
unsigned get_num_buffers() const { return _num; }
const ptr_node& front() const { return _buffers.front(); }
const ptr_node& back() const { return _buffers.back(); }
@ -1004,7 +990,6 @@ inline namespace v14_2_0 {
return _carriage->unused_tail_length();
}
unsigned get_memcopy_count() const {return _memcopy_count; }
const buffers_t& buffers() const { return _buffers; }
void swap(list& other) noexcept;
unsigned length() const {
@ -1043,18 +1028,20 @@ inline namespace v14_2_0 {
_carriage = &always_empty_bptr;
_buffers.clear_and_dispose();
_len = 0;
_memcopy_count = 0;
_num = 0;
}
void push_back(const ptr& bp) {
if (bp.length() == 0)
return;
_buffers.push_back(*ptr_node::create(bp).release());
_len += bp.length();
_num += 1;
}
void push_back(ptr&& bp) {
if (bp.length() == 0)
return;
_len += bp.length();
_num += 1;
_buffers.push_back(*ptr_node::create(std::move(bp)).release());
_carriage = &always_empty_bptr;
}
@ -1066,6 +1053,7 @@ inline namespace v14_2_0 {
return;
_carriage = bp.get();
_len += bp->length();
_num += 1;
_buffers.push_back(*bp.release());
}
void push_back(raw* const r) = delete;
@ -1073,6 +1061,7 @@ inline namespace v14_2_0 {
_buffers.push_back(*ptr_node::create(std::move(r)).release());
_carriage = &_buffers.back();
_len += _buffers.back().length();
_num += 1;
}
void zero();
@ -1091,12 +1080,8 @@ inline namespace v14_2_0 {
void reserve(size_t prealloc);
// assignment-op with move semantics
const static unsigned int CLAIM_DEFAULT = 0;
const static unsigned int CLAIM_ALLOW_NONSHAREABLE = 1;
void claim(list& bl, unsigned int flags = CLAIM_DEFAULT);
void claim_append(list& bl, unsigned int flags = CLAIM_DEFAULT);
void claim(list& bl);
void claim_append(list& bl);
// only for bl is bufferlist::page_aligned_appender
void claim_append_piecewise(list& bl);
@ -1109,6 +1094,7 @@ inline namespace v14_2_0 {
_buffers.push_back(*ptr_node::create(bp).release());
}
_len = bl._len;
_num = bl._num;
}
}
@ -1191,11 +1177,11 @@ inline namespace v14_2_0 {
template<typename VectorT>
void prepare_iov(VectorT *piov) const {
#ifdef __CEPH__
ceph_assert(_buffers.size() <= IOV_MAX);
ceph_assert(_num <= IOV_MAX);
#else
assert(_buffers.size() <= IOV_MAX);
assert(_num <= IOV_MAX);
#endif
piov->resize(_buffers.size());
piov->resize(_num);
unsigned n = 0;
for (auto& p : _buffers) {
(*piov)[n].iov_base = (void *)p.c_str();

View File

@ -93,12 +93,6 @@ public:
memcpy(c->data, data, len);
return ceph::unique_leakable_ptr<raw>(c);
}
virtual bool is_shareable() const {
// true if safe to reference/share the existing buffer copy
// false if it is not safe to share the buffer, e.g., due to special
// and/or registered memory that is scarce
return true;
}
bool get_crc(const std::pair<size_t, size_t> &fromto,
std::pair<uint32_t, uint32_t> *crc) const {
std::lock_guard lg(crc_spinlock);

View File

@ -899,7 +899,7 @@ void RocksDBStore::RocksDBTransactionImpl::put_bat(
to_set_bl.length()));
} else {
rocksdb::Slice key_slice(key);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
bat.Put(cf,
rocksdb::SliceParts(&key_slice, 1),
prepare_sliceparts(to_set_bl, &value_slices));
@ -1055,7 +1055,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
} else {
// make a copy
rocksdb::Slice key_slice(k);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
prepare_sliceparts(to_set_bl, &value_slices));
}
@ -1070,7 +1070,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
} else {
// make a copy
rocksdb::Slice key_slice(key);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
vector<rocksdb::Slice> value_slices(to_set_bl.get_num_buffers());
bat.Merge(
db->default_cf,
rocksdb::SliceParts(&key_slice, 1),

View File

@ -396,7 +396,7 @@ public:
void set_payload(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
payload.claim(bl, ceph::buffer::list::CLAIM_ALLOW_NONSHAREABLE);
payload.claim(bl);
if (byte_throttler)
byte_throttler->take(payload.length());
}
@ -404,7 +404,7 @@ public:
void set_middle(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(middle.length());
middle.claim(bl, ceph::buffer::list::CLAIM_ALLOW_NONSHAREABLE);
middle.claim(bl);
if (byte_throttler)
byte_throttler->take(middle.length());
}
@ -420,11 +420,10 @@ public:
const ceph::buffer::list& get_data() const { return data; }
ceph::buffer::list& get_data() { return data; }
void claim_data(ceph::buffer::list& bl,
unsigned int flags = ceph::buffer::list::CLAIM_DEFAULT) {
void claim_data(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(data.length());
bl.claim(data, flags);
bl.claim(data);
}
off_t get_data_len() const { return data.length(); }

View File

@ -109,7 +109,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
ssize_t send(bufferlist &bl, bool more) override {
size_t sent_bytes = 0;
auto pb = std::cbegin(bl.buffers());
uint64_t left_pbrs = std::size(bl.buffers());
uint64_t left_pbrs = bl.get_num_buffers();
while (left_pbrs) {
struct msghdr msg;
struct iovec msgvec[IOV_MAX];

View File

@ -1134,7 +1134,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
<< " front=" << header.front_len << " data=" << header.data_len
<< " off " << header.data_off << dendl;
if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.get_num_buffers() > 1)) {
for (const auto &pb : bl.buffers()) {
connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
}

View File

@ -345,7 +345,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
std::lock_guard l{lock};
size_t bytes = pending_bl.length();
ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
<< pending_bl.buffers().size() << dendl;
<< pending_bl.get_num_buffers() << dendl;
if (!bytes)
return 0;
@ -388,7 +388,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
}
ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
<< pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
<< pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
int r = post_work_request(tx_buffers);
if (r < 0)

View File

@ -1579,7 +1579,7 @@ int FileJournal::prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist
ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
}
// payload
ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
ebl.claim_append(bl);
if (h.post_pad) {
ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
}

View File

@ -64,7 +64,7 @@ public:
ZTracer::Trace trace;
write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
seq(s), orig_len(ol), tracked_op(opref) {
bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
bl.claim(b);
}
write_item() : seq(0), orig_len(0) {}
};

View File

@ -1714,7 +1714,7 @@ TEST(BufferList, push_back) {
bufferptr ptr;
bl.push_back(std::move(ptr));
EXPECT_EQ((unsigned)0, bl.length());
EXPECT_EQ((unsigned)0, bl.buffers().size());
EXPECT_EQ((unsigned)0, bl.get_num_buffers());
}
{
bufferlist bl;
@ -1723,7 +1723,7 @@ TEST(BufferList, push_back) {
ptr.c_str()[0] = 'B';
bl.push_back(std::move(ptr));
EXPECT_EQ((unsigned)(1 + len), bl.length());
EXPECT_EQ((unsigned)2, bl.buffers().size());
EXPECT_EQ((unsigned)2, bl.get_num_buffers());
EXPECT_EQ('B', bl.buffers().back()[0]);
EXPECT_FALSE(static_cast<instrumented_bptr&>(ptr).get_raw());
}
@ -2047,18 +2047,18 @@ TEST(BufferList, append) {
//
{
bufferlist bl;
EXPECT_EQ((unsigned)0, bl.buffers().size());
EXPECT_EQ((unsigned)0, bl.get_num_buffers());
EXPECT_EQ((unsigned)0, bl.length());
{
bufferptr ptr;
bl.append(std::move(ptr));
EXPECT_EQ((unsigned)0, bl.buffers().size());
EXPECT_EQ((unsigned)0, bl.get_num_buffers());
EXPECT_EQ((unsigned)0, bl.length());
}
{
bufferptr ptr(3);
bl.append(std::move(ptr));
EXPECT_EQ((unsigned)1, bl.buffers().size());
EXPECT_EQ((unsigned)1, bl.get_num_buffers());
EXPECT_EQ((unsigned)3, bl.length());
EXPECT_FALSE(static_cast<instrumented_bptr&>(ptr).get_raw());
}
@ -2752,48 +2752,6 @@ TEST(BufferList, TestDirectAppend) {
ASSERT_EQ(memcmp(bl.c_str(), correct, curpos), 0);
}
TEST(BufferList, TestCloneNonShareable) {
bufferlist bl;
std::string str = "sharetest";
bl.append(str.c_str(), 9);
bufferlist bl_share;
bl_share.share(bl);
bufferlist bl_noshare;
buffer::ptr unraw = buffer::create_unshareable(10);
unraw.copy_in(0, 9, str.c_str());
bl_noshare.append(unraw);
bufferlist bl_copied_share = bl_share;
bufferlist bl_copied_noshare = bl_noshare;
// assert shared bufferlist has same buffers
bufferlist::iterator iter_bl = bl.begin();
bufferlist::iterator iter_bl_share = bl_share.begin();
// ok, this considers ptr::off, but it's still a true assertion (and below)
ASSERT_TRUE(iter_bl.get_current_ptr().c_str() ==
iter_bl_share.get_current_ptr().c_str());
// assert copy of shareable bufferlist has same buffers
iter_bl = bl.begin();
bufferlist::iterator iter_bl_copied_share = bl_copied_share.begin();
ASSERT_TRUE(iter_bl.get_current_ptr().c_str() ==
iter_bl_copied_share.get_current_ptr().c_str());
// assert copy of non-shareable bufferlist has different buffers
bufferlist::iterator iter_bl_copied_noshare = bl_copied_noshare.begin();
ASSERT_FALSE(iter_bl.get_current_ptr().c_str() ==
iter_bl_copied_noshare.get_current_ptr().c_str());
// assert that claim with CLAIM_ALLOW_NONSHAREABLE overrides safe-sharing
bufferlist bl_claim_noshare_override;
void* addr = bl_noshare.begin().get_current_ptr().c_str();
bl_claim_noshare_override.claim(bl_noshare,
buffer::list::CLAIM_ALLOW_NONSHAREABLE);
bufferlist::iterator iter_bl_noshare_override =
bl_claim_noshare_override.begin();
ASSERT_TRUE(addr /* the original first segment of bl_noshare() */ ==
iter_bl_noshare_override.get_current_ptr().c_str());
}
TEST(BufferList, TestCopyAll) {
const static size_t BIG_SZ = 10737414;
std::shared_ptr <unsigned char> big(
@ -2863,13 +2821,16 @@ TEST(BufferList, TestIsProvidedBuffer) {
ASSERT_FALSE(bl.is_provided_buffer(buff));
}
TEST(BufferList, DanglingLastP) {
TEST(BufferList, DISABLED_DanglingLastP) {
bufferlist bl;
{
// going with the unsharable buffer type to distinguish this problem
// from the generic crosstalk issue we had since the very beginning:
// previously we're using the unsharable buffer type to distinguish
// the last_p-specific problem from the generic crosstalk issues we
// had since the very beginning:
// https://gist.github.com/rzarzynski/aed18372e88aed392101adac3bd87bbc
bufferptr bp(buffer::create_unshareable(10));
// this is no longer possible as `buffer::create_unsharable()` has
// been dropped.
bufferptr bp(buffer::create(10));
bp.copy_in(0, 3, "XXX");
bl.push_back(std::move(bp));
EXPECT_EQ(0, ::memcmp("XXX", bl.c_str(), 3));
@ -2892,28 +2853,6 @@ TEST(BufferList, DanglingLastP) {
EXPECT_EQ(0, ::memcmp("12C", bl.c_str(), 3));
}
TEST(BufferList, ClaimingTwoUnsharablePtrs) {
// two or more consecutive, to be precise. Otherwise the problem
// is nonexistent or self-healing.
// See: https://tracker.ceph.com/issues/43814.
bufferlist to_claim;
{
bufferptr one(buffer::create_unshareable(3));
one.copy_in(0, 3, "ABC");
to_claim.push_back(std::move(one));
bufferptr two(buffer::create_unshareable(3));
two.copy_in(0, 3, "123");
to_claim.push_back(std::move(two));
}
bufferlist claimer;
// this is supposed to not fail because of destructing wrong bptr:
// [ RUN ] BufferList.ClaimingTwoUnsharablePtrs
// *** Error in `./bin/unittest_bufferlist': free(): invalid pointer: 0x00007ffe20f03e20 ***
claimer.claim_append(to_claim);
EXPECT_EQ(0, ::memcmp("ABC123", claimer.c_str(), 6));
}
TEST(BufferHash, all) {
{
bufferlist bl;