rbd-replay: added version control to trace output file

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2015-10-08 13:21:29 -04:00
parent 646e50a771
commit 3ecdae8388
16 changed files with 1117 additions and 819 deletions

View File

@ -0,0 +1,354 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "rbd_replay/ActionTypes.h"
#include "include/assert.h"
#include "include/byteorder.h"
#include "include/stringify.h"
#include "common/Formatter.h"
#include <iostream>
#include <boost/variant.hpp>
namespace rbd_replay {
namespace action {
namespace {
bool byte_swap_required(__u8 version) {
#if defined(CEPH_LITTLE_ENDIAN)
return (version == 0);
#else
return false;
#endif
}
void decode_big_endian_string(std::string &str, bufferlist::iterator &it) {
#if defined(CEPH_LITTLE_ENDIAN)
uint32_t length;
::decode(length, it);
length = swab32(length);
str.clear();
it.copy(length, str);
#else
assert(false);
#endif
}
class EncodeVisitor : public boost::static_visitor<void> {
public:
EncodeVisitor(bufferlist &bl) : m_bl(bl) {
}
template <typename Action>
inline void operator()(const Action &action) const {
::encode(static_cast<uint8_t>(Action::ACTION_TYPE), m_bl);
action.encode(m_bl);
}
private:
bufferlist &m_bl;
};
class DecodeVisitor : public boost::static_visitor<void> {
public:
DecodeVisitor(__u8 version, bufferlist::iterator &iter)
: m_version(version), m_iter(iter) {
}
template <typename Action>
inline void operator()(Action &action) const {
action.decode(m_version, m_iter);
}
private:
__u8 m_version;
bufferlist::iterator &m_iter;
};
class DumpVisitor : public boost::static_visitor<void> {
public:
DumpVisitor(Formatter *formatter) : m_formatter(formatter) {}
template <typename Action>
inline void operator()(const Action &action) const {
ActionType action_type = Action::ACTION_TYPE;
m_formatter->dump_string("action_type", stringify(action_type));
action.dump(m_formatter);
}
private:
ceph::Formatter *m_formatter;
};
} // anonymous namespace
void Dependency::encode(bufferlist &bl) const {
::encode(id, bl);
::encode(time_delta, bl);
}
void Dependency::decode(bufferlist::iterator &it) {
decode(1, it);
}
void Dependency::decode(__u8 version, bufferlist::iterator &it) {
::decode(id, it);
::decode(time_delta, it);
if (byte_swap_required(version)) {
id = swab32(id);
time_delta = swab64(time_delta);
}
}
void Dependency::dump(Formatter *f) const {
f->dump_unsigned("id", id);
f->dump_unsigned("time_delta", time_delta);
}
void Dependency::generate_test_instances(std::list<Dependency *> &o) {
o.push_back(new Dependency());
o.push_back(new Dependency(1, 123456789));
}
void ActionBase::encode(bufferlist &bl) const {
::encode(id, bl);
::encode(thread_id, bl);
::encode(dependencies, bl);
}
void ActionBase::decode(__u8 version, bufferlist::iterator &it) {
::decode(id, it);
::decode(thread_id, it);
if (version == 0) {
uint32_t num_successors;
::decode(num_successors, it);
uint32_t num_completion_successors;
::decode(num_completion_successors, it);
}
if (byte_swap_required(version)) {
id = swab32(id);
thread_id = swab64(thread_id);
uint32_t dep_count;
::decode(dep_count, it);
dep_count = swab32(dep_count);
dependencies.resize(dep_count);
for (uint32_t i = 0; i < dep_count; ++i) {
dependencies[i].decode(0, it);
}
} else {
::decode(dependencies, it);
}
}
void ActionBase::dump(Formatter *f) const {
f->dump_unsigned("id", id);
f->dump_unsigned("thread_id", thread_id);
f->open_array_section("dependencies");
for (size_t i = 0; i < dependencies.size(); ++i) {
f->open_object_section("dependency");
dependencies[i].dump(f);
f->close_section();
}
f->close_section();
}
void ImageActionBase::encode(bufferlist &bl) const {
ActionBase::encode(bl);
::encode(imagectx_id, bl);
}
void ImageActionBase::decode(__u8 version, bufferlist::iterator &it) {
ActionBase::decode(version, it);
::decode(imagectx_id, it);
if (byte_swap_required(version)) {
imagectx_id = swab64(imagectx_id);
}
}
void ImageActionBase::dump(Formatter *f) const {
ActionBase::dump(f);
f->dump_unsigned("imagectx_id", imagectx_id);
}
void IoActionBase::encode(bufferlist &bl) const {
ImageActionBase::encode(bl);
::encode(offset, bl);
::encode(length, bl);
}
void IoActionBase::decode(__u8 version, bufferlist::iterator &it) {
ImageActionBase::decode(version, it);
::decode(offset, it);
::decode(length, it);
if (byte_swap_required(version)) {
offset = swab64(offset);
length = swab64(length);
}
}
void IoActionBase::dump(Formatter *f) const {
ImageActionBase::dump(f);
f->dump_unsigned("offset", offset);
f->dump_unsigned("length", length);
}
void OpenImageAction::encode(bufferlist &bl) const {
ImageActionBase::encode(bl);
::encode(name, bl);
::encode(snap_name, bl);
::encode(read_only, bl);
}
void OpenImageAction::decode(__u8 version, bufferlist::iterator &it) {
ImageActionBase::decode(version, it);
if (byte_swap_required(version)) {
decode_big_endian_string(name, it);
decode_big_endian_string(snap_name, it);
} else {
::decode(name, it);
::decode(snap_name, it);
}
::decode(read_only, it);
}
void OpenImageAction::dump(Formatter *f) const {
ImageActionBase::dump(f);
f->dump_string("name", name);
f->dump_string("snap_name", snap_name);
f->dump_bool("read_only", read_only);
}
void UnknownAction::encode(bufferlist &bl) const {
assert(false);
}
void UnknownAction::decode(__u8 version, bufferlist::iterator &it) {
}
void UnknownAction::dump(Formatter *f) const {
}
void ActionEntry::encode(bufferlist &bl) const {
ENCODE_START(1, 1, bl);
boost::apply_visitor(EncodeVisitor(bl), action);
ENCODE_FINISH(bl);
}
void ActionEntry::decode(bufferlist::iterator &it) {
DECODE_START(1, it);
decode(struct_v, it);
DECODE_FINISH(it);
}
void ActionEntry::decode_unversioned(bufferlist::iterator &it) {
decode(0, it);
}
void ActionEntry::decode(__u8 version, bufferlist::iterator &it) {
uint8_t action_type;
::decode(action_type, it);
// select the correct action variant based upon the action_type
switch (action_type) {
case ACTION_TYPE_START_THREAD:
action = StartThreadAction();
break;
case ACTION_TYPE_STOP_THREAD:
action = StopThreadAction();
break;
case ACTION_TYPE_READ:
action = ReadAction();
break;
case ACTION_TYPE_WRITE:
action = WriteAction();
break;
case ACTION_TYPE_AIO_READ:
action = AioReadAction();
break;
case ACTION_TYPE_AIO_WRITE:
action = AioWriteAction();
break;
case ACTION_TYPE_OPEN_IMAGE:
action = OpenImageAction();
break;
case ACTION_TYPE_CLOSE_IMAGE:
action = CloseImageAction();
break;
}
boost::apply_visitor(DecodeVisitor(version, it), action);
}
void ActionEntry::dump(Formatter *f) const {
boost::apply_visitor(DumpVisitor(f), action);
}
void ActionEntry::generate_test_instances(std::list<ActionEntry *> &o) {
Dependencies dependencies;
dependencies.push_back(Dependency(3, 123456789));
dependencies.push_back(Dependency(4, 234567890));
o.push_back(new ActionEntry(StartThreadAction()));
o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies)));
o.push_back(new ActionEntry(StopThreadAction()));
o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies)));
o.push_back(new ActionEntry(ReadAction()));
o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5)));
o.push_back(new ActionEntry(WriteAction()));
o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4,
5)));
o.push_back(new ActionEntry(AioReadAction()));
o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4,
5)));
o.push_back(new ActionEntry(AioWriteAction()));
o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4,
5)));
o.push_back(new ActionEntry(OpenImageAction()));
o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3,
"image_name", "snap_name",
true)));
o.push_back(new ActionEntry(CloseImageAction()));
o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3)));
}
} // namespace action
} // namespace rbd_replay
std::ostream &operator<<(std::ostream &out,
const rbd_replay::action::ActionType &type) {
using namespace rbd_replay::action;
switch (type) {
case ACTION_TYPE_START_THREAD:
out << "StartThread";
break;
case ACTION_TYPE_STOP_THREAD:
out << "StopThread";
break;
case ACTION_TYPE_READ:
out << "Read";
break;
case ACTION_TYPE_WRITE:
out << "Write";
break;
case ACTION_TYPE_AIO_READ:
out << "AioRead";
break;
case ACTION_TYPE_AIO_WRITE:
out << "AioWrite";
break;
case ACTION_TYPE_OPEN_IMAGE:
out << "OpenImage";
break;
case ACTION_TYPE_CLOSE_IMAGE:
out << "CloseImage";
break;
default:
out << "Unknown (" << static_cast<uint32_t>(type) << ")";
break;
}
return out;
}

View File

@ -0,0 +1,277 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H
#define CEPH_RBD_REPLAY_ACTION_TYPES_H
#include "include/int_types.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include <iosfwd>
#include <list>
#include <string>
#include <vector>
#include <boost/variant/variant.hpp>
namespace ceph { class Formatter; }
namespace rbd_replay {
namespace action {
typedef uint64_t imagectx_id_t;
typedef uint64_t thread_id_t;
/// Even IDs are normal actions, odd IDs are completions.
typedef uint32_t action_id_t;
static const std::string BANNER("rbd-replay-trace");
/**
* Dependencies link actions to earlier actions or completions.
* If an action has a dependency \c d then it waits until \c d.time_delta
* nanoseconds after the action or completion with ID \c d.id has fired.
*/
struct Dependency {
/// ID of the action or completion to wait for.
action_id_t id;
/// Nanoseconds of delay to wait until after the action or completion fires.
uint64_t time_delta;
/**
* @param id ID of the action or completion to wait for.
* @param time_delta Nanoseconds of delay to wait after the action or
* completion fires.
*/
Dependency() : id(0), time_delta(0) {
}
Dependency(action_id_t id, uint64_t time_delta)
: id(id), time_delta(time_delta) {
}
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &it);
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
static void generate_test_instances(std::list<Dependency *> &o);
};
WRITE_CLASS_ENCODER(Dependency);
typedef std::vector<Dependency> Dependencies;
enum ActionType {
ACTION_TYPE_START_THREAD = 0,
ACTION_TYPE_STOP_THREAD = 1,
ACTION_TYPE_READ = 2,
ACTION_TYPE_WRITE = 3,
ACTION_TYPE_AIO_READ = 4,
ACTION_TYPE_AIO_WRITE = 5,
ACTION_TYPE_OPEN_IMAGE = 6,
ACTION_TYPE_CLOSE_IMAGE = 7
};
struct ActionBase {
action_id_t id;
thread_id_t thread_id;
Dependencies dependencies;
ActionBase() : id(0), thread_id(0) {
}
ActionBase(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies)
: id(id), thread_id(thread_id), dependencies(dependencies) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
};
struct StartThreadAction : public ActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD;
StartThreadAction() {
}
StartThreadAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies)
: ActionBase(id, thread_id, dependencies) {
}
};
struct StopThreadAction : public ActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD;
StopThreadAction() {
}
StopThreadAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies)
: ActionBase(id, thread_id, dependencies) {
}
};
struct ImageActionBase : public ActionBase {
imagectx_id_t imagectx_id;
ImageActionBase() : imagectx_id(0) {
}
ImageActionBase(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id)
: ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
};
struct IoActionBase : public ImageActionBase {
uint64_t offset;
uint64_t length;
IoActionBase() : offset(0), length(0) {
}
IoActionBase(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
uint64_t offset, uint64_t length)
: ImageActionBase(id, thread_id, dependencies, imagectx_id),
offset(offset), length(length) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
};
struct ReadAction : public IoActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_READ;
ReadAction() {
}
ReadAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
uint64_t offset, uint64_t length)
: IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
}
};
struct WriteAction : public IoActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE;
WriteAction() {
}
WriteAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
uint64_t offset, uint64_t length)
: IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
}
};
struct AioReadAction : public IoActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ;
AioReadAction() {
}
AioReadAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
uint64_t offset, uint64_t length)
: IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
}
};
struct AioWriteAction : public IoActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE;
AioWriteAction() {
}
AioWriteAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
uint64_t offset, uint64_t length)
: IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
}
};
struct OpenImageAction : public ImageActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE;
std::string name;
std::string snap_name;
bool read_only;
OpenImageAction() : read_only(false) {
}
OpenImageAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id,
const std::string &name, const std::string &snap_name,
bool read_only)
: ImageActionBase(id, thread_id, dependencies, imagectx_id),
name(name), snap_name(snap_name), read_only(read_only) {
}
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
};
struct CloseImageAction : public ImageActionBase {
static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE;
CloseImageAction() {
}
CloseImageAction(action_id_t id, thread_id_t thread_id,
const Dependencies &dependencies, imagectx_id_t imagectx_id)
: ImageActionBase(id, thread_id, dependencies, imagectx_id) {
}
};
struct UnknownAction {
static const ActionType ACTION_TYPE = static_cast<ActionType>(-1);
void encode(bufferlist &bl) const;
void decode(__u8 version, bufferlist::iterator &it);
void dump(Formatter *f) const;
};
typedef boost::variant<StartThreadAction,
StopThreadAction,
ReadAction,
WriteAction,
AioReadAction,
AioWriteAction,
OpenImageAction,
CloseImageAction,
UnknownAction> Action;
class ActionEntry {
public:
Action action;
ActionEntry() : action(UnknownAction()) {
}
ActionEntry(const Action &action) : action(action) {
}
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &it);
void decode_unversioned(bufferlist::iterator &it);
void dump(Formatter *f) const;
static void generate_test_instances(std::list<ActionEntry *> &o);
private:
void decode(__u8 version, bufferlist::iterator &it);
};
WRITE_CLASS_ENCODER(ActionEntry);
} // namespace action
} // namespace rbd_replay
std::ostream &operator<<(std::ostream &out,
const rbd_replay::action::ActionType &type);
using rbd_replay::action::decode;
using rbd_replay::action::encode;
#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H

View File

@ -0,0 +1,34 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "rbd_replay/BufferReader.h"
#include "include/assert.h"
#include "include/intarith.h"
namespace rbd_replay {
BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes)
: m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes),
m_bl_it(m_bl.begin()) {
assert(m_min_bytes <= m_max_bytes);
}
int BufferReader::fetch(bufferlist::iterator **it) {
if (m_bl_it.get_remaining() < m_min_bytes) {
ssize_t bytes_to_read = ROUND_UP_TO(m_max_bytes - m_bl_it.get_remaining(),
CEPH_BUFFER_APPEND_SIZE);
while (bytes_to_read > 0) {
int r = m_bl.read_fd(m_fd, CEPH_BUFFER_APPEND_SIZE);
if (r < 0) {
return r;
}
assert(r <= bytes_to_read);
bytes_to_read -= r;
}
}
*it = &m_bl_it;
return 0;
}
} // namespace rbd_replay

View File

@ -0,0 +1,33 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H
#define CEPH_RBD_REPLAY_BUFFER_READER_H
#include "include/int_types.h"
#include "include/buffer.h"
namespace rbd_replay {
class BufferReader {
public:
static const size_t DEFAULT_MIN_BYTES = 1<<20;
static const size_t DEFAULT_MAX_BYTES = 1<<22;
BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES,
size_t max_bytes = DEFAULT_MAX_BYTES);
int fetch(bufferlist::iterator **it);
private:
int m_fd;
size_t m_min_bytes;
size_t m_max_bytes;
bufferlist m_bl;
bufferlist::iterator m_bl_it;
};
} // namespace rbd_replay
#endif // CEPH_RBD_REPLAY_BUFFER_READER_H

View File

@ -1,70 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "acconfig.h"
#include "Deser.hpp"
#include <arpa/inet.h>
#include <cstdlib>
#if defined(DARWIN) || defined(__FreeBSD__)
#include <machine/endian.h>
#else
#include <endian.h>
#endif
rbd_replay::Deser::Deser(std::istream &in)
: m_in(in) {
}
uint8_t rbd_replay::Deser::read_uint8_t() {
uint8_t data;
m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
return data;
}
uint16_t rbd_replay::Deser::read_uint16_t() {
uint16_t data;
m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
return ntohs(data);
}
uint32_t rbd_replay::Deser::read_uint32_t() {
uint32_t data;
m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
return ntohl(data);
}
uint64_t rbd_replay::Deser::read_uint64_t() {
uint64_t data;
m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
#if __BYTE_ORDER == __LITTLE_ENDIAN
data = (static_cast<uint64_t>(ntohl(data)) << 32 | ntohl(data >> 32));
#endif
return data;
}
std::string rbd_replay::Deser::read_string() {
uint32_t length = read_uint32_t();
char* data = reinterpret_cast<char*>(malloc(length));
m_in.read(data, length);
std::string s(data, length);
free(data);
return s;
}
bool rbd_replay::Deser::read_bool() {
return read_uint8_t() != 0;
}
bool rbd_replay::Deser::eof() {
return m_in.eof();
}

View File

@ -1,52 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP
#define _INCLUDED_RBD_REPLAY_DESER_HPP
#include <iostream>
#include <stdint.h>
namespace rbd_replay {
/**
Helper for deserializing data in an architecture-indepdendent way.
Everything is read big-endian.
@see Ser
*/
class Deser {
public:
Deser(std::istream &in);
uint8_t read_uint8_t();
uint16_t read_uint16_t();
uint32_t read_uint32_t();
uint64_t read_uint64_t();
std::string read_string();
bool read_bool();
bool eof();
private:
std::istream &m_in;
};
}
#endif

View File

@ -2,35 +2,46 @@ if ENABLE_CLIENT
if WITH_RADOS
if WITH_RBD
librbd_replay_types_la_SOURCES = \
rbd_replay/ActionTypes.cc
noinst_HEADERS += \
rbd_replay/ActionTypes.h
noinst_LTLIBRARIES += librbd_replay_types.la
DENCODER_DEPS += librbd_replay_types.la
# librbd_replay_la exists only to help with unit tests
librbd_replay_la_SOURCES = rbd_replay/actions.cc \
rbd_replay/Deser.cc \
librbd_replay_la_SOURCES = \
rbd_replay/actions.cc \
rbd_replay/BufferReader.cc \
rbd_replay/ImageNameMap.cc \
rbd_replay/PendingIO.cc \
rbd_replay/rbd_loc.cc \
rbd_replay/Replayer.cc \
rbd_replay/Ser.cc
librbd_replay_la_LIBADD = $(LIBRBD) \
rbd_replay/Replayer.cc
librbd_replay_la_LIBADD = \
$(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL)
noinst_LTLIBRARIES += librbd_replay.la
noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
noinst_HEADERS += \
rbd_replay/actions.hpp \
rbd_replay/Deser.hpp \
rbd_replay/BoundedBuffer.hpp \
rbd_replay/BufferReader.h \
rbd_replay/ImageNameMap.hpp \
rbd_replay/ios.hpp \
rbd_replay/PendingIO.hpp \
rbd_replay/rbd_loc.hpp \
rbd_replay/rbd_replay_debug.hpp \
rbd_replay/Replayer.hpp \
rbd_replay/Ser.hpp
rbd_replay/Replayer.hpp
rbd_replay_SOURCES = rbd_replay/rbd-replay.cc
rbd_replay_LDADD = $(LIBRBD) \
rbd_replay_SOURCES = \
rbd_replay/rbd-replay.cc
rbd_replay_LDADD = \
librbd_replay.la \
librbd_replay_types.la \
$(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
librbd_replay.la
$(LIBCOMMON)
if LINUX
bin_PROGRAMS += rbd-replay
@ -43,12 +54,16 @@ librbd_replay_ios_la_LIBADD = $(LIBRBD) \
librbd_replay.la
noinst_LTLIBRARIES += librbd_replay_ios.la
rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
rbd_replay_prep_LDADD = $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
rbd_replay_prep_SOURCES = \
rbd_replay/rbd-replay-prep.cc
rbd_replay_prep_LDADD = \
librbd_replay.la \
librbd_replay_ios.la \
librbd_replay_types.la \
$(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
$(LIBCOMMON) \
-lbabeltrace \
-lbabeltrace-ctf \
-lboost_date_time

View File

@ -14,8 +14,11 @@
#include "Replayer.hpp"
#include "common/errno.h"
#include "rbd_replay/ActionTypes.h"
#include "rbd_replay/BufferReader.h"
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
#include <boost/scope_exit.hpp>
#include <fstream>
#include "global/global_context.h"
#include "rbd_replay_debug.hpp"
@ -24,6 +27,29 @@
using namespace std;
using namespace rbd_replay;
namespace {
bool is_versioned_replay(BufferReader &buffer_reader) {
bufferlist::iterator *it;
int r = buffer_reader.fetch(&it);
if (r < 0) {
return false;
}
if (it->get_remaining() < action::BANNER.size()) {
return false;
}
std::string banner;
it->copy(action::BANNER.size(), banner);
bool versioned = (banner == action::BANNER);
if (!versioned) {
it->seek(0);
}
return versioned;
}
} // anonymous namespace
Worker::Worker(Replayer &replayer)
: m_replayer(replayer),
@ -176,18 +202,45 @@ void Replayer::run(const std::string& replay_file) {
m_rbd = new librbd::RBD();
map<thread_id_t, Worker*> workers;
ifstream input(replay_file.c_str(), ios::in | ios::binary);
if (!input.is_open()) {
cerr << "Failed to open " << replay_file << std::endl;
exit(1);
int fd = open(replay_file.c_str(), O_RDONLY);
if (fd < 0) {
std::cerr << "Failed to open " << replay_file << ": "
<< cpp_strerror(errno) << std::endl;
exit(1);
}
BOOST_SCOPE_EXIT( (fd) ) {
close(fd);
} BOOST_SCOPE_EXIT_END;
Deser deser(input);
BufferReader buffer_reader(fd);
bool versioned = is_versioned_replay(buffer_reader);
while (true) {
Action::ptr action = Action::read_from(deser);
action::ActionEntry action_entry;
try {
bufferlist::iterator *it;
int r = buffer_reader.fetch(&it);
if (r < 0) {
std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
<< std::endl;
exit(-r);
}
if (versioned) {
action_entry.decode(*it);
} else {
action_entry.decode_unversioned(*it);
}
} catch (const buffer::error &err) {
std::cerr << "Failed to decode trace action" << std::endl;
exit(1);
}
Action::ptr action = Action::construct(action_entry);
if (!action) {
break;
// unknown / unsupported action
continue;
}
if (action->is_start_thread()) {
Worker *worker = new Worker(*this);
workers[action->thread_id()] = worker;
@ -261,9 +314,9 @@ bool Replayer::is_action_complete(action_id_t id) {
return tracker.actions.count(id) > 0;
}
void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
void Replayer::wait_for_actions(const action::Dependencies &deps) {
boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
BOOST_FOREACH(const dependency_d &dep, deps) {
BOOST_FOREACH(const action::Dependency &dep, deps) {
dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
action_tracker_d &tracker = tracker_for(dep.id);

View File

@ -17,6 +17,7 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/shared_mutex.hpp>
#include "rbd_replay/ActionTypes.h"
#include "BoundedBuffer.hpp"
#include "ImageNameMap.hpp"
#include "PendingIO.hpp"
@ -100,7 +101,7 @@ public:
bool is_action_complete(action_id_t id);
void wait_for_actions(const std::vector<dependency_d> &deps);
void wait_for_actions(const action::Dependencies &deps);
std::string pool_name() const;

View File

@ -1,57 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "acconfig.h"
#include "Ser.hpp"
#include <arpa/inet.h>
#include <cstdlib>
#if defined(DARWIN) || defined(__FreeBSD__)
#include <machine/endian.h>
#else
#include <endian.h>
#endif
rbd_replay::Ser::Ser(std::ostream &out)
: m_out(out) {
}
void rbd_replay::Ser::write_uint8_t(uint8_t data) {
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint16_t(uint16_t data) {
data = htons(data);
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint32_t(uint32_t data) {
data = htonl(data);
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint64_t(uint64_t data) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
data = (static_cast<uint64_t>(htonl(data)) << 32 | htonl(data >> 32));
#endif
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_string(const std::string& data) {
write_uint32_t(data.length());
m_out.write(data.data(), data.length());
}
void rbd_replay::Ser::write_bool(bool data) {
write_uint8_t(data ? 1 : 0);
}

View File

@ -1,50 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef _INCLUDED_RBD_REPLAY_SER_HPP
#define _INCLUDED_RBD_REPLAY_SER_HPP
#include <iostream>
#include <stdint.h>
namespace rbd_replay {
/**
Helper for serializing data in an architecture-indepdendent way.
Everything is written big-endian.
@see Deser
*/
class Ser {
public:
Ser(std::ostream &out);
void write_uint8_t(uint8_t);
void write_uint16_t(uint16_t);
void write_uint32_t(uint32_t);
void write_uint64_t(uint64_t);
void write_string(const std::string&);
void write_bool(bool b);
private:
std::ostream &m_out;
};
}
#endif

View File

@ -22,213 +22,9 @@
using namespace rbd_replay;
using namespace std;
namespace {
Action::Action(action_id_t id,
thread_id_t thread_id,
std::vector<dependency_d> &predecessors)
: m_id(id),
m_thread_id(thread_id),
m_predecessors(predecessors) {
}
Action::~Action() {
}
Action::ptr Action::read_from(Deser &d) {
uint8_t type = d.read_uint8_t();
if (d.eof()) {
return Action::ptr();
}
uint32_t ionum = d.read_uint32_t();
uint64_t thread_id = d.read_uint64_t();
d.read_uint32_t(); // unused
d.read_uint32_t(); // unused
uint32_t num_dependencies = d.read_uint32_t();
vector<dependency_d> deps;
for (unsigned int i = 0; i < num_dependencies; i++) {
uint32_t dep_id = d.read_uint32_t();
uint64_t time_delta = d.read_uint64_t();
deps.push_back(dependency_d(dep_id, time_delta));
}
DummyAction dummy(ionum, thread_id, deps);
switch (type) {
case IO_START_THREAD:
return StartThreadAction::read_from(dummy, d);
case IO_STOP_THREAD:
return StopThreadAction::read_from(dummy, d);
case IO_READ:
return ReadAction::read_from(dummy, d);
case IO_WRITE:
return WriteAction::read_from(dummy, d);
case IO_ASYNC_READ:
return AioReadAction::read_from(dummy, d);
case IO_ASYNC_WRITE:
return AioWriteAction::read_from(dummy, d);
case IO_OPEN_IMAGE:
return OpenImageAction::read_from(dummy, d);
case IO_CLOSE_IMAGE:
return CloseImageAction::read_from(dummy, d);
default:
cerr << "Invalid action type: " << type << std::endl;
exit(1);
}
}
std::ostream& Action::dump_action_fields(std::ostream& o) const {
o << "id=" << m_id << ", thread_id=" << m_thread_id << ", predecessors=[";
bool first = true;
BOOST_FOREACH(const dependency_d &d, m_predecessors) {
if (!first) {
o << ",";
}
o << d.id;
first = false;
}
return o << "]";
}
std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
return a.dump(o);
}
std::ostream& DummyAction::dump(std::ostream& o) const {
o << "DummyAction[";
dump_action_fields(o);
return o << "]";
}
StartThreadAction::StartThreadAction(Action &src)
: Action(src) {
}
void StartThreadAction::perform(ActionCtx &ctx) {
cerr << "StartThreadAction should never actually be performed" << std::endl;
exit(1);
}
bool StartThreadAction::is_start_thread() {
return true;
}
Action::ptr StartThreadAction::read_from(Action &src, Deser &d) {
return Action::ptr(new StartThreadAction(src));
}
std::ostream& StartThreadAction::dump(std::ostream& o) const {
o << "StartThreadAction[";
dump_action_fields(o);
return o << "]";
}
StopThreadAction::StopThreadAction(Action &src)
: Action(src) {
}
void StopThreadAction::perform(ActionCtx &ctx) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
ctx.stop();
}
Action::ptr StopThreadAction::read_from(Action &src, Deser &d) {
return Action::ptr(new StopThreadAction(src));
}
std::ostream& StopThreadAction::dump(std::ostream& o) const {
o << "StopThreadAction[";
dump_action_fields(o);
return o << "]";
}
AioReadAction::AioReadAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length)
: Action(src),
m_imagectx_id(imagectx_id),
m_offset(offset),
m_length(length) {
}
Action::ptr AioReadAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
uint64_t offset = d.read_uint64_t();
uint64_t length = d.read_uint64_t();
return Action::ptr(new AioReadAction(src, imagectx_id, offset, length));
}
void AioReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
assert(image);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
int r = image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
std::ostream& AioReadAction::dump(std::ostream& o) const {
o << "AioReadAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
}
ReadAction::ReadAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length)
: Action(src),
m_imagectx_id(imagectx_id),
m_offset(offset),
m_length(length) {
}
Action::ptr ReadAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
uint64_t offset = d.read_uint64_t();
uint64_t length = d.read_uint64_t();
return Action::ptr(new ReadAction(src, imagectx_id, offset, length));
}
void ReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
ssize_t r = image->read(m_offset, m_length, io->bufferlist());
assertf(r >= 0, "id = %d, r = %d", id(), r);
worker.remove_pending(io);
}
std::ostream& ReadAction::dump(std::ostream& o) const {
o << "ReadAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
}
AioWriteAction::AioWriteAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length)
: Action(src),
m_imagectx_id(imagectx_id),
m_offset(offset),
m_length(length) {
}
Action::ptr AioWriteAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
uint64_t offset = d.read_uint64_t();
uint64_t length = d.read_uint64_t();
return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length));
}
static std::string create_fake_data() {
std::string create_fake_data() {
char data[1 << 20]; // 1 MB
for (unsigned int i = 0; i < sizeof(data); i++) {
data[i] = (char) i;
@ -236,12 +32,91 @@ static std::string create_fake_data() {
return std::string(data, sizeof(data));
}
struct ConstructVisitor : public boost::static_visitor<Action::ptr> {
inline Action::ptr operator()(const action::StartThreadAction &action) const {
return Action::ptr(new StartThreadAction(action));
}
inline Action::ptr operator()(const action::StopThreadAction &action) const{
return Action::ptr(new StopThreadAction(action));
}
inline Action::ptr operator()(const action::ReadAction &action) const {
return Action::ptr(new ReadAction(action));
}
inline Action::ptr operator()(const action::AioReadAction &action) const {
return Action::ptr(new AioReadAction(action));
}
inline Action::ptr operator()(const action::WriteAction &action) const {
return Action::ptr(new WriteAction(action));
}
inline Action::ptr operator()(const action::AioWriteAction &action) const {
return Action::ptr(new AioWriteAction(action));
}
inline Action::ptr operator()(const action::OpenImageAction &action) const {
return Action::ptr(new OpenImageAction(action));
}
inline Action::ptr operator()(const action::CloseImageAction &action) const {
return Action::ptr(new CloseImageAction(action));
}
inline Action::ptr operator()(const action::UnknownAction &action) const {
return Action::ptr();
}
};
} // anonymous namespace
std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
return a.dump(o);
}
Action::ptr Action::construct(const action::ActionEntry &action_entry) {
return boost::apply_visitor(ConstructVisitor(), action_entry.action);
}
void StartThreadAction::perform(ActionCtx &ctx) {
cerr << "StartThreadAction should never actually be performed" << std::endl;
exit(1);
}
void StopThreadAction::perform(ActionCtx &ctx) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
ctx.stop();
}
void AioReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_action.imagectx_id);
assert(image);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
void ReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist());
assertf(r >= 0, "id = %d, r = %d", id(), r);
worker.remove_pending(io);
}
void AioWriteAction::perform(ActionCtx &worker) {
static const std::string fake_data(create_fake_data());
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
uint64_t remaining = m_length;
uint64_t remaining = m_action.length;
while (remaining > 0) {
uint64_t n = std::min(remaining, (uint64_t)fake_data.length());
io->bufferlist().append(fake_data.data(), n);
@ -251,126 +126,52 @@ void AioWriteAction::perform(ActionCtx &worker) {
if (worker.readonly()) {
worker.remove_pending(io);
} else {
int r = image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion());
int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
}
std::ostream& AioWriteAction::dump(std::ostream& o) const {
o << "AioWriteAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
}
WriteAction::WriteAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length)
: Action(src),
m_imagectx_id(imagectx_id),
m_offset(offset),
m_length(length) {
}
Action::ptr WriteAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
uint64_t offset = d.read_uint64_t();
uint64_t length = d.read_uint64_t();
return Action::ptr(new WriteAction(src, imagectx_id, offset, length));
}
void WriteAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
io->bufferlist().append_zero(m_length);
io->bufferlist().append_zero(m_action.length);
if (!worker.readonly()) {
ssize_t r = image->write(m_offset, m_length, io->bufferlist());
ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
worker.remove_pending(io);
}
std::ostream& WriteAction::dump(std::ostream& o) const {
o << "WriteAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
}
OpenImageAction::OpenImageAction(Action &src,
imagectx_id_t imagectx_id,
string name,
string snap_name,
bool readonly)
: Action(src),
m_imagectx_id(imagectx_id),
m_name(name),
m_snap_name(snap_name),
m_readonly(readonly) {
}
Action::ptr OpenImageAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
string name = d.read_string();
string snap_name = d.read_string();
bool readonly = d.read_bool();
return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly));
}
void OpenImageAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
librbd::Image *image = new librbd::Image();
librbd::RBD *rbd = worker.rbd();
rbd_loc name(worker.map_image_name(m_name, m_snap_name));
rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name));
int r;
if (m_readonly || worker.readonly()) {
if (m_action.read_only || worker.readonly()) {
r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
} else {
r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
}
if (r) {
cerr << "Unable to open image '" << m_name
<< "' with snap '" << m_snap_name
cerr << "Unable to open image '" << m_action.name
<< "' with snap '" << m_action.snap_name
<< "' (mapped to '" << name.str()
<< "') and readonly " << m_readonly
<< "') and readonly " << m_action.read_only
<< ": (" << -r << ") " << strerror(-r) << std::endl;
exit(1);
}
worker.put_image(m_imagectx_id, image);
worker.put_image(m_action.imagectx_id, image);
worker.remove_pending(io);
}
std::ostream& OpenImageAction::dump(std::ostream& o) const {
o << "OpenImageAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly << "]";
}
CloseImageAction::CloseImageAction(Action &src,
imagectx_id_t imagectx_id)
: Action(src),
m_imagectx_id(imagectx_id) {
}
Action::ptr CloseImageAction::read_from(Action &src, Deser &d) {
imagectx_id_t imagectx_id = d.read_uint64_t();
return Action::ptr(new CloseImageAction(src, imagectx_id));
}
void CloseImageAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
worker.erase_image(m_imagectx_id);
worker.erase_image(m_action.imagectx_id);
worker.set_action_complete(pending_io_id());
}
std::ostream& CloseImageAction::dump(std::ostream& o) const {
o << "CloseImageAction[";
dump_action_fields(o);
return o << ", imagectx_id=" << m_imagectx_id << "]";
}

View File

@ -17,8 +17,10 @@
#include <boost/shared_ptr.hpp>
#include "include/rbd/librbd.hpp"
#include "Deser.hpp"
#include "common/Formatter.h"
#include "rbd_replay/ActionTypes.h"
#include "rbd_loc.hpp"
#include <iostream>
// Stupid Doxygen requires this or else the typedef docs don't appear anywhere.
/// @file rbd_replay/actions.hpp
@ -31,44 +33,8 @@ typedef uint64_t thread_id_t;
/// Even IDs are normal actions, odd IDs are completions.
typedef uint32_t action_id_t;
/**
Dependencies link actions to earlier actions or completions.
If an action has a dependency \c d then it waits until \c d.time_delta nanoseconds after the action or completion with ID \c d.id has fired.
*/
struct dependency_d {
/// ID of the action or completion to wait for.
action_id_t id;
/// Nanoseconds of delay to wait until after the action or completion fires.
uint64_t time_delta;
/**
@param id ID of the action or completion to wait for.
@param time_delta Nanoseconds of delay to wait after the action or completion fires.
*/
dependency_d(action_id_t id,
uint64_t time_delta)
: id(id),
time_delta(time_delta) {
}
};
// These are written to files, so don't change existing assignments.
enum io_type {
IO_START_THREAD,
IO_STOP_THREAD,
IO_READ,
IO_WRITE,
IO_ASYNC_READ,
IO_ASYNC_WRITE,
IO_OPEN_IMAGE,
IO_CLOSE_IMAGE,
};
class PendingIO;
/**
%Context through which an Action interacts with its environment.
*/
@ -131,17 +97,14 @@ class Action {
public:
typedef boost::shared_ptr<Action> ptr;
Action(action_id_t id,
thread_id_t thread_id,
std::vector<dependency_d> &predecessors);
virtual ~Action();
virtual ~Action() {
}
virtual void perform(ActionCtx &ctx) = 0;
/// Returns the ID of the completion corresponding to this action.
action_id_t pending_io_id() {
return m_id + 1;
return id() + 1;
}
// There's probably a better way to do this, but oh well.
@ -149,202 +112,172 @@ public:
return false;
}
action_id_t id() const {
return m_id;
}
thread_id_t thread_id() const {
return m_thread_id;
}
const std::vector<dependency_d>& predecessors() const {
return m_predecessors;
}
/// Reads and constructs an action from the replay file.
static ptr read_from(Deser &d);
protected:
std::ostream& dump_action_fields(std::ostream& o) const;
private:
friend std::ostream& operator<<(std::ostream&, const Action&);
virtual action_id_t id() const = 0;
virtual thread_id_t thread_id() const = 0;
virtual const action::Dependencies& predecessors() const = 0;
virtual std::ostream& dump(std::ostream& o) const = 0;
const action_id_t m_id;
const thread_id_t m_thread_id;
const std::vector<dependency_d> m_predecessors;
static ptr construct(const action::ActionEntry &action_entry);
};
template <typename ActionType>
class TypedAction : public Action {
public:
TypedAction(const ActionType &action) : m_action(action) {
}
virtual action_id_t id() const {
return m_action.id;
}
virtual thread_id_t thread_id() const {
return m_action.thread_id;
}
virtual const action::Dependencies& predecessors() const {
return m_action.dependencies;
}
virtual std::ostream& dump(std::ostream& o) const {
o << get_action_name() << ": ";
ceph::JSONFormatter formatter(false);
formatter.open_object_section("");
m_action.dump(&formatter);
formatter.close_section();
formatter.flush(o);
return o;
}
protected:
const ActionType m_action;
virtual const char *get_action_name() const = 0;
};
/// Writes human-readable debug information about the action to the stream.
/// @related Action
std::ostream& operator<<(std::ostream& o, const Action& a);
/**
Placeholder for partially-constructed actions.
Does nothing, and does not appear in the replay file.
*/
class DummyAction : public Action {
class StartThreadAction : public TypedAction<action::StartThreadAction> {
public:
DummyAction(action_id_t id,
thread_id_t thread_id,
std::vector<dependency_d> &predecessors)
: Action(id, thread_id, predecessors) {
explicit StartThreadAction(const action::StartThreadAction &action)
: TypedAction<action::StartThreadAction>(action) {
}
void perform(ActionCtx &ctx) {
virtual bool is_start_thread() {
return true;
}
virtual void perform(ActionCtx &ctx);
protected:
virtual const char *get_action_name() const {
return "StartThreadAction";
}
};
class StopThreadAction : public TypedAction<action::StopThreadAction> {
public:
explicit StopThreadAction(const action::StopThreadAction &action)
: TypedAction<action::StopThreadAction>(action) {
}
private:
std::ostream& dump(std::ostream& o) const;
virtual void perform(ActionCtx &ctx);
protected:
virtual const char *get_action_name() const {
return "StartThreadAction";
}
};
class StopThreadAction : public Action {
class AioReadAction : public TypedAction<action::AioReadAction> {
public:
explicit StopThreadAction(Action &src);
AioReadAction(const action::AioReadAction &action)
: TypedAction<action::AioReadAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
protected:
virtual const char *get_action_name() const {
return "AioReadAction";
}
};
class AioReadAction : public Action {
class ReadAction : public TypedAction<action::ReadAction> {
public:
AioReadAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length);
ReadAction(const action::ReadAction &action)
: TypedAction<action::ReadAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
uint64_t m_offset;
uint64_t m_length;
protected:
virtual const char *get_action_name() const {
return "ReadAction";
}
};
class ReadAction : public Action {
class AioWriteAction : public TypedAction<action::AioWriteAction> {
public:
ReadAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length);
AioWriteAction(const action::AioWriteAction &action)
: TypedAction<action::AioWriteAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
uint64_t m_offset;
uint64_t m_length;
protected:
virtual const char *get_action_name() const {
return "AioWriteAction";
}
};
class AioWriteAction : public Action {
class WriteAction : public TypedAction<action::WriteAction> {
public:
AioWriteAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length);
WriteAction(const action::WriteAction &action)
: TypedAction<action::WriteAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
uint64_t m_offset;
uint64_t m_length;
protected:
virtual const char *get_action_name() const {
return "WriteAction";
}
};
class WriteAction : public Action {
class OpenImageAction : public TypedAction<action::OpenImageAction> {
public:
WriteAction(const Action &src,
imagectx_id_t imagectx_id,
uint64_t offset,
uint64_t length);
OpenImageAction(const action::OpenImageAction &action)
: TypedAction<action::OpenImageAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
uint64_t m_offset;
uint64_t m_length;
protected:
virtual const char *get_action_name() const {
return "OpenImageAction";
}
};
class OpenImageAction : public Action {
class CloseImageAction : public TypedAction<action::CloseImageAction> {
public:
OpenImageAction(Action &src,
imagectx_id_t imagectx_id,
std::string name,
std::string snap_name,
bool readonly);
CloseImageAction(const action::CloseImageAction &action)
: TypedAction<action::CloseImageAction>(action) {
}
void perform(ActionCtx &ctx);
virtual void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
std::string m_name;
std::string m_snap_name;
bool m_readonly;
};
class CloseImageAction : public Action {
public:
CloseImageAction(Action &src,
imagectx_id_t imagectx_id);
void perform(ActionCtx &ctx);
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
imagectx_id_t m_imagectx_id;
};
class StartThreadAction : public Action {
public:
explicit StartThreadAction(Action &src);
void perform(ActionCtx &ctx);
bool is_start_thread();
static Action::ptr read_from(Action &src, Deser &d);
private:
std::ostream& dump(std::ostream& o) const;
protected:
virtual const char *get_action_name() const {
return "CloseImageAction";
}
};
}

View File

@ -16,14 +16,37 @@
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
#include "ios.hpp"
#include "rbd_replay/ActionTypes.h"
using namespace std;
using namespace rbd_replay;
bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
return p1->start_time() < p2->start_time();
namespace {
bool compare_dependencies_by_start_time(const action::Dependency &lhs,
const action::Dependency &rhs) {
return lhs.time_delta < rhs.time_delta;
}
action::Dependencies convert_dependencies(uint64_t start_time,
const io_set_t &deps) {
action::Dependencies action_deps;
action_deps.reserve(deps.size());
for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) {
boost::shared_ptr<IO> io = *it;
uint64_t time_delta = 0;
if (start_time >= io->start_time()) {
time_delta = start_time - io->start_time();
}
action_deps.push_back(action::Dependency(io->ionum(), time_delta));
}
std::sort(action_deps.begin(), action_deps.end(),
compare_dependencies_by_start_time);
return action_deps;
}
} // anonymous namespace
void IO::write_debug_base(ostream& out, string type) const {
out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
bool first = true;
@ -39,51 +62,36 @@ void IO::write_debug_base(ostream& out, string type) const {
}
void IO::write_to(Ser& out, io_type iotype) const {
// TODO break compatibility now to add version (and yank unused fields)?
out.write_uint8_t(iotype);
out.write_uint32_t(m_ionum);
out.write_uint64_t(m_thread_id);
out.write_uint32_t(0);
out.write_uint32_t(0);
out.write_uint32_t(m_dependencies.size());
vector<IO::ptr> deps;
for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
deps.push_back(*itr);
}
sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
out.write_uint32_t((*itr)->m_ionum);
out.write_uint64_t(m_start_time - (*itr)->m_start_time);
}
}
ostream& operator<<(ostream& out, IO::ptr io) {
io->write_debug(out);
return out;
}
void StartThreadIO::write_to(Ser& out) const {
IO::write_to(out, IO_START_THREAD);
void StartThreadIO::encode(bufferlist &bl) const {
action::Action action((action::StartThreadAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
::encode(action, bl);
}
void StartThreadIO::write_debug(std::ostream& out) const {
write_debug_base(out, "start thread");
}
void StopThreadIO::write_to(Ser& out) const {
IO::write_to(out, IO_STOP_THREAD);
void StopThreadIO::encode(bufferlist &bl) const {
action::Action action((action::StopThreadAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
::encode(action, bl);
}
void StopThreadIO::write_debug(std::ostream& out) const {
write_debug_base(out, "stop thread");
}
void ReadIO::write_to(Ser& out) const {
IO::write_to(out, IO_READ);
out.write_uint64_t(m_imagectx);
out.write_uint64_t(m_offset);
out.write_uint64_t(m_length);
void ReadIO::encode(bufferlist &bl) const {
action::Action action((action::ReadAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx, m_offset, m_length)));
::encode(action, bl);
}
void ReadIO::write_debug(std::ostream& out) const {
@ -91,11 +99,11 @@ void ReadIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
void WriteIO::write_to(Ser& out) const {
IO::write_to(out, IO_WRITE);
out.write_uint64_t(m_imagectx);
out.write_uint64_t(m_offset);
out.write_uint64_t(m_length);
void WriteIO::encode(bufferlist &bl) const {
action::Action action((action::WriteAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx, m_offset, m_length)));
::encode(action, bl);
}
void WriteIO::write_debug(std::ostream& out) const {
@ -103,11 +111,11 @@ void WriteIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
void AioReadIO::write_to(Ser& out) const {
IO::write_to(out, IO_ASYNC_READ);
out.write_uint64_t(m_imagectx);
out.write_uint64_t(m_offset);
out.write_uint64_t(m_length);
void AioReadIO::encode(bufferlist &bl) const {
action::Action action((action::AioReadAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx, m_offset, m_length)));
::encode(action, bl);
}
void AioReadIO::write_debug(std::ostream& out) const {
@ -115,11 +123,11 @@ void AioReadIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
void AioWriteIO::write_to(Ser& out) const {
IO::write_to(out, IO_ASYNC_WRITE);
out.write_uint64_t(m_imagectx);
out.write_uint64_t(m_offset);
out.write_uint64_t(m_length);
void AioWriteIO::encode(bufferlist &bl) const {
action::Action action((action::AioWriteAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx, m_offset, m_length)));
::encode(action, bl);
}
void AioWriteIO::write_debug(std::ostream& out) const {
@ -127,12 +135,11 @@ void AioWriteIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
void OpenImageIO::write_to(Ser& out) const {
IO::write_to(out, IO_OPEN_IMAGE);
out.write_uint64_t(m_imagectx);
out.write_string(m_name);
out.write_string(m_snap_name);
out.write_bool(m_readonly);
void OpenImageIO::encode(bufferlist &bl) const {
action::Action action((action::OpenImageAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx, m_name, m_snap_name, m_readonly)));
::encode(action, bl);
}
void OpenImageIO::write_debug(std::ostream& out) const {
@ -140,9 +147,11 @@ void OpenImageIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
}
void CloseImageIO::write_to(Ser& out) const {
IO::write_to(out, IO_CLOSE_IMAGE);
out.write_uint64_t(m_imagectx);
void CloseImageIO::encode(bufferlist &bl) const {
action::Action action((action::CloseImageAction(
ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
m_imagectx)));
::encode(action, bl);
}
void CloseImageIO::write_debug(std::ostream& out) const {

View File

@ -18,6 +18,7 @@
// This code assumes that IO IDs and timestamps are related monotonically.
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
#include "include/buffer.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <iostream>
@ -25,7 +26,6 @@
#include <set>
#include <vector>
#include "actions.hpp"
#include "Ser.hpp"
namespace rbd_replay {
@ -77,7 +77,7 @@ public:
return m_dependencies;
}
virtual void write_to(Ser& out) const = 0;
virtual void encode(bufferlist &bl) const = 0;
void set_ionum(action_id_t ionum) {
m_ionum = ionum;
@ -87,11 +87,13 @@ public:
return m_ionum;
}
thread_id_t thread_id() const {
return m_thread_id;
}
virtual void write_debug(std::ostream& out) const = 0;
protected:
void write_to(Ser& out, io_type iotype) const;
void write_debug_base(std::ostream& out, std::string iotype) const;
private:
@ -115,7 +117,7 @@ public:
: IO(ionum, start_time, thread_id, io_set_t()) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
};
@ -129,7 +131,7 @@ public:
: IO(ionum, start_time, thread_id, deps) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
};
@ -149,7 +151,7 @@ public:
m_length(length) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
@ -174,7 +176,7 @@ public:
m_length(length) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
@ -199,7 +201,7 @@ public:
m_length(length) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
@ -224,7 +226,7 @@ public:
m_length(length) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
@ -251,7 +253,7 @@ public:
m_readonly(readonly) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
imagectx_id_t imagectx() const {
return m_imagectx;
@ -277,7 +279,7 @@ public:
m_imagectx(imagectx) {
}
void write_to(Ser& out) const;
virtual void encode(bufferlist &bl) const;
imagectx_id_t imagectx() const {
return m_imagectx;
@ -289,9 +291,6 @@ private:
imagectx_id_t m_imagectx;
};
/// @related IO
bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
}
#endif

View File

@ -15,15 +15,20 @@
// This code assumes that IO IDs and timestamps are related monotonically.
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
#include "common/errno.h"
#include "rbd_replay/ActionTypes.h"
#include <babeltrace/babeltrace.h>
#include <babeltrace/ctf/events.h>
#include <babeltrace/ctf/iterator.h>
#include <sys/types.h>
#include <fcntl.h>
#include <cstdlib>
#include <string>
#include <assert.h>
#include <fstream>
#include <set>
#include <boost/thread/thread.hpp>
#include <boost/scope_exit.hpp>
#include "ios.hpp"
using namespace std;
@ -193,12 +198,14 @@ public:
struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
ofstream myfile;
myfile.open(output_file_name.c_str(), ios::out | ios::binary | ios::trunc);
ASSERT_EXIT(!myfile.fail(), "Error opening output file " <<
output_file_name);
int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
": " << cpp_strerror(errno));
BOOST_SCOPE_EXIT( (fd) ) {
close(fd);
} BOOST_SCOPE_EXIT_END;
Ser ser(myfile);
write_banner(fd);
uint64_t trace_start = 0;
bool first = true;
@ -219,7 +226,7 @@ public:
IO::ptrs ptrs;
process_event(ts, evt, &ptrs);
serialize_events(ser, ptrs);
serialize_events(fd, ptrs);
int r = bt_iter_next(bt_itr);
ASSERT_EXIT(r == 0, "Error advancing event iterator");
@ -227,15 +234,26 @@ public:
bt_ctf_iter_destroy(itr);
insert_thread_stops(ser);
myfile.close();
insert_thread_stops(fd);
}
private:
void serialize_events(Ser &ser, const IO::ptrs &ptrs) {
void write_banner(int fd) {
bufferlist bl;
bl.append(rbd_replay::action::BANNER);
int r = bl.write_fd(fd);
ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
}
void serialize_events(int fd, const IO::ptrs &ptrs) {
for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
IO::ptr io(*it);
io->write_to(ser);
bufferlist bl;
io->encode(bl);
int r = bl.write_fd(fd);
ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
if (m_verbose) {
io->write_debug(std::cout);
@ -244,7 +262,7 @@ private:
}
}
void insert_thread_stops(Ser &ser) {
void insert_thread_stops(int fd) {
IO::ptrs ios;
for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
end = m_threads.end(); itr != end; ++itr) {
@ -253,7 +271,7 @@ private:
thread->id(),
m_recent_completions)));
}
serialize_events(ser, ios);
serialize_events(fd, ios);
}
void process_event(uint64_t ts, struct bt_ctf_event *evt,