Merge pull request #24882 from rzarzynski/wip-bl-hypercombined

common: hypercombined bufferlist

Reviewed-by: Casey Bodley <cbodley@redhat.com>
Reviewed-by: Adam Emerson <aemerson@redhat.com>
This commit is contained in:
Kefu Chai 2018-12-06 00:16:55 +08:00 committed by GitHub
commit 2919ada413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 536 additions and 231 deletions

View File

@ -9255,9 +9255,9 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
int64_t some = in->size - pos;
if (some > left)
some = left;
bufferptr z(some);
z.zero();
bl->push_back(z);
auto z = buffer::ptr_node::create(some);
z->zero();
bl->push_back(std::move(z));
read += some;
pos += some;
left -= some;
@ -13174,10 +13174,10 @@ int Client::ll_write_block(Inode *in, uint64_t blockid,
}
object_t oid = file_object_t(vino.ino, blockid);
SnapContext fakesnap;
bufferptr bp;
if (length > 0) bp = buffer::copy(buf, length);
bufferlist bl;
bl.push_back(bp);
ceph::bufferlist bl;
if (length > 0) {
bl.push_back(buffer::copy(buf, length));
}
ldout(cct, 1) << "ll_block_write for " << vino.ino << "." << blockid
<< dendl;

View File

@ -639,14 +639,18 @@ using namespace ceph;
if (_raw) {
bdout << "ptr " << this << " release " << _raw << bendl;
if (--_raw->nref == 0) {
// BE CAREFUL: this is called also for hypercombined ptr_node. After
// freeing underlying raw, `*this` can become inaccessible as well!
const auto* delete_raw = _raw;
_raw = nullptr;
//cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
ANNOTATE_HAPPENS_AFTER(&_raw->nref);
ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
delete _raw; // dealloc old (if any)
delete delete_raw; // dealloc old (if any)
} else {
ANNOTATE_HAPPENS_BEFORE(&_raw->nref);
_raw = nullptr;
}
_raw = 0;
}
}
@ -1085,13 +1089,11 @@ using namespace ceph;
if (p == ls->end())
seek(off);
unsigned left = len;
for (std::list<ptr>::const_iterator i = otherl._buffers.begin();
i != otherl._buffers.end();
++i) {
unsigned l = (*i).length();
for (const auto& node : otherl._buffers) {
unsigned l = node.length();
if (left < l)
l = left;
copy_in(l, i->c_str());
copy_in(l, node.c_str());
left -= l;
if (left == 0)
break;
@ -1127,10 +1129,10 @@ using namespace ceph;
// buffer-wise comparison
if (true) {
std::list<ptr>::const_iterator a = _buffers.begin();
std::list<ptr>::const_iterator b = other._buffers.begin();
auto a = std::cbegin(_buffers);
auto b = std::cbegin(other._buffers);
unsigned aoff = 0, boff = 0;
while (a != _buffers.end()) {
while (a != std::cend(_buffers)) {
unsigned len = a->length() - aoff;
if (len > b->length() - boff)
len = b->length() - boff;
@ -1147,7 +1149,7 @@ using namespace ceph;
++b;
}
}
ceph_assert(b == other._buffers.end());
ceph_assert(b == std::cend(other._buffers));
return true;
}
@ -1165,50 +1167,49 @@ using namespace ceph;
}
}
bool buffer::list::is_provided_buffer(const char *dst) const
bool buffer::list::is_provided_buffer(const char* const dst) const
{
if (_buffers.empty())
if (_buffers.empty()) {
return false;
}
return (is_contiguous() && (_buffers.front().c_str() == dst));
}
bool buffer::list::is_aligned(unsigned align) const
bool buffer::list::is_aligned(const unsigned align) const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it)
if (!it->is_aligned(align))
for (const auto& node : _buffers) {
if (!node.is_aligned(align)) {
return false;
}
}
return true;
}
bool buffer::list::is_n_align_sized(unsigned align) const
bool buffer::list::is_n_align_sized(const unsigned align) const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it)
if (!it->is_n_align_sized(align))
for (const auto& node : _buffers) {
if (!node.is_n_align_sized(align)) {
return false;
}
}
return true;
}
bool buffer::list::is_aligned_size_and_memory(unsigned align_size,
unsigned align_memory) const
bool buffer::list::is_aligned_size_and_memory(
const unsigned align_size,
const unsigned align_memory) const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it) {
if (!it->is_aligned(align_memory) || !it->is_n_align_sized(align_size))
for (const auto& node : _buffers) {
if (!node.is_aligned(align_memory) || !node.is_n_align_sized(align_size)) {
return false;
}
}
return true;
}
bool buffer::list::is_zero() const {
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it) {
if (!it->is_zero()) {
for (const auto& node : _buffers) {
if (!node.is_zero()) {
return false;
}
}
@ -1217,47 +1218,45 @@ using namespace ceph;
void buffer::list::zero()
{
for (std::list<ptr>::iterator it = _buffers.begin();
it != _buffers.end();
++it)
it->zero();
for (auto& node : _buffers) {
node.zero();
}
}
void buffer::list::zero(unsigned o, unsigned l)
void buffer::list::zero(const unsigned o, const unsigned l)
{
ceph_assert(o+l <= _len);
unsigned p = 0;
for (std::list<ptr>::iterator it = _buffers.begin();
it != _buffers.end();
++it) {
if (p + it->length() > o) {
if (p >= o && p+it->length() <= o+l) {
for (auto& node : _buffers) {
if (p + node.length() > o) {
if (p >= o && p+node.length() <= o+l) {
// 'o'------------- l -----------|
// 'p'-- it->length() --|
it->zero();
// 'p'-- node.length() --|
node.zero();
} else if (p >= o) {
// 'o'------------- l -----------|
// 'p'------- it->length() -------|
it->zero(0, o+l-p);
} else if (p + it->length() <= o+l) {
// 'p'------- node.length() -------|
node.zero(0, o+l-p);
} else if (p + node.length() <= o+l) {
// 'o'------------- l -----------|
// 'p'------- it->length() -------|
it->zero(o-p, it->length()-(o-p));
// 'p'------- node.length() -------|
node.zero(o-p, node.length()-(o-p));
} else {
// 'o'----------- l -----------|
// 'p'---------- it->length() ----------|
it->zero(o-p, l);
// 'p'---------- node.length() ----------|
node.zero(o-p, l);
}
}
p += it->length();
if (o+l <= p)
p += node.length();
if (o+l <= p) {
break; // done
}
}
}
bool buffer::list::is_contiguous() const
{
return &(*_buffers.begin()) == &(*_buffers.rbegin());
return _buffers.size() <= 1;
}
bool buffer::list::is_n_page_sized() const
@ -1327,30 +1326,28 @@ using namespace ceph;
void buffer::list::rebuild()
{
if (_len == 0) {
_buffers.clear();
_buffers.clear_and_dispose();
return;
}
ptr nb;
if ((_len & ~CEPH_PAGE_MASK) == 0)
nb = buffer::create_page_aligned(_len);
rebuild(ptr_node::create(buffer::create_page_aligned(_len)));
else
nb = buffer::create(_len);
rebuild(nb);
rebuild(ptr_node::create(buffer::create(_len)));
}
void buffer::list::rebuild(ptr& nb)
void buffer::list::rebuild(
std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> nb)
{
unsigned pos = 0;
for (std::list<ptr>::iterator it = _buffers.begin();
it != _buffers.end();
++it) {
nb.copy_in(pos, it->length(), it->c_str(), false);
pos += it->length();
for (auto& node : _buffers) {
nb->copy_in(pos, node.length(), node.c_str(), false);
pos += node.length();
}
_memcopy_count += pos;
_buffers.clear();
if (nb.length())
_buffers.push_back(nb);
_buffers.clear_and_dispose();
if (likely(nb->length())) {
_buffers.push_back(*nb.release());
}
invalidate_crc();
last_p = begin();
}
@ -1370,8 +1367,9 @@ using namespace ceph;
&& _len > (max_buffers * align_size)) {
align_size = round_up_to(round_up_to(_len, max_buffers) / max_buffers, align_size);
}
std::list<ptr>::iterator p = _buffers.begin();
while (p != _buffers.end()) {
auto p = std::begin(_buffers);
auto p_prev = _buffers.before_begin();
while (p != std::end(_buffers)) {
// keep anything that's already align and sized aligned
if (p->is_aligned(align_memory) && p->is_n_align_sized(align_size)) {
/*cout << " segment " << (void*)p->c_str()
@ -1379,7 +1377,7 @@ using namespace ceph;
<< " length " << p->length()
<< " " << (p->length() & (align - 1)) << " ok" << std::endl;
*/
++p;
p_prev = p++;
continue;
}
@ -1394,18 +1392,23 @@ using namespace ceph;
<< " not ok" << std::endl;
*/
offset += p->length();
unaligned.push_back(*p);
_buffers.erase(p++);
} while (p != _buffers.end() &&
// no need to reallocate, relinking is enough thankfully to bi::list.
auto p_after = _buffers.erase_after(p_prev);
unaligned._buffers.push_back(*p);
unaligned._len += p->length();
p = p_after;
} while (p != std::end(_buffers) &&
(!p->is_aligned(align_memory) ||
!p->is_n_align_sized(align_size) ||
(offset % align_size)));
if (!(unaligned.is_contiguous() && unaligned._buffers.front().is_aligned(align_memory))) {
ptr nb(buffer::create_aligned(unaligned._len, align_memory));
unaligned.rebuild(nb);
unaligned.rebuild(
ptr_node::create(
buffer::create_aligned(unaligned._len, align_memory)));
_memcopy_count += unaligned._len;
}
_buffers.insert(p, unaligned._buffers.front());
_buffers.insert_after(p_prev, *ptr_node::create(unaligned._buffers.front()).release());
++p_prev;
}
last_p = begin();
@ -1439,7 +1442,7 @@ using namespace ceph;
_len += bl._len;
if (!(flags & CLAIM_ALLOW_NONSHAREABLE))
bl.make_shareable();
_buffers.splice(_buffers.end(), bl._buffers );
_buffers.splice_back(bl._buffers);
bl._len = 0;
bl.last_p = bl.begin();
}
@ -1447,9 +1450,8 @@ using namespace ceph;
void buffer::list::claim_append_piecewise(list& bl)
{
// steal the other guy's buffers
for (std::list<buffer::ptr>::const_iterator i = bl.buffers().begin();
i != bl.buffers().end(); ++i) {
append(*i, 0, i->length());
for (const auto& node : bl.buffers()) {
append(node, 0, node.length());
}
bl.clear();
}
@ -1561,16 +1563,15 @@ using namespace ceph;
}
}
// add new item to list
push_back(ptr(bp, off, len));
push_back(ptr_node::create(bp, off, len));
}
void buffer::list::append(const list& bl)
{
_len += bl._len;
for (std::list<ptr>::const_iterator p = bl._buffers.begin();
p != bl._buffers.end();
++p)
_buffers.push_back(*p);
for (const auto& node : bl._buffers) {
_buffers.push_back(*ptr_node::create(node).release());
}
}
void buffer::list::append(std::istream& in)
@ -1600,15 +1601,15 @@ using namespace ceph;
append_buffer.set_length(append_buffer.length() + len);
append(append_buffer, append_buffer.length() - len, len);
return { std::prev(std::end(_buffers))->end_c_str() - len };
return { _buffers.back().end_c_str() - len };
}
void buffer::list::prepend_zero(unsigned len)
{
ptr bp(len);
bp.zero(false);
auto bp = ptr_node::create(len);
bp->zero(false);
_len += len;
_buffers.emplace_front(std::move(bp));
_buffers.push_front(*bp.release());
}
void buffer::list::append_zero(unsigned len)
@ -1620,9 +1621,9 @@ using namespace ceph;
len -= need;
}
if (len) {
ptr bp = buffer::create_page_aligned(len);
bp.zero(false);
append(std::move(bp));
auto bp = ptr_node::create(buffer::create_page_aligned(len));
bp->zero(false);
push_back(std::move(bp));
}
}
@ -1635,14 +1636,12 @@ using namespace ceph;
if (n >= _len)
throw end_of_buffer();
for (std::list<ptr>::const_iterator p = _buffers.begin();
p != _buffers.end();
++p) {
if (n >= p->length()) {
n -= p->length();
for (const auto& node : _buffers) {
if (n >= node.length()) {
n -= node.length();
continue;
}
return (*p)[n];
return node[n];
}
ceph_abort();
}
@ -1655,22 +1654,21 @@ using namespace ceph;
if (_buffers.empty())
return 0; // no buffers
std::list<ptr>::const_iterator iter = _buffers.begin();
auto iter = std::cbegin(_buffers);
++iter;
if (iter != _buffers.end())
if (iter != std::cend(_buffers)) {
rebuild();
}
return _buffers.front().c_str(); // good, we're already contiguous.
}
string buffer::list::to_str() const {
string s;
s.reserve(length());
for (std::list<ptr>::const_iterator p = _buffers.begin();
p != _buffers.end();
++p) {
if (p->length()) {
s.append(p->c_str(), p->length());
for (const auto& node : _buffers) {
if (node.length()) {
s.append(node.c_str(), node.length());
}
}
return s;
@ -1684,21 +1682,20 @@ using namespace ceph;
clear();
// skip off
std::list<ptr>::const_iterator curbuf = other._buffers.begin();
while (off > 0 &&
off >= curbuf->length()) {
auto curbuf = std::cbegin(other._buffers);
while (off > 0 && off >= curbuf->length()) {
// skip this buffer
//cout << "skipping over " << *curbuf << std::endl;
off -= (*curbuf).length();
++curbuf;
}
ceph_assert(len == 0 || curbuf != other._buffers.end());
ceph_assert(len == 0 || curbuf != std::cend(other._buffers));
while (len > 0) {
// partial?
if (off + len < curbuf->length()) {
//cout << "copying partial of " << *curbuf << std::endl;
_buffers.push_back( ptr( *curbuf, off, len ) );
_buffers.push_back(*ptr_node::create( *curbuf, off, len ).release());
_len += len;
break;
}
@ -1706,7 +1703,7 @@ using namespace ceph;
// through end
//cout << "copying end (all?) of " << *curbuf << std::endl;
unsigned howmuch = curbuf->length() - off;
_buffers.push_back( ptr( *curbuf, off, howmuch ) );
_buffers.push_back(*ptr_node::create( *curbuf, off, howmuch ).release());
_len += howmuch;
len -= howmuch;
off = 0;
@ -1727,14 +1724,15 @@ using namespace ceph;
//cout << "splice off " << off << " len " << len << " ... mylen = " << length() << std::endl;
// skip off
std::list<ptr>::iterator curbuf = _buffers.begin();
auto curbuf = std::begin(_buffers);
auto curbuf_prev = _buffers.before_begin();
while (off > 0) {
ceph_assert(curbuf != _buffers.end());
ceph_assert(curbuf != std::end(_buffers));
if (off >= (*curbuf).length()) {
// skip this buffer
//cout << "off = " << off << " skipping over " << *curbuf << std::endl;
off -= (*curbuf).length();
++curbuf;
curbuf_prev = curbuf++;
} else {
// somewhere in this buffer!
//cout << "off = " << off << " somewhere in " << *curbuf << std::endl;
@ -1746,8 +1744,10 @@ using namespace ceph;
// add a reference to the front bit
// insert it before curbuf (which we'll hose)
//cout << "keeping front " << off << " of " << *curbuf << std::endl;
_buffers.insert( curbuf, ptr( *curbuf, 0, off ) );
_buffers.insert_after(curbuf_prev,
*ptr_node::create(*curbuf, 0, off).release());
_len += off;
++curbuf_prev;
}
while (len > 0) {
@ -1769,7 +1769,7 @@ using namespace ceph;
if (claim_by)
claim_by->append( *curbuf, off, howmuch );
_len -= (*curbuf).length();
_buffers.erase( curbuf++ );
curbuf = _buffers.erase_after_and_dispose(curbuf_prev);
len -= howmuch;
off = 0;
}
@ -1783,19 +1783,11 @@ using namespace ceph;
{
list s;
s.substr_of(*this, off, len);
for (std::list<ptr>::const_iterator it = s._buffers.begin();
it != s._buffers.end();
++it)
if (it->length())
out.write(it->c_str(), it->length());
/*iterator p(this, off);
while (len > 0 && !p.end()) {
int l = p.left_in_this_buf();
if (l > len)
l = len;
out.write(p.c_str(), l);
len -= l;
}*/
for (const auto& node : s._buffers) {
if (node.length()) {
out.write(node.c_str(), node.length());
}
}
}
void buffer::list::encode_base64(buffer::list& o)
@ -1869,11 +1861,11 @@ int buffer::list::read_file(const char *fn, std::string *error)
ssize_t buffer::list::read_fd(int fd, size_t len)
{
bufferptr bp = buffer::create(len);
ssize_t ret = safe_read(fd, (void*)bp.c_str(), len);
auto bp = ptr_node::create(buffer::create(len));
ssize_t ret = safe_read(fd, (void*)bp->c_str(), len);
if (ret >= 0) {
bp.set_length(ret);
append(std::move(bp));
bp->set_length(ret);
push_back(std::move(bp));
}
return ret;
}
@ -1949,8 +1941,8 @@ int buffer::list::write_fd(int fd) const
int iovlen = 0;
ssize_t bytes = 0;
std::list<ptr>::const_iterator p = _buffers.begin();
while (p != _buffers.end()) {
auto p = std::cbegin(_buffers);
while (p != std::cend(_buffers)) {
if (p->length() > 0) {
iov[iovlen].iov_base = (void *)p->c_str();
iov[iovlen].iov_len = p->length();
@ -1998,8 +1990,8 @@ int buffer::list::write_fd(int fd, uint64_t offset) const
{
iovec iov[IOV_MAX];
std::list<ptr>::const_iterator p = _buffers.begin();
uint64_t left_pbrs = _buffers.size();
auto p = std::cbegin(_buffers);
uint64_t left_pbrs = std::size(_buffers);
while (left_pbrs) {
ssize_t bytes = 0;
unsigned iovlen = 0;
@ -2028,12 +2020,10 @@ __u32 buffer::list::crc32c(__u32 crc) const
int cache_hits = 0;
int cache_adjusts = 0;
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it) {
if (it->length()) {
raw *r = it->get_raw();
pair<size_t, size_t> ofs(it->offset(), it->offset() + it->length());
for (const auto& node : _buffers) {
if (node.length()) {
raw* const r = node.get_raw();
pair<size_t, size_t> ofs(node.offset(), node.offset() + node.length());
pair<uint32_t, uint32_t> ccrc;
if (r->get_crc(ofs, &ccrc)) {
if (ccrc.first == crc) {
@ -2049,13 +2039,13 @@ __u32 buffer::list::crc32c(__u32 crc) const
* http://crcutil.googlecode.com/files/crc-doc.1.0.pdf
* note, u for our crc32c implementation is 0
*/
crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, node.length());
cache_adjusts++;
}
} else {
cache_misses++;
uint32_t base = crc;
crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
crc = ceph_crc32c(crc, (unsigned char*)node.c_str(), node.length());
r->set_crc(ofs, make_pair(base, crc));
}
}
@ -2075,8 +2065,8 @@ __u32 buffer::list::crc32c(__u32 crc) const
void buffer::list::invalidate_crc()
{
for (std::list<ptr>::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) {
raw *r = p->get_raw();
for (const auto& node : _buffers) {
raw* const r = node.get_raw();
if (r) {
r->invalidate_crc();
}
@ -2102,9 +2092,9 @@ sha1_digest_t buffer::list::sha1()
*/
void buffer::list::write_stream(std::ostream &out) const
{
for (std::list<ptr>::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) {
if (p->length() > 0) {
out.write(p->c_str(), p->length());
for (const auto& node : _buffers) {
if (node.length() > 0) {
out.write(node.c_str(), node.length());
}
}
}
@ -2184,7 +2174,7 @@ void buffer::list::hexdump(std::ostream &out, bool trailing_newline) const
buffer::list buffer::list::static_from_mem(char* c, size_t l) {
list bl;
bl.push_back(ptr(create_static(l, c)));
bl.push_back(ptr_node::create(create_static(l, c)));
return bl;
}
@ -2200,6 +2190,29 @@ buffer::list buffer::list::static_from_string(string& s) {
// const makes me generally sad.
}
bool buffer::ptr_node::dispose_if_hypercombined(
buffer::ptr_node* const delete_this)
{
const bool is_hypercombined = static_cast<void*>(delete_this) == \
static_cast<void*>(&delete_this->get_raw()->bptr_storage);
if (is_hypercombined) {
delete_this->~ptr_node();
}
return is_hypercombined;
}
std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>
buffer::ptr_node::create_hypercombined(buffer::raw* const r)
{
if (likely(r->nref == 0)) {
return std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>(
new (&r->bptr_storage) ptr_node(r));
} else {
return std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>(
new ptr_node(r));
}
}
std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
}
@ -2219,11 +2232,11 @@ std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
std::ostream& buffer::operator<<(std::ostream& out, const buffer::list& bl) {
out << "buffer::list(len=" << bl.length() << "," << std::endl;
std::list<buffer::ptr>::const_iterator it = bl.buffers().begin();
while (it != bl.buffers().end()) {
out << "\t" << *it;
if (++it == bl.buffers().end()) break;
out << "," << std::endl;
for (const auto& node : bl.buffers()) {
out << "\t" << node;
if (&node != &bl.buffers().back()) {
out << "," << std::endl;
}
}
out << std::endl << ")";
return out;

View File

@ -66,7 +66,7 @@ int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
return -1;
}
for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
i != in.buffers().end();) {
c_in = (unsigned char*) (*i).c_str();
@ -120,7 +120,7 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
isal_deflate_init(&strm);
strm.end_of_stream = 0;
for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
i != in.buffers().end();) {
c_in = (unsigned char*) (*i).c_str();

View File

@ -271,6 +271,8 @@ namespace buffer CEPH_BUFFER_API {
ptr& operator= (const ptr& p);
ptr& operator= (ptr&& p) noexcept;
~ptr() {
// BE CAREFUL: this destructor is called also for hypercombined ptr_node.
// After freeing underlying raw, `*this` can become inaccessible as well!
release();
}
@ -379,21 +381,300 @@ namespace buffer CEPH_BUFFER_API {
};
struct ptr_hook {
mutable ptr_hook* next;
ptr_hook() = default;
ptr_hook(ptr_hook* const next)
: next(next) {
}
};
class ptr_node : public ptr_hook, public ptr {
public:
struct cloner {
ptr_node* operator()(const ptr_node& clone_this) {
return new ptr_node(clone_this);
}
};
struct disposer {
void operator()(ptr_node* const delete_this) {
if (!dispose_if_hypercombined(delete_this)) {
delete delete_this;
}
}
};
~ptr_node() = default;
static std::unique_ptr<ptr_node, disposer> create(raw* const r) {
return create_hypercombined(r);
}
static std::unique_ptr<ptr_node, disposer> create(const unsigned l) {
return create_hypercombined(buffer::create(l));
}
template <class... Args>
static std::unique_ptr<ptr_node, disposer> create(Args&&... args) {
return std::unique_ptr<ptr_node, disposer>(
new ptr_node(std::forward<Args>(args)...));
}
private:
template <class... Args>
ptr_node(Args&&... args) : ptr(std::forward<Args>(args)...) {
}
ptr_node(const ptr_node&) = default;
ptr& operator= (const ptr& p) = delete;
ptr& operator= (ptr&& p) noexcept = delete;
ptr_node& operator= (const ptr_node& p) = delete;
ptr_node& operator= (ptr_node&& p) noexcept = delete;
void swap(ptr& other) noexcept = delete;
void swap(ptr_node& other) noexcept = delete;
static bool dispose_if_hypercombined(ptr_node* delete_this);
static std::unique_ptr<ptr_node, disposer> create_hypercombined(raw* r);
};
/*
* list - the useful bit!
*/
class CEPH_BUFFER_API list {
public:
// this the very low-level implementation of singly linked list
// ceph::buffer::list is built on. We don't use intrusive slist
// of Boost (or any other 3rd party) to save extra dependencies
// in our public headers.
class buffers_t {
// _root.next can be thought as _head
ptr_hook _root;
ptr_hook* _tail;
std::size_t _size;
public:
template <class T>
class buffers_iterator {
typename std::conditional<
std::is_const<T>::value, const ptr_hook*, ptr_hook*>::type cur;
template <class U> friend class buffers_iterator;
public:
using value_type = T;
using reference = typename std::add_lvalue_reference<T>::type;
using pointer = typename std::add_pointer<T>::type;
using difference_type = std::ptrdiff_t;
using iterator_category = std::forward_iterator_tag;
template <class U>
buffers_iterator(U* const p)
: cur(p) {
}
template <class U>
buffers_iterator(const buffers_iterator<U>& other)
: cur(other.cur) {
}
buffers_iterator() = default;
T& operator*() const {
return *reinterpret_cast<T*>(cur);
}
T* operator->() const {
return reinterpret_cast<T*>(cur);
}
buffers_iterator& operator++() {
cur = cur->next;
return *this;
}
buffers_iterator operator++(int) {
const auto temp(*this);
++*this;
return temp;
}
template <class U>
buffers_iterator& operator=(buffers_iterator<U>& other) {
cur = other.cur;
return *this;
}
bool operator==(const buffers_iterator& rhs) const {
return cur == rhs.cur;
}
bool operator!=(const buffers_iterator& rhs) const {
return !(*this==rhs);
}
using citer_t = buffers_iterator<typename std::add_const<T>::type>;
operator citer_t() const {
return citer_t(cur);
}
};
typedef buffers_iterator<const ptr_node> const_iterator;
typedef buffers_iterator<ptr_node> iterator;
typedef const ptr_node& const_reference;
typedef ptr_node& reference;
buffers_t()
: _root(&_root),
_tail(&_root),
_size(0) {
}
buffers_t(const buffers_t&) = delete;
buffers_t(buffers_t&& other)
: _root(other._root.next == &other._root ? &_root : other._root.next),
_tail(other._tail == &other._root ? &_root : other._tail),
_size(other._size) {
other._root.next = &other._root;
other._tail = &other._root;
other._size = 0;
_tail->next = &_root;
}
buffers_t& operator=(buffers_t&& other) {
if (&other != this) {
clear_and_dispose();
swap(other);
}
return *this;
}
void push_back(reference item) {
item.next = &_root;
// this updates _root.next when called on empty
_tail->next = &item;
_tail = &item;
_size++;
}
void push_front(reference item) {
item.next = _root.next;
_root.next = &item;
_tail = _tail == &_root ? &item : _tail;
_size++;
}
// *_after
iterator erase_after(const_iterator it) {
const auto* to_erase = it->next;
it->next = to_erase->next;
_root.next = _root.next == to_erase ? to_erase->next : _root.next;
_tail = _tail == to_erase ? (ptr_hook*)&*it : _tail;
_size--;
return it->next;
}
void insert_after(const_iterator it, reference item) {
item.next = it->next;
it->next = &item;
_root.next = it == end() ? &item : _root.next;
_tail = const_iterator(_tail) == it ? &item : _tail;
_size++;
}
void splice_back(buffers_t& other) {
if (other._size == 0) {
return;
}
other._tail->next = &_root;
// will update root.next if empty() == true
_tail->next = other._root.next;
_tail = other._tail;
_size += other._size;
other._root.next = &other._root;
other._tail = &other._root;
other._size = 0;
}
std::size_t size() const { return _size; }
bool empty() const { return _tail == &_root; }
const_iterator begin() const {
return _root.next;
}
const_iterator before_begin() const {
return &_root;
}
const_iterator end() const {
return &_root;
}
iterator begin() {
return _root.next;
}
iterator before_begin() {
return &_root;
}
iterator end() {
return &_root;
}
reference front() {
return reinterpret_cast<reference>(*_root.next);
}
reference back() {
return reinterpret_cast<reference>(*_tail);
}
const_reference front() const {
return reinterpret_cast<const_reference>(*_root.next);
}
const_reference back() const {
return reinterpret_cast<const_reference>(*_tail);
}
void clone_from(const buffers_t& other) {
clear_and_dispose();
for (auto& node : other) {
ptr_node* clone = ptr_node::cloner()(node);
push_back(*clone);
}
}
void clear_and_dispose() {
for (auto it = begin(); it != end(); /* nop */) {
auto& node = *it;
it = it->next;
ptr_node::disposer()(&node);
}
_root.next = &_root;
_tail = &_root;
_size = 0;
}
iterator erase_after_and_dispose(iterator it) {
auto* to_dispose = &*std::next(it);
auto ret = erase_after(it);
ptr_node::disposer()(to_dispose);
return ret;
}
void swap(buffers_t& other) {
const auto copy_root = _root;
_root.next = \
other._root.next == &other._root ? &this->_root : other._root.next;
other._root.next = \
copy_root.next == &_root ? &other._root : copy_root.next;
const auto copy_tail = _tail;
_tail = other._tail == &other._root ? &this->_root : other._tail;
other._tail = copy_tail == &_root ? &other._root : copy_tail;
_tail->next = &_root;
other._tail->next = &other._root;
std::swap(_size, other._size);
}
};
class iterator;
private:
// my private bits
std::list<ptr> _buffers;
buffers_t _buffers;
unsigned _len;
unsigned _memcopy_count; //the total of memcopy using rebuild().
ptr append_buffer; // where i put small appends.
public:
class iterator;
private:
template <bool is_const>
class CEPH_BUFFER_API iterator_impl {
protected:
@ -401,11 +682,11 @@ namespace buffer CEPH_BUFFER_API {
const list,
list>::type bl_t;
typedef typename std::conditional<is_const,
const std::list<ptr>,
std::list<ptr> >::type list_t;
const buffers_t,
buffers_t >::type list_t;
typedef typename std::conditional<is_const,
typename std::list<ptr>::const_iterator,
typename std::list<ptr>::iterator>::type list_iter_t;
typename buffers_t::const_iterator,
typename buffers_t::iterator>::type list_iter_t;
using iterator_category = std::forward_iterator_tag;
using value_type = typename std::conditional<is_const, const char, char>::type;
using difference_type = std::ptrdiff_t;
@ -702,20 +983,25 @@ namespace buffer CEPH_BUFFER_API {
reserve(prealloc);
}
list(const list& other) : _buffers(other._buffers), _len(other._len),
list(const list& other) : _len(other._len),
_memcopy_count(other._memcopy_count), last_p(this) {
_buffers.clone_from(other._buffers);
make_shareable();
}
list(list&& other) noexcept;
~list() {
_buffers.clear_and_dispose();
}
list& operator= (const list& other) {
if (this != &other) {
_buffers = other._buffers;
_buffers.clone_from(other._buffers);
_len = other._len;
make_shareable();
}
return *this;
}
list& operator= (list&& other) noexcept {
_buffers = std::move(other._buffers);
_len = other._len;
@ -728,8 +1014,8 @@ namespace buffer CEPH_BUFFER_API {
uint64_t get_wasted_space() const;
unsigned get_num_buffers() const { return _buffers.size(); }
const ptr& front() const { return _buffers.front(); }
const ptr& back() const { return _buffers.back(); }
const ptr_node& front() const { return _buffers.front(); }
const ptr_node& back() const { return _buffers.back(); }
int get_mempool() const;
void reassign_to_mempool(int pool);
@ -740,7 +1026,7 @@ namespace buffer CEPH_BUFFER_API {
}
unsigned get_memcopy_count() const {return _memcopy_count; }
const std::list<ptr>& buffers() const { return _buffers; }
const buffers_t& buffers() const { return _buffers; }
void swap(list& other) noexcept;
unsigned length() const {
#if 0
@ -774,7 +1060,7 @@ namespace buffer CEPH_BUFFER_API {
// modifiers
void clear() noexcept {
_buffers.clear();
_buffers.clear_and_dispose();
_len = 0;
_memcopy_count = 0;
last_p = begin();
@ -783,17 +1069,27 @@ namespace buffer CEPH_BUFFER_API {
void push_back(const ptr& bp) {
if (bp.length() == 0)
return;
_buffers.push_back(bp);
_buffers.push_back(*ptr_node::create(bp).release());
_len += bp.length();
}
void push_back(ptr&& bp) {
if (bp.length() == 0)
return;
_len += bp.length();
_buffers.push_back(std::move(bp));
_buffers.push_back(*ptr_node::create(std::move(bp)).release());
}
void push_back(raw *r) {
push_back(ptr(r));
void push_back(const ptr_node&) = delete;
void push_back(ptr_node&) = delete;
void push_back(ptr_node&&) = delete;
void push_back(std::unique_ptr<ptr_node, ptr_node::disposer> bp) {
if (bp->length() == 0)
return;
_len += bp->length();
_buffers.push_back(*bp.release());
}
void push_back(raw* const r) {
_buffers.push_back(*ptr_node::create(r).release());
_len += _buffers.back().length();
}
void zero();
@ -801,7 +1097,7 @@ namespace buffer CEPH_BUFFER_API {
bool is_contiguous() const;
void rebuild();
void rebuild(ptr& nb);
void rebuild(std::unique_ptr<ptr_node, ptr_node::disposer> nb);
bool rebuild_aligned(unsigned align);
// max_buffers = 0 mean don't care _buffers.size(), other
// must make _buffers.size() <= max_buffers after rebuilding.
@ -823,7 +1119,7 @@ namespace buffer CEPH_BUFFER_API {
// clone non-shareable buffers (make shareable)
void make_shareable() {
std::list<buffer::ptr>::iterator pb;
decltype(_buffers)::iterator pb;
for (pb = _buffers.begin(); pb != _buffers.end(); ++pb) {
(void) pb->make_shareable();
}
@ -834,9 +1130,8 @@ namespace buffer CEPH_BUFFER_API {
{
if (this != &bl) {
clear();
std::list<buffer::ptr>::const_iterator pb;
for (pb = bl._buffers.begin(); pb != bl._buffers.end(); ++pb) {
push_back(*pb);
for (const auto& pb : bl._buffers) {
push_back(static_cast<const ptr&>(pb));
}
}
}

View File

@ -18,6 +18,7 @@
#include <atomic>
#include <map>
#include <utility>
#include <type_traits>
#include "include/buffer.h"
#include "include/mempool.h"
#include "include/spinlock.h"
@ -25,6 +26,10 @@
namespace ceph::buffer {
class raw {
public:
// In the future we might want to have a slab allocator here with few
// embedded slots. This would allow to avoid the "if" in dtor of ptr_node.
std::aligned_storage<sizeof(ptr_node),
alignof(ptr_node)>::type bptr_storage;
char *data;
unsigned len;
std::atomic<unsigned> nref { 0 };
@ -36,7 +41,7 @@ namespace ceph::buffer {
mutable ceph::spinlock crc_spinlock;
explicit raw(unsigned l, int mempool=mempool::mempool_buffer_anon)
: data(NULL), len(l), nref(0), mempool(mempool) {
: data(nullptr), len(l), nref(0), mempool(mempool) {
mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
}
raw(char *c, unsigned l, int mempool=mempool::mempool_buffer_anon)

View File

@ -255,10 +255,9 @@ void LevelDBStore::LevelDBTransactionImpl::set(
// make sure the buffer isn't too large or we might crash here...
char* slicebuf = (char*) alloca(bllen);
leveldb::Slice newslice(slicebuf, bllen);
std::list<buffer::ptr>::const_iterator pb;
for (pb = to_set_bl.buffers().begin(); pb != to_set_bl.buffers().end(); ++pb) {
size_t ptrlen = (*pb).length();
memcpy((void*)slicebuf, (*pb).c_str(), ptrlen);
for (const auto& node : to_set_bl.buffers()) {
const size_t ptrlen = node.length();
memcpy(static_cast<void*>(slicebuf), node.c_str(), ptrlen);
slicebuf += ptrlen;
}
bat.Put(leveldb::Slice(key), newslice);

View File

@ -1427,12 +1427,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
});
auto gather_ctx = new C_Gather(m_dest->cct, end_op_ctx);
bufferptr m_ptr(m_bl->length());
m_bl->rebuild(m_ptr);
m_bl->rebuild(buffer::ptr_node::create(m_bl->length()));
size_t write_offset = 0;
size_t write_length = 0;
size_t offset = 0;
size_t length = m_bl->length();
const auto& m_ptr = m_bl->front();
while (offset < length) {
if (util::calc_sparse_extent(m_ptr,
m_sparse_size,
@ -1440,9 +1440,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
&write_offset,
&write_length,
&offset)) {
bufferptr write_ptr(m_ptr, write_offset, write_length);
bufferlist *write_bl = new bufferlist();
write_bl->push_back(write_ptr);
write_bl->push_back(
buffer::ptr_node::create(m_ptr, write_offset, write_length));
Context *ctx = new C_CopyWrite(write_bl, gather_ctx->new_sub());
auto comp = io::AioCompletion::create(ctx);

View File

@ -64,10 +64,8 @@ private:
mdata_hook(&mp);
if (free_data) {
const std::list<buffer::ptr>& buffers = data.buffers();
list<bufferptr>::const_iterator pb;
for (pb = buffers.begin(); pb != buffers.end(); ++pb) {
free((void*) pb->c_str());
for (const auto& node : data.buffers()) {
free(const_cast<void*>(static_cast<const void*>(node.c_str())));
}
}
}

View File

@ -112,8 +112,8 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
ssize_t send(bufferlist &bl, bool more) override {
size_t sent_bytes = 0;
std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
uint64_t left_pbrs = bl.buffers().size();
auto pb = std::cbegin(bl.buffers());
uint64_t left_pbrs = std::size(bl.buffers());
while (left_pbrs) {
struct msghdr msg;
struct iovec msgvec[IOV_MAX];

View File

@ -446,9 +446,10 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
if (!bytes)
return 0;
auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
std::list<bufferptr>::const_iterator &start,
std::list<bufferptr>::const_iterator &end) -> unsigned {
auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers,
unsigned bytes,
auto& start,
const auto& end) -> unsigned {
ceph_assert(start != end);
auto chunk_idx = tx_buffers.size();
int ret = worker->get_reged_mem(this, tx_buffers, bytes);
@ -481,8 +482,8 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
};
std::vector<Chunk*> tx_buffers;
std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
std::list<bufferptr>::const_iterator copy_it = it;
auto it = std::cbegin(pending_bl.buffers());
auto copy_it = it;
unsigned total = 0;
unsigned need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {

View File

@ -2408,7 +2408,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo
msg.msg_iovlen++;
// payload (front+data)
list<bufferptr>::const_iterator pb = blist.buffers().begin();
auto pb = std::cbegin(blist.buffers());
unsigned b_off = 0; // carry-over buffer offset, if any
unsigned bl_pos = 0; // blist pos
unsigned left = blist.length();

View File

@ -695,15 +695,11 @@ public:
bufferlist& bl,
vector<__le32> &cm,
vector<__le32> &om) {
for (auto& bp : bl.buffers()) {
ceph_assert(bp.length() % sizeof(Op) == 0);
list<bufferptr> list = bl.buffers();
std::list<bufferptr>::iterator p;
for(p = list.begin(); p != list.end(); ++p) {
ceph_assert(p->length() % sizeof(Op) == 0);
char* raw_p = p->c_str();
char* raw_end = raw_p + p->length();
char* raw_p = const_cast<char*>(bp.c_str());
char* raw_end = raw_p + bp.length();
while (raw_p < raw_end) {
_update_op(reinterpret_cast<Op*>(raw_p), cm, om);
raw_p += sizeof(Op);

View File

@ -847,9 +847,9 @@ int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
_aio_log_start(ioc, off, len);
bufferptr p = buffer::create_small_page_aligned(len);
auto p = buffer::ptr_node::create(buffer::create_small_page_aligned(len));
int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET],
p.c_str(), len, off);
p->c_str(), len, off);
if (r < 0) {
r = -errno;
goto out;

View File

@ -1372,11 +1372,9 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
iovec *iov = new iovec[max];
int n = 0;
unsigned len = 0;
for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
n < max;
++p, ++n) {
ceph_assert(p != bl.buffers().end());
iov[n].iov_base = (void *)p->c_str();
for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) {
ceph_assert(p != std::cend(bl.buffers()));
iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str()));
iov[n].iov_len = p->length();
len += p->length();
}

View File

@ -2186,7 +2186,7 @@ public:
struct rgw_vio* get_vio() { return vio; }
const std::list<buffer::ptr>& buffers() { return bl.buffers(); }
const auto& buffers() { return bl.buffers(); }
unsigned /* XXX */ length() { return bl.length(); }

View File

@ -1294,7 +1294,7 @@ void bench_bufferlist_alloc(int size, int num, int per)
for (int i=0; i<num; ++i) {
bufferlist bl;
for (int j=0; j<per; ++j)
bl.append(buffer::create(size));
bl.push_back(buffer::ptr_node::create(buffer::create(size)));
}
utime_t end = ceph_clock_now();
cout << num << " alloc of size " << size

View File

@ -12,7 +12,7 @@ public:
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override
{
auto bl_buffers = bl.buffers();
auto& bl_buffers = bl.buffers();
auto i = bl_buffers.begin();
while (bl_len > 0)
{