diff --git a/configure.ac b/configure.ac index 8523c3fb06e..d1d291bb5ef 100644 --- a/configure.ac +++ b/configure.ac @@ -584,6 +584,20 @@ AC_CHECK_MEMBER([struct stat.st_mtimespec.tv_nsec], [AC_DEFINE(HAVE_STAT_ST_MTIMESPEC_TV_NSEC, 1, [Define if you have struct stat.st_mtimespec.tv_nsec])]) +# splice/tee +AC_CHECK_FUNC([splice], + [AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])], + []) + + +AC_CHECK_HEADERS([arpa/nameser_compat.h]) + +AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include +F_SETPIPE_SZ]])], + [AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])], + [AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])]) + + AC_CHECK_HEADERS([arpa/nameser_compat.h]) AC_CHECK_HEADERS([sys/prctl.h]) AC_CHECK_FUNCS([prctl]) diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 819d767d006..69e4c85b1b2 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -18,6 +18,7 @@ #include "common/errno.h" #include "common/safe_io.h" #include "common/simple_spin.h" +#include "common/strtol.h" #include "include/atomic.h" #include "include/types.h" #include "include/compat.h" @@ -69,6 +70,52 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE return buffer_cached_crc_adjusted.read(); } + atomic_t buffer_c_str_accesses; + bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK"); + + void buffer::track_c_str(bool b) { + buffer_track_c_str = b; + } + int buffer::get_c_str_accesses() { + return buffer_c_str_accesses.read(); + } + + atomic_t buffer_max_pipe_size; + int update_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + char buf[32]; + int r; + std::string err; + struct stat stat_result; + if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1) + return -errno; + r = safe_read_file("/proc/sys/fs/", "pipe-max-size", + buf, sizeof(buf) - 1); + if (r < 0) + return r; + buf[r] = '\0'; + size = strict_strtol(buf, 10, &err); + if (!err.empty()) + return -EIO; + buffer_max_pipe_size.set(size); +#endif + return 0; + } + + size_t get_max_pipe_size() { +#ifdef CEPH_HAVE_SETPIPE_SZ + size_t size = buffer_max_pipe_size.read(); + if (size) + return size; + if (update_max_pipe_size() == 0) + return buffer_max_pipe_size.read() +#endif + // this is the max size hardcoded in linux before 2.6.35 + return 65536; + } + + buffer::error_code::error_code(int error) : + buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {} class buffer::raw { public: @@ -89,18 +136,22 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE raw(const raw &other); const raw& operator=(const raw &other); + virtual char *get_data() { + return data; + } virtual raw* clone_empty() = 0; raw *clone() { raw *c = clone_empty(); memcpy(c->data, data, len); return c; } - - unsigned length() const { - return len; + virtual bool can_zero_copy() const { + return false; } - - bool is_page_aligned() { + virtual int zero_copy_to_fd(int fd, loff_t *offset) { + return -ENOTSUP; + } + virtual bool is_page_aligned() { return ((long)data & ~CEPH_PAGE_MASK) == 0; } bool is_n_page_sized() { @@ -228,6 +279,187 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE }; #endif +#ifdef CEPH_HAVE_SPLICE + class buffer::raw_pipe : public buffer::raw { + public: + raw_pipe(unsigned len) : raw(len), source_consumed(false) { + size_t max = get_max_pipe_size(); + if (len > max) { + bdout << "raw_pipe: requested length " << len + << " > max length " << max << bendl; + throw malformed_input("length larger than max pipe size"); + } + pipefds[0] = -1; + pipefds[1] = -1; + + int r; + if (::pipe(pipefds) == -1) { + r = -errno; + bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; + throw error_code(r); + } + + r = set_nonblocking(pipefds); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + + r = set_pipe_size(pipefds, len); + if (r < 0) { + bdout << "raw_pipe: could not set pipe size" << bendl; + // continue, since the pipe should become large enough as needed + } + + inc_total_alloc(len); + bdout << "raw_pipe " << this << " alloc " << len << " " + << buffer::get_total_alloc() << bendl; + } + + ~raw_pipe() { + if (data) + delete data; + close_pipe(pipefds); + dec_total_alloc(len); + bdout << "raw_pipe " << this << " free " << (void *)data << " " + << buffer::get_total_alloc() << bendl; + } + + bool can_zero_copy() const { + return true; + } + + bool is_page_aligned() { + return false; + } + + int set_source(int fd, loff_t *off) { + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r) + << bendl; + return r; + } + // update length with actual amount read + len = r; + return 0; + } + + int zero_copy_to_fd(int fd, loff_t *offset) { + assert(!source_consumed); + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing from pipe to fd: " + << cpp_strerror(r) << bendl; + return r; + } + source_consumed = true; + return 0; + } + + buffer::raw* clone_empty() { + // cloning doesn't make sense for pipe-based buffers, + // and is only used by unit tests for other types of buffers + return NULL; + } + + char *get_data() { + if (data) + return data; + return copy_pipe(pipefds); + } + + private: + int set_pipe_size(int *fds, long length) { +#ifdef CEPH_HAVE_SETPIPE_SZ + if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) { + int r = -errno; + if (r == -EPERM) { + // pipe limit must have changed - EPERM means we requested + // more than the maximum size as an unprivileged user + update_max_pipe_size(); + throw malformed_input("length larger than new max pipe size"); + } + return r; + } +#endif + return 0; + } + + int set_nonblocking(int *fds) { + if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) + return -errno; + if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1) + return -errno; + return 0; + } + + void close_pipe(int *fds) { + if (fds[0] >= 0) + TEMP_FAILURE_RETRY(::close(fds[0])); + if (fds[1] >= 0) + TEMP_FAILURE_RETRY(::close(fds[1])); + } + char *copy_pipe(int *fds) { + /* preserve original pipe contents by copying into a temporary + * pipe before reading. + */ + int tmpfd[2]; + int r; + + assert(!source_consumed); + assert(fds[0] >= 0); + + if (::pipe(tmpfd) == -1) { + r = -errno; + bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + r = set_nonblocking(tmpfd); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + r = set_pipe_size(tmpfd, len); + if (r < 0) { + bdout << "raw_pipe: error setting pipe size on temp pipe: " + << cpp_strerror(r) << bendl; + } + int flags = SPLICE_F_NONBLOCK; + if (::tee(fds[0], tmpfd[1], len, flags) == -1) { + r = errno; + bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r) + << bendl; + close_pipe(tmpfd); + throw error_code(r); + } + data = (char *)malloc(len); + if (!data) { + close_pipe(tmpfd); + throw bad_alloc(); + } + r = safe_read(tmpfd[0], data, len); + if (r < (ssize_t)len) { + bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r) + << bendl; + delete data; + data = NULL; + close_pipe(tmpfd); + throw error_code(r); + } + close_pipe(tmpfd); + return data; + } + bool source_consumed; + int pipefds[2]; + }; +#endif // CEPH_HAVE_SPLICE + /* * primitive buffer types */ @@ -293,6 +525,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE #endif } + buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) { +#ifdef CEPH_HAVE_SPLICE + buffer::raw_pipe* buf = new raw_pipe(len); + int r = buf->set_source(fd, offset); + if (r < 0) { + delete buf; + throw error_code(r); + } + return buf; +#else + throw error_code(-ENOTSUP); +#endif + } + buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw. { r->nref.inc(); @@ -375,8 +621,18 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE bool buffer::ptr::at_buffer_tail() const { return _off + _len == _raw->len; } - const char *buffer::ptr::c_str() const { assert(_raw); return _raw->data + _off; } - char *buffer::ptr::c_str() { assert(_raw); return _raw->data + _off; } + const char *buffer::ptr::c_str() const { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses.inc(); + return _raw->get_data() + _off; + } + char *buffer::ptr::c_str() { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses.inc(); + return _raw->get_data() + _off; + } unsigned buffer::ptr::unused_tail_length() const { @@ -389,13 +645,13 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE { assert(_raw); assert(n < _len); - return _raw->data[_off + n]; + return _raw->get_data()[_off + n]; } char& buffer::ptr::operator[](unsigned n) { assert(_raw); assert(n < _len); - return _raw->data[_off + n]; + return _raw->get_data()[_off + n]; } const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; } @@ -472,6 +728,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE memset(c_str()+o, 0, l); } + bool buffer::ptr::can_zero_copy() const + { + return _raw->can_zero_copy(); + } + + int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const + { + return _raw->zero_copy_to_fd(fd, offset); + } // -- buffer::list::iterator -- /* @@ -734,6 +999,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE } } + bool buffer::list::can_zero_copy() const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->can_zero_copy()) + return false; + return true; + } + bool buffer::list::is_page_aligned() const { for (std::list::const_iterator it = _buffers.begin(); @@ -1232,8 +1507,14 @@ int buffer::list::read_file(const char *fn, std::string *error) return 0; } -ssize_t buffer::list::read_fd(int fd, size_t len) +ssize_t buffer::list::read_fd(int fd, size_t len) { + // try zero copy first + if (false && read_fd_zero_copy(fd, len) == 0) { + // TODO fix callers to not require correct read size, which is not + // available for raw_pipe until we actually inspect the data + return 0; + } int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE); bufferptr bp = buffer::create_page_aligned(s); ssize_t ret = safe_read(fd, (void*)bp.c_str(), len); @@ -1244,6 +1525,23 @@ ssize_t buffer::list::read_fd(int fd, size_t len) return ret; } +int buffer::list::read_fd_zero_copy(int fd, size_t len) +{ +#ifdef CEPH_HAVE_SPLICE + try { + bufferptr bp = buffer::create_zero_copy(len, fd, NULL); + append(bp); + } catch (buffer::error_code e) { + return e.code; + } catch (buffer::malformed_input) { + return -EIO; + } + return 0; +#else + return -ENOTSUP; +#endif +} + int buffer::list::write_file(const char *fn, int mode) { int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode)); @@ -1271,12 +1569,15 @@ int buffer::list::write_file(const char *fn, int mode) int buffer::list::write_fd(int fd) const { + if (can_zero_copy()) + return write_fd_zero_copy(fd); + // use writev! iovec iov[IOV_MAX]; int iovlen = 0; ssize_t bytes = 0; - std::list::const_iterator p = _buffers.begin(); + std::list::const_iterator p = _buffers.begin(); while (p != _buffers.end()) { if (p->length() > 0) { iov[iovlen].iov_base = (void *)p->c_str(); @@ -1321,6 +1622,30 @@ int buffer::list::write_fd(int fd) const return 0; } +int buffer::list::write_fd_zero_copy(int fd) const +{ + if (!can_zero_copy()) + return -ENOTSUP; + /* pass offset to each call to avoid races updating the fd seek + * position, since the I/O may be non-blocking + */ + loff_t offset = ::lseek(fd, 0, SEEK_CUR); + loff_t *off_p = &offset; + if (offset < 0 && offset != ESPIPE) + return (int) offset; + if (offset == ESPIPE) + off_p = NULL; + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); ++it) { + int r = it->zero_copy_to_fd(fd, off_p); + if (r < 0) + return r; + if (off_p) + offset += it->length(); + } + return 0; +} + __u32 buffer::list::crc32c(__u32 crc) const { for (std::list::const_iterator it = _buffers.begin(); diff --git a/src/common/safe_io.c b/src/common/safe_io.c index afee82edf07..16cc7293d9c 100644 --- a/src/common/safe_io.c +++ b/src/common/safe_io.c @@ -117,6 +117,40 @@ ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset) return 0; } +#ifdef CEPH_HAVE_SPLICE +ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, + size_t len, unsigned int flags) +{ + size_t cnt = 0; + + while (cnt < len) { + ssize_t r = splice(fd_in, off_in, fd_out, off_out, len - cnt, flags); + if (r <= 0) { + if (r == 0) { + // EOF + return cnt; + } + if (errno == EINTR) + continue; + return -errno; + } + cnt += r; + } + return cnt; +} + +ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) +{ + ssize_t ret = safe_splice(fd_in, off_in, fd_out, off_out, len, flags); + if (ret < 0) + return ret; + if ((size_t)ret != len) + return -EDOM; + return 0; +} +#endif + int safe_write_file(const char *base, const char *file, const char *val, size_t vallen) { diff --git a/src/common/safe_io.h b/src/common/safe_io.h index a4c9bc7a72f..c45589eee7e 100644 --- a/src/common/safe_io.h +++ b/src/common/safe_io.h @@ -15,6 +15,7 @@ #ifndef CEPH_SAFE_IO #define CEPH_SAFE_IO +#include "acconfig.h" #include "common/compiler_extensions.h" #include @@ -35,6 +36,18 @@ extern "C" { WARN_UNUSED_RESULT; ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset) WARN_UNUSED_RESULT; +#ifdef CEPH_HAVE_SPLICE + /* + * Similar to the above (non-exact version) and below (exact version). + * See splice(2) for parameter descriptions. + */ + ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, + size_t len, unsigned int flags) + WARN_UNUSED_RESULT; + ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out, + loff_t *off_out, size_t len, unsigned int flags) + WARN_UNUSED_RESULT; +#endif /* * Same as the above functions, but return -EDOM unless exactly the requested diff --git a/src/include/buffer.h b/src/include/buffer.h index 657e66e2cc3..2d2e8e467fa 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -101,6 +101,10 @@ public: private: char buf[256]; }; + struct error_code : public malformed_input { + explicit error_code(int error); + int code; + }; /// total bytes allocated @@ -116,6 +120,10 @@ public: /// enable/disable tracking of cached crcs static void track_cached_crc(bool b); + /// count of calls to buffer::ptr::c_str() + static int get_c_str_accesses(); + /// enable/disable tracking of buffer::ptr::c_str() calls + static void track_c_str(bool b); private: @@ -133,6 +141,7 @@ private: class raw_posix_aligned; class raw_hack_aligned; class raw_char; + class raw_pipe; friend std::ostream& operator<<(std::ostream& out, const raw &r); @@ -148,8 +157,8 @@ public: static raw* claim_malloc(unsigned len, char *buf); static raw* create_static(unsigned len, char *buf); static raw* create_page_aligned(unsigned len); - - + static raw* create_zero_copy(unsigned len, int fd, loff_t *offset); + /* * a buffer pointer. references (a subsequence of) a raw buffer. */ @@ -206,6 +215,9 @@ public: memcpy(dest, c_str()+o, l); } + bool can_zero_copy() const; + int zero_copy_to_fd(int fd, loff_t *offset) const; + unsigned wasted(); int cmp(const ptr& o); @@ -305,6 +317,7 @@ public: private: mutable iterator last_p; + int zero_copy_to_fd(int fd) const; public: // cons/des @@ -342,6 +355,7 @@ public: } bool contents_equal(buffer::list& other); + bool can_zero_copy() const; bool is_page_aligned() const; bool is_n_page_sized() const; @@ -430,8 +444,10 @@ public: void hexdump(std::ostream &out) const; int read_file(const char *fn, std::string *error); ssize_t read_fd(int fd, size_t len); + int read_fd_zero_copy(int fd, size_t len); int write_file(const char *fn, int mode=0644); int write_fd(int fd) const; + int write_fd_zero_copy(int fd) const; uint32_t crc32c(uint32_t crc) const; }; diff --git a/src/test/bufferlist.cc b/src/test/bufferlist.cc index 167f4cb4a23..8962575f6ec 100644 --- a/src/test/bufferlist.cc +++ b/src/test/bufferlist.cc @@ -29,6 +29,7 @@ #include "include/encoding.h" #include "common/environment.h" #include "common/Clock.h" +#include "common/safe_io.h" #include "gtest/gtest.h" #include "stdlib.h" @@ -141,6 +142,25 @@ TEST(Buffer, constructors) { bufferptr clone = ptr.clone(); EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len)); } +#ifdef CEPH_HAVE_SPLICE + if (ceph_buffer_track) + EXPECT_EQ(0, buffer::get_total_alloc()); + { + // no fd + EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code); + + unsigned zc_len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + int fd = ::open("testfile", O_RDONLY); + bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL)); + EXPECT_EQ(zc_len, ptr.length()); + if (ceph_buffer_track) + EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc()); + ::close(fd); + ::unlink("testfile"); + } +#endif if (ceph_buffer_track) EXPECT_EQ(0, buffer::get_total_alloc()); } @@ -153,6 +173,146 @@ TEST(BufferRaw, ostream) { EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)")); } +#ifdef CEPH_HAVE_SPLICE +class TestRawPipe : public ::testing::Test { +protected: + virtual void SetUp() { + len = 4; + ::unlink("testfile"); + ::system("echo ABC > testfile"); + fd = ::open("testfile", O_RDONLY); + assert(fd >= 0); + } + virtual void TearDown() { + ::close(fd); + ::unlink("testfile"); + } + int fd; + unsigned len; +}; + +TEST_F(TestRawPipe, create_zero_copy) { + bufferptr ptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + if (get_env_bool("CEPH_BUFFER_TRACK")) + EXPECT_EQ(len, (unsigned)buffer::get_total_alloc()); +} + +TEST_F(TestRawPipe, c_str_no_fd) { + EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)), + buffer::error_code); +} + +TEST_F(TestRawPipe, c_str_basic) { + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(len, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_twice) { + // make sure we're creating a copy of the data and not consuming it + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_basic_offset) { + loff_t offset = len - 1; + ::lseek(fd, offset, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL)); + EXPECT_EQ(len - offset, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset)); +} + +TEST_F(TestRawPipe, c_str_dest_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL)); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} + +TEST_F(TestRawPipe, c_str_source_short) { + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL)); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_zero_offset) { + loff_t offset = 0; + ::lseek(fd, 1, SEEK_SET); + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) { + loff_t offset = len; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, + &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(0u, ptr.length()); +} + +TEST_F(TestRawPipe, c_str_source_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset)); + EXPECT_EQ(len, offset); + EXPECT_EQ(len - 1, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1)); +} + +TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) { + loff_t offset = 1; + bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset)); + EXPECT_EQ(3, offset); + EXPECT_EQ(2u, ptr.length()); + EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2)); +} + +TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) { + bufferlist bl; + EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len)); + bl = bufferlist(); + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_EQ(len, bl.length()); + EXPECT_EQ(0u, bl.buffers().front().unused_tail_length()); + EXPECT_EQ(1u, bl.buffers().size()); + EXPECT_EQ(len, bl.buffers().front().raw_length()); + EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len)); + EXPECT_TRUE(bl.can_zero_copy()); +} + +TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) { + ::unlink("testfile_out"); + bufferlist bl; + EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len)); + EXPECT_TRUE(bl.can_zero_copy()); + int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600); + EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd)); + struct stat st; + memset(&st, 0, sizeof(st)); + EXPECT_EQ(0, ::stat("testfile_out", &st)); + EXPECT_EQ(len, st.st_size); + char buf[len + 1]; + EXPECT_EQ(len, safe_read(out_fd, buf, len + 1)); + EXPECT_EQ(0, memcmp(buf, "ABC\n", len)); + ::close(out_fd); + ::unlink("testfile_out"); +} +#endif // CEPH_HAVE_SPLICE + // // +-----------+ +-----+ // | | | | @@ -1615,9 +1775,14 @@ TEST(BufferList, read_fd) { bufferlist bl; EXPECT_EQ(-EBADF, bl.read_fd(fd, len)); fd = ::open("testfile", O_RDONLY); +#ifdef CEPH_HAVE_SPLICE + EXPECT_EQ(0, bl.read_fd(fd, len)); + EXPECT_EQ(0u, bl.buffers().front().unused_tail_length()); +#else EXPECT_EQ(len, (unsigned)bl.read_fd(fd, len)); - EXPECT_EQ(len, bl.length()); EXPECT_EQ(CEPH_PAGE_SIZE - len, bl.buffers().front().unused_tail_length()); +#endif + EXPECT_EQ(len, bl.length()); ::close(fd); ::unlink("testfile"); }