libcephfs: Add test for lazyio via libcephfs

Signed-off-by: Sidharth Anupkrishnan <sanupkri@redhat.com>
This commit is contained in:
Sidharth Anupkrishnan 2019-06-26 21:24:36 +05:30
parent e0722de187
commit 1184619128
7 changed files with 374 additions and 7 deletions

View File

@ -3,5 +3,6 @@
ceph_test_libcephfs
ceph_test_libcephfs_access
ceph_test_libcephfs_reclaim
ceph_test_libcephfs_lazyio
exit 0

View File

@ -10484,10 +10484,10 @@ int Client::ll_lazyio(Fh *fh, int enable)
return _lazyio(fh, enable);
}
int Client::lazyio_propogate(int fd, loff_t offset, size_t count)
int Client::lazyio_propagate(int fd, loff_t offset, size_t count)
{
std::lock_guard l(client_lock);
ldout(cct, 3) << "op: client->lazyio_propogate(" << fd
ldout(cct, 3) << "op: client->lazyio_propagate(" << fd
<< ", " << offset << ", " << count << ")" << dendl;
Fh *f = get_filehandle(fd);

View File

@ -454,7 +454,7 @@ public:
// hpc lazyio
int lazyio(int fd, int enable);
int lazyio_propogate(int fd, loff_t offset, size_t count);
int lazyio_propagate(int fd, loff_t offset, size_t count);
int lazyio_synchronize(int fd, loff_t offset, size_t count);
// expose file layout

View File

@ -1119,11 +1119,11 @@ int ceph_lazyio(struct ceph_mount_info *cmount, int fd, int enable);
* @param offset a boolean to enable lazyio or disable lazyio.
* @returns 0 on success or a negative error code on failure.
*/
int ceph_lazyio_propogate(struct ceph_mount_info *cmount, int fd, int64_t offset, size_t count);
int ceph_lazyio_propagate(struct ceph_mount_info *cmount, int fd, int64_t offset, size_t count);
/**
* Flushes the write buffer for the file and invalidate the read cache. This allows a subsequent read operation to read and cache data directly from the file and hence everyone's propogated writes would be visible.
* Flushes the write buffer for the file and invalidate the read cache. This allows a subsequent read operation to read and cache data directly from the file and hence everyone's propagated writes would be visible.
*
* @param cmount the ceph mount handle to use for performing the fsync.
* @param fd the file descriptor of the file to sync.

View File

@ -1078,12 +1078,12 @@ extern "C" int ceph_lazyio(class ceph_mount_info *cmount,
return (cmount->get_client()->lazyio(fd, enable));
}
extern "C" int ceph_lazyio_propogate(class ceph_mount_info *cmount,
extern "C" int ceph_lazyio_propagate(class ceph_mount_info *cmount,
int fd, int64_t offset, size_t count)
{
if (!cmount->is_mounted())
return -ENOTCONN;
return (cmount->get_client()->lazyio_propogate(fd, offset, count));
return (cmount->get_client()->lazyio_propagate(fd, offset, count));
}
extern "C" int ceph_lazyio_synchronize(class ceph_mount_info *cmount,

View File

@ -32,6 +32,19 @@ if(${WITH_CEPHFS})
install(TARGETS ceph_test_libcephfs_reclaim
DESTINATION ${CMAKE_INSTALL_BINDIR})
add_executable(ceph_test_libcephfs_lazyio
lazyio.cc
)
target_link_libraries(ceph_test_libcephfs_lazyio
cephfs
librados
${UNITTEST_LIBS}
${EXTRALIBS}
${CMAKE_DL_LIBS}
)
install(TARGETS ceph_test_libcephfs_lazyio
DESTINATION ${CMAKE_INSTALL_BINDIR})
add_executable(ceph_test_libcephfs_access
test.cc
access.cc

View File

@ -0,0 +1,353 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2019 Red Hat Ltd
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "gtest/gtest.h"
#include "include/cephfs/libcephfs.h"
#include "include/rados/librados.h"
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include <sys/xattr.h>
rados_t cluster;
TEST(LibCephFS, LazyIOOneWriterMulipleReaders) {
struct ceph_mount_info *ca, *cb;
ASSERT_EQ(ceph_create(&ca, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
ASSERT_EQ(ceph_mount(ca, NULL), 0);
ASSERT_EQ(ceph_create(&cb, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
ASSERT_EQ(ceph_mount(cb, NULL), 0);
char name[20];
snprintf(name, sizeof(name), "foo.%d", getpid());
int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
ASSERT_LE(0, fda);
int fdb = ceph_open(cb, name, O_RDONLY, 0644);
ASSERT_LE(0, fdb);
ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
char out_buf[] = "fooooooooo";
/* Client a issues a write and propagates/flushes the buffer */
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client a issues a write and propagates/flushes the buffer */
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 10));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
char in_buf[40];
/* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
/* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and fda holds the updated inode*/
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
ceph_close(ca, fda);
ceph_close(cb, fdb);
ceph_shutdown(ca);
ceph_shutdown(cb);
}
TEST(LibCephFS, LazyIOMultipleWritersMulipleReaders) {
struct ceph_mount_info *ca, *cb;
ASSERT_EQ(ceph_create(&ca, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
ASSERT_EQ(ceph_mount(ca, NULL), 0);
ASSERT_EQ(ceph_create(&cb, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
ASSERT_EQ(ceph_mount(cb, NULL), 0);
char name[20];
snprintf(name, sizeof(name), "foo2.%d", getpid());
int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
ASSERT_LE(0, fda);
int fdb = ceph_open(cb, name, O_RDWR, 0644);
ASSERT_LE(0, fdb);
ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
char out_buf[] = "fooooooooo";
/* Client a issues a write and propagates/flushes the buffer */
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client b issues a write and propagates/flushes the buffer*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
char in_buf[40];
/* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
/* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
/* Client a issues a write */
char wait_out_buf[] = "foobarbars";
ASSERT_EQ((int)sizeof(wait_out_buf), ceph_write(ca, fda, wait_out_buf, sizeof(wait_out_buf), 20));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
/* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
ceph_close(ca, fda);
ceph_close(cb, fdb);
ceph_shutdown(ca);
ceph_shutdown(cb);
}
TEST(LibCephFS, LazyIOMultipleWritersOneReader) {
struct ceph_mount_info *ca, *cb;
ASSERT_EQ(ceph_create(&ca, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
ASSERT_EQ(ceph_mount(ca, NULL), 0);
ASSERT_EQ(ceph_create(&cb, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
ASSERT_EQ(ceph_mount(cb, NULL), 0);
char name[20];
snprintf(name, sizeof(name), "foo3.%d", getpid());
int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
ASSERT_LE(0, fda);
int fdb = ceph_open(cb, name, O_RDWR, 0644);
ASSERT_LE(0, fdb);
ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
char out_buf[] = "fooooooooo";
/* Client a issues a write and propagates/flushes the buffer */
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client b issues a write and propagates/flushes the buffer*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
char in_buf[40];
/* Client a reads the file and verifies that it only reads it's propagated writes and not Client b's*/
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooo");
/* Client a reads the file again, this time with a lazyio_synchronize to check if the cache gets invalidated and data is refetched i.e all the propagated writes are being read*/
ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
ceph_close(ca, fda);
ceph_close(cb, fdb);
ceph_shutdown(ca);
ceph_shutdown(cb);
}
TEST(LibCephFS, LazyIOSynchronizeFlush) {
/* Test to make sure lazyio_synchronize flushes dirty buffers */
struct ceph_mount_info *ca, *cb;
ASSERT_EQ(ceph_create(&ca, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
ASSERT_EQ(ceph_mount(ca, NULL), 0);
ASSERT_EQ(ceph_create(&cb, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
ASSERT_EQ(ceph_mount(cb, NULL), 0);
char name[20];
snprintf(name, sizeof(name), "foo4.%d", getpid());
int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
ASSERT_LE(0, fda);
int fdb = ceph_open(cb, name, O_RDWR, 0644);
ASSERT_LE(0, fdb);
ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
char out_buf[] = "fooooooooo";
/* Client a issues a write and propagates it*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client b issues writes and without lazyio_propagate*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 20));
char in_buf[40];
/* Calling ceph_lazyio_synchronize here will first flush the possibly pending buffered write of client b and invalidate client b's cache and hence enable client b to fetch all the propagated writes */
ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
/* Required to call ceph_lazyio_synchronize here since client b is the latest writer and client a is out of sync with updated file*/
ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
ceph_close(ca, fda);
ceph_close(cb, fdb);
ceph_shutdown(ca);
ceph_shutdown(cb);
}
TEST(LibCephFS, WithoutandWithLazyIO) {
struct ceph_mount_info *ca, *cb;
ASSERT_EQ(ceph_create(&ca, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
ASSERT_EQ(ceph_mount(ca, NULL), 0);
ASSERT_EQ(ceph_create(&cb, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
ASSERT_EQ(ceph_mount(cb, NULL), 0);
char name[20];
snprintf(name, sizeof(name), "foo5.%d", getpid());
int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
ASSERT_LE(0, fda);
int fdb = ceph_open(cb, name, O_RDWR, 0644);
ASSERT_LE(0, fdb);
char out_buf_w[] = "1234567890";
/* Doing some non lazyio writes and read*/
ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(ca, fda, out_buf_w, sizeof(out_buf_w), 0));
ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(cb, fdb, out_buf_w, sizeof(out_buf_w), 10));
char in_buf_w[30];
ASSERT_EQ(ceph_read(ca, fda, in_buf_w, sizeof(in_buf_w), 0), 2*strlen(out_buf_w)+1);
/* Enable lazyio*/
ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
char out_buf[] = "fooooooooo";
/* Client a issues a write and propagates/flushes the buffer*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 20));
ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
/* Client b issues a write and propagates/flushes the buffer*/
ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 30));
ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
char in_buf[50];
/* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
/* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
ceph_close(ca, fda);
ceph_close(cb, fdb);
ceph_shutdown(ca);
ceph_shutdown(cb);
}
static int update_root_mode()
{
struct ceph_mount_info *admin;
int r = ceph_create(&admin, NULL);
if (r < 0)
return r;
ceph_conf_read_file(admin, NULL);
ceph_conf_parse_env(admin, NULL);
ceph_conf_set(admin, "client_permissions", "false");
r = ceph_mount(admin, "/");
if (r < 0)
goto out;
r = ceph_chmod(admin, "/", 0777);
out:
ceph_shutdown(admin);
return r;
}
int main(int argc, char **argv)
{
int r = update_root_mode();
if (r < 0)
exit(1);
::testing::InitGoogleTest(&argc, argv);
srand(getpid());
r = rados_create(&cluster, NULL);
if (r < 0)
exit(1);
r = rados_conf_read_file(cluster, NULL);
if (r < 0)
exit(1);
rados_conf_parse_env(cluster, NULL);
r = rados_connect(cluster);
if (r < 0)
exit(1);
r = RUN_ALL_TESTS();
rados_shutdown(cluster);
return r;
}