From 3ecdae8388d69123b937a40ce614a0b795a757f1 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 8 Oct 2015 13:21:29 -0400 Subject: [PATCH] rbd-replay: added version control to trace output file Signed-off-by: Jason Dillaman --- src/rbd_replay/ActionTypes.cc | 354 +++++++++++++++++++++++++++ src/rbd_replay/ActionTypes.h | 277 +++++++++++++++++++++ src/rbd_replay/BufferReader.cc | 34 +++ src/rbd_replay/BufferReader.h | 33 +++ src/rbd_replay/Deser.cc | 70 ------ src/rbd_replay/Deser.hpp | 52 ---- src/rbd_replay/Makefile.am | 49 ++-- src/rbd_replay/Replayer.cc | 71 +++++- src/rbd_replay/Replayer.hpp | 3 +- src/rbd_replay/Ser.cc | 57 ----- src/rbd_replay/Ser.hpp | 50 ---- src/rbd_replay/actions.cc | 387 ++++++++---------------------- src/rbd_replay/actions.hpp | 311 ++++++++++-------------- src/rbd_replay/ios.cc | 117 ++++----- src/rbd_replay/ios.hpp | 29 ++- src/rbd_replay/rbd-replay-prep.cc | 42 +++- 16 files changed, 1117 insertions(+), 819 deletions(-) create mode 100644 src/rbd_replay/ActionTypes.cc create mode 100644 src/rbd_replay/ActionTypes.h create mode 100644 src/rbd_replay/BufferReader.cc create mode 100644 src/rbd_replay/BufferReader.h delete mode 100644 src/rbd_replay/Deser.cc delete mode 100644 src/rbd_replay/Deser.hpp delete mode 100644 src/rbd_replay/Ser.cc delete mode 100644 src/rbd_replay/Ser.hpp diff --git a/src/rbd_replay/ActionTypes.cc b/src/rbd_replay/ActionTypes.cc new file mode 100644 index 00000000000..36ed3ca02c7 --- /dev/null +++ b/src/rbd_replay/ActionTypes.cc @@ -0,0 +1,354 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rbd_replay/ActionTypes.h" +#include "include/assert.h" +#include "include/byteorder.h" +#include "include/stringify.h" +#include "common/Formatter.h" +#include +#include + +namespace rbd_replay { +namespace action { + +namespace { + +bool byte_swap_required(__u8 version) { +#if defined(CEPH_LITTLE_ENDIAN) + return (version == 0); +#else + return false; +#endif +} + +void decode_big_endian_string(std::string &str, bufferlist::iterator &it) { +#if defined(CEPH_LITTLE_ENDIAN) + uint32_t length; + ::decode(length, it); + length = swab32(length); + str.clear(); + it.copy(length, str); +#else + assert(false); +#endif +} + +class EncodeVisitor : public boost::static_visitor { +public: + EncodeVisitor(bufferlist &bl) : m_bl(bl) { + } + + template + inline void operator()(const Action &action) const { + ::encode(static_cast(Action::ACTION_TYPE), m_bl); + action.encode(m_bl); + } +private: + bufferlist &m_bl; +}; + +class DecodeVisitor : public boost::static_visitor { +public: + DecodeVisitor(__u8 version, bufferlist::iterator &iter) + : m_version(version), m_iter(iter) { + } + + template + inline void operator()(Action &action) const { + action.decode(m_version, m_iter); + } +private: + __u8 m_version; + bufferlist::iterator &m_iter; +}; + +class DumpVisitor : public boost::static_visitor { +public: + DumpVisitor(Formatter *formatter) : m_formatter(formatter) {} + + template + inline void operator()(const Action &action) const { + ActionType action_type = Action::ACTION_TYPE; + m_formatter->dump_string("action_type", stringify(action_type)); + action.dump(m_formatter); + } +private: + ceph::Formatter *m_formatter; +}; + +} // anonymous namespace + +void Dependency::encode(bufferlist &bl) const { + ::encode(id, bl); + ::encode(time_delta, bl); +} + +void Dependency::decode(bufferlist::iterator &it) { + decode(1, it); +} + +void Dependency::decode(__u8 version, bufferlist::iterator &it) { + ::decode(id, it); + ::decode(time_delta, it); + if (byte_swap_required(version)) { + id = swab32(id); + time_delta = swab64(time_delta); + } +} + +void Dependency::dump(Formatter *f) const { + f->dump_unsigned("id", id); + f->dump_unsigned("time_delta", time_delta); +} + +void Dependency::generate_test_instances(std::list &o) { + o.push_back(new Dependency()); + o.push_back(new Dependency(1, 123456789)); +} + +void ActionBase::encode(bufferlist &bl) const { + ::encode(id, bl); + ::encode(thread_id, bl); + ::encode(dependencies, bl); +} + +void ActionBase::decode(__u8 version, bufferlist::iterator &it) { + ::decode(id, it); + ::decode(thread_id, it); + if (version == 0) { + uint32_t num_successors; + ::decode(num_successors, it); + + uint32_t num_completion_successors; + ::decode(num_completion_successors, it); + } + + if (byte_swap_required(version)) { + id = swab32(id); + thread_id = swab64(thread_id); + + uint32_t dep_count; + ::decode(dep_count, it); + dep_count = swab32(dep_count); + dependencies.resize(dep_count); + for (uint32_t i = 0; i < dep_count; ++i) { + dependencies[i].decode(0, it); + } + } else { + ::decode(dependencies, it); + } +} + +void ActionBase::dump(Formatter *f) const { + f->dump_unsigned("id", id); + f->dump_unsigned("thread_id", thread_id); + f->open_array_section("dependencies"); + for (size_t i = 0; i < dependencies.size(); ++i) { + f->open_object_section("dependency"); + dependencies[i].dump(f); + f->close_section(); + } + f->close_section(); +} + +void ImageActionBase::encode(bufferlist &bl) const { + ActionBase::encode(bl); + ::encode(imagectx_id, bl); +} + +void ImageActionBase::decode(__u8 version, bufferlist::iterator &it) { + ActionBase::decode(version, it); + ::decode(imagectx_id, it); + if (byte_swap_required(version)) { + imagectx_id = swab64(imagectx_id); + } +} + +void ImageActionBase::dump(Formatter *f) const { + ActionBase::dump(f); + f->dump_unsigned("imagectx_id", imagectx_id); +} + +void IoActionBase::encode(bufferlist &bl) const { + ImageActionBase::encode(bl); + ::encode(offset, bl); + ::encode(length, bl); +} + +void IoActionBase::decode(__u8 version, bufferlist::iterator &it) { + ImageActionBase::decode(version, it); + ::decode(offset, it); + ::decode(length, it); + if (byte_swap_required(version)) { + offset = swab64(offset); + length = swab64(length); + } +} + +void IoActionBase::dump(Formatter *f) const { + ImageActionBase::dump(f); + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); +} + +void OpenImageAction::encode(bufferlist &bl) const { + ImageActionBase::encode(bl); + ::encode(name, bl); + ::encode(snap_name, bl); + ::encode(read_only, bl); +} + +void OpenImageAction::decode(__u8 version, bufferlist::iterator &it) { + ImageActionBase::decode(version, it); + if (byte_swap_required(version)) { + decode_big_endian_string(name, it); + decode_big_endian_string(snap_name, it); + } else { + ::decode(name, it); + ::decode(snap_name, it); + } + ::decode(read_only, it); +} + +void OpenImageAction::dump(Formatter *f) const { + ImageActionBase::dump(f); + f->dump_string("name", name); + f->dump_string("snap_name", snap_name); + f->dump_bool("read_only", read_only); +} + +void UnknownAction::encode(bufferlist &bl) const { + assert(false); +} + +void UnknownAction::decode(__u8 version, bufferlist::iterator &it) { +} + +void UnknownAction::dump(Formatter *f) const { +} + +void ActionEntry::encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + boost::apply_visitor(EncodeVisitor(bl), action); + ENCODE_FINISH(bl); +} + +void ActionEntry::decode(bufferlist::iterator &it) { + DECODE_START(1, it); + decode(struct_v, it); + DECODE_FINISH(it); +} + +void ActionEntry::decode_unversioned(bufferlist::iterator &it) { + decode(0, it); +} + +void ActionEntry::decode(__u8 version, bufferlist::iterator &it) { + uint8_t action_type; + ::decode(action_type, it); + + // select the correct action variant based upon the action_type + switch (action_type) { + case ACTION_TYPE_START_THREAD: + action = StartThreadAction(); + break; + case ACTION_TYPE_STOP_THREAD: + action = StopThreadAction(); + break; + case ACTION_TYPE_READ: + action = ReadAction(); + break; + case ACTION_TYPE_WRITE: + action = WriteAction(); + break; + case ACTION_TYPE_AIO_READ: + action = AioReadAction(); + break; + case ACTION_TYPE_AIO_WRITE: + action = AioWriteAction(); + break; + case ACTION_TYPE_OPEN_IMAGE: + action = OpenImageAction(); + break; + case ACTION_TYPE_CLOSE_IMAGE: + action = CloseImageAction(); + break; + } + + boost::apply_visitor(DecodeVisitor(version, it), action); +} + +void ActionEntry::dump(Formatter *f) const { + boost::apply_visitor(DumpVisitor(f), action); +} + +void ActionEntry::generate_test_instances(std::list &o) { + Dependencies dependencies; + dependencies.push_back(Dependency(3, 123456789)); + dependencies.push_back(Dependency(4, 234567890)); + + o.push_back(new ActionEntry(StartThreadAction())); + o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies))); + o.push_back(new ActionEntry(StopThreadAction())); + o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies))); + + o.push_back(new ActionEntry(ReadAction())); + o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5))); + o.push_back(new ActionEntry(WriteAction())); + o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(AioReadAction())); + o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(AioWriteAction())); + o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4, + 5))); + + o.push_back(new ActionEntry(OpenImageAction())); + o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3, + "image_name", "snap_name", + true))); + o.push_back(new ActionEntry(CloseImageAction())); + o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3))); +} + +} // namespace action +} // namespace rbd_replay + +std::ostream &operator<<(std::ostream &out, + const rbd_replay::action::ActionType &type) { + using namespace rbd_replay::action; + + switch (type) { + case ACTION_TYPE_START_THREAD: + out << "StartThread"; + break; + case ACTION_TYPE_STOP_THREAD: + out << "StopThread"; + break; + case ACTION_TYPE_READ: + out << "Read"; + break; + case ACTION_TYPE_WRITE: + out << "Write"; + break; + case ACTION_TYPE_AIO_READ: + out << "AioRead"; + break; + case ACTION_TYPE_AIO_WRITE: + out << "AioWrite"; + break; + case ACTION_TYPE_OPEN_IMAGE: + out << "OpenImage"; + break; + case ACTION_TYPE_CLOSE_IMAGE: + out << "CloseImage"; + break; + default: + out << "Unknown (" << static_cast(type) << ")"; + break; + } + return out; +} + diff --git a/src/rbd_replay/ActionTypes.h b/src/rbd_replay/ActionTypes.h new file mode 100644 index 00000000000..63ef34e98f2 --- /dev/null +++ b/src/rbd_replay/ActionTypes.h @@ -0,0 +1,277 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H +#define CEPH_RBD_REPLAY_ACTION_TYPES_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include +#include +#include +#include +#include + +namespace ceph { class Formatter; } + +namespace rbd_replay { +namespace action { + +typedef uint64_t imagectx_id_t; +typedef uint64_t thread_id_t; + +/// Even IDs are normal actions, odd IDs are completions. +typedef uint32_t action_id_t; + +static const std::string BANNER("rbd-replay-trace"); + +/** + * Dependencies link actions to earlier actions or completions. + * If an action has a dependency \c d then it waits until \c d.time_delta + * nanoseconds after the action or completion with ID \c d.id has fired. + */ +struct Dependency { + /// ID of the action or completion to wait for. + action_id_t id; + + /// Nanoseconds of delay to wait until after the action or completion fires. + uint64_t time_delta; + + /** + * @param id ID of the action or completion to wait for. + * @param time_delta Nanoseconds of delay to wait after the action or + * completion fires. + */ + Dependency() : id(0), time_delta(0) { + } + Dependency(action_id_t id, uint64_t time_delta) + : id(id), time_delta(time_delta) { + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &it); + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list &o); +}; + +WRITE_CLASS_ENCODER(Dependency); + +typedef std::vector Dependencies; + +enum ActionType { + ACTION_TYPE_START_THREAD = 0, + ACTION_TYPE_STOP_THREAD = 1, + ACTION_TYPE_READ = 2, + ACTION_TYPE_WRITE = 3, + ACTION_TYPE_AIO_READ = 4, + ACTION_TYPE_AIO_WRITE = 5, + ACTION_TYPE_OPEN_IMAGE = 6, + ACTION_TYPE_CLOSE_IMAGE = 7 +}; + +struct ActionBase { + action_id_t id; + thread_id_t thread_id; + Dependencies dependencies; + + ActionBase() : id(0), thread_id(0) { + } + ActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : id(id), thread_id(thread_id), dependencies(dependencies) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; +}; + +struct StartThreadAction : public ActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD; + + StartThreadAction() { + } + StartThreadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : ActionBase(id, thread_id, dependencies) { + } +}; + +struct StopThreadAction : public ActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD; + + StopThreadAction() { + } + StopThreadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : ActionBase(id, thread_id, dependencies) { + } +}; + +struct ImageActionBase : public ActionBase { + imagectx_id_t imagectx_id; + + ImageActionBase() : imagectx_id(0) { + } + ImageActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id) + : ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; +}; + +struct IoActionBase : public ImageActionBase { + uint64_t offset; + uint64_t length; + + IoActionBase() : offset(0), length(0) { + } + IoActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : ImageActionBase(id, thread_id, dependencies, imagectx_id), + offset(offset), length(length) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; +}; + +struct ReadAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_READ; + + ReadAction() { + } + ReadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct WriteAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE; + + WriteAction() { + } + WriteAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct AioReadAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ; + + AioReadAction() { + } + AioReadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct AioWriteAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE; + + AioWriteAction() { + } + AioWriteAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct OpenImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE; + + std::string name; + std::string snap_name; + bool read_only; + + OpenImageAction() : read_only(false) { + } + OpenImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + const std::string &name, const std::string &snap_name, + bool read_only) + : ImageActionBase(id, thread_id, dependencies, imagectx_id), + name(name), snap_name(snap_name), read_only(read_only) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; +}; + +struct CloseImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE; + + CloseImageAction() { + } + CloseImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id) + : ImageActionBase(id, thread_id, dependencies, imagectx_id) { + } +}; + +struct UnknownAction { + static const ActionType ACTION_TYPE = static_cast(-1); + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &it); + void dump(Formatter *f) const; +}; + +typedef boost::variant Action; + +class ActionEntry { +public: + Action action; + + ActionEntry() : action(UnknownAction()) { + } + ActionEntry(const Action &action) : action(action) { + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &it); + void decode_unversioned(bufferlist::iterator &it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list &o); + +private: + void decode(__u8 version, bufferlist::iterator &it); +}; + +WRITE_CLASS_ENCODER(ActionEntry); + +} // namespace action +} // namespace rbd_replay + +std::ostream &operator<<(std::ostream &out, + const rbd_replay::action::ActionType &type); + +using rbd_replay::action::decode; +using rbd_replay::action::encode; + +#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H diff --git a/src/rbd_replay/BufferReader.cc b/src/rbd_replay/BufferReader.cc new file mode 100644 index 00000000000..f1327b7f3f8 --- /dev/null +++ b/src/rbd_replay/BufferReader.cc @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rbd_replay/BufferReader.h" +#include "include/assert.h" +#include "include/intarith.h" + +namespace rbd_replay { + +BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes) + : m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes), + m_bl_it(m_bl.begin()) { + assert(m_min_bytes <= m_max_bytes); +} + +int BufferReader::fetch(bufferlist::iterator **it) { + if (m_bl_it.get_remaining() < m_min_bytes) { + ssize_t bytes_to_read = ROUND_UP_TO(m_max_bytes - m_bl_it.get_remaining(), + CEPH_BUFFER_APPEND_SIZE); + while (bytes_to_read > 0) { + int r = m_bl.read_fd(m_fd, CEPH_BUFFER_APPEND_SIZE); + if (r < 0) { + return r; + } + assert(r <= bytes_to_read); + bytes_to_read -= r; + } + } + + *it = &m_bl_it; + return 0; +} + +} // namespace rbd_replay diff --git a/src/rbd_replay/BufferReader.h b/src/rbd_replay/BufferReader.h new file mode 100644 index 00000000000..95b1533ba59 --- /dev/null +++ b/src/rbd_replay/BufferReader.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H +#define CEPH_RBD_REPLAY_BUFFER_READER_H + +#include "include/int_types.h" +#include "include/buffer.h" + +namespace rbd_replay { + +class BufferReader { +public: + static const size_t DEFAULT_MIN_BYTES = 1<<20; + static const size_t DEFAULT_MAX_BYTES = 1<<22; + + BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES, + size_t max_bytes = DEFAULT_MAX_BYTES); + + int fetch(bufferlist::iterator **it); + +private: + int m_fd; + size_t m_min_bytes; + size_t m_max_bytes; + bufferlist m_bl; + bufferlist::iterator m_bl_it; + +}; + +} // namespace rbd_replay + +#endif // CEPH_RBD_REPLAY_BUFFER_READER_H diff --git a/src/rbd_replay/Deser.cc b/src/rbd_replay/Deser.cc deleted file mode 100644 index 972db9b3ef0..00000000000 --- a/src/rbd_replay/Deser.cc +++ /dev/null @@ -1,70 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 Adam Crume - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ -#include "acconfig.h" -#include "Deser.hpp" -#include -#include -#if defined(DARWIN) || defined(__FreeBSD__) -#include -#else -#include -#endif - -rbd_replay::Deser::Deser(std::istream &in) - : m_in(in) { -} - -uint8_t rbd_replay::Deser::read_uint8_t() { - uint8_t data; - m_in.read(reinterpret_cast(&data), sizeof(data)); - return data; -} - -uint16_t rbd_replay::Deser::read_uint16_t() { - uint16_t data; - m_in.read(reinterpret_cast(&data), sizeof(data)); - return ntohs(data); -} - -uint32_t rbd_replay::Deser::read_uint32_t() { - uint32_t data; - m_in.read(reinterpret_cast(&data), sizeof(data)); - return ntohl(data); -} - -uint64_t rbd_replay::Deser::read_uint64_t() { - uint64_t data; - m_in.read(reinterpret_cast(&data), sizeof(data)); -#if __BYTE_ORDER == __LITTLE_ENDIAN - data = (static_cast(ntohl(data)) << 32 | ntohl(data >> 32)); -#endif - return data; -} - -std::string rbd_replay::Deser::read_string() { - uint32_t length = read_uint32_t(); - char* data = reinterpret_cast(malloc(length)); - m_in.read(data, length); - std::string s(data, length); - free(data); - return s; -} - -bool rbd_replay::Deser::read_bool() { - return read_uint8_t() != 0; -} - -bool rbd_replay::Deser::eof() { - return m_in.eof(); -} diff --git a/src/rbd_replay/Deser.hpp b/src/rbd_replay/Deser.hpp deleted file mode 100644 index b466acec662..00000000000 --- a/src/rbd_replay/Deser.hpp +++ /dev/null @@ -1,52 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 Adam Crume - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP -#define _INCLUDED_RBD_REPLAY_DESER_HPP - -#include -#include - -namespace rbd_replay { - -/** - Helper for deserializing data in an architecture-indepdendent way. - Everything is read big-endian. - @see Ser -*/ -class Deser { -public: - Deser(std::istream &in); - - uint8_t read_uint8_t(); - - uint16_t read_uint16_t(); - - uint32_t read_uint32_t(); - - uint64_t read_uint64_t(); - - std::string read_string(); - - bool read_bool(); - - bool eof(); - -private: - std::istream &m_in; -}; - -} - -#endif diff --git a/src/rbd_replay/Makefile.am b/src/rbd_replay/Makefile.am index fa101b7423f..23a8e9152e0 100644 --- a/src/rbd_replay/Makefile.am +++ b/src/rbd_replay/Makefile.am @@ -2,35 +2,46 @@ if ENABLE_CLIENT if WITH_RADOS if WITH_RBD +librbd_replay_types_la_SOURCES = \ + rbd_replay/ActionTypes.cc +noinst_HEADERS += \ + rbd_replay/ActionTypes.h +noinst_LTLIBRARIES += librbd_replay_types.la +DENCODER_DEPS += librbd_replay_types.la + # librbd_replay_la exists only to help with unit tests -librbd_replay_la_SOURCES = rbd_replay/actions.cc \ - rbd_replay/Deser.cc \ +librbd_replay_la_SOURCES = \ + rbd_replay/actions.cc \ + rbd_replay/BufferReader.cc \ rbd_replay/ImageNameMap.cc \ rbd_replay/PendingIO.cc \ rbd_replay/rbd_loc.cc \ - rbd_replay/Replayer.cc \ - rbd_replay/Ser.cc -librbd_replay_la_LIBADD = $(LIBRBD) \ + rbd_replay/Replayer.cc +librbd_replay_la_LIBADD = \ + $(LIBRBD) \ $(LIBRADOS) \ $(CEPH_GLOBAL) noinst_LTLIBRARIES += librbd_replay.la -noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \ +noinst_HEADERS += \ rbd_replay/actions.hpp \ - rbd_replay/Deser.hpp \ + rbd_replay/BoundedBuffer.hpp \ + rbd_replay/BufferReader.h \ rbd_replay/ImageNameMap.hpp \ rbd_replay/ios.hpp \ rbd_replay/PendingIO.hpp \ rbd_replay/rbd_loc.hpp \ rbd_replay/rbd_replay_debug.hpp \ - rbd_replay/Replayer.hpp \ - rbd_replay/Ser.hpp + rbd_replay/Replayer.hpp - -rbd_replay_SOURCES = rbd_replay/rbd-replay.cc -rbd_replay_LDADD = $(LIBRBD) \ +rbd_replay_SOURCES = \ + rbd_replay/rbd-replay.cc +rbd_replay_LDADD = \ + librbd_replay.la \ + librbd_replay_types.la \ + $(LIBRBD) \ $(LIBRADOS) \ $(CEPH_GLOBAL) \ - librbd_replay.la + $(LIBCOMMON) if LINUX bin_PROGRAMS += rbd-replay @@ -43,12 +54,16 @@ librbd_replay_ios_la_LIBADD = $(LIBRBD) \ librbd_replay.la noinst_LTLIBRARIES += librbd_replay_ios.la -rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc -rbd_replay_prep_LDADD = $(LIBRBD) \ - $(LIBRADOS) \ - $(CEPH_GLOBAL) \ +rbd_replay_prep_SOURCES = \ + rbd_replay/rbd-replay-prep.cc +rbd_replay_prep_LDADD = \ librbd_replay.la \ librbd_replay_ios.la \ + librbd_replay_types.la \ + $(LIBRBD) \ + $(LIBRADOS) \ + $(CEPH_GLOBAL) \ + $(LIBCOMMON) \ -lbabeltrace \ -lbabeltrace-ctf \ -lboost_date_time diff --git a/src/rbd_replay/Replayer.cc b/src/rbd_replay/Replayer.cc index 53d60c5da1b..b37f226dbe4 100644 --- a/src/rbd_replay/Replayer.cc +++ b/src/rbd_replay/Replayer.cc @@ -14,8 +14,11 @@ #include "Replayer.hpp" #include "common/errno.h" +#include "rbd_replay/ActionTypes.h" +#include "rbd_replay/BufferReader.h" #include #include +#include #include #include "global/global_context.h" #include "rbd_replay_debug.hpp" @@ -24,6 +27,29 @@ using namespace std; using namespace rbd_replay; +namespace { + +bool is_versioned_replay(BufferReader &buffer_reader) { + bufferlist::iterator *it; + int r = buffer_reader.fetch(&it); + if (r < 0) { + return false; + } + + if (it->get_remaining() < action::BANNER.size()) { + return false; + } + + std::string banner; + it->copy(action::BANNER.size(), banner); + bool versioned = (banner == action::BANNER); + if (!versioned) { + it->seek(0); + } + return versioned; +} + +} // anonymous namespace Worker::Worker(Replayer &replayer) : m_replayer(replayer), @@ -176,18 +202,45 @@ void Replayer::run(const std::string& replay_file) { m_rbd = new librbd::RBD(); map workers; - ifstream input(replay_file.c_str(), ios::in | ios::binary); - if (!input.is_open()) { - cerr << "Failed to open " << replay_file << std::endl; - exit(1); + int fd = open(replay_file.c_str(), O_RDONLY); + if (fd < 0) { + std::cerr << "Failed to open " << replay_file << ": " + << cpp_strerror(errno) << std::endl; + exit(1); } + BOOST_SCOPE_EXIT( (fd) ) { + close(fd); + } BOOST_SCOPE_EXIT_END; - Deser deser(input); + BufferReader buffer_reader(fd); + bool versioned = is_versioned_replay(buffer_reader); while (true) { - Action::ptr action = Action::read_from(deser); + action::ActionEntry action_entry; + try { + bufferlist::iterator *it; + int r = buffer_reader.fetch(&it); + if (r < 0) { + std::cerr << "Failed to read from trace file: " << cpp_strerror(r) + << std::endl; + exit(-r); + } + + if (versioned) { + action_entry.decode(*it); + } else { + action_entry.decode_unversioned(*it); + } + } catch (const buffer::error &err) { + std::cerr << "Failed to decode trace action" << std::endl; + exit(1); + } + + Action::ptr action = Action::construct(action_entry); if (!action) { - break; + // unknown / unsupported action + continue; } + if (action->is_start_thread()) { Worker *worker = new Worker(*this); workers[action->thread_id()] = worker; @@ -261,9 +314,9 @@ bool Replayer::is_action_complete(action_id_t id) { return tracker.actions.count(id) > 0; } -void Replayer::wait_for_actions(const vector &deps) { +void Replayer::wait_for_actions(const action::Dependencies &deps) { boost::posix_time::ptime release_time(boost::posix_time::neg_infin); - BOOST_FOREACH(const dependency_d &dep, deps) { + BOOST_FOREACH(const action::Dependency &dep, deps) { dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl; boost::system_time start_time(boost::get_system_time()); action_tracker_d &tracker = tracker_for(dep.id); diff --git a/src/rbd_replay/Replayer.hpp b/src/rbd_replay/Replayer.hpp index 538e7fddd4d..acad7258fcc 100644 --- a/src/rbd_replay/Replayer.hpp +++ b/src/rbd_replay/Replayer.hpp @@ -17,6 +17,7 @@ #include #include +#include "rbd_replay/ActionTypes.h" #include "BoundedBuffer.hpp" #include "ImageNameMap.hpp" #include "PendingIO.hpp" @@ -100,7 +101,7 @@ public: bool is_action_complete(action_id_t id); - void wait_for_actions(const std::vector &deps); + void wait_for_actions(const action::Dependencies &deps); std::string pool_name() const; diff --git a/src/rbd_replay/Ser.cc b/src/rbd_replay/Ser.cc deleted file mode 100644 index 4b721ab7205..00000000000 --- a/src/rbd_replay/Ser.cc +++ /dev/null @@ -1,57 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 Adam Crume - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "acconfig.h" -#include "Ser.hpp" -#include -#include -#if defined(DARWIN) || defined(__FreeBSD__) -#include -#else -#include -#endif - -rbd_replay::Ser::Ser(std::ostream &out) - : m_out(out) { -} - -void rbd_replay::Ser::write_uint8_t(uint8_t data) { - m_out.write(reinterpret_cast(&data), sizeof(data)); -} - -void rbd_replay::Ser::write_uint16_t(uint16_t data) { - data = htons(data); - m_out.write(reinterpret_cast(&data), sizeof(data)); -} - -void rbd_replay::Ser::write_uint32_t(uint32_t data) { - data = htonl(data); - m_out.write(reinterpret_cast(&data), sizeof(data)); -} - -void rbd_replay::Ser::write_uint64_t(uint64_t data) { -#if __BYTE_ORDER == __LITTLE_ENDIAN - data = (static_cast(htonl(data)) << 32 | htonl(data >> 32)); -#endif - m_out.write(reinterpret_cast(&data), sizeof(data)); -} - -void rbd_replay::Ser::write_string(const std::string& data) { - write_uint32_t(data.length()); - m_out.write(data.data(), data.length()); -} - -void rbd_replay::Ser::write_bool(bool data) { - write_uint8_t(data ? 1 : 0); -} diff --git a/src/rbd_replay/Ser.hpp b/src/rbd_replay/Ser.hpp deleted file mode 100644 index 130465dc439..00000000000 --- a/src/rbd_replay/Ser.hpp +++ /dev/null @@ -1,50 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 Adam Crume - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef _INCLUDED_RBD_REPLAY_SER_HPP -#define _INCLUDED_RBD_REPLAY_SER_HPP - -#include -#include - -namespace rbd_replay { - -/** - Helper for serializing data in an architecture-indepdendent way. - Everything is written big-endian. - @see Deser -*/ -class Ser { -public: - Ser(std::ostream &out); - - void write_uint8_t(uint8_t); - - void write_uint16_t(uint16_t); - - void write_uint32_t(uint32_t); - - void write_uint64_t(uint64_t); - - void write_string(const std::string&); - - void write_bool(bool b); - -private: - std::ostream &m_out; -}; - -} - -#endif diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc index 0c327b653ed..7726d08706d 100644 --- a/src/rbd_replay/actions.cc +++ b/src/rbd_replay/actions.cc @@ -22,213 +22,9 @@ using namespace rbd_replay; using namespace std; +namespace { -Action::Action(action_id_t id, - thread_id_t thread_id, - std::vector &predecessors) - : m_id(id), - m_thread_id(thread_id), - m_predecessors(predecessors) { - } - -Action::~Action() { -} - -Action::ptr Action::read_from(Deser &d) { - uint8_t type = d.read_uint8_t(); - if (d.eof()) { - return Action::ptr(); - } - uint32_t ionum = d.read_uint32_t(); - uint64_t thread_id = d.read_uint64_t(); - d.read_uint32_t(); // unused - d.read_uint32_t(); // unused - uint32_t num_dependencies = d.read_uint32_t(); - vector deps; - for (unsigned int i = 0; i < num_dependencies; i++) { - uint32_t dep_id = d.read_uint32_t(); - uint64_t time_delta = d.read_uint64_t(); - deps.push_back(dependency_d(dep_id, time_delta)); - } - DummyAction dummy(ionum, thread_id, deps); - switch (type) { - case IO_START_THREAD: - return StartThreadAction::read_from(dummy, d); - case IO_STOP_THREAD: - return StopThreadAction::read_from(dummy, d); - case IO_READ: - return ReadAction::read_from(dummy, d); - case IO_WRITE: - return WriteAction::read_from(dummy, d); - case IO_ASYNC_READ: - return AioReadAction::read_from(dummy, d); - case IO_ASYNC_WRITE: - return AioWriteAction::read_from(dummy, d); - case IO_OPEN_IMAGE: - return OpenImageAction::read_from(dummy, d); - case IO_CLOSE_IMAGE: - return CloseImageAction::read_from(dummy, d); - default: - cerr << "Invalid action type: " << type << std::endl; - exit(1); - } -} - -std::ostream& Action::dump_action_fields(std::ostream& o) const { - o << "id=" << m_id << ", thread_id=" << m_thread_id << ", predecessors=["; - bool first = true; - BOOST_FOREACH(const dependency_d &d, m_predecessors) { - if (!first) { - o << ","; - } - o << d.id; - first = false; - } - return o << "]"; -} - -std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) { - return a.dump(o); -} - - -std::ostream& DummyAction::dump(std::ostream& o) const { - o << "DummyAction["; - dump_action_fields(o); - return o << "]"; -} - - -StartThreadAction::StartThreadAction(Action &src) - : Action(src) { -} - -void StartThreadAction::perform(ActionCtx &ctx) { - cerr << "StartThreadAction should never actually be performed" << std::endl; - exit(1); -} - -bool StartThreadAction::is_start_thread() { - return true; -} - -Action::ptr StartThreadAction::read_from(Action &src, Deser &d) { - return Action::ptr(new StartThreadAction(src)); -} - -std::ostream& StartThreadAction::dump(std::ostream& o) const { - o << "StartThreadAction["; - dump_action_fields(o); - return o << "]"; -} - - -StopThreadAction::StopThreadAction(Action &src) - : Action(src) { -} - -void StopThreadAction::perform(ActionCtx &ctx) { - dout(ACTION_LEVEL) << "Performing " << *this << dendl; - ctx.stop(); -} - -Action::ptr StopThreadAction::read_from(Action &src, Deser &d) { - return Action::ptr(new StopThreadAction(src)); -} - -std::ostream& StopThreadAction::dump(std::ostream& o) const { - o << "StopThreadAction["; - dump_action_fields(o); - return o << "]"; -} - - -AioReadAction::AioReadAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length) - : Action(src), - m_imagectx_id(imagectx_id), - m_offset(offset), - m_length(length) { - } - -Action::ptr AioReadAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - uint64_t offset = d.read_uint64_t(); - uint64_t length = d.read_uint64_t(); - return Action::ptr(new AioReadAction(src, imagectx_id, offset, length)); -} - -void AioReadAction::perform(ActionCtx &worker) { - dout(ACTION_LEVEL) << "Performing " << *this << dendl; - librbd::Image *image = worker.get_image(m_imagectx_id); - assert(image); - PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); - worker.add_pending(io); - int r = image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion()); - assertf(r >= 0, "id = %d, r = %d", id(), r); -} - -std::ostream& AioReadAction::dump(std::ostream& o) const { - o << "AioReadAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]"; -} - - -ReadAction::ReadAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length) - : Action(src), - m_imagectx_id(imagectx_id), - m_offset(offset), - m_length(length) { - } - -Action::ptr ReadAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - uint64_t offset = d.read_uint64_t(); - uint64_t length = d.read_uint64_t(); - return Action::ptr(new ReadAction(src, imagectx_id, offset, length)); -} - -void ReadAction::perform(ActionCtx &worker) { - dout(ACTION_LEVEL) << "Performing " << *this << dendl; - librbd::Image *image = worker.get_image(m_imagectx_id); - PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); - worker.add_pending(io); - ssize_t r = image->read(m_offset, m_length, io->bufferlist()); - assertf(r >= 0, "id = %d, r = %d", id(), r); - worker.remove_pending(io); -} - -std::ostream& ReadAction::dump(std::ostream& o) const { - o << "ReadAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]"; -} - - -AioWriteAction::AioWriteAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length) - : Action(src), - m_imagectx_id(imagectx_id), - m_offset(offset), - m_length(length) { - } - -Action::ptr AioWriteAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - uint64_t offset = d.read_uint64_t(); - uint64_t length = d.read_uint64_t(); - return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length)); -} - -static std::string create_fake_data() { +std::string create_fake_data() { char data[1 << 20]; // 1 MB for (unsigned int i = 0; i < sizeof(data); i++) { data[i] = (char) i; @@ -236,12 +32,91 @@ static std::string create_fake_data() { return std::string(data, sizeof(data)); } +struct ConstructVisitor : public boost::static_visitor { + inline Action::ptr operator()(const action::StartThreadAction &action) const { + return Action::ptr(new StartThreadAction(action)); + } + + inline Action::ptr operator()(const action::StopThreadAction &action) const{ + return Action::ptr(new StopThreadAction(action)); + } + + inline Action::ptr operator()(const action::ReadAction &action) const { + return Action::ptr(new ReadAction(action)); + } + + inline Action::ptr operator()(const action::AioReadAction &action) const { + return Action::ptr(new AioReadAction(action)); + } + + inline Action::ptr operator()(const action::WriteAction &action) const { + return Action::ptr(new WriteAction(action)); + } + + inline Action::ptr operator()(const action::AioWriteAction &action) const { + return Action::ptr(new AioWriteAction(action)); + } + + inline Action::ptr operator()(const action::OpenImageAction &action) const { + return Action::ptr(new OpenImageAction(action)); + } + + inline Action::ptr operator()(const action::CloseImageAction &action) const { + return Action::ptr(new CloseImageAction(action)); + } + + inline Action::ptr operator()(const action::UnknownAction &action) const { + return Action::ptr(); + } +}; + +} // anonymous namespace + +std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) { + return a.dump(o); +} + +Action::ptr Action::construct(const action::ActionEntry &action_entry) { + return boost::apply_visitor(ConstructVisitor(), action_entry.action); +} + +void StartThreadAction::perform(ActionCtx &ctx) { + cerr << "StartThreadAction should never actually be performed" << std::endl; + exit(1); +} + +void StopThreadAction::perform(ActionCtx &ctx) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + ctx.stop(); +} + +void AioReadAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + assert(image); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion()); + assertf(r >= 0, "id = %d, r = %d", id(), r); +} + +void ReadAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist()); + assertf(r >= 0, "id = %d, r = %d", id(), r); + worker.remove_pending(io); +} + + void AioWriteAction::perform(ActionCtx &worker) { static const std::string fake_data(create_fake_data()); dout(ACTION_LEVEL) << "Performing " << *this << dendl; - librbd::Image *image = worker.get_image(m_imagectx_id); + librbd::Image *image = worker.get_image(m_action.imagectx_id); PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); - uint64_t remaining = m_length; + uint64_t remaining = m_action.length; while (remaining > 0) { uint64_t n = std::min(remaining, (uint64_t)fake_data.length()); io->bufferlist().append(fake_data.data(), n); @@ -251,126 +126,52 @@ void AioWriteAction::perform(ActionCtx &worker) { if (worker.readonly()) { worker.remove_pending(io); } else { - int r = image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion()); + int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion()); assertf(r >= 0, "id = %d, r = %d", id(), r); } } -std::ostream& AioWriteAction::dump(std::ostream& o) const { - o << "AioWriteAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]"; -} - - -WriteAction::WriteAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length) - : Action(src), - m_imagectx_id(imagectx_id), - m_offset(offset), - m_length(length) { - } - -Action::ptr WriteAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - uint64_t offset = d.read_uint64_t(); - uint64_t length = d.read_uint64_t(); - return Action::ptr(new WriteAction(src, imagectx_id, offset, length)); -} - void WriteAction::perform(ActionCtx &worker) { dout(ACTION_LEVEL) << "Performing " << *this << dendl; - librbd::Image *image = worker.get_image(m_imagectx_id); + librbd::Image *image = worker.get_image(m_action.imagectx_id); PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); worker.add_pending(io); - io->bufferlist().append_zero(m_length); + io->bufferlist().append_zero(m_action.length); if (!worker.readonly()) { - ssize_t r = image->write(m_offset, m_length, io->bufferlist()); + ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist()); assertf(r >= 0, "id = %d, r = %d", id(), r); } worker.remove_pending(io); } -std::ostream& WriteAction::dump(std::ostream& o) const { - o << "WriteAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]"; -} - - -OpenImageAction::OpenImageAction(Action &src, - imagectx_id_t imagectx_id, - string name, - string snap_name, - bool readonly) - : Action(src), - m_imagectx_id(imagectx_id), - m_name(name), - m_snap_name(snap_name), - m_readonly(readonly) { - } - -Action::ptr OpenImageAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - string name = d.read_string(); - string snap_name = d.read_string(); - bool readonly = d.read_bool(); - return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly)); -} - void OpenImageAction::perform(ActionCtx &worker) { dout(ACTION_LEVEL) << "Performing " << *this << dendl; PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); worker.add_pending(io); librbd::Image *image = new librbd::Image(); librbd::RBD *rbd = worker.rbd(); - rbd_loc name(worker.map_image_name(m_name, m_snap_name)); + rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name)); int r; - if (m_readonly || worker.readonly()) { + if (m_action.read_only || worker.readonly()) { r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); } else { r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); } if (r) { - cerr << "Unable to open image '" << m_name - << "' with snap '" << m_snap_name + cerr << "Unable to open image '" << m_action.name + << "' with snap '" << m_action.snap_name << "' (mapped to '" << name.str() - << "') and readonly " << m_readonly + << "') and readonly " << m_action.read_only << ": (" << -r << ") " << strerror(-r) << std::endl; exit(1); } - worker.put_image(m_imagectx_id, image); + worker.put_image(m_action.imagectx_id, image); worker.remove_pending(io); } -std::ostream& OpenImageAction::dump(std::ostream& o) const { - o << "OpenImageAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly << "]"; -} - - -CloseImageAction::CloseImageAction(Action &src, - imagectx_id_t imagectx_id) - : Action(src), - m_imagectx_id(imagectx_id) { - } - -Action::ptr CloseImageAction::read_from(Action &src, Deser &d) { - imagectx_id_t imagectx_id = d.read_uint64_t(); - return Action::ptr(new CloseImageAction(src, imagectx_id)); -} - void CloseImageAction::perform(ActionCtx &worker) { dout(ACTION_LEVEL) << "Performing " << *this << dendl; - worker.erase_image(m_imagectx_id); + worker.erase_image(m_action.imagectx_id); worker.set_action_complete(pending_io_id()); } -std::ostream& CloseImageAction::dump(std::ostream& o) const { - o << "CloseImageAction["; - dump_action_fields(o); - return o << ", imagectx_id=" << m_imagectx_id << "]"; -} diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp index e9522dbf99f..ea46a883362 100644 --- a/src/rbd_replay/actions.hpp +++ b/src/rbd_replay/actions.hpp @@ -17,8 +17,10 @@ #include #include "include/rbd/librbd.hpp" -#include "Deser.hpp" +#include "common/Formatter.h" +#include "rbd_replay/ActionTypes.h" #include "rbd_loc.hpp" +#include // Stupid Doxygen requires this or else the typedef docs don't appear anywhere. /// @file rbd_replay/actions.hpp @@ -31,44 +33,8 @@ typedef uint64_t thread_id_t; /// Even IDs are normal actions, odd IDs are completions. typedef uint32_t action_id_t; -/** - Dependencies link actions to earlier actions or completions. - If an action has a dependency \c d then it waits until \c d.time_delta nanoseconds after the action or completion with ID \c d.id has fired. -*/ -struct dependency_d { - /// ID of the action or completion to wait for. - action_id_t id; - - /// Nanoseconds of delay to wait until after the action or completion fires. - uint64_t time_delta; - - /** - @param id ID of the action or completion to wait for. - @param time_delta Nanoseconds of delay to wait after the action or completion fires. - */ - dependency_d(action_id_t id, - uint64_t time_delta) - : id(id), - time_delta(time_delta) { - } -}; - -// These are written to files, so don't change existing assignments. -enum io_type { - IO_START_THREAD, - IO_STOP_THREAD, - IO_READ, - IO_WRITE, - IO_ASYNC_READ, - IO_ASYNC_WRITE, - IO_OPEN_IMAGE, - IO_CLOSE_IMAGE, -}; - - class PendingIO; - /** %Context through which an Action interacts with its environment. */ @@ -131,17 +97,14 @@ class Action { public: typedef boost::shared_ptr ptr; - Action(action_id_t id, - thread_id_t thread_id, - std::vector &predecessors); - - virtual ~Action(); + virtual ~Action() { + } virtual void perform(ActionCtx &ctx) = 0; /// Returns the ID of the completion corresponding to this action. action_id_t pending_io_id() { - return m_id + 1; + return id() + 1; } // There's probably a better way to do this, but oh well. @@ -149,202 +112,172 @@ public: return false; } - action_id_t id() const { - return m_id; - } - - thread_id_t thread_id() const { - return m_thread_id; - } - - const std::vector& predecessors() const { - return m_predecessors; - } - - /// Reads and constructs an action from the replay file. - static ptr read_from(Deser &d); - -protected: - std::ostream& dump_action_fields(std::ostream& o) const; - -private: - friend std::ostream& operator<<(std::ostream&, const Action&); + virtual action_id_t id() const = 0; + virtual thread_id_t thread_id() const = 0; + virtual const action::Dependencies& predecessors() const = 0; virtual std::ostream& dump(std::ostream& o) const = 0; - const action_id_t m_id; - const thread_id_t m_thread_id; - const std::vector m_predecessors; + static ptr construct(const action::ActionEntry &action_entry); +}; + +template +class TypedAction : public Action { +public: + TypedAction(const ActionType &action) : m_action(action) { + } + + virtual action_id_t id() const { + return m_action.id; + } + + virtual thread_id_t thread_id() const { + return m_action.thread_id; + } + + virtual const action::Dependencies& predecessors() const { + return m_action.dependencies; + } + + virtual std::ostream& dump(std::ostream& o) const { + o << get_action_name() << ": "; + ceph::JSONFormatter formatter(false); + formatter.open_object_section(""); + m_action.dump(&formatter); + formatter.close_section(); + formatter.flush(o); + return o; + } + +protected: + const ActionType m_action; + + virtual const char *get_action_name() const = 0; }; /// Writes human-readable debug information about the action to the stream. /// @related Action std::ostream& operator<<(std::ostream& o, const Action& a); - -/** - Placeholder for partially-constructed actions. - Does nothing, and does not appear in the replay file. - */ -class DummyAction : public Action { +class StartThreadAction : public TypedAction { public: - DummyAction(action_id_t id, - thread_id_t thread_id, - std::vector &predecessors) - : Action(id, thread_id, predecessors) { + explicit StartThreadAction(const action::StartThreadAction &action) + : TypedAction(action) { } - void perform(ActionCtx &ctx) { + virtual bool is_start_thread() { + return true; + } + virtual void perform(ActionCtx &ctx); + +protected: + virtual const char *get_action_name() const { + return "StartThreadAction"; + } +}; + +class StopThreadAction : public TypedAction { +public: + explicit StopThreadAction(const action::StopThreadAction &action) + : TypedAction(action) { } -private: - std::ostream& dump(std::ostream& o) const; + virtual void perform(ActionCtx &ctx); + +protected: + virtual const char *get_action_name() const { + return "StartThreadAction"; + } }; -class StopThreadAction : public Action { +class AioReadAction : public TypedAction { public: - explicit StopThreadAction(Action &src); + AioReadAction(const action::AioReadAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; +protected: + virtual const char *get_action_name() const { + return "AioReadAction"; + } }; -class AioReadAction : public Action { +class ReadAction : public TypedAction { public: - AioReadAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length); + ReadAction(const action::ReadAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; - uint64_t m_offset; - uint64_t m_length; +protected: + virtual const char *get_action_name() const { + return "ReadAction"; + } }; -class ReadAction : public Action { +class AioWriteAction : public TypedAction { public: - ReadAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length); + AioWriteAction(const action::AioWriteAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; - uint64_t m_offset; - uint64_t m_length; +protected: + virtual const char *get_action_name() const { + return "AioWriteAction"; + } }; -class AioWriteAction : public Action { +class WriteAction : public TypedAction { public: - AioWriteAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length); + WriteAction(const action::WriteAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; - uint64_t m_offset; - uint64_t m_length; +protected: + virtual const char *get_action_name() const { + return "WriteAction"; + } }; -class WriteAction : public Action { +class OpenImageAction : public TypedAction { public: - WriteAction(const Action &src, - imagectx_id_t imagectx_id, - uint64_t offset, - uint64_t length); + OpenImageAction(const action::OpenImageAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; - uint64_t m_offset; - uint64_t m_length; +protected: + virtual const char *get_action_name() const { + return "OpenImageAction"; + } }; -class OpenImageAction : public Action { +class CloseImageAction : public TypedAction { public: - OpenImageAction(Action &src, - imagectx_id_t imagectx_id, - std::string name, - std::string snap_name, - bool readonly); + CloseImageAction(const action::CloseImageAction &action) + : TypedAction(action) { + } - void perform(ActionCtx &ctx); + virtual void perform(ActionCtx &ctx); - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; - std::string m_name; - std::string m_snap_name; - bool m_readonly; -}; - - -class CloseImageAction : public Action { -public: - CloseImageAction(Action &src, - imagectx_id_t imagectx_id); - - void perform(ActionCtx &ctx); - - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; - - imagectx_id_t m_imagectx_id; -}; - - -class StartThreadAction : public Action { -public: - explicit StartThreadAction(Action &src); - - void perform(ActionCtx &ctx); - - bool is_start_thread(); - - static Action::ptr read_from(Action &src, Deser &d); - -private: - std::ostream& dump(std::ostream& o) const; +protected: + virtual const char *get_action_name() const { + return "CloseImageAction"; + } }; } diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc index 21a68019ece..7437bed8897 100644 --- a/src/rbd_replay/ios.cc +++ b/src/rbd_replay/ios.cc @@ -16,14 +16,37 @@ // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. #include "ios.hpp" +#include "rbd_replay/ActionTypes.h" using namespace std; using namespace rbd_replay; -bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) { - return p1->start_time() < p2->start_time(); +namespace { + +bool compare_dependencies_by_start_time(const action::Dependency &lhs, + const action::Dependency &rhs) { + return lhs.time_delta < rhs.time_delta; } +action::Dependencies convert_dependencies(uint64_t start_time, + const io_set_t &deps) { + action::Dependencies action_deps; + action_deps.reserve(deps.size()); + for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) { + boost::shared_ptr io = *it; + uint64_t time_delta = 0; + if (start_time >= io->start_time()) { + time_delta = start_time - io->start_time(); + } + action_deps.push_back(action::Dependency(io->ionum(), time_delta)); + } + std::sort(action_deps.begin(), action_deps.end(), + compare_dependencies_by_start_time); + return action_deps; +} + +} // anonymous namespace + void IO::write_debug_base(ostream& out, string type) const { out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; bool first = true; @@ -39,51 +62,36 @@ void IO::write_debug_base(ostream& out, string type) const { } -void IO::write_to(Ser& out, io_type iotype) const { - // TODO break compatibility now to add version (and yank unused fields)? - out.write_uint8_t(iotype); - out.write_uint32_t(m_ionum); - out.write_uint64_t(m_thread_id); - out.write_uint32_t(0); - out.write_uint32_t(0); - out.write_uint32_t(m_dependencies.size()); - vector deps; - for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { - deps.push_back(*itr); - } - sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time); - for (vector::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) { - out.write_uint32_t((*itr)->m_ionum); - out.write_uint64_t(m_start_time - (*itr)->m_start_time); - } -} - ostream& operator<<(ostream& out, IO::ptr io) { io->write_debug(out); return out; } -void StartThreadIO::write_to(Ser& out) const { - IO::write_to(out, IO_START_THREAD); +void StartThreadIO::encode(bufferlist &bl) const { + action::Action action((action::StartThreadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies())))); + ::encode(action, bl); } void StartThreadIO::write_debug(std::ostream& out) const { write_debug_base(out, "start thread"); } -void StopThreadIO::write_to(Ser& out) const { - IO::write_to(out, IO_STOP_THREAD); +void StopThreadIO::encode(bufferlist &bl) const { + action::Action action((action::StopThreadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies())))); + ::encode(action, bl); } void StopThreadIO::write_debug(std::ostream& out) const { write_debug_base(out, "stop thread"); } -void ReadIO::write_to(Ser& out) const { - IO::write_to(out, IO_READ); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); +void ReadIO::encode(bufferlist &bl) const { + action::Action action((action::ReadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + ::encode(action, bl); } void ReadIO::write_debug(std::ostream& out) const { @@ -91,11 +99,11 @@ void ReadIO::write_debug(std::ostream& out) const { out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; } -void WriteIO::write_to(Ser& out) const { - IO::write_to(out, IO_WRITE); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); +void WriteIO::encode(bufferlist &bl) const { + action::Action action((action::WriteAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + ::encode(action, bl); } void WriteIO::write_debug(std::ostream& out) const { @@ -103,11 +111,11 @@ void WriteIO::write_debug(std::ostream& out) const { out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; } -void AioReadIO::write_to(Ser& out) const { - IO::write_to(out, IO_ASYNC_READ); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); +void AioReadIO::encode(bufferlist &bl) const { + action::Action action((action::AioReadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + ::encode(action, bl); } void AioReadIO::write_debug(std::ostream& out) const { @@ -115,11 +123,11 @@ void AioReadIO::write_debug(std::ostream& out) const { out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; } -void AioWriteIO::write_to(Ser& out) const { - IO::write_to(out, IO_ASYNC_WRITE); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); +void AioWriteIO::encode(bufferlist &bl) const { + action::Action action((action::AioWriteAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + ::encode(action, bl); } void AioWriteIO::write_debug(std::ostream& out) const { @@ -127,12 +135,11 @@ void AioWriteIO::write_debug(std::ostream& out) const { out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; } -void OpenImageIO::write_to(Ser& out) const { - IO::write_to(out, IO_OPEN_IMAGE); - out.write_uint64_t(m_imagectx); - out.write_string(m_name); - out.write_string(m_snap_name); - out.write_bool(m_readonly); +void OpenImageIO::encode(bufferlist &bl) const { + action::Action action((action::OpenImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_name, m_snap_name, m_readonly))); + ::encode(action, bl); } void OpenImageIO::write_debug(std::ostream& out) const { @@ -140,9 +147,11 @@ void OpenImageIO::write_debug(std::ostream& out) const { out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; } -void CloseImageIO::write_to(Ser& out) const { - IO::write_to(out, IO_CLOSE_IMAGE); - out.write_uint64_t(m_imagectx); +void CloseImageIO::encode(bufferlist &bl) const { + action::Action action((action::CloseImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx))); + ::encode(action, bl); } void CloseImageIO::write_debug(std::ostream& out) const { diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp index 73e6f767459..17559331c7a 100644 --- a/src/rbd_replay/ios.hpp +++ b/src/rbd_replay/ios.hpp @@ -18,6 +18,7 @@ // This code assumes that IO IDs and timestamps are related monotonically. // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. +#include "include/buffer.h" #include #include #include @@ -25,7 +26,6 @@ #include #include #include "actions.hpp" -#include "Ser.hpp" namespace rbd_replay { @@ -77,7 +77,7 @@ public: return m_dependencies; } - virtual void write_to(Ser& out) const = 0; + virtual void encode(bufferlist &bl) const = 0; void set_ionum(action_id_t ionum) { m_ionum = ionum; @@ -87,11 +87,13 @@ public: return m_ionum; } + thread_id_t thread_id() const { + return m_thread_id; + } + virtual void write_debug(std::ostream& out) const = 0; protected: - void write_to(Ser& out, io_type iotype) const; - void write_debug_base(std::ostream& out, std::string iotype) const; private: @@ -115,7 +117,7 @@ public: : IO(ionum, start_time, thread_id, io_set_t()) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; }; @@ -129,7 +131,7 @@ public: : IO(ionum, start_time, thread_id, deps) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; }; @@ -149,7 +151,7 @@ public: m_length(length) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; @@ -174,7 +176,7 @@ public: m_length(length) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; @@ -199,7 +201,7 @@ public: m_length(length) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; @@ -224,7 +226,7 @@ public: m_length(length) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; void write_debug(std::ostream& out) const; @@ -251,7 +253,7 @@ public: m_readonly(readonly) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; imagectx_id_t imagectx() const { return m_imagectx; @@ -277,7 +279,7 @@ public: m_imagectx(imagectx) { } - void write_to(Ser& out) const; + virtual void encode(bufferlist &bl) const; imagectx_id_t imagectx() const { return m_imagectx; @@ -289,9 +291,6 @@ private: imagectx_id_t m_imagectx; }; -/// @related IO -bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2); - } #endif diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc index 94042a5191e..61cff592cf2 100644 --- a/src/rbd_replay/rbd-replay-prep.cc +++ b/src/rbd_replay/rbd-replay-prep.cc @@ -15,15 +15,20 @@ // This code assumes that IO IDs and timestamps are related monotonically. // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. +#include "common/errno.h" +#include "rbd_replay/ActionTypes.h" #include #include #include +#include +#include #include #include #include #include #include #include +#include #include "ios.hpp" using namespace std; @@ -193,12 +198,14 @@ public: struct bt_iter *bt_itr = bt_ctf_get_iter(itr); - ofstream myfile; - myfile.open(output_file_name.c_str(), ios::out | ios::binary | ios::trunc); - ASSERT_EXIT(!myfile.fail(), "Error opening output file " << - output_file_name); + int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644); + ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name << + ": " << cpp_strerror(errno)); + BOOST_SCOPE_EXIT( (fd) ) { + close(fd); + } BOOST_SCOPE_EXIT_END; - Ser ser(myfile); + write_banner(fd); uint64_t trace_start = 0; bool first = true; @@ -219,7 +226,7 @@ public: IO::ptrs ptrs; process_event(ts, evt, &ptrs); - serialize_events(ser, ptrs); + serialize_events(fd, ptrs); int r = bt_iter_next(bt_itr); ASSERT_EXIT(r == 0, "Error advancing event iterator"); @@ -227,15 +234,26 @@ public: bt_ctf_iter_destroy(itr); - insert_thread_stops(ser); - myfile.close(); + insert_thread_stops(fd); } private: - void serialize_events(Ser &ser, const IO::ptrs &ptrs) { + void write_banner(int fd) { + bufferlist bl; + bl.append(rbd_replay::action::BANNER); + int r = bl.write_fd(fd); + ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r)); + } + + void serialize_events(int fd, const IO::ptrs &ptrs) { for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) { IO::ptr io(*it); - io->write_to(ser); + + bufferlist bl; + io->encode(bl); + + int r = bl.write_fd(fd); + ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r)); if (m_verbose) { io->write_debug(std::cout); @@ -244,7 +262,7 @@ private: } } - void insert_thread_stops(Ser &ser) { + void insert_thread_stops(int fd) { IO::ptrs ios; for (map::const_iterator itr = m_threads.begin(), end = m_threads.end(); itr != end; ++itr) { @@ -253,7 +271,7 @@ private: thread->id(), m_recent_completions))); } - serialize_events(ser, ios); + serialize_events(fd, ios); } void process_event(uint64_t ts, struct bt_ctf_event *evt,