From 0f052f8cefaae643a99bce9ab83b752194c20b98 Mon Sep 17 00:00:00 2001 From: Adam Crume Date: Thu, 7 Aug 2014 13:38:19 -0700 Subject: [PATCH] rbd-replay: Convert prep-for-replay.py to rbd-replay-prep.cc Signed-off-by: Adam Crume --- examples/rbd-replay/run-prep-for-replay | 3 - examples/rbd-replay/run-rbd-replay-prep | 3 + src/.gitignore | 1 + src/rbd_replay/Makefile.am | 18 +- src/rbd_replay/Ser.cc | 53 ++ src/rbd_replay/Ser.hpp | 45 + src/rbd_replay/actions.cc | 16 +- src/rbd_replay/actions.hpp | 12 + src/rbd_replay/prep-for-replay.py | 526 ------------ src/rbd_replay/rbd-replay-prep.cc | 1051 +++++++++++++++++++++++ 10 files changed, 1189 insertions(+), 539 deletions(-) delete mode 100755 examples/rbd-replay/run-prep-for-replay create mode 100755 examples/rbd-replay/run-rbd-replay-prep create mode 100644 src/rbd_replay/Ser.cc create mode 100644 src/rbd_replay/Ser.hpp delete mode 100755 src/rbd_replay/prep-for-replay.py create mode 100644 src/rbd_replay/rbd-replay-prep.cc diff --git a/examples/rbd-replay/run-prep-for-replay b/examples/rbd-replay/run-prep-for-replay deleted file mode 100755 index 394caf89cf3..00000000000 --- a/examples/rbd-replay/run-prep-for-replay +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit replay.bin diff --git a/examples/rbd-replay/run-rbd-replay-prep b/examples/rbd-replay/run-rbd-replay-prep new file mode 100755 index 00000000000..28f4876008a --- /dev/null +++ b/examples/rbd-replay/run-rbd-replay-prep @@ -0,0 +1,3 @@ +#!/bin/bash + +../../src/rbd-replay-prep traces/ust/uid/10002/64-bit replay.bin diff --git a/src/.gitignore b/src/.gitignore index e7c09457d5f..2482e1a8a28 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -67,6 +67,7 @@ Makefile /rbd /rbd-fuse /rbd-replay +/rbd-replay-prep /rest-bench /sample.fetch_config /TAGS diff --git a/src/rbd_replay/Makefile.am b/src/rbd_replay/Makefile.am index 6e85b40537b..cd004b67392 100644 --- a/src/rbd_replay/Makefile.am +++ b/src/rbd_replay/Makefile.am @@ -4,7 +4,8 @@ librbd_replay_la_SOURCES = rbd_replay/actions.cc \ rbd_replay/ImageNameMap.cc \ rbd_replay/PendingIO.cc \ rbd_replay/rbd_loc.cc \ - rbd_replay/Replayer.cc + rbd_replay/Replayer.cc \ + rbd_replay/Ser.cc librbd_replay_la_LIBADD = $(LIBRBD) \ $(LIBRADOS) \ $(CEPH_GLOBAL) @@ -16,7 +17,9 @@ noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \ rbd_replay/PendingIO.hpp \ rbd_replay/rbd_loc.hpp \ rbd_replay/rbd_replay_debug.hpp \ - rbd_replay/Replayer.hpp + rbd_replay/Replayer.hpp \ + rbd_replay/Ser.hpp + rbd_replay_SOURCES = rbd_replay/rbd-replay.cc rbd_replay_LDADD = $(LIBRBD) \ @@ -27,3 +30,14 @@ rbd_replay_LDADD = $(LIBRBD) \ if LINUX bin_PROGRAMS += rbd-replay endif #LINUX + +# TODO: See if we need any new dependencies +rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc +rbd_replay_prep_LDADD = $(LIBRBD) \ + $(LIBRADOS) \ + $(CEPH_GLOBAL) \ + librbd_replay.la \ + -lbabeltrace \ + -lbabeltrace-ctf \ + -lboost_date_time +bin_PROGRAMS += rbd-replay-prep diff --git a/src/rbd_replay/Ser.cc b/src/rbd_replay/Ser.cc new file mode 100644 index 00000000000..97a63cdcd5d --- /dev/null +++ b/src/rbd_replay/Ser.cc @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Ser.hpp" +#include +#include +#include + + +rbd_replay::Ser::Ser(std::ostream &out) + : m_out(out) { +} + +void rbd_replay::Ser::write_uint8_t(uint8_t data) { + m_out.write(reinterpret_cast(&data), sizeof(data)); +} + +void rbd_replay::Ser::write_uint16_t(uint16_t data) { + data = htons(data); + m_out.write(reinterpret_cast(&data), sizeof(data)); +} + +void rbd_replay::Ser::write_uint32_t(uint32_t data) { + data = htonl(data); + m_out.write(reinterpret_cast(&data), sizeof(data)); +} + +void rbd_replay::Ser::write_uint64_t(uint64_t data) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + data = (static_cast(htonl(data)) << 32 | htonl(data >> 32)); +#endif + m_out.write(reinterpret_cast(&data), sizeof(data)); +} + +void rbd_replay::Ser::write_string(const std::string& data) { + write_uint32_t(data.length()); + m_out.write(data.data(), data.length()); +} + +void rbd_replay::Ser::write_bool(bool data) { + write_uint8_t(data ? 1 : 0); +} diff --git a/src/rbd_replay/Ser.hpp b/src/rbd_replay/Ser.hpp new file mode 100644 index 00000000000..2bada8f70bc --- /dev/null +++ b/src/rbd_replay/Ser.hpp @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_SER_HPP +#define _INCLUDED_RBD_REPLAY_SER_HPP + +#include +#include + +namespace rbd_replay { + +class Ser { +public: + Ser(std::ostream &out); + + void write_uint8_t(uint8_t); + + void write_uint16_t(uint16_t); + + void write_uint32_t(uint32_t); + + void write_uint64_t(uint64_t); + + void write_string(const std::string&); + + void write_bool(bool b); + +private: + std::ostream &m_out; +}; + +} + +#endif diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc index d9e143dcf88..2e9a4cb27a6 100644 --- a/src/rbd_replay/actions.cc +++ b/src/rbd_replay/actions.cc @@ -56,21 +56,21 @@ Action::ptr Action::read_from(Deser &d) { } DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps); switch (type) { - case 0: + case IO_START_THREAD: return StartThreadAction::read_from(dummy, d); - case 1: + case IO_STOP_THREAD: return StopThreadAction::read_from(dummy, d); - case 2: + case IO_READ: return ReadAction::read_from(dummy, d); - case 3: + case IO_WRITE: return WriteAction::read_from(dummy, d); - case 4: + case IO_ASYNC_READ: return AioReadAction::read_from(dummy, d); - case 5: + case IO_ASYNC_WRITE: return AioWriteAction::read_from(dummy, d); - case 6: + case IO_OPEN_IMAGE: return OpenImageAction::read_from(dummy, d); - case 7: + case IO_CLOSE_IMAGE: return CloseImageAction::read_from(dummy, d); default: cerr << "Invalid action type: " << type << std::endl; diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp index fdc9eaa5174..5d3f4dc4cd4 100644 --- a/src/rbd_replay/actions.hpp +++ b/src/rbd_replay/actions.hpp @@ -40,6 +40,18 @@ struct dependency_d { } }; +// These are written to files, so don't change existing assignments. +enum io_type { + IO_START_THREAD, + IO_STOP_THREAD, + IO_READ, + IO_WRITE, + IO_ASYNC_READ, + IO_ASYNC_WRITE, + IO_OPEN_IMAGE, + IO_CLOSE_IMAGE, +}; + class PendingIO; diff --git a/src/rbd_replay/prep-for-replay.py b/src/rbd_replay/prep-for-replay.py deleted file mode 100755 index a6300750ffd..00000000000 --- a/src/rbd_replay/prep-for-replay.py +++ /dev/null @@ -1,526 +0,0 @@ -#!/usr/bin/python -# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -# vim: ts=8 sw=2 smarttab -# -# Ceph - scalable distributed file system -# -# Copyright (C) 2014 Adam Crume -# -# This is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1, as published by the Free Software -# Foundation. See file COPYING. -# -# - -import argparse -from babeltrace import * -import datetime -import struct -import sys - - -class Extent(object): - def __init__(self, offset, length): - self.offset = offset - self.length = length - def __str__(self): - return str(self.offset) + "+" + str(self.length) - def __repr__(self): - return "Extent(" + str(self.offset) + "," + str(self.length) + ")" - -class Thread(object): - def __init__(self, id, threads, window): - self.id = id - self.threads = threads - self.window = window - self.pendingIO = None - self.latestIO = None # may not be meaningful - self.latestCompletion = None # may not be meaningful - self.lastTS = None - def insertTS(self, ts): - if not self.lastTS or ts > self.lastTS: - self.lastTS = ts - def issuedIO(self, io): - latestIOs = [] - for threadID in self.threads: - thread = self.threads[threadID] - if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window: - latestIOs.append(thread.latestIO) - io.addDependencies(latestIOs) - self.latestIO = io - def completedIO(self, io): - self.latestCompletion = io - -def batchUnreachableFrom(deps, base): - if not base: - return set() - if not deps: - return set() - unreachable = set() - searchingFor = set(deps) - discovered = set() - boundary = set(base) - boundaryHorizon = None - for io in boundary: - if not boundaryHorizon or io.start_time > boundaryHorizon: - boundaryHorizon = io.start_time - searchingHorizon = None - for io in searchingFor: - if not searchingHorizon or io.start_time < searchingHorizon: - searchingHorizon = io.start_time - tmp = [x for x in searchingFor if boundaryHorizon < x.start_time] - searchingFor.difference_update(tmp) - unreachable.update(tmp) - while boundary and searchingFor: - io = boundary.pop() - for dep in io.dependencies: - if dep in searchingFor: - searchingFor.remove(dep) - if dep.start_time == searchingHorizon: - searchingHorizon = None - for io in searchingFor: - if not searchingHorizon or io.start_time < searchingHorizon: - searchingHorizon = io.start_time - if not dep in discovered: - boundary.add(dep) - if io.start_time == boundaryHorizon: - boundaryHorizon = None - for io in boundary: - if not boundaryHorizon or io.start_time > boundaryHorizon: - boundaryHorizon = io.start_time - if boundaryHorizon: - tmp = [x for x in searchingFor if boundaryHorizon < x.start_time] - searchingFor.difference_update(tmp) - unreachable.update(tmp) - searchingHorizon = None - for io in searchingFor: - if not searchingHorizon or io.start_time < searchingHorizon: - searchingHorizon = io.start_time - unreachable.update(searchingFor) - return unreachable - -class IO(object): - def __init__(self, ionum, start_time, thread, prev): - self.ionum = ionum - self.start_time = start_time - self.thread = thread - self.dependencies = set() - self.isCompletion = False - self.prev = prev - self.numSuccessors = 0 - self.completion = None - def reachableFrom(self, ios): - if not ios: - return False - discovered = set() - boundary = set(ios) - horizon = None - for io in boundary: - if not horizon or io.start_time > horizon: - horizon = io.start_time - if horizon < self.start_time: - return False - while boundary: - io = boundary.pop() - for dep in io.dependencies: - if self == dep: - return True - if not dep in discovered: - boundary.add(dep) - if io.start_time == horizon: - horizon = None - for io in boundary: - if not horizon or io.start_time > horizon: - horizon = io.start_time - if horizon and horizon < self.start_time: - return False - return False - def addDependency(self, dep): - if not dep.reachableFrom(self.dependencies): - self.dependencies.add(dep) - def addDependencies(self, deps): - base = set(self.dependencies) - for dep in deps: - base.update(dep.dependencies) - unreachable = batchUnreachableFrom(deps, base) - self.dependencies.update(unreachable) - def depIDs(self): - ids = [] - for dep in self.dependencies: - ids.append(dep.ionum) - return ids - def depMap(self): - deps = dict() - for dep in self.dependencies: - deps[dep.ionum] = self.start_time - dep.start_time - return deps - def addThreadCompletionDependencies(self, threads, recentCompletions): - self.addDependencies(recentCompletions) - def numCompletionSuccessors(self): - return self.completion.numSuccessors if self.completion else 0 - def writeTo(self, f, iotype): - f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies))) - for dep in self.dependencies: - f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time)) - -class StartThreadIO(IO): - def __init__(self, ionum, start_time, thread): - IO.__init__(self, ionum, start_time, thread, None) - def writeTo(self, f): - IO.writeTo(self, f, 0) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class StopThreadIO(IO): - def __init__(self, ionum, start_time, thread): - IO.__init__(self, ionum, start_time, thread, None) - def writeTo(self, f): - IO.writeTo(self, f, 1) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class ReadIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx, extents): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - self.extents = extents - def writeTo(self, f): - IO.writeTo(self, f, 2) - if len(self.extents) != 1: - raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) - extent = self.extents[0] - f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class WriteIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx, extents): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - self.extents = extents - def writeTo(self, f): - IO.writeTo(self, f, 3) - if len(self.extents) != 1: - raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) - extent = self.extents[0] - f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class AioReadIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx, extents): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - self.extents = extents - def writeTo(self, f): - IO.writeTo(self, f, 4) - if len(self.extents) != 1: - raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) - extent = self.extents[0] - f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class AioWriteIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx, extents): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - self.extents = extents - def writeTo(self, f): - IO.writeTo(self, f, 5) - if len(self.extents) != 1: - raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents))) - extent = self.extents[0] - f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors()) - -class OpenImageIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - self.name = name - self.snap_name = snap_name - self.readonly = readonly - def writeTo(self, f): - IO.writeTo(self, f, 6) - f.write(struct.pack("!QI", self.imagectx, len(self.name))) - f.write(self.name) - f.write(struct.pack("!I", len(self.snap_name))) - f.write(self.snap_name) - f.write(struct.pack("!b", self.readonly)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap()) - -class CloseImageIO(IO): - def __init__(self, ionum, start_time, thread, prev, imagectx): - IO.__init__(self, ionum, start_time, thread, prev) - self.imagectx = imagectx - def writeTo(self, f): - IO.writeTo(self, f, 7) - f.write(struct.pack("!Q", self.imagectx)) - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap()) - -class CompletionIO(IO): - def __init__(self, start_time, thread, baseIO): - IO.__init__(self, baseIO.ionum + 1, start_time, thread, None) - self.baseIO = baseIO - self.isCompletion = True - self.addDependency(baseIO) - baseIO.completion = self - def writeTo(self, f): - pass - def __str__(self): - return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap()) - - -class Processor(object): - def __init__(self): - self.window = 1 * 1e9 - self.threads = {} - self.ioCount = 0 - self.recentCompletions = [] - self.openImages = {} - self.threads = {} - self.ios = [] - def nextID(self): - val = self.ioCount - self.ioCount = self.ioCount + 2 - return val - def completed(self, io): - self.recentCompletions.append(io) - self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window] - def requireImage(self, ts, thread, imagectx, name, snap_name, readonly): - if imagectx in self.openImages: - return - ionum = self.nextID() - thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - self.ios.append(completionIO) - self.completed(completionIO) - self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx - if self.printOnRead: - print str(thread.pendingIO) - def run(self, raw_args): - parser = argparse.ArgumentParser(description='convert librbd trace output to an rbd-replay input file.') - parser.add_argument('--print-on-read', action="store_true", help='print events as they are read in (for debugging)') - parser.add_argument('--print-on-write', action="store_true", help='print events as they are written out (for debugging)') - parser.add_argument('--window', default=1, type=float, help='size of the window, in seconds. Larger values slow down processing, and smaller values may miss dependencies. Default: 1') - parser.add_argument('input', help='trace to read') - parser.add_argument('output', help='replay file to write') - args = parser.parse_args(raw_args) - self.window = 1e9 * args.window - inputFileName = args.input - outputFileName = args.output - pendingIOs = {} - limit = 100000000000 - self.printOnRead = args.print_on_read - printOnWrite = args.print_on_write - traces = TraceCollection() - traces.add_trace(inputFileName, "ctf") - - # Parse phase - trace_start = None - count = 0 - for event in traces.events: - count = count + 1 - if count > limit: - break - ts = event.timestamp - if not trace_start: - trace_start = ts - ts = ts - trace_start - threadID = event["pthread_id"] - if threadID in self.threads: - thread = self.threads[threadID] - else: - thread = Thread(threadID, self.threads, self.window) - self.threads[threadID] = thread - ionum = self.nextID() - io = StartThreadIO(ionum, ts, thread) - self.ios.append(io) - if self.printOnRead: - print str(io) - thread.insertTS(ts) - if event.name == "librbd:read_enter": - name = event["name"] - snap_name = event["snap_name"] - readonly = event["read_only"] - imagectx = event["imagectx"] - self.requireImage(ts, thread, imagectx, name, snap_name, readonly) - ionum = self.nextID() - thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - elif event.name == "librbd:open_image_enter": - imagectx = event["imagectx"] - name = event["name"] - snap_name = event["snap_name"] - readid = event["id"] - readonly = event["read_only"] - ionum = self.nextID() - thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - elif event.name == "librbd:open_image_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - self.ios.append(completionIO) - self.completed(completionIO) - self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:close_image_enter": - imagectx = event["imagectx"] - ionum = self.nextID() - thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - elif event.name == "librbd:close_image_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - self.ios.append(completionIO) - self.completed(completionIO) - del self.openImages[thread.pendingIO.imagectx] - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:read_extent": - offset = event["offset"] - length = event["length"] - thread.pendingIO.extents.append(Extent(offset, length)) - elif event.name == "librbd:read_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - self.ios.append(completionIO) - completed(completionIO) - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:write_enter": - name = event["name"] - snap_name = event["snap_name"] - readonly = event["read_only"] - offset = event["off"] - length = event["buf_len"] - imagectx = event["imagectx"] - self.requireImage(ts, thread, imagectx, name, snap_name, readonly) - ionum = self.nextID() - thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - elif event.name == "librbd:write_exit": - thread.pendingIO.end_time = ts - completionIO = CompletionIO(ts, thread, thread.pendingIO) - thread.completedIO(completionIO) - self.ios.append(completionIO) - completed(completionIO) - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:aio_read_enter": - name = event["name"] - snap_name = event["snap_name"] - readonly = event["read_only"] - completion = event["completion"] - imagectx = event["imagectx"] - self.requireImage(ts, thread, imagectx, name, snap_name, readonly) - ionum = self.nextID() - thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, []) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - self.ios.append(thread.pendingIO) - thread.issuedIO(thread.pendingIO) - pendingIOs[completion] = thread.pendingIO - elif event.name == "librbd:aio_read_extent": - offset = event["offset"] - length = event["length"] - thread.pendingIO.extents.append(Extent(offset, length)) - elif event.name == "librbd:aio_read_exit": - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:aio_write_enter": - name = event["name"] - snap_name = event["snap_name"] - readonly = event["read_only"] - offset = event["off"] - length = event["len"] - completion = event["completion"] - imagectx = event["imagectx"] - self.requireImage(ts, thread, imagectx, name, snap_name, readonly) - ionum = self.nextID() - thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)]) - thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions) - thread.issuedIO(thread.pendingIO) - self.ios.append(thread.pendingIO) - pendingIOs[completion] = thread.pendingIO - if self.printOnRead: - print str(thread.pendingIO) - elif event.name == "librbd:aio_complete_enter": - completion = event["completion"] - retval = event["rval"] - if completion in pendingIOs: - completedIO = pendingIOs[completion] - del pendingIOs[completion] - completedIO.end_time = ts - completionIO = CompletionIO(ts, thread, completedIO) - completedIO.thread.completedIO(completionIO) - self.ios.append(completionIO) - self.completed(completionIO) - if self.printOnRead: - print str(completionIO) - - # Insert-thread-stop phase - self.ios = sorted(self.ios, key = lambda io: io.start_time) - for threadID in self.threads: - thread = self.threads[threadID] - ionum = None - maxIONum = 0 # only valid if ionum is None - for io in self.ios: - if io.ionum > maxIONum: - maxIONum = io.ionum - if io.start_time > thread.lastTS: - ionum = io.ionum - if ionum % 2 == 1: - ionum = ionum + 1 - break - if not ionum: - if maxIONum % 2 == 1: - maxIONum = maxIONum - 1 - ionum = maxIONum + 2 - for io in self.ios: - if io.ionum >= ionum: - io.ionum = io.ionum + 2 - # TODO: Insert in the right place, don't append and re-sort - self.ios.append(StopThreadIO(ionum, thread.lastTS, thread)) - self.ios = sorted(self.ios, key = lambda io: io.start_time) - - for io in self.ios: - if io.prev and io.prev in io.dependencies: - io.dependencies.remove(io.prev) - if io.isCompletion: - io.dependencies.clear() - for dep in io.dependencies: - dep.numSuccessors = dep.numSuccessors + 1 - - if self.printOnRead and printOnWrite: - print - - with open(outputFileName, "wb") as f: - for io in self.ios: - if printOnWrite and not io.isCompletion: - print str(io) - io.writeTo(f) - -if __name__ == '__main__': - Processor().run(sys.argv[1:]) diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc new file mode 100644 index 00000000000..96adfc18e1c --- /dev/null +++ b/src/rbd_replay/rbd-replay-prep.cc @@ -0,0 +1,1051 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "actions.hpp" +#include "Ser.hpp" + +using namespace std; +using namespace rbd_replay; + +// Allows us to easily expose all the functions to make debugging easier. +#define STATIC static + +struct extent { + extent() : offset(0), length(0) { + } + extent(uint64_t offset, uint64_t length) : offset(offset), length(length) { + } + uint64_t offset; + uint64_t length; +}; + +class IO; + +typedef set > io_set_t; + +typedef map > io_map_t; + +STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable); + +class IO : public boost::enable_shared_from_this { +public: + typedef boost::shared_ptr ptr; + + typedef boost::weak_ptr weak_ptr; + + IO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + ptr prev) + : m_ionum(ionum), + m_start_time(start_time), + m_dependencies(io_set_t()), + m_completion(weak_ptr()), + m_num_successors(0), + m_thread_id(thread_id), + m_prev(prev) { + } + + virtual ~IO() { + } + + uint64_t start_time() const { + return m_start_time; + } + + io_set_t& dependencies() { + return m_dependencies; + } + + const io_set_t& dependencies() const { + return m_dependencies; + } + + void add_dependencies(const io_set_t& deps) { + io_set_t base(m_dependencies); + for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { + ptr dep(*itr); + for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) { + base.insert(*itr2); + } + } + batch_unreachable_from(deps, base, &m_dependencies); + } + + uint64_t num_completion_successors() const { + ptr c(m_completion.lock()); + return c ? c->m_num_successors : 0; + } + + void write_to(Ser& out, io_type iotype) const; + + virtual void write_to(Ser& out) const = 0; + + virtual bool is_completion() const { + return false; + } + + void set_ionum(action_id_t ionum) { + m_ionum = ionum; + } + + action_id_t ionum() const { + return m_ionum; + } + + ptr prev() const { + return m_prev; + } + + void set_num_successors(uint32_t n) { + m_num_successors = n; + } + + uint32_t num_successors() const { + return m_num_successors; + } + + void write_debug_base(ostream& out, string iotype); + + virtual void write_debug(ostream& out) = 0; + + // The result must be stored somewhere, or else m_completion will expire + ptr create_completion(uint64_t start_time, thread_id_t thread_id); + +private: + action_id_t m_ionum; + uint64_t m_start_time; + io_set_t m_dependencies; + boost::weak_ptr m_completion; + uint32_t m_num_successors; + thread_id_t m_thread_id; + ptr m_prev; +}; + +ostream& operator<<(ostream& out, IO::ptr io) { + io->write_debug(out); + return out; +} + +class Thread { +public: + typedef boost::shared_ptr ptr; + + Thread(thread_id_t id, + uint64_t window) + : m_id(id), + m_window(window), + m_pending_io(IO::ptr()), + m_latest_io(IO::ptr()), + m_max_ts(0) { + } + + void insert_ts(uint64_t ts) { + if (m_max_ts == 0 || ts > m_max_ts) { + m_max_ts = ts; + } + } + + uint64_t max_ts() const { + return m_max_ts; + } + + void issued_io(IO::ptr io, const map& threads) { + assert(io); + io_set_t latest_ios; + for (map::const_iterator itr = threads.begin(), end = threads.end(); itr != end; ++itr) { + assertf(itr->second, "id = %ld", itr->first); + ptr thread(itr->second); + if (thread->m_latest_io) { + if (thread->m_latest_io->start_time() + m_window > io->start_time()) { + latest_ios.insert(thread->m_latest_io); + } + } + } + io->add_dependencies(latest_ios); + m_latest_io = io; + m_pending_io = io; + } + + thread_id_t id() const { + return m_id; + } + + IO::ptr pending_io() { + return m_pending_io; + } + +private: + thread_id_t m_id; + uint64_t m_window; + IO::ptr m_pending_io; + IO::ptr m_latest_io; + uint64_t m_max_ts; +}; + +void IO::write_debug_base(ostream& out, string type) { + out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; + bool first = true; + for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { + if (first) { + first = false; + } else { + out << ", "; + } + out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time; + } + out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors(); +} + +class StartThreadIO : public IO { +public: + StartThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_START_THREAD); + } + + void write_debug(ostream& out) { + write_debug_base(out, "start thread"); + } +}; + +class StopThreadIO : public IO { +public: + StopThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_STOP_THREAD); + } + + void write_debug(ostream& out) { + write_debug_base(out, "stop thread"); + } +}; + +class ReadIO : public IO { +public: + ReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_extents(vector()) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_READ); + // TODO: figure out how to handle empty IO, i.e. reads/writes with no extents. + // These happen if the trace cuts off mid-IO. We should just toss it, but it + // might mess up the dependency graph. + assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size()); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_extents[0].offset); + out.write_uint64_t(m_extents[0].length); + } + + void add_extent(const extent& e) { + m_extents.push_back(e); + } + + void write_debug(ostream& out) { + write_debug_base(out, "read"); + out << ", imagectx=" << m_imagectx << ", extents=["; + for (int i = 0, n = m_extents.size(); i < n; i++) { + if (i > 0) { + out << ", "; + } + out << m_extents[i].offset << "+" << m_extents[i].length; + } + out << "]"; + } + +private: + imagectx_id_t m_imagectx; + vector m_extents; +}; + +class WriteIO : public IO { +public: + WriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + const vector& extents) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_extents(extents) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_WRITE); + assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size()); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_extents[0].offset); + out.write_uint64_t(m_extents[0].length); + } + + void write_debug(ostream& out) { + write_debug_base(out, "write"); + out << ", imagectx=" << m_imagectx << ", extents=["; + for (int i = 0, n = m_extents.size(); i < n; i++) { + if (i > 0) { + out << ", "; + } + out << m_extents[i].offset << "+" << m_extents[i].length; + } + out << "]"; + } + +private: + imagectx_id_t m_imagectx; + vector m_extents; +}; + +class AioReadIO : public IO { +public: + AioReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_extents(vector()) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_ASYNC_READ); + assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size()); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_extents[0].offset); + out.write_uint64_t(m_extents[0].length); + } + + void add_extent(const extent& e) { + m_extents.push_back(e); + } + + + void write_debug(ostream& out) { + write_debug_base(out, "aio read"); + out << ", imagectx=" << m_imagectx << ", extents=["; + for (int i = 0, n = m_extents.size(); i < n; i++) { + if (i > 0) { + out << ", "; + } + out << m_extents[i].offset << "+" << m_extents[i].length; + } + out << "]"; + } +private: + imagectx_id_t m_imagectx; + vector m_extents; +}; + +class AioWriteIO : public IO { +public: + AioWriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + const vector& extents) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_extents(extents) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_ASYNC_WRITE); + assertf(m_extents.size() == 1, "m_extents.size() = %d", m_extents.size()); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_extents[0].offset); + out.write_uint64_t(m_extents[0].length); + } + + void write_debug(ostream& out) { + write_debug_base(out, "aio write"); + out << ", imagectx=" << m_imagectx << ", extents=["; + for (int i = 0, n = m_extents.size(); i < n; i++) { + if (i > 0) { + out << ", "; + } + out << m_extents[i].offset << "+" << m_extents[i].length; + } + out << "]"; + } + +private: + imagectx_id_t m_imagectx; + vector m_extents; +}; + +class OpenImageIO : public IO { +public: + OpenImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + const string& name, + const string& snap_name, + bool readonly) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_name(name), + m_snap_name(snap_name), + m_readonly(readonly) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_OPEN_IMAGE); + out.write_uint64_t(m_imagectx); + out.write_string(m_name); + out.write_string(m_snap_name); + out.write_bool(m_readonly); + } + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(ostream& out) { + write_debug_base(out, "open image"); + out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; + } + +private: + imagectx_id_t m_imagectx; + string m_name; + string m_snap_name; + bool m_readonly; +}; + +class CloseImageIO : public IO { +public: + CloseImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx) { + } + + void write_to(Ser& out) const { + IO::write_to(out, IO_CLOSE_IMAGE); + out.write_uint64_t(m_imagectx); + } + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(ostream& out) { + write_debug_base(out, "close image"); + out << ", imagectx=" << m_imagectx; + } + +private: + imagectx_id_t m_imagectx; +}; + +class CompletionIO : public IO { +public: + CompletionIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const { + } + + bool is_completion() const { + return true; + } + + void write_debug(ostream& out) { + write_debug_base(out, "completion"); + } +}; + +STATIC bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) { + return p1->start_time() < p2->start_time(); +} + +void IO::write_to(Ser& out, io_type iotype) const { + out.write_uint8_t(iotype); + out.write_uint32_t(m_ionum); + out.write_uint64_t(m_thread_id); + out.write_uint32_t(m_num_successors); + out.write_uint32_t(num_completion_successors()); + out.write_uint32_t(m_dependencies.size()); + vector deps; + for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { + deps.push_back(*itr); + } + sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time); + for (vector::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) { + out.write_uint32_t((*itr)->m_ionum); + out.write_uint64_t(m_start_time - (*itr)->m_start_time); + } +} + +IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) { + assert(!m_completion.lock()); + IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id)); + m_completion = completion; + completion->m_dependencies.insert(shared_from_this()); + return completion; +} + +STATIC uint64_t min_time(const map& s) { + if (s.empty()) { + return 0; + } + return s.begin()->second->start_time(); +} + +STATIC uint64_t max_time(const map& s) { + if (s.empty()) { + return 0; + } + map::const_iterator itr(s.end()); + --itr; + return itr->second->start_time(); +} + +// TODO: Add unit tests +// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable' +STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) { + if (deps.empty()) { + return; + } + + map searching_for; + for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { + searching_for[(*itr)->ionum()] = *itr; + } + + map boundary; + for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) { + boundary[(*itr)->ionum()] = *itr; + } + + // The boundary horizon is the maximum timestamp of IOs in the boundary. + // This monotonically decreases, because dependencies (which are added to the set) + // have earlier timestamp than the dependent IOs (which were just removed from the set). + uint64_t boundary_horizon = max_time(boundary); + + for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { + if (boundary_horizon >= itr->second->start_time()) { + break; + } + unreachable->insert(itr->second); + searching_for.erase(itr++); + } + if (searching_for.empty()) { + return; + } + + // The searching horizon is the minimum timestamp of IOs in the searching set. + // This monotonically increases, because elements are only removed from the set. + uint64_t searching_horizon = min_time(searching_for); + + while (!boundary.empty()) { + // Take an IO from the end, which has the highest timestamp. + // This reduces the boundary horizon as early as possible, + // which means we can short cut as soon as possible. + map >::iterator b_itr(boundary.end()); + --b_itr; + boost::shared_ptr io(b_itr->second); + boundary.erase(b_itr); + + for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) { + IO::ptr dep(*itr); + assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum()); + io_map_t::iterator p = searching_for.find(dep->ionum()); + if (p != searching_for.end()) { + searching_for.erase(p); + if (dep->start_time() == searching_horizon) { + searching_horizon = min_time(searching_for); + if (searching_horizon == 0) { + return; + } + } + } + boundary[dep->ionum()] = dep; + } + + boundary_horizon = max_time(boundary); + if (boundary_horizon != 0) { + // Anything we're searching for that has a timestamp greater than the + // boundary horizon will never be found, since the boundary horizon + // falls monotonically. + for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { + if (boundary_horizon >= itr->second->start_time()) { + break; + } + unreachable->insert(itr->second); + searching_for.erase(itr++); + } + searching_horizon = min_time(searching_for); + if (searching_horizon == 0) { + return; + } + } + } + + // Anything we're still searching for has not been found. + for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) { + unreachable->insert(itr->second); + } +} + +STATIC void usage(string prog) { + cout << "Usage: " << prog << " [ --window ] " << endl; +} + +__attribute__((noreturn)) STATIC void usage_exit(string prog, string msg) { + cerr << msg << endl; + usage(prog); + exit(1); +} + +class Processor { +public: + Processor() + : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second + m_threads(), + m_io_count(0), + m_recent_completions(io_set_t()), + m_open_images(set()), + m_ios(vector()), + m_pending_ios(map()) { + } + + void run(vector args) { + string input_file_name; + string output_file_name; + bool got_input = false; + bool got_output = false; + for (int i = 1, nargs = args.size(); i < nargs; i++) { + const string& arg(args[i]); + if (arg == "--window") { + if (i == nargs - 1) { + usage_exit(args[0], "--window requires an argument"); + } + m_window = (uint64_t)(1e9 * atof(args[++i].c_str())); + } else if (arg.find("--window=") == 0) { + // TODO: test + printf("Arg: '%s'\n", arg.c_str() + sizeof("--window=")); + m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window="))); + } else if (arg == "-h" || arg == "--help") { + usage(args[0]); + exit(0); + } else if (arg.find("-") == 0) { + usage_exit(args[0], "Unrecognized argument: " + arg); + } else if (!got_input) { + input_file_name = arg; + got_input = true; + } else if (!got_output) { + output_file_name = arg; + got_output = true; + } else { + usage_exit(args[0], "Too many arguments"); + } + } + if (!got_output) { + usage_exit(args[0], "Not enough arguments"); + } + + struct bt_context *ctx = bt_context_create(); + int trace_handle = bt_context_add_trace(ctx, + input_file_name.c_str(), // path + "ctf", // format + NULL, // packet_seek + NULL, // stream_list + NULL); // metadata + assertf(trace_handle >= 0, "trace_handle = %d", trace_handle); + + uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL); + assert(start_time_ns != -1ULL); + + struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx, + NULL, // begin_pos + NULL); // end_pos + assert(itr); + + struct bt_iter *bt_itr = bt_ctf_get_iter(itr); + + uint64_t trace_start = 0; + struct bt_ctf_event *evt; + bool first = true; + while(true) { + evt = bt_ctf_iter_read_event(itr); + if(!evt) { + break; + } + uint64_t ts = bt_ctf_get_timestamp(evt); + assert(ts != -1ULL); + + if (first) { + trace_start = ts; + first = false; + } + ts -= trace_start; + ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is. + + process_event(ts, evt); + + int r = bt_iter_next(bt_itr); + assert(!r); + } + + bt_ctf_iter_destroy(itr); + + insert_thread_stops(); + + for (vector::const_iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) { + IO::ptr io(*itr); + IO::ptr prev(io->prev()); + if (prev) { + // TODO: explain when prev is and isn't a dep + io_set_t::iterator depitr = io->dependencies().find(prev); + if (depitr != io->dependencies().end()) { + io->dependencies().erase(depitr); + } + } + if (io->is_completion()) { + io->dependencies().clear(); + } + for (io_set_t::const_iterator depitr = io->dependencies().begin(); depitr != io->dependencies().end(); ++depitr) { + IO::ptr dep(*depitr); + dep->set_num_successors(dep->num_successors() + 1); + } + } + + ofstream myfile; + myfile.open(output_file_name.c_str(), ios::out | ios::binary); + Ser ser(myfile); + for (vector::iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) { + (*itr)->write_to(ser); + } + myfile.close(); + } + +private: + void insert_thread_stops() { + sort(m_ios.begin(), m_ios.end(), compare_io_ptrs_by_start_time); + for (map::const_iterator itr = m_threads.begin(), end = m_threads.end(); itr != end; ++itr) { + Thread::ptr thread(itr->second); + const action_id_t none = -1; + action_id_t ionum = none; + action_id_t maxIONum = 0; // only valid if ionum is none + for (vector::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) { + IO::ptr io(*itr2); + if (io->ionum() > maxIONum) { + maxIONum = io->ionum(); + } + if (io->start_time() > thread->max_ts()) { + ionum = io->ionum(); + if (ionum & 1) { + ionum++; + } + break; + } + } + if (ionum == none) { + if (maxIONum & 1) { + maxIONum--; + } + ionum = maxIONum + 2; + } + for (vector::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) { + IO::ptr io(*itr2); + if (io->ionum() >= ionum) { + io->set_ionum(io->ionum() + 2); + } + } + IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), thread->id())); + vector::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time); + m_ios.insert(insertion_point, stop_thread_io); + } + } + + void process_event(uint64_t ts, struct bt_ctf_event *evt) { + const char *event_name = bt_ctf_event_name(evt); + const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt, + BT_STREAM_EVENT_CONTEXT); + assert(scope_context); + const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt, + BT_EVENT_FIELDS); + assert(scope_fields); + + const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id"); + assert(pthread_id_field); + thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field); + Thread::ptr &thread(m_threads[threadID]); + if (!thread) { + thread.reset(new Thread(threadID, m_window)); + IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID)); + m_ios.push_back(io); + } + thread->insert_ts(ts); + + class FieldLookup { + public: + FieldLookup(struct bt_ctf_event *evt, + const struct bt_definition *scope) + : m_evt(evt), + m_scope(scope) { + } + + const char* string(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + assertf(field, "field name = '%s'", name); + const char* c = bt_ctf_get_string(field); + int err = bt_ctf_field_get_error(); + assertf(c && err == 0, "field name = '%s', err = %d", name, err); + return c; + } + + int64_t int64(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + assertf(field, "field name = '%s'", name); + int64_t val = bt_ctf_get_int64(field); + int err = bt_ctf_field_get_error(); + assertf(err == 0, "field name = '%s', err = %d", name, err); + return val; + } + + uint64_t uint64(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + assertf(field, "field name = '%s'", name); + uint64_t val = bt_ctf_get_uint64(field); + int err = bt_ctf_field_get_error(); + assertf(err == 0, "field name = '%s', err = %d", name, err); + return val; + } + + private: + struct bt_ctf_event *m_evt; + const struct bt_definition *m_scope; + } fields(evt, scope_fields); + + if (strcmp(event_name, "librbd:read_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly); + action_id_t ionum = next_id(); + IO::ptr io(new ReadIO(ionum, ts, threadID, thread->pending_io(), imagectx)); + io->add_dependencies(m_recent_completions); + thread->issued_io(io, m_threads); + m_ios.push_back(io); + } else if (strcmp(event_name, "librbd:open_image_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + action_id_t ionum = next_id(); + IO::ptr io(new OpenImageIO(ionum, ts, threadID, thread->pending_io(), imagectx, name, snap_name, readonly)); + io->add_dependencies(m_recent_completions); + thread->issued_io(io, m_threads); + m_ios.push_back(io); + } else if (strcmp(event_name, "librbd:open_image_exit") == 0) { + IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); + m_ios.push_back(completionIO); + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->pending_io())); + assert(io); + m_open_images.insert(io->imagectx()); + } else if (strcmp(event_name, "librbd:close_image_enter") == 0) { + imagectx_id_t imagectx = fields.uint64("imagectx"); + action_id_t ionum = next_id(); + IO::ptr io(new CloseImageIO(ionum, ts, threadID, thread->pending_io(), imagectx)); + io->add_dependencies(m_recent_completions); + thread->issued_io(thread->pending_io(), m_threads); + m_ios.push_back(thread->pending_io()); + } else if (strcmp(event_name, "librbd:close_image_exit") == 0) { + IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); + m_ios.push_back(completionIO); + completed(completionIO); + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->pending_io())); + assert(io); + m_open_images.erase(io->imagectx()); + } else if (strcmp(event_name, "librbd:read_extent") == 0) { + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->pending_io())); + assert(io); + uint64_t offset = fields.uint64("offset"); + uint64_t length = fields.uint64("length"); + io->add_extent(extent(offset, length)); + } else if (strcmp(event_name, "librbd:read_exit") == 0) { + IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); + m_ios.push_back(completionIO); + completed(completionIO); + } else if (strcmp(event_name, "librbd:write_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("buf_len"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly); + action_id_t ionum = next_id(); + vector extents; + extents.push_back(extent(offset, length)); + IO::ptr io(new WriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, extents)); + io->add_dependencies(m_recent_completions); + thread->issued_io(io, m_threads); + m_ios.push_back(io); + } else if (strcmp(event_name, "librbd:write_exit") == 0) { + IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); + m_ios.push_back(completionIO); + completed(completionIO); + } else if (strcmp(event_name, "librbd:aio_read_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t completion = fields.uint64("completion"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly); + action_id_t ionum = next_id(); + IO::ptr io(new AioReadIO(ionum, ts, threadID, thread->pending_io(), imagectx)); + io->add_dependencies(m_recent_completions); + m_ios.push_back(io); + thread->issued_io(io, m_threads); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_read_extent") == 0) { + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->pending_io())); + assert(io); + uint64_t offset = fields.uint64("offset"); + uint64_t length = fields.uint64("length"); + io->add_extent(extent(offset, length)); + } else if (strcmp(event_name, "librbd:aio_write_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("len"); + uint64_t completion = fields.uint64("completion"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly); + action_id_t ionum = next_id(); + vector extents; + extents.push_back(extent(offset, length)); + IO::ptr io(new AioWriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, extents)); + io->add_dependencies(m_recent_completions); + thread->issued_io(io, m_threads); + m_ios.push_back(io); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) { + uint64_t completion = fields.uint64("completion"); + map::iterator itr = m_pending_ios.find(completion); + if (itr != m_pending_ios.end()) { + IO::ptr completedIO(itr->second); + m_pending_ios.erase(itr); + IO::ptr completionIO(completedIO->create_completion(ts, threadID)); + m_ios.push_back(completionIO); + completed(completionIO); + } + } + + // cout << ts << "\t" << event_name << "\tthreadID = " << threadID << endl; + } + + action_id_t next_id() { + action_id_t id = m_io_count; + m_io_count += 2; + return id; + } + + void completed(IO::ptr io) { + uint64_t limit = io->start_time() < m_window ? 0 : io->start_time() - m_window; + for (io_set_t::iterator itr = m_recent_completions.begin(); itr != m_recent_completions.end(); ) { + if ((*itr)->start_time() < limit) { + m_recent_completions.erase(itr++); + } else { + ++itr; + } + } + m_recent_completions.insert(io); + } + + void require_image(uint64_t ts, + Thread::ptr thread, + imagectx_id_t imagectx, + const string& name, + const string& snap_name, + bool readonly) { + assert(thread); + if (m_open_images.count(imagectx) > 0) { + return; + } + action_id_t ionum = next_id(); + IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), thread->pending_io(), imagectx, name, snap_name, readonly)); + io->add_dependencies(m_recent_completions); + thread->issued_io(io, m_threads); + m_ios.push_back(io); + IO::ptr completionIO(io->create_completion(ts - 1, thread->id())); + m_ios.push_back(completionIO); + completed(completionIO); + m_open_images.insert(imagectx); + } + + uint64_t m_window; + map m_threads; + uint32_t m_io_count; + io_set_t m_recent_completions; + set m_open_images; + vector m_ios; + + // keyed by completion + map m_pending_ios; +}; + +int main(int argc, char** argv) { + vector args; + for (int i = 0; i < argc; i++) { + args.push_back(string(argv[i])); + } + + Processor p; + p.run(args); +}