Merge pull request #24031 from rzarzynski/wip-common-bl-drop-0c

common: drop the unused zero-copy facilities in ceph::bufferlist

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Josh Durgin 2018-09-17 13:48:41 -07:00 committed by GitHub
commit 202b27363d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1 additions and 451 deletions

View File

@ -314,183 +314,6 @@ using namespace ceph;
};
#endif
#ifdef CEPH_HAVE_SPLICE
class buffer::raw_pipe : public buffer::raw {
public:
MEMPOOL_CLASS_HELPERS();
explicit raw_pipe(unsigned len) : raw(len), source_consumed(false) {
size_t max = get_max_pipe_size();
if (len > max) {
bdout << "raw_pipe: requested length " << len
<< " > max length " << max << bendl;
throw malformed_input("length larger than max pipe size");
}
pipefds[0] = -1;
pipefds[1] = -1;
int r;
if (::pipe(pipefds) == -1) {
r = -errno;
bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
throw error_code(r);
}
r = set_nonblocking(pipefds);
if (r < 0) {
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
r = set_pipe_size(pipefds, len);
if (r < 0) {
bdout << "raw_pipe: could not set pipe size" << bendl;
// continue, since the pipe should become large enough as needed
}
inc_total_alloc(len);
inc_history_alloc(len);
bdout << "raw_pipe " << this << " alloc " << len << " "
<< buffer::get_total_alloc() << bendl;
}
~raw_pipe() override {
if (data)
free(data);
close_pipe(pipefds);
dec_total_alloc(len);
bdout << "raw_pipe " << this << " free " << (void *)data << " "
<< buffer::get_total_alloc() << bendl;
}
bool can_zero_copy() const override {
return true;
}
int set_source(int fd, loff_t *off) {
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
if (r < 0) {
bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
<< bendl;
return r;
}
// update length with actual amount read
_set_len(r);
return 0;
}
int zero_copy_to_fd(int fd, loff_t *offset) override {
ceph_assert(!source_consumed);
int flags = SPLICE_F_NONBLOCK;
ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
if (r < 0) {
bdout << "raw_pipe: error splicing from pipe to fd: "
<< cpp_strerror(r) << bendl;
return r;
}
source_consumed = true;
return 0;
}
buffer::raw* clone_empty() override {
// cloning doesn't make sense for pipe-based buffers,
// and is only used by unit tests for other types of buffers
return NULL;
}
char *get_data() override {
if (data)
return data;
return copy_pipe(pipefds);
}
private:
int set_pipe_size(int *fds, long length) {
#ifdef CEPH_HAVE_SETPIPE_SZ
if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
int r = -errno;
if (r == -EPERM) {
// pipe limit must have changed - EPERM means we requested
// more than the maximum size as an unprivileged user
update_max_pipe_size();
throw malformed_input("length larger than new max pipe size");
}
return r;
}
#endif
return 0;
}
int set_nonblocking(int *fds) {
if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
return -errno;
if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
return -errno;
return 0;
}
static void close_pipe(const int fds[2]) {
if (fds[0] >= 0)
VOID_TEMP_FAILURE_RETRY(::close(fds[0]));
if (fds[1] >= 0)
VOID_TEMP_FAILURE_RETRY(::close(fds[1]));
}
char *copy_pipe(int *fds) {
/* preserve original pipe contents by copying into a temporary
* pipe before reading.
*/
int tmpfd[2];
int r;
ceph_assert(!source_consumed);
ceph_assert(fds[0] >= 0);
if (::pipe(tmpfd) == -1) {
r = -errno;
bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
<< bendl;
throw error_code(r);
}
auto sg = make_scope_guard([=] { close_pipe(tmpfd); });
r = set_nonblocking(tmpfd);
if (r < 0) {
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
<< cpp_strerror(r) << bendl;
throw error_code(r);
}
r = set_pipe_size(tmpfd, len);
if (r < 0) {
bdout << "raw_pipe: error setting pipe size on temp pipe: "
<< cpp_strerror(r) << bendl;
}
int flags = SPLICE_F_NONBLOCK;
if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
r = errno;
bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
<< bendl;
throw error_code(r);
}
data = (char *)malloc(len);
if (!data) {
throw bad_alloc();
}
r = safe_read(tmpfd[0], data, len);
if (r < (ssize_t)len) {
bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
<< bendl;
free(data);
data = NULL;
throw error_code(r);
}
return data;
}
bool source_consumed;
int pipefds[2];
};
#endif // CEPH_HAVE_SPLICE
/*
* primitive buffer types
*/
@ -703,20 +526,6 @@ using namespace ceph;
return create_aligned(len, CEPH_PAGE_SIZE);
}
buffer::raw* buffer::create_zero_copy(unsigned len, int fd, int64_t *offset) {
#ifdef CEPH_HAVE_SPLICE
buffer::raw_pipe* buf = new raw_pipe(len);
int r = buf->set_source(fd, (loff_t*)offset);
if (r < 0) {
delete buf;
throw error_code(r);
}
return buf;
#else
throw error_code(-ENOTSUP);
#endif
}
buffer::raw* buffer::create_unshareable(unsigned len) {
return new raw_unshareable(len);
}
@ -1013,15 +822,6 @@ using namespace ceph;
_raw->invalidate_crc();
memset(c_str()+o, 0, l);
}
bool buffer::ptr::can_zero_copy() const
{
return _raw->can_zero_copy();
}
int buffer::ptr::zero_copy_to_fd(int fd, int64_t *offset) const
{
return _raw->zero_copy_to_fd(fd, (loff_t*)offset);
}
// -- buffer::list::iterator --
/*
@ -1469,16 +1269,6 @@ using namespace ceph;
}
}
bool buffer::list::can_zero_copy() const
{
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end();
++it)
if (!it->can_zero_copy())
return false;
return true;
}
bool buffer::list::is_provided_buffer(const char *dst) const
{
if (_buffers.empty())
@ -2185,12 +1975,6 @@ int buffer::list::read_file(const char *fn, std::string *error)
ssize_t buffer::list::read_fd(int fd, size_t len)
{
// try zero copy first
if (false && read_fd_zero_copy(fd, len) == 0) {
// TODO fix callers to not require correct read size, which is not
// available for raw_pipe until we actually inspect the data
return 0;
}
bufferptr bp = buffer::create(len);
ssize_t ret = safe_read(fd, (void*)bp.c_str(), len);
if (ret >= 0) {
@ -2200,22 +1984,6 @@ ssize_t buffer::list::read_fd(int fd, size_t len)
return ret;
}
int buffer::list::read_fd_zero_copy(int fd, size_t len)
{
#ifdef CEPH_HAVE_SPLICE
try {
append(buffer::create_zero_copy(len, fd, NULL));
} catch (buffer::error_code &e) {
return e.code;
} catch (buffer::malformed_input &e) {
return -EIO;
}
return 0;
#else
return -ENOTSUP;
#endif
}
int buffer::list::write_file(const char *fn, int mode)
{
int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
@ -2282,9 +2050,6 @@ static int do_writev(int fd, struct iovec *vec, uint64_t offset, unsigned veclen
int buffer::list::write_fd(int fd) const
{
if (can_zero_copy())
return write_fd_zero_copy(fd);
// use writev!
iovec iov[IOV_MAX];
int iovlen = 0;
@ -2363,30 +2128,6 @@ int buffer::list::write_fd(int fd, uint64_t offset) const
return 0;
}
int buffer::list::write_fd_zero_copy(int fd) const
{
if (!can_zero_copy())
return -ENOTSUP;
/* pass offset to each call to avoid races updating the fd seek
* position, since the I/O may be non-blocking
*/
int64_t offset = ::lseek(fd, 0, SEEK_CUR);
int64_t *off_p = &offset;
if (offset < 0 && errno != ESPIPE)
return -errno;
if (errno == ESPIPE)
off_p = NULL;
for (std::list<ptr>::const_iterator it = _buffers.begin();
it != _buffers.end(); ++it) {
int r = it->zero_copy_to_fd(fd, off_p);
if (r < 0)
return r;
if (off_p)
offset += it->length();
}
return 0;
}
__u32 buffer::list::crc32c(__u32 crc) const
{
int cache_misses = 0;
@ -2615,9 +2356,6 @@ MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_malloc, buffer_raw_malloc,
buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
buffer_raw_posix_aligned, buffer_meta);
#ifdef CEPH_HAVE_SPLICE
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta);
#endif
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
buffer_meta);

View File

@ -150,7 +150,6 @@ namespace buffer CEPH_BUFFER_API {
class raw_hack_aligned;
class raw_char;
class raw_claimed_char;
class raw_pipe;
class raw_unshareable; // diagnostic, unshareable char buffer
class raw_combined;
class raw_claim_buffer;
@ -173,7 +172,6 @@ namespace buffer CEPH_BUFFER_API {
raw* create_aligned_in_mempool(unsigned len, unsigned align, int mempool);
raw* create_page_aligned(unsigned len);
raw* create_small_page_aligned(unsigned len);
raw* create_zero_copy(unsigned len, int fd, int64_t *offset);
raw* create_unshareable(unsigned len);
raw* create_static(unsigned len, char *buf);
raw* claim_buffer(unsigned len, char *buf, deleter del);
@ -337,9 +335,6 @@ namespace buffer CEPH_BUFFER_API {
void copy_out(unsigned o, unsigned l, char *dest) const;
bool can_zero_copy() const;
int zero_copy_to_fd(int fd, int64_t *offset) const;
unsigned wasted() const;
int cmp(const ptr& o) const;
@ -701,7 +696,6 @@ namespace buffer CEPH_BUFFER_API {
private:
mutable iterator last_p;
int zero_copy_to_fd(int fd) const;
public:
// cons/des
@ -771,7 +765,6 @@ namespace buffer CEPH_BUFFER_API {
bool contents_equal(buffer::list& other);
bool contents_equal(const buffer::list& other) const;
bool can_zero_copy() const;
bool is_provided_buffer(const char *dst) const;
bool is_aligned(unsigned align) const;
bool is_page_aligned() const;
@ -934,11 +927,9 @@ namespace buffer CEPH_BUFFER_API {
void hexdump(std::ostream &out, bool trailing_newline = true) const;
int read_file(const char *fn, std::string *error);
ssize_t read_fd(int fd, size_t len);
int read_fd_zero_copy(int fd, size_t len);
int write_file(const char *fn, int mode=0644);
int write_fd(int fd) const;
int write_fd(int fd, uint64_t offset) const;
int write_fd_zero_copy(int fd) const;
template<typename VectorT>
void prepare_iov(VectorT *piov) const {
#ifdef __CEPH__

View File

@ -77,7 +77,7 @@ private:
raw(const raw &other) = delete;
const raw& operator=(const raw &other) = delete;
public:
virtual char *get_data() {
char *get_data() {
return data;
}
virtual raw* clone_empty() = 0;
@ -86,12 +86,6 @@ public:
memcpy(c->data, data, len);
return c;
}
virtual bool can_zero_copy() const {
return false;
}
virtual int zero_copy_to_fd(int fd, loff_t *offset) {
return -ENOTSUP;
}
virtual bool is_shareable() {
// true if safe to reference/share the existing buffer copy
// false if it is not safe to share the buffer, e.g., due to special

View File

@ -203,36 +203,6 @@ TEST(Buffer, constructors) {
EXPECT_EQ(history_alloc_num, buffer::get_history_alloc_num());
}
}
#ifdef CEPH_HAVE_SPLICE
if (ceph_buffer_track) {
EXPECT_EQ(0, buffer::get_total_alloc());
}
{
// no fd
EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code);
history_alloc_bytes += len;
history_alloc_num++;
unsigned zc_len = 4;
::unlink(FILENAME);
snprintf(cmd, sizeof(cmd), "echo ABC > %s", FILENAME);
EXPECT_EQ(0, ::system(cmd));
int fd = ::open(FILENAME, O_RDONLY);
assert (fd >= 0);
bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL));
history_alloc_bytes += zc_len;
history_alloc_num++;
EXPECT_EQ(zc_len, ptr.length());
if (ceph_buffer_track) {
EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc());
EXPECT_EQ(history_alloc_bytes, buffer::get_history_alloc_bytes());
EXPECT_EQ(history_alloc_num, buffer::get_history_alloc_num());
}
::close(fd);
::unlink(FILENAME);
}
#endif
if (ceph_buffer_track) {
EXPECT_EQ(0, buffer::get_total_alloc());
}
@ -267,149 +237,6 @@ TEST(BufferRaw, ostream) {
EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)"));
}
#ifdef CEPH_HAVE_SPLICE
class TestRawPipe : public ::testing::Test {
protected:
void SetUp() override {
len = 4;
::unlink(FILENAME);
snprintf(cmd, sizeof(cmd), "echo ABC > %s", FILENAME);
EXPECT_EQ(0, ::system(cmd));
fd = ::open(FILENAME, O_RDONLY);
ceph_assert(fd >= 0);
}
void TearDown() override {
::close(fd);
::unlink(FILENAME);
}
int fd;
unsigned len;
};
TEST_F(TestRawPipe, create_zero_copy) {
bufferptr ptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len, ptr.length());
if (get_env_bool("CEPH_BUFFER_TRACK")) {
EXPECT_EQ(len, (unsigned)buffer::get_total_alloc());
}
}
TEST_F(TestRawPipe, c_str_no_fd) {
EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)),
buffer::error_code);
}
TEST_F(TestRawPipe, c_str_basic) {
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
EXPECT_EQ(len, ptr.length());
}
TEST_F(TestRawPipe, c_str_twice) {
// make sure we're creating a copy of the data and not consuming it
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
}
TEST_F(TestRawPipe, c_str_basic_offset) {
loff_t offset = len - 1;
::lseek(fd, offset, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL));
EXPECT_EQ(len - offset, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset));
}
TEST_F(TestRawPipe, c_str_dest_short) {
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL));
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}
TEST_F(TestRawPipe, c_str_source_short) {
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}
TEST_F(TestRawPipe, c_str_explicit_zero_offset) {
int64_t offset = 0;
::lseek(fd, 1, SEEK_SET);
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
}
TEST_F(TestRawPipe, c_str_explicit_positive_offset) {
int64_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
&offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}
TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) {
int64_t offset = len;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
&offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(0u, ptr.length());
}
TEST_F(TestRawPipe, c_str_source_short_explicit_offset) {
int64_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
EXPECT_EQ(len, offset);
EXPECT_EQ(len - 1, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
}
TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
int64_t offset = 1;
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset));
EXPECT_EQ(3, offset);
EXPECT_EQ(2u, ptr.length());
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
}
TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) {
bufferlist bl;
EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len));
bl = bufferlist();
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
EXPECT_EQ(len, bl.length());
EXPECT_EQ(0u, bl.front().unused_tail_length());
EXPECT_EQ(1u, bl.get_num_buffers());
EXPECT_EQ(len, bl.front().raw_length());
EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len));
EXPECT_TRUE(bl.can_zero_copy());
}
TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) {
::unlink(FILENAME);
bufferlist bl;
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
EXPECT_TRUE(bl.can_zero_copy());
int out_fd = ::open(FILENAME, O_RDWR|O_CREAT|O_TRUNC, 0600);
ASSERT_NE(-1, out_fd);
EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd));
struct stat st;
memset(&st, 0, sizeof(st));
ASSERT_EQ(0, ::stat(FILENAME, &st));
EXPECT_EQ(len, st.st_size);
char buf[len + 1];
EXPECT_EQ((int)len, safe_read(out_fd, buf, len + 1));
EXPECT_EQ(0, memcmp(buf, "ABC\n", len));
::close(out_fd);
::unlink(FILENAME);
}
#endif // CEPH_HAVE_SPLICE
//
// +-----------+ +-----+
// | | | |