mirror of
https://github.com/ceph/ceph
synced 2024-12-17 17:05:42 +00:00
Merge branch 'wip-zero-copy-bufferlist-last'
Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
commit
29ef3d4607
14
configure.ac
14
configure.ac
@ -584,6 +584,20 @@ AC_CHECK_MEMBER([struct stat.st_mtimespec.tv_nsec],
|
||||
[AC_DEFINE(HAVE_STAT_ST_MTIMESPEC_TV_NSEC, 1,
|
||||
[Define if you have struct stat.st_mtimespec.tv_nsec])])
|
||||
|
||||
# splice/tee
|
||||
AC_CHECK_FUNC([splice],
|
||||
[AC_DEFINE([CEPH_HAVE_SPLICE], [], [splice(2) is supported])],
|
||||
[])
|
||||
|
||||
|
||||
AC_CHECK_HEADERS([arpa/nameser_compat.h])
|
||||
|
||||
AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include <fcntl.h>
|
||||
F_SETPIPE_SZ]])],
|
||||
[AC_DEFINE([CEPH_HAVE_SETPIPE_SZ], [], [F_SETPIPE_SZ is supported])],
|
||||
[AC_MSG_NOTICE(["F_SETPIPE_SZ not found, zero-copy may be less efficent"])])
|
||||
|
||||
|
||||
AC_CHECK_HEADERS([arpa/nameser_compat.h])
|
||||
AC_CHECK_HEADERS([sys/prctl.h])
|
||||
AC_CHECK_FUNCS([prctl])
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "common/errno.h"
|
||||
#include "common/safe_io.h"
|
||||
#include "common/simple_spin.h"
|
||||
#include "common/strtol.h"
|
||||
#include "include/atomic.h"
|
||||
#include "include/types.h"
|
||||
#include "include/compat.h"
|
||||
@ -69,6 +70,52 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
return buffer_cached_crc_adjusted.read();
|
||||
}
|
||||
|
||||
atomic_t buffer_c_str_accesses;
|
||||
bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
|
||||
|
||||
void buffer::track_c_str(bool b) {
|
||||
buffer_track_c_str = b;
|
||||
}
|
||||
int buffer::get_c_str_accesses() {
|
||||
return buffer_c_str_accesses.read();
|
||||
}
|
||||
|
||||
atomic_t buffer_max_pipe_size;
|
||||
int update_max_pipe_size() {
|
||||
#ifdef CEPH_HAVE_SETPIPE_SZ
|
||||
char buf[32];
|
||||
int r;
|
||||
std::string err;
|
||||
struct stat stat_result;
|
||||
if (::stat("/proc/sys/fs/pipe-max-size", &stat_result) == -1)
|
||||
return -errno;
|
||||
r = safe_read_file("/proc/sys/fs/", "pipe-max-size",
|
||||
buf, sizeof(buf) - 1);
|
||||
if (r < 0)
|
||||
return r;
|
||||
buf[r] = '\0';
|
||||
size = strict_strtol(buf, 10, &err);
|
||||
if (!err.empty())
|
||||
return -EIO;
|
||||
buffer_max_pipe_size.set(size);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t get_max_pipe_size() {
|
||||
#ifdef CEPH_HAVE_SETPIPE_SZ
|
||||
size_t size = buffer_max_pipe_size.read();
|
||||
if (size)
|
||||
return size;
|
||||
if (update_max_pipe_size() == 0)
|
||||
return buffer_max_pipe_size.read()
|
||||
#endif
|
||||
// this is the max size hardcoded in linux before 2.6.35
|
||||
return 65536;
|
||||
}
|
||||
|
||||
buffer::error_code::error_code(int error) :
|
||||
buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}
|
||||
|
||||
class buffer::raw {
|
||||
public:
|
||||
@ -89,18 +136,22 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
raw(const raw &other);
|
||||
const raw& operator=(const raw &other);
|
||||
|
||||
virtual char *get_data() {
|
||||
return data;
|
||||
}
|
||||
virtual raw* clone_empty() = 0;
|
||||
raw *clone() {
|
||||
raw *c = clone_empty();
|
||||
memcpy(c->data, data, len);
|
||||
return c;
|
||||
}
|
||||
|
||||
unsigned length() const {
|
||||
return len;
|
||||
virtual bool can_zero_copy() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool is_page_aligned() {
|
||||
virtual int zero_copy_to_fd(int fd, loff_t *offset) {
|
||||
return -ENOTSUP;
|
||||
}
|
||||
virtual bool is_page_aligned() {
|
||||
return ((long)data & ~CEPH_PAGE_MASK) == 0;
|
||||
}
|
||||
bool is_n_page_sized() {
|
||||
@ -228,6 +279,187 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
};
|
||||
#endif
|
||||
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
class buffer::raw_pipe : public buffer::raw {
|
||||
public:
|
||||
raw_pipe(unsigned len) : raw(len), source_consumed(false) {
|
||||
size_t max = get_max_pipe_size();
|
||||
if (len > max) {
|
||||
bdout << "raw_pipe: requested length " << len
|
||||
<< " > max length " << max << bendl;
|
||||
throw malformed_input("length larger than max pipe size");
|
||||
}
|
||||
pipefds[0] = -1;
|
||||
pipefds[1] = -1;
|
||||
|
||||
int r;
|
||||
if (::pipe(pipefds) == -1) {
|
||||
r = -errno;
|
||||
bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
|
||||
throw error_code(r);
|
||||
}
|
||||
|
||||
r = set_nonblocking(pipefds);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
|
||||
<< cpp_strerror(r) << bendl;
|
||||
throw error_code(r);
|
||||
}
|
||||
|
||||
r = set_pipe_size(pipefds, len);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: could not set pipe size" << bendl;
|
||||
// continue, since the pipe should become large enough as needed
|
||||
}
|
||||
|
||||
inc_total_alloc(len);
|
||||
bdout << "raw_pipe " << this << " alloc " << len << " "
|
||||
<< buffer::get_total_alloc() << bendl;
|
||||
}
|
||||
|
||||
~raw_pipe() {
|
||||
if (data)
|
||||
delete data;
|
||||
close_pipe(pipefds);
|
||||
dec_total_alloc(len);
|
||||
bdout << "raw_pipe " << this << " free " << (void *)data << " "
|
||||
<< buffer::get_total_alloc() << bendl;
|
||||
}
|
||||
|
||||
bool can_zero_copy() const {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool is_page_aligned() {
|
||||
return false;
|
||||
}
|
||||
|
||||
int set_source(int fd, loff_t *off) {
|
||||
int flags = SPLICE_F_NONBLOCK;
|
||||
ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
|
||||
<< bendl;
|
||||
return r;
|
||||
}
|
||||
// update length with actual amount read
|
||||
len = r;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zero_copy_to_fd(int fd, loff_t *offset) {
|
||||
assert(!source_consumed);
|
||||
int flags = SPLICE_F_NONBLOCK;
|
||||
ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: error splicing from pipe to fd: "
|
||||
<< cpp_strerror(r) << bendl;
|
||||
return r;
|
||||
}
|
||||
source_consumed = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
buffer::raw* clone_empty() {
|
||||
// cloning doesn't make sense for pipe-based buffers,
|
||||
// and is only used by unit tests for other types of buffers
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char *get_data() {
|
||||
if (data)
|
||||
return data;
|
||||
return copy_pipe(pipefds);
|
||||
}
|
||||
|
||||
private:
|
||||
int set_pipe_size(int *fds, long length) {
|
||||
#ifdef CEPH_HAVE_SETPIPE_SZ
|
||||
if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
|
||||
int r = -errno;
|
||||
if (r == -EPERM) {
|
||||
// pipe limit must have changed - EPERM means we requested
|
||||
// more than the maximum size as an unprivileged user
|
||||
update_max_pipe_size();
|
||||
throw malformed_input("length larger than new max pipe size");
|
||||
}
|
||||
return r;
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int set_nonblocking(int *fds) {
|
||||
if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
|
||||
return -errno;
|
||||
if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
|
||||
return -errno;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void close_pipe(int *fds) {
|
||||
if (fds[0] >= 0)
|
||||
TEMP_FAILURE_RETRY(::close(fds[0]));
|
||||
if (fds[1] >= 0)
|
||||
TEMP_FAILURE_RETRY(::close(fds[1]));
|
||||
}
|
||||
char *copy_pipe(int *fds) {
|
||||
/* preserve original pipe contents by copying into a temporary
|
||||
* pipe before reading.
|
||||
*/
|
||||
int tmpfd[2];
|
||||
int r;
|
||||
|
||||
assert(!source_consumed);
|
||||
assert(fds[0] >= 0);
|
||||
|
||||
if (::pipe(tmpfd) == -1) {
|
||||
r = -errno;
|
||||
bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
|
||||
<< bendl;
|
||||
throw error_code(r);
|
||||
}
|
||||
r = set_nonblocking(tmpfd);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
|
||||
<< cpp_strerror(r) << bendl;
|
||||
throw error_code(r);
|
||||
}
|
||||
r = set_pipe_size(tmpfd, len);
|
||||
if (r < 0) {
|
||||
bdout << "raw_pipe: error setting pipe size on temp pipe: "
|
||||
<< cpp_strerror(r) << bendl;
|
||||
}
|
||||
int flags = SPLICE_F_NONBLOCK;
|
||||
if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
|
||||
r = errno;
|
||||
bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
|
||||
<< bendl;
|
||||
close_pipe(tmpfd);
|
||||
throw error_code(r);
|
||||
}
|
||||
data = (char *)malloc(len);
|
||||
if (!data) {
|
||||
close_pipe(tmpfd);
|
||||
throw bad_alloc();
|
||||
}
|
||||
r = safe_read(tmpfd[0], data, len);
|
||||
if (r < (ssize_t)len) {
|
||||
bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
|
||||
<< bendl;
|
||||
delete data;
|
||||
data = NULL;
|
||||
close_pipe(tmpfd);
|
||||
throw error_code(r);
|
||||
}
|
||||
close_pipe(tmpfd);
|
||||
return data;
|
||||
}
|
||||
bool source_consumed;
|
||||
int pipefds[2];
|
||||
};
|
||||
#endif // CEPH_HAVE_SPLICE
|
||||
|
||||
/*
|
||||
* primitive buffer types
|
||||
*/
|
||||
@ -293,6 +525,20 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
#endif
|
||||
}
|
||||
|
||||
buffer::raw* buffer::create_zero_copy(unsigned len, int fd, loff_t *offset) {
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
buffer::raw_pipe* buf = new raw_pipe(len);
|
||||
int r = buf->set_source(fd, offset);
|
||||
if (r < 0) {
|
||||
delete buf;
|
||||
throw error_code(r);
|
||||
}
|
||||
return buf;
|
||||
#else
|
||||
throw error_code(-ENOTSUP);
|
||||
#endif
|
||||
}
|
||||
|
||||
buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw.
|
||||
{
|
||||
r->nref.inc();
|
||||
@ -375,8 +621,18 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
|
||||
bool buffer::ptr::at_buffer_tail() const { return _off + _len == _raw->len; }
|
||||
|
||||
const char *buffer::ptr::c_str() const { assert(_raw); return _raw->data + _off; }
|
||||
char *buffer::ptr::c_str() { assert(_raw); return _raw->data + _off; }
|
||||
const char *buffer::ptr::c_str() const {
|
||||
assert(_raw);
|
||||
if (buffer_track_c_str)
|
||||
buffer_c_str_accesses.inc();
|
||||
return _raw->get_data() + _off;
|
||||
}
|
||||
char *buffer::ptr::c_str() {
|
||||
assert(_raw);
|
||||
if (buffer_track_c_str)
|
||||
buffer_c_str_accesses.inc();
|
||||
return _raw->get_data() + _off;
|
||||
}
|
||||
|
||||
unsigned buffer::ptr::unused_tail_length() const
|
||||
{
|
||||
@ -389,13 +645,13 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
{
|
||||
assert(_raw);
|
||||
assert(n < _len);
|
||||
return _raw->data[_off + n];
|
||||
return _raw->get_data()[_off + n];
|
||||
}
|
||||
char& buffer::ptr::operator[](unsigned n)
|
||||
{
|
||||
assert(_raw);
|
||||
assert(n < _len);
|
||||
return _raw->data[_off + n];
|
||||
return _raw->get_data()[_off + n];
|
||||
}
|
||||
|
||||
const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
|
||||
@ -472,6 +728,15 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
memset(c_str()+o, 0, l);
|
||||
}
|
||||
|
||||
bool buffer::ptr::can_zero_copy() const
|
||||
{
|
||||
return _raw->can_zero_copy();
|
||||
}
|
||||
|
||||
int buffer::ptr::zero_copy_to_fd(int fd, loff_t *offset) const
|
||||
{
|
||||
return _raw->zero_copy_to_fd(fd, offset);
|
||||
}
|
||||
|
||||
// -- buffer::list::iterator --
|
||||
/*
|
||||
@ -734,6 +999,16 @@ static uint32_t simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZE
|
||||
}
|
||||
}
|
||||
|
||||
bool buffer::list::can_zero_copy() const
|
||||
{
|
||||
for (std::list<ptr>::const_iterator it = _buffers.begin();
|
||||
it != _buffers.end();
|
||||
++it)
|
||||
if (!it->can_zero_copy())
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool buffer::list::is_page_aligned() const
|
||||
{
|
||||
for (std::list<ptr>::const_iterator it = _buffers.begin();
|
||||
@ -1232,8 +1507,14 @@ int buffer::list::read_file(const char *fn, std::string *error)
|
||||
return 0;
|
||||
}
|
||||
|
||||
ssize_t buffer::list::read_fd(int fd, size_t len)
|
||||
ssize_t buffer::list::read_fd(int fd, size_t len)
|
||||
{
|
||||
// try zero copy first
|
||||
if (false && read_fd_zero_copy(fd, len) == 0) {
|
||||
// TODO fix callers to not require correct read size, which is not
|
||||
// available for raw_pipe until we actually inspect the data
|
||||
return 0;
|
||||
}
|
||||
int s = ROUND_UP_TO(len, CEPH_PAGE_SIZE);
|
||||
bufferptr bp = buffer::create_page_aligned(s);
|
||||
ssize_t ret = safe_read(fd, (void*)bp.c_str(), len);
|
||||
@ -1244,6 +1525,23 @@ ssize_t buffer::list::read_fd(int fd, size_t len)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int buffer::list::read_fd_zero_copy(int fd, size_t len)
|
||||
{
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
try {
|
||||
bufferptr bp = buffer::create_zero_copy(len, fd, NULL);
|
||||
append(bp);
|
||||
} catch (buffer::error_code e) {
|
||||
return e.code;
|
||||
} catch (buffer::malformed_input) {
|
||||
return -EIO;
|
||||
}
|
||||
return 0;
|
||||
#else
|
||||
return -ENOTSUP;
|
||||
#endif
|
||||
}
|
||||
|
||||
int buffer::list::write_file(const char *fn, int mode)
|
||||
{
|
||||
int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
|
||||
@ -1271,12 +1569,15 @@ int buffer::list::write_file(const char *fn, int mode)
|
||||
|
||||
int buffer::list::write_fd(int fd) const
|
||||
{
|
||||
if (can_zero_copy())
|
||||
return write_fd_zero_copy(fd);
|
||||
|
||||
// use writev!
|
||||
iovec iov[IOV_MAX];
|
||||
int iovlen = 0;
|
||||
ssize_t bytes = 0;
|
||||
|
||||
std::list<ptr>::const_iterator p = _buffers.begin();
|
||||
std::list<ptr>::const_iterator p = _buffers.begin();
|
||||
while (p != _buffers.end()) {
|
||||
if (p->length() > 0) {
|
||||
iov[iovlen].iov_base = (void *)p->c_str();
|
||||
@ -1321,6 +1622,30 @@ int buffer::list::write_fd(int fd) const
|
||||
return 0;
|
||||
}
|
||||
|
||||
int buffer::list::write_fd_zero_copy(int fd) const
|
||||
{
|
||||
if (!can_zero_copy())
|
||||
return -ENOTSUP;
|
||||
/* pass offset to each call to avoid races updating the fd seek
|
||||
* position, since the I/O may be non-blocking
|
||||
*/
|
||||
loff_t offset = ::lseek(fd, 0, SEEK_CUR);
|
||||
loff_t *off_p = &offset;
|
||||
if (offset < 0 && offset != ESPIPE)
|
||||
return (int) offset;
|
||||
if (offset == ESPIPE)
|
||||
off_p = NULL;
|
||||
for (std::list<ptr>::const_iterator it = _buffers.begin();
|
||||
it != _buffers.end(); ++it) {
|
||||
int r = it->zero_copy_to_fd(fd, off_p);
|
||||
if (r < 0)
|
||||
return r;
|
||||
if (off_p)
|
||||
offset += it->length();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
__u32 buffer::list::crc32c(__u32 crc) const
|
||||
{
|
||||
for (std::list<ptr>::const_iterator it = _buffers.begin();
|
||||
|
@ -117,6 +117,40 @@ ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset)
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out,
|
||||
size_t len, unsigned int flags)
|
||||
{
|
||||
size_t cnt = 0;
|
||||
|
||||
while (cnt < len) {
|
||||
ssize_t r = splice(fd_in, off_in, fd_out, off_out, len - cnt, flags);
|
||||
if (r <= 0) {
|
||||
if (r == 0) {
|
||||
// EOF
|
||||
return cnt;
|
||||
}
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
return -errno;
|
||||
}
|
||||
cnt += r;
|
||||
}
|
||||
return cnt;
|
||||
}
|
||||
|
||||
ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out,
|
||||
loff_t *off_out, size_t len, unsigned int flags)
|
||||
{
|
||||
ssize_t ret = safe_splice(fd_in, off_in, fd_out, off_out, len, flags);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
if ((size_t)ret != len)
|
||||
return -EDOM;
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int safe_write_file(const char *base, const char *file,
|
||||
const char *val, size_t vallen)
|
||||
{
|
||||
|
@ -15,6 +15,7 @@
|
||||
#ifndef CEPH_SAFE_IO
|
||||
#define CEPH_SAFE_IO
|
||||
|
||||
#include "acconfig.h"
|
||||
#include "common/compiler_extensions.h"
|
||||
#include <sys/types.h>
|
||||
|
||||
@ -35,6 +36,18 @@ extern "C" {
|
||||
WARN_UNUSED_RESULT;
|
||||
ssize_t safe_pwrite(int fd, const void *buf, size_t count, off_t offset)
|
||||
WARN_UNUSED_RESULT;
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
/*
|
||||
* Similar to the above (non-exact version) and below (exact version).
|
||||
* See splice(2) for parameter descriptions.
|
||||
*/
|
||||
ssize_t safe_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out,
|
||||
size_t len, unsigned int flags)
|
||||
WARN_UNUSED_RESULT;
|
||||
ssize_t safe_splice_exact(int fd_in, loff_t *off_in, int fd_out,
|
||||
loff_t *off_out, size_t len, unsigned int flags)
|
||||
WARN_UNUSED_RESULT;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Same as the above functions, but return -EDOM unless exactly the requested
|
||||
|
@ -101,6 +101,10 @@ public:
|
||||
private:
|
||||
char buf[256];
|
||||
};
|
||||
struct error_code : public malformed_input {
|
||||
explicit error_code(int error);
|
||||
int code;
|
||||
};
|
||||
|
||||
|
||||
/// total bytes allocated
|
||||
@ -116,6 +120,10 @@ public:
|
||||
/// enable/disable tracking of cached crcs
|
||||
static void track_cached_crc(bool b);
|
||||
|
||||
/// count of calls to buffer::ptr::c_str()
|
||||
static int get_c_str_accesses();
|
||||
/// enable/disable tracking of buffer::ptr::c_str() calls
|
||||
static void track_c_str(bool b);
|
||||
|
||||
private:
|
||||
|
||||
@ -133,6 +141,7 @@ private:
|
||||
class raw_posix_aligned;
|
||||
class raw_hack_aligned;
|
||||
class raw_char;
|
||||
class raw_pipe;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const raw &r);
|
||||
|
||||
@ -148,8 +157,8 @@ public:
|
||||
static raw* claim_malloc(unsigned len, char *buf);
|
||||
static raw* create_static(unsigned len, char *buf);
|
||||
static raw* create_page_aligned(unsigned len);
|
||||
|
||||
|
||||
static raw* create_zero_copy(unsigned len, int fd, loff_t *offset);
|
||||
|
||||
/*
|
||||
* a buffer pointer. references (a subsequence of) a raw buffer.
|
||||
*/
|
||||
@ -206,6 +215,9 @@ public:
|
||||
memcpy(dest, c_str()+o, l);
|
||||
}
|
||||
|
||||
bool can_zero_copy() const;
|
||||
int zero_copy_to_fd(int fd, loff_t *offset) const;
|
||||
|
||||
unsigned wasted();
|
||||
|
||||
int cmp(const ptr& o);
|
||||
@ -305,6 +317,7 @@ public:
|
||||
|
||||
private:
|
||||
mutable iterator last_p;
|
||||
int zero_copy_to_fd(int fd) const;
|
||||
|
||||
public:
|
||||
// cons/des
|
||||
@ -342,6 +355,7 @@ public:
|
||||
}
|
||||
bool contents_equal(buffer::list& other);
|
||||
|
||||
bool can_zero_copy() const;
|
||||
bool is_page_aligned() const;
|
||||
bool is_n_page_sized() const;
|
||||
|
||||
@ -430,8 +444,10 @@ public:
|
||||
void hexdump(std::ostream &out) const;
|
||||
int read_file(const char *fn, std::string *error);
|
||||
ssize_t read_fd(int fd, size_t len);
|
||||
int read_fd_zero_copy(int fd, size_t len);
|
||||
int write_file(const char *fn, int mode=0644);
|
||||
int write_fd(int fd) const;
|
||||
int write_fd_zero_copy(int fd) const;
|
||||
uint32_t crc32c(uint32_t crc) const;
|
||||
};
|
||||
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "include/encoding.h"
|
||||
#include "common/environment.h"
|
||||
#include "common/Clock.h"
|
||||
#include "common/safe_io.h"
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
#include "stdlib.h"
|
||||
@ -141,6 +142,25 @@ TEST(Buffer, constructors) {
|
||||
bufferptr clone = ptr.clone();
|
||||
EXPECT_EQ(0, ::memcmp(clone.c_str(), ptr.c_str(), len));
|
||||
}
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
if (ceph_buffer_track)
|
||||
EXPECT_EQ(0, buffer::get_total_alloc());
|
||||
{
|
||||
// no fd
|
||||
EXPECT_THROW(buffer::create_zero_copy(len, -1, NULL), buffer::error_code);
|
||||
|
||||
unsigned zc_len = 4;
|
||||
::unlink("testfile");
|
||||
::system("echo ABC > testfile");
|
||||
int fd = ::open("testfile", O_RDONLY);
|
||||
bufferptr ptr(buffer::create_zero_copy(zc_len, fd, NULL));
|
||||
EXPECT_EQ(zc_len, ptr.length());
|
||||
if (ceph_buffer_track)
|
||||
EXPECT_EQ(zc_len, (unsigned)buffer::get_total_alloc());
|
||||
::close(fd);
|
||||
::unlink("testfile");
|
||||
}
|
||||
#endif
|
||||
if (ceph_buffer_track)
|
||||
EXPECT_EQ(0, buffer::get_total_alloc());
|
||||
}
|
||||
@ -153,6 +173,146 @@ TEST(BufferRaw, ostream) {
|
||||
EXPECT_GT(stream.str().size(), stream.str().find("len 1 nref 1)"));
|
||||
}
|
||||
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
class TestRawPipe : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
len = 4;
|
||||
::unlink("testfile");
|
||||
::system("echo ABC > testfile");
|
||||
fd = ::open("testfile", O_RDONLY);
|
||||
assert(fd >= 0);
|
||||
}
|
||||
virtual void TearDown() {
|
||||
::close(fd);
|
||||
::unlink("testfile");
|
||||
}
|
||||
int fd;
|
||||
unsigned len;
|
||||
};
|
||||
|
||||
TEST_F(TestRawPipe, create_zero_copy) {
|
||||
bufferptr ptr(buffer::create_zero_copy(len, fd, NULL));
|
||||
EXPECT_EQ(len, ptr.length());
|
||||
if (get_env_bool("CEPH_BUFFER_TRACK"))
|
||||
EXPECT_EQ(len, (unsigned)buffer::get_total_alloc());
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_no_fd) {
|
||||
EXPECT_THROW(bufferptr ptr(buffer::create_zero_copy(len, -1, NULL)),
|
||||
buffer::error_code);
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_basic) {
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
|
||||
EXPECT_EQ(len, ptr.length());
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_twice) {
|
||||
// make sure we're creating a copy of the data and not consuming it
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
|
||||
EXPECT_EQ(len, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_basic_offset) {
|
||||
loff_t offset = len - 1;
|
||||
::lseek(fd, offset, SEEK_SET);
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd, NULL));
|
||||
EXPECT_EQ(len - offset, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "\n", len - offset));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_dest_short) {
|
||||
::lseek(fd, 1, SEEK_SET);
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, NULL));
|
||||
EXPECT_EQ(2u, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_source_short) {
|
||||
::lseek(fd, 1, SEEK_SET);
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, NULL));
|
||||
EXPECT_EQ(len - 1, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_explicit_zero_offset) {
|
||||
loff_t offset = 0;
|
||||
::lseek(fd, 1, SEEK_SET);
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
|
||||
EXPECT_EQ(len, offset);
|
||||
EXPECT_EQ(len, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "ABC\n", len));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_explicit_positive_offset) {
|
||||
loff_t offset = 1;
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
|
||||
&offset));
|
||||
EXPECT_EQ(len, offset);
|
||||
EXPECT_EQ(len - 1, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_explicit_positive_empty_result) {
|
||||
loff_t offset = len;
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len - offset, fd,
|
||||
&offset));
|
||||
EXPECT_EQ(len, offset);
|
||||
EXPECT_EQ(0u, ptr.length());
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_source_short_explicit_offset) {
|
||||
loff_t offset = 1;
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(len, fd, &offset));
|
||||
EXPECT_EQ(len, offset);
|
||||
EXPECT_EQ(len - 1, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC\n", len - 1));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, c_str_dest_short_explicit_offset) {
|
||||
loff_t offset = 1;
|
||||
bufferptr ptr = bufferptr(buffer::create_zero_copy(2, fd, &offset));
|
||||
EXPECT_EQ(3, offset);
|
||||
EXPECT_EQ(2u, ptr.length());
|
||||
EXPECT_EQ(0, memcmp(ptr.c_str(), "BC", 2));
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, buffer_list_read_fd_zero_copy) {
|
||||
bufferlist bl;
|
||||
EXPECT_EQ(-EBADF, bl.read_fd_zero_copy(-1, len));
|
||||
bl = bufferlist();
|
||||
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
|
||||
EXPECT_EQ(len, bl.length());
|
||||
EXPECT_EQ(0u, bl.buffers().front().unused_tail_length());
|
||||
EXPECT_EQ(1u, bl.buffers().size());
|
||||
EXPECT_EQ(len, bl.buffers().front().raw_length());
|
||||
EXPECT_EQ(0, memcmp(bl.c_str(), "ABC\n", len));
|
||||
EXPECT_TRUE(bl.can_zero_copy());
|
||||
}
|
||||
|
||||
TEST_F(TestRawPipe, buffer_list_write_fd_zero_copy) {
|
||||
::unlink("testfile_out");
|
||||
bufferlist bl;
|
||||
EXPECT_EQ(0, bl.read_fd_zero_copy(fd, len));
|
||||
EXPECT_TRUE(bl.can_zero_copy());
|
||||
int out_fd = ::open("testfile_out", O_RDWR|O_CREAT|O_TRUNC, 0600);
|
||||
EXPECT_EQ(0, bl.write_fd_zero_copy(out_fd));
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(st));
|
||||
EXPECT_EQ(0, ::stat("testfile_out", &st));
|
||||
EXPECT_EQ(len, st.st_size);
|
||||
char buf[len + 1];
|
||||
EXPECT_EQ(len, safe_read(out_fd, buf, len + 1));
|
||||
EXPECT_EQ(0, memcmp(buf, "ABC\n", len));
|
||||
::close(out_fd);
|
||||
::unlink("testfile_out");
|
||||
}
|
||||
#endif // CEPH_HAVE_SPLICE
|
||||
|
||||
//
|
||||
// +-----------+ +-----+
|
||||
// | | | |
|
||||
@ -1615,9 +1775,14 @@ TEST(BufferList, read_fd) {
|
||||
bufferlist bl;
|
||||
EXPECT_EQ(-EBADF, bl.read_fd(fd, len));
|
||||
fd = ::open("testfile", O_RDONLY);
|
||||
#ifdef CEPH_HAVE_SPLICE
|
||||
EXPECT_EQ(0, bl.read_fd(fd, len));
|
||||
EXPECT_EQ(0u, bl.buffers().front().unused_tail_length());
|
||||
#else
|
||||
EXPECT_EQ(len, (unsigned)bl.read_fd(fd, len));
|
||||
EXPECT_EQ(len, bl.length());
|
||||
EXPECT_EQ(CEPH_PAGE_SIZE - len, bl.buffers().front().unused_tail_length());
|
||||
#endif
|
||||
EXPECT_EQ(len, bl.length());
|
||||
::close(fd);
|
||||
::unlink("testfile");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user