From 275d828adc46b0dba4a6d22b427cee9e130f6dca Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Thu, 31 Jan 2019 14:39:44 +0800 Subject: [PATCH] tools: format code and clean up Signed-off-by: Yuan Zhou --- .../test_ObjectCacheFile.cc | 120 ------------------ .../immutable_object_cache/CacheClient.cc | 54 ++++---- .../immutable_object_cache/CacheServer.cc | 10 +- .../immutable_object_cache/CacheSession.cc | 13 +- .../ObjectCacheStore.cc | 12 +- .../immutable_object_cache/SimplePolicy.cc | 2 +- .../immutable_object_cache/SocketCommon.h | 7 +- 7 files changed, 44 insertions(+), 174 deletions(-) delete mode 100644 src/test/immutable_object_cache/test_ObjectCacheFile.cc diff --git a/src/test/immutable_object_cache/test_ObjectCacheFile.cc b/src/test/immutable_object_cache/test_ObjectCacheFile.cc deleted file mode 100644 index 1ddbfa2e473..00000000000 --- a/src/test/immutable_object_cache/test_ObjectCacheFile.cc +++ /dev/null @@ -1,120 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "gtest/gtest.h" -#include "include/Context.h" -#include "include/buffer_fwd.h" -#include "common/Mutex.h" -#include "common/Cond.h" -#include "global/global_init.h" -#include "common/ceph_argparse.h" -#include "global/global_context.h" -#include - -#include "tools/immutable_object_cache/ObjectCacheFile.h" - -using namespace ceph::immutable_obj_cache; -namespace efs = std::experimental::filesystem; - -class TestObjectCacheFile :public ::testing::Test { -public: - std::string m_cache_root_dir; - - TestObjectCacheFile(){} - ~TestObjectCacheFile(){} - static void SetUpTestCase() {} - static void TearDownTestCase() {} - - void SetUp() override { - m_cache_root_dir = g_ceph_context->_conf.get_val("immutable_object_cache_path") - + "/ceph_immutable_obj_cache/"; - - if (efs::exists(m_cache_root_dir)) { - efs::remove_all(m_cache_root_dir); - } - efs::create_directories(m_cache_root_dir); - } - - void TearDown() override { - efs::remove_all(m_cache_root_dir); - } - -}; - -TEST_F(TestObjectCacheFile, test_write_object_to_file) { - ObjectCacheFile* m_cache_file_1 = new ObjectCacheFile(g_ceph_context, "test_sync_file_1"); - ObjectCacheFile* m_cache_file_2 = new ObjectCacheFile(g_ceph_context, "test_sync_file_2"); - ObjectCacheFile* m_cache_file_3 = new ObjectCacheFile(g_ceph_context, "test_sync_file_3"); - ASSERT_TRUE(m_cache_file_1->get_file_size() == -1); - ASSERT_TRUE(m_cache_file_2->get_file_size() == -1); - ASSERT_TRUE(m_cache_file_3->get_file_size() == -1); - - bufferlist* buf_1 = new ceph::bufferlist(); - bufferlist* buf_2 = new ceph::bufferlist(); - bufferlist* buf_3 = new ceph::bufferlist(); - buf_1->append(std::string(1024, '0')); - buf_2->append(std::string(4096, '0')); - buf_3->append(std::string(0, '0')); - - ASSERT_TRUE(m_cache_file_1->write_object_to_file(*buf_1, 1024) == 1024); - ASSERT_TRUE(m_cache_file_2->write_object_to_file(*buf_2, 4096) == 4096); - ASSERT_TRUE(m_cache_file_3->write_object_to_file(*buf_3, 0) == 0); - ASSERT_TRUE(m_cache_file_1->get_file_size() == 1024); - ASSERT_TRUE(m_cache_file_2->get_file_size() == 4096); - ASSERT_TRUE(m_cache_file_3->get_file_size() == 0); - - delete m_cache_file_1; - delete m_cache_file_2; - delete m_cache_file_3; - delete buf_1; - delete buf_2; - delete buf_3; -} - -TEST_F(TestObjectCacheFile, test_read_object_from_file) { - ObjectCacheFile* m_cache_file_1 = new ObjectCacheFile(g_ceph_context, "test_sync_file_1"); - ObjectCacheFile* m_cache_file_2 = new ObjectCacheFile(g_ceph_context, "test_sync_file_2"); - bufferlist* buf_1 = new ceph::bufferlist(); - bufferlist* buf_2 = new ceph::bufferlist(); - - ASSERT_TRUE(m_cache_file_1->get_file_size() == -1); - ASSERT_TRUE(m_cache_file_2->get_file_size() == -1); - ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_1, 0, 1024), -1); - ASSERT_EQ(m_cache_file_2->read_object_from_file(buf_2, 0, 1024), -1); - - buf_1->append(std::string("helloworld")); - ASSERT_TRUE(m_cache_file_1->write_object_to_file(*buf_1, 10) == 10); - ASSERT_TRUE(m_cache_file_1->get_file_size() == 10); - - bufferlist* buf_3 = new ceph::bufferlist(); - bufferlist* buf_4 = new ceph::bufferlist(); - bufferlist* buf_5 = new ceph::bufferlist(); - bufferlist* buf_6 = new ceph::bufferlist(); - - ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_3, 0, 10), 10); - ASSERT_EQ(10, buf_3->length()); - ASSERT_EQ(0, (strncmp(buf_1->c_str(), buf_3->c_str(), 10))); - - ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_4, 0, 4096), 10); - ASSERT_EQ(10, buf_4->length()); - ASSERT_EQ(0, (strncmp(buf_1->c_str(), buf_4->c_str(), 10))); - - ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_5, 2, 4), 4); - ASSERT_EQ(4, buf_5->length()); - bufferlist sub_bl; - sub_bl.substr_of(*buf_1, 2, 4); - ASSERT_EQ(0, (strncmp(sub_bl.c_str(), buf_5->c_str(), 4))); - - ASSERT_EQ(m_cache_file_1->read_object_from_file(buf_6, 12, 4), 0); - ASSERT_EQ(0, buf_6->length()); - - - delete m_cache_file_1; - delete m_cache_file_2; - delete buf_1; - delete buf_2; - delete buf_3; - delete buf_4; - delete buf_5; - delete buf_6; -} diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc index 37c42bd68e1..4dca9188ad2 100644 --- a/src/tools/immutable_object_cache/CacheClient.cc +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -23,7 +23,7 @@ namespace immutable_obj_cache { // TODO : configure it. m_use_dedicated_worker = true; m_worker_thread_num = 2; - if(m_use_dedicated_worker) { + if (m_use_dedicated_worker) { m_worker = new boost::asio::io_service(); m_worker_io_service_work = new boost::asio::io_service::work(*m_worker); for(uint64_t i = 0; i < m_worker_thread_num; i++) { @@ -50,10 +50,10 @@ namespace immutable_obj_cache { m_session_work.store(false); m_io_service.stop(); - if(m_io_thread != nullptr) { + if (m_io_thread != nullptr) { m_io_thread->join(); } - if(m_use_dedicated_worker) { + if (m_use_dedicated_worker) { m_worker->stop(); for(auto thd : m_worker_threads) { thd->join(); @@ -69,7 +69,7 @@ namespace immutable_obj_cache { m_session_work.store(false); boost::system::error_code close_ec; m_dm_socket.close(close_ec); - if(close_ec) { + if (close_ec) { ldout(cct, 20) << "close: " << close_ec.message() << dendl; } } @@ -77,7 +77,7 @@ namespace immutable_obj_cache { int CacheClient::connect() { boost::system::error_code ec; m_dm_socket.connect(m_ep, ec); - if(ec) { + if (ec) { fault(ASIO_ERROR_CONNECT, ec); return -1; } @@ -121,7 +121,7 @@ namespace immutable_obj_cache { } void CacheClient::try_send() { - if(!m_writing.load()) { + if (!m_writing.load()) { m_writing.store(true); send_message(); } @@ -148,7 +148,7 @@ namespace immutable_obj_cache { { Mutex::Locker locker(m_lock); - if(m_outcoming_bl.length() == 0) { + if (m_outcoming_bl.length() == 0) { m_writing.store(false); return; } @@ -161,7 +161,7 @@ namespace immutable_obj_cache { } void CacheClient::try_receive() { - if(!m_reading.load()) { + if (!m_reading.load()) { m_reading.store(true); receive_message(); } @@ -174,9 +174,6 @@ namespace immutable_obj_cache { void CacheClient::read_reply_header() { - /* one head buffer for all arrived reply. */ - // bufferptr bp_head(buffer::create_static(sizeof(ObjectCacheMsgHeader), m_header_buffer)); - /* create new head buffer for every reply */ bufferptr bp_head(buffer::create(sizeof(ObjectCacheMsgHeader))); auto raw_ptr = bp_head.c_str(); @@ -193,7 +190,7 @@ namespace immutable_obj_cache { void CacheClient::handle_reply_header(bufferptr bp_head, const boost::system::error_code& ec, size_t bytes_transferred) { - if(ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { + if (ec || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { fault(ASIO_ERROR_READ, ec); return; } @@ -246,12 +243,12 @@ namespace immutable_obj_cache { { Mutex::Locker locker(m_lock); - if(m_seq_to_req.size() == 0 && m_outcoming_bl.length()) { + if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) { m_reading.store(false); return; } } - if(is_session_work()) { + if (is_session_work()) { receive_message(); } @@ -269,7 +266,7 @@ namespace immutable_obj_cache { ceph_assert(current_request != nullptr); auto process_reply = new FunctionContext([this, current_request, reply] (bool dedicated) { - if(dedicated) { + if (dedicated) { // dedicated thrad to execute this context. } current_request->m_process_msg->complete(reply); @@ -277,7 +274,7 @@ namespace immutable_obj_cache { delete reply; }); - if(m_use_dedicated_worker) { + if (m_use_dedicated_worker) { m_worker->post([process_reply]() { process_reply->complete(true); }); @@ -290,9 +287,9 @@ namespace immutable_obj_cache { void CacheClient::fault(const int err_type, const boost::system::error_code& ec) { ldout(cct, 20) << "fault." << ec.message() << dendl; - if(err_type == ASIO_ERROR_CONNECT) { + if (err_type == ASIO_ERROR_CONNECT) { ceph_assert(!m_session_work.load()); - if(ec == boost::asio::error::connection_refused) { + if (ec == boost::asio::error::connection_refused) { ldout(cct, 20) << "Connecting RO daenmon fails : "<< ec.message() << ". Immutable-object-cache daemon is down ? " << "Data will be read from ceph cluster " << dendl; @@ -300,20 +297,20 @@ namespace immutable_obj_cache { ldout(cct, 20) << "Connecting RO daemon fails : " << ec.message() << dendl; } - if(m_dm_socket.is_open()) { + if (m_dm_socket.is_open()) { // Set to indicate what error occurred, if any. // Note that, even if the function indicates an error, // the underlying descriptor is closed. boost::system::error_code close_ec; m_dm_socket.close(close_ec); - if(close_ec) { + if (close_ec) { ldout(cct, 20) << "close: " << close_ec.message() << dendl; } } return; } - if(!m_session_work.load()) { + if (!m_session_work.load()) { return; } @@ -324,16 +321,16 @@ namespace immutable_obj_cache { // make sure just have one thread to modify execute below code. m_session_work.store(false); - if(err_type == ASIO_ERROR_MSG_INCOMPLETE) { + if (err_type == ASIO_ERROR_MSG_INCOMPLETE) { ldout(cct, 20) << "ASIO In-complete message." << ec.message() << dendl; ceph_assert(0); } - if(err_type == ASIO_ERROR_READ) { + if (err_type == ASIO_ERROR_READ) { ldout(cct, 20) << "ASIO async read fails : " << ec.message() << dendl; } - if(err_type == ASIO_ERROR_WRITE) { + if (err_type == ASIO_ERROR_WRITE) { ldout(cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl; // CacheClient should not occur this error. ceph_assert(0); @@ -357,9 +354,6 @@ namespace immutable_obj_cache { << ec.message() << dendl; } - - // TODO : use async + wait_event - // TODO : accept one parameter : ObjectCacheRequest int CacheClient::register_client(Context* on_finish) { ObjectCacheRequest* message = new ObjectCacheRequest(); message->m_head.version = 0; @@ -378,14 +372,14 @@ namespace immutable_obj_cache { ret = boost::asio::write(m_dm_socket, boost::asio::buffer(bl.c_str(), bl.length()), ec); - if(ec || ret != bl.length()) { + if (ec || ret != bl.length()) { fault(ASIO_ERROR_WRITE, ec); return -1; } ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_header_buffer, sizeof(ObjectCacheMsgHeader)), ec); - if(ec || ret != sizeof(ObjectCacheMsgHeader)) { + if (ec || ret != sizeof(ObjectCacheMsgHeader)) { fault(ASIO_ERROR_READ, ec); return -1; } @@ -395,7 +389,7 @@ namespace immutable_obj_cache { bufferptr bp_data(buffer::create(data_len)); ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), data_len), ec); - if(ec || ret != data_len) { + if (ec || ret != data_len) { fault(ASIO_ERROR_READ, ec); return -1; } diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc index e2a1e4549aa..e10db19c44a 100644 --- a/src/tools/immutable_object_cache/CacheServer.cc +++ b/src/tools/immutable_object_cache/CacheServer.cc @@ -29,13 +29,13 @@ int CacheServer::run() { ldout(cct, 20) << dendl; int ret = start_accept(); - if(ret != 0) { + if (ret != 0) { return ret; } boost::system::error_code ec; ret = m_io_service.run(ec); - if(ec) { + if (ec) { ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl; return -1; } @@ -52,19 +52,19 @@ int CacheServer::start_accept() { boost::system::error_code ec; m_acceptor.open(m_local_path.protocol(), ec); - if(ec) { + if (ec) { ldout(cct, 1) << "m_acceptor open fails: " << ec.message() << dendl; return -1; } m_acceptor.bind(m_local_path, ec); - if(ec) { + if (ec) { ldout(cct, 1) << "m_acceptor bind fails: " << ec.message() << dendl; return -1; } m_acceptor.listen(boost::asio::socket_base::max_connections, ec); - if(ec) { + if (ec) { ldout(cct, 1) << "m_acceptor listen fails: " << ec.message() << dendl; return -1; } diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc index f58bd8163c3..87528ac48f5 100644 --- a/src/tools/immutable_object_cache/CacheSession.cc +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -32,10 +32,10 @@ stream_protocol::socket& CacheSession::socket() { } void CacheSession::close() { - if(m_dm_socket.is_open()) { + if (m_dm_socket.is_open()) { boost::system::error_code close_ec; m_dm_socket.close(close_ec); - if(close_ec) { + if (close_ec) { ldout(cct, 20) << "close: " << close_ec.message() << dendl; } } @@ -59,7 +59,7 @@ void CacheSession::read_request_header() { void CacheSession::handle_request_header(const boost::system::error_code& err, size_t bytes_transferred) { ldout(cct, 20) << dendl; - if(err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { + if (err || bytes_transferred != sizeof(ObjectCacheMsgHeader)) { fault(); return; } @@ -67,8 +67,7 @@ void CacheSession::handle_request_header(const boost::system::error_code& err, ObjectCacheMsgHeader* head = (ObjectCacheMsgHeader*)(m_head_buffer); ceph_assert(head->version == 0); ceph_assert(head->reserved == 0); - ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ || - head->type == RBDSC_LOOKUP); + ceph_assert(head->type == RBDSC_REGISTER || head->type == RBDSC_READ); read_request_data(head->data_len); } @@ -89,7 +88,7 @@ void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, const boost::system::error_code& err, size_t bytes_transferred) { ldout(cct, 20) << dendl; - if(err || bytes_transferred != data_len) { + if (err || bytes_transferred != data_len) { fault(); return; } @@ -120,7 +119,7 @@ void CacheSession::send(ObjectCacheRequest* reply) { boost::asio::buffer(bl.c_str(), bl.length()), boost::asio::transfer_exactly(bl.length()), [this, bl, reply](const boost::system::error_code& err, size_t bytes_transferred) { - if(err || bytes_transferred != bl.length()) { + if (err || bytes_transferred != bl.length()) { fault(); return; } diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc index 5ddec8403ea..d862d0cac9b 100644 --- a/src/tools/immutable_object_cache/ObjectCacheStore.cc +++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc @@ -38,13 +38,13 @@ int ObjectCacheStore::init(bool reset) { ldout(m_cct, 20) << dendl; int ret = m_rados->init_with_context(m_cct); - if(ret < 0) { + if (ret < 0) { lderr(m_cct) << "fail to init Ceph context" << dendl; return ret; } ret = m_rados->connect(); - if(ret < 0 ) { + if (ret < 0 ) { lderr(m_cct) << "fail to connect to cluster" << dendl; return ret; } @@ -128,7 +128,7 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl; // rados read error - if(ret != -ENOENT && ret < 0) { + if (ret != -ENOENT && ret < 0) { lderr(m_cct) << "fail to read from rados" << dendl; m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); @@ -141,8 +141,6 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, ret = 0; } - uint32_t file_size = ret; - std::string cache_file_path = std::move(generate_cache_file_path(cache_file_name)); ret = read_buf->write_file(cache_file_path.c_str()); @@ -208,7 +206,7 @@ int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, librados::AioCompletion* read_completion = create_rados_callback(on_finish); // issue a zero-sized read req to get full obj int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0); - if(ret < 0) { + if (ret < 0) { lderr(m_cct) << "failed to read from rados" << dendl; } read_completion->release(); @@ -260,7 +258,7 @@ std::string ObjectCacheStore::generate_cache_file_path(std::string cache_file_na std::string cache_file_dir = ""; - if(m_dir_num > 0) { + if (m_dir_num > 0) { auto const pos = cache_file_name.find_last_of('.'); cache_file_dir = std::to_string(stoul(cache_file_name.substr(pos+1)) % m_dir_num); } diff --git a/src/tools/immutable_object_cache/SimplePolicy.cc b/src/tools/immutable_object_cache/SimplePolicy.cc index 0131ed013a1..ab02def4fe0 100644 --- a/src/tools/immutable_object_cache/SimplePolicy.cc +++ b/src/tools/immutable_object_cache/SimplePolicy.cc @@ -163,7 +163,7 @@ cache_status_t SimplePolicy::get_status(std::string file_name) { RWLock::RLocker locker(m_cache_map_lock); auto entry_it = m_cache_map.find(file_name); - if(entry_it == m_cache_map.end()) { + if (entry_it == m_cache_map.end()) { return OBJ_CACHE_NONE; } diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h index 1831e2715a5..db78e427912 100644 --- a/src/tools/immutable_object_cache/SocketCommon.h +++ b/src/tools/immutable_object_cache/SocketCommon.h @@ -9,10 +9,9 @@ namespace immutable_obj_cache { static const int RBDSC_REGISTER = 0X11; static const int RBDSC_READ = 0X12; -static const int RBDSC_LOOKUP = 0X13; -static const int RBDSC_REGISTER_REPLY = 0X14; -static const int RBDSC_READ_REPLY = 0X15; -static const int RBDSC_READ_RADOS = 0X16; +static const int RBDSC_REGISTER_REPLY = 0X13; +static const int RBDSC_READ_REPLY = 0X14; +static const int RBDSC_READ_RADOS = 0X15; static const int ASIO_ERROR_READ = 0X01; static const int ASIO_ERROR_WRITE = 0X02;