rbd-replay: Convert prep-for-replay.py to rbd-replay-prep.cc

Signed-off-by: Adam Crume <adamcrume@gmail.com>
This commit is contained in:
Adam Crume 2014-08-07 13:38:19 -07:00 committed by Sage Weil
parent e18748ed99
commit 0f052f8cef
10 changed files with 1189 additions and 539 deletions

View File

@ -1,3 +0,0 @@
#!/bin/bash
PYTHONPATH=~/babeltrace/bindings/python/:~/babeltrace/bindings/python/.libs/ ../../src/rbd_replay/prep-for-replay.py traces/ust/uid/10002/64-bit replay.bin

View File

@ -0,0 +1,3 @@
#!/bin/bash
../../src/rbd-replay-prep traces/ust/uid/10002/64-bit replay.bin

1
src/.gitignore vendored
View File

@ -67,6 +67,7 @@ Makefile
/rbd
/rbd-fuse
/rbd-replay
/rbd-replay-prep
/rest-bench
/sample.fetch_config
/TAGS

View File

@ -4,7 +4,8 @@ librbd_replay_la_SOURCES = rbd_replay/actions.cc \
rbd_replay/ImageNameMap.cc \
rbd_replay/PendingIO.cc \
rbd_replay/rbd_loc.cc \
rbd_replay/Replayer.cc
rbd_replay/Replayer.cc \
rbd_replay/Ser.cc
librbd_replay_la_LIBADD = $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL)
@ -16,7 +17,9 @@ noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
rbd_replay/PendingIO.hpp \
rbd_replay/rbd_loc.hpp \
rbd_replay/rbd_replay_debug.hpp \
rbd_replay/Replayer.hpp
rbd_replay/Replayer.hpp \
rbd_replay/Ser.hpp
rbd_replay_SOURCES = rbd_replay/rbd-replay.cc
rbd_replay_LDADD = $(LIBRBD) \
@ -27,3 +30,14 @@ rbd_replay_LDADD = $(LIBRBD) \
if LINUX
bin_PROGRAMS += rbd-replay
endif #LINUX
# TODO: See if we need any new dependencies
rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
rbd_replay_prep_LDADD = $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
librbd_replay.la \
-lbabeltrace \
-lbabeltrace-ctf \
-lboost_date_time
bin_PROGRAMS += rbd-replay-prep

53
src/rbd_replay/Ser.cc Normal file
View File

@ -0,0 +1,53 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "Ser.hpp"
#include <arpa/inet.h>
#include <cstdlib>
#include <endian.h>
rbd_replay::Ser::Ser(std::ostream &out)
: m_out(out) {
}
void rbd_replay::Ser::write_uint8_t(uint8_t data) {
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint16_t(uint16_t data) {
data = htons(data);
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint32_t(uint32_t data) {
data = htonl(data);
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_uint64_t(uint64_t data) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
data = (static_cast<uint64_t>(htonl(data)) << 32 | htonl(data >> 32));
#endif
m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
}
void rbd_replay::Ser::write_string(const std::string& data) {
write_uint32_t(data.length());
m_out.write(data.data(), data.length());
}
void rbd_replay::Ser::write_bool(bool data) {
write_uint8_t(data ? 1 : 0);
}

45
src/rbd_replay/Ser.hpp Normal file
View File

@ -0,0 +1,45 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef _INCLUDED_RBD_REPLAY_SER_HPP
#define _INCLUDED_RBD_REPLAY_SER_HPP
#include <iostream>
#include <stdint.h>
namespace rbd_replay {
class Ser {
public:
Ser(std::ostream &out);
void write_uint8_t(uint8_t);
void write_uint16_t(uint16_t);
void write_uint32_t(uint32_t);
void write_uint64_t(uint64_t);
void write_string(const std::string&);
void write_bool(bool b);
private:
std::ostream &m_out;
};
}
#endif

View File

@ -56,21 +56,21 @@ Action::ptr Action::read_from(Deser &d) {
}
DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
switch (type) {
case 0:
case IO_START_THREAD:
return StartThreadAction::read_from(dummy, d);
case 1:
case IO_STOP_THREAD:
return StopThreadAction::read_from(dummy, d);
case 2:
case IO_READ:
return ReadAction::read_from(dummy, d);
case 3:
case IO_WRITE:
return WriteAction::read_from(dummy, d);
case 4:
case IO_ASYNC_READ:
return AioReadAction::read_from(dummy, d);
case 5:
case IO_ASYNC_WRITE:
return AioWriteAction::read_from(dummy, d);
case 6:
case IO_OPEN_IMAGE:
return OpenImageAction::read_from(dummy, d);
case 7:
case IO_CLOSE_IMAGE:
return CloseImageAction::read_from(dummy, d);
default:
cerr << "Invalid action type: " << type << std::endl;

View File

@ -40,6 +40,18 @@ struct dependency_d {
}
};
// These are written to files, so don't change existing assignments.
enum io_type {
IO_START_THREAD,
IO_STOP_THREAD,
IO_READ,
IO_WRITE,
IO_ASYNC_READ,
IO_ASYNC_WRITE,
IO_OPEN_IMAGE,
IO_CLOSE_IMAGE,
};
class PendingIO;

View File

@ -1,526 +0,0 @@
#!/usr/bin/python
# -*- mode:Python; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
# vim: ts=8 sw=2 smarttab
#
# Ceph - scalable distributed file system
#
# Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
#
# This is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1, as published by the Free Software
# Foundation. See file COPYING.
#
#
import argparse
from babeltrace import *
import datetime
import struct
import sys
class Extent(object):
def __init__(self, offset, length):
self.offset = offset
self.length = length
def __str__(self):
return str(self.offset) + "+" + str(self.length)
def __repr__(self):
return "Extent(" + str(self.offset) + "," + str(self.length) + ")"
class Thread(object):
def __init__(self, id, threads, window):
self.id = id
self.threads = threads
self.window = window
self.pendingIO = None
self.latestIO = None # may not be meaningful
self.latestCompletion = None # may not be meaningful
self.lastTS = None
def insertTS(self, ts):
if not self.lastTS or ts > self.lastTS:
self.lastTS = ts
def issuedIO(self, io):
latestIOs = []
for threadID in self.threads:
thread = self.threads[threadID]
if thread.latestIO and thread.latestIO.start_time > io.start_time - self.window:
latestIOs.append(thread.latestIO)
io.addDependencies(latestIOs)
self.latestIO = io
def completedIO(self, io):
self.latestCompletion = io
def batchUnreachableFrom(deps, base):
if not base:
return set()
if not deps:
return set()
unreachable = set()
searchingFor = set(deps)
discovered = set()
boundary = set(base)
boundaryHorizon = None
for io in boundary:
if not boundaryHorizon or io.start_time > boundaryHorizon:
boundaryHorizon = io.start_time
searchingHorizon = None
for io in searchingFor:
if not searchingHorizon or io.start_time < searchingHorizon:
searchingHorizon = io.start_time
tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
searchingFor.difference_update(tmp)
unreachable.update(tmp)
while boundary and searchingFor:
io = boundary.pop()
for dep in io.dependencies:
if dep in searchingFor:
searchingFor.remove(dep)
if dep.start_time == searchingHorizon:
searchingHorizon = None
for io in searchingFor:
if not searchingHorizon or io.start_time < searchingHorizon:
searchingHorizon = io.start_time
if not dep in discovered:
boundary.add(dep)
if io.start_time == boundaryHorizon:
boundaryHorizon = None
for io in boundary:
if not boundaryHorizon or io.start_time > boundaryHorizon:
boundaryHorizon = io.start_time
if boundaryHorizon:
tmp = [x for x in searchingFor if boundaryHorizon < x.start_time]
searchingFor.difference_update(tmp)
unreachable.update(tmp)
searchingHorizon = None
for io in searchingFor:
if not searchingHorizon or io.start_time < searchingHorizon:
searchingHorizon = io.start_time
unreachable.update(searchingFor)
return unreachable
class IO(object):
def __init__(self, ionum, start_time, thread, prev):
self.ionum = ionum
self.start_time = start_time
self.thread = thread
self.dependencies = set()
self.isCompletion = False
self.prev = prev
self.numSuccessors = 0
self.completion = None
def reachableFrom(self, ios):
if not ios:
return False
discovered = set()
boundary = set(ios)
horizon = None
for io in boundary:
if not horizon or io.start_time > horizon:
horizon = io.start_time
if horizon < self.start_time:
return False
while boundary:
io = boundary.pop()
for dep in io.dependencies:
if self == dep:
return True
if not dep in discovered:
boundary.add(dep)
if io.start_time == horizon:
horizon = None
for io in boundary:
if not horizon or io.start_time > horizon:
horizon = io.start_time
if horizon and horizon < self.start_time:
return False
return False
def addDependency(self, dep):
if not dep.reachableFrom(self.dependencies):
self.dependencies.add(dep)
def addDependencies(self, deps):
base = set(self.dependencies)
for dep in deps:
base.update(dep.dependencies)
unreachable = batchUnreachableFrom(deps, base)
self.dependencies.update(unreachable)
def depIDs(self):
ids = []
for dep in self.dependencies:
ids.append(dep.ionum)
return ids
def depMap(self):
deps = dict()
for dep in self.dependencies:
deps[dep.ionum] = self.start_time - dep.start_time
return deps
def addThreadCompletionDependencies(self, threads, recentCompletions):
self.addDependencies(recentCompletions)
def numCompletionSuccessors(self):
return self.completion.numSuccessors if self.completion else 0
def writeTo(self, f, iotype):
f.write(struct.pack("!BIQIII", iotype, self.ionum, self.thread.id, self.numSuccessors, self.numCompletionSuccessors(), len(self.dependencies)))
for dep in self.dependencies:
f.write(struct.pack("!IQ", dep.ionum, self.start_time - dep.start_time))
class StartThreadIO(IO):
def __init__(self, ionum, start_time, thread):
IO.__init__(self, ionum, start_time, thread, None)
def writeTo(self, f):
IO.writeTo(self, f, 0)
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": start thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class StopThreadIO(IO):
def __init__(self, ionum, start_time, thread):
IO.__init__(self, ionum, start_time, thread, None)
def writeTo(self, f):
IO.writeTo(self, f, 1)
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": stop thread, thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class ReadIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
self.extents = extents
def writeTo(self, f):
IO.writeTo(self, f, 2)
if len(self.extents) != 1:
raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
extent = self.extents[0]
f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class WriteIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
self.extents = extents
def writeTo(self, f):
IO.writeTo(self, f, 3)
if len(self.extents) != 1:
raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
extent = self.extents[0]
f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class AioReadIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
self.extents = extents
def writeTo(self, f):
IO.writeTo(self, f, 4)
if len(self.extents) != 1:
raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
extent = self.extents[0]
f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio read, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class AioWriteIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx, extents):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
self.extents = extents
def writeTo(self, f):
IO.writeTo(self, f, 5)
if len(self.extents) != 1:
raise ValueError("Expected read to have 1 extent, but it had " + str(len(self.extents)))
extent = self.extents[0]
f.write(struct.pack("!QQQ", self.imagectx, extent.offset, extent.length))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": aio write, extents = " + str(self.extents) + ", thread = " + str(self.thread.id) + ", deps = " + str(self.depMap()) + ", numSuccessors = " + str(self.numSuccessors) + ", numCompletionSuccessors = " + str(self.numCompletionSuccessors())
class OpenImageIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx, name, snap_name, readonly):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
self.name = name
self.snap_name = snap_name
self.readonly = readonly
def writeTo(self, f):
IO.writeTo(self, f, 6)
f.write(struct.pack("!QI", self.imagectx, len(self.name)))
f.write(self.name)
f.write(struct.pack("!I", len(self.snap_name)))
f.write(self.snap_name)
f.write(struct.pack("!b", self.readonly))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": open image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", name = " + self.name + ", snap_name = " + self.snap_name + ", readonly = " + str(self.readonly) + ", deps = " + str(self.depMap())
class CloseImageIO(IO):
def __init__(self, ionum, start_time, thread, prev, imagectx):
IO.__init__(self, ionum, start_time, thread, prev)
self.imagectx = imagectx
def writeTo(self, f):
IO.writeTo(self, f, 7)
f.write(struct.pack("!Q", self.imagectx))
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": close image, thread = " + str(self.thread.id) + ", imagectx = " + str(self.imagectx) + ", deps = " + str(self.depMap())
class CompletionIO(IO):
def __init__(self, start_time, thread, baseIO):
IO.__init__(self, baseIO.ionum + 1, start_time, thread, None)
self.baseIO = baseIO
self.isCompletion = True
self.addDependency(baseIO)
baseIO.completion = self
def writeTo(self, f):
pass
def __str__(self):
return str(self.ionum) + ": " + str(self.start_time * 1e-6) + ": completion, thread = " + str(self.thread.id) + ", baseIO = " + str(self.baseIO) + ", deps = " + str(self.depMap())
class Processor(object):
def __init__(self):
self.window = 1 * 1e9
self.threads = {}
self.ioCount = 0
self.recentCompletions = []
self.openImages = {}
self.threads = {}
self.ios = []
def nextID(self):
val = self.ioCount
self.ioCount = self.ioCount + 2
return val
def completed(self, io):
self.recentCompletions.append(io)
self.recentCompletions[:] = [x for x in self.recentCompletions if x.start_time > io.start_time - self.window]
def requireImage(self, ts, thread, imagectx, name, snap_name, readonly):
if imagectx in self.openImages:
return
ionum = self.nextID()
thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
self.ios.append(completionIO)
self.completed(completionIO)
self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx
if self.printOnRead:
print str(thread.pendingIO)
def run(self, raw_args):
parser = argparse.ArgumentParser(description='convert librbd trace output to an rbd-replay input file.')
parser.add_argument('--print-on-read', action="store_true", help='print events as they are read in (for debugging)')
parser.add_argument('--print-on-write', action="store_true", help='print events as they are written out (for debugging)')
parser.add_argument('--window', default=1, type=float, help='size of the window, in seconds. Larger values slow down processing, and smaller values may miss dependencies. Default: 1')
parser.add_argument('input', help='trace to read')
parser.add_argument('output', help='replay file to write')
args = parser.parse_args(raw_args)
self.window = 1e9 * args.window
inputFileName = args.input
outputFileName = args.output
pendingIOs = {}
limit = 100000000000
self.printOnRead = args.print_on_read
printOnWrite = args.print_on_write
traces = TraceCollection()
traces.add_trace(inputFileName, "ctf")
# Parse phase
trace_start = None
count = 0
for event in traces.events:
count = count + 1
if count > limit:
break
ts = event.timestamp
if not trace_start:
trace_start = ts
ts = ts - trace_start
threadID = event["pthread_id"]
if threadID in self.threads:
thread = self.threads[threadID]
else:
thread = Thread(threadID, self.threads, self.window)
self.threads[threadID] = thread
ionum = self.nextID()
io = StartThreadIO(ionum, ts, thread)
self.ios.append(io)
if self.printOnRead:
print str(io)
thread.insertTS(ts)
if event.name == "librbd:read_enter":
name = event["name"]
snap_name = event["snap_name"]
readonly = event["read_only"]
imagectx = event["imagectx"]
self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
ionum = self.nextID()
thread.pendingIO = ReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
elif event.name == "librbd:open_image_enter":
imagectx = event["imagectx"]
name = event["name"]
snap_name = event["snap_name"]
readid = event["id"]
readonly = event["read_only"]
ionum = self.nextID()
thread.pendingIO = OpenImageIO(ionum, ts, thread, thread.pendingIO, imagectx, name, snap_name, readonly)
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
elif event.name == "librbd:open_image_exit":
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
self.ios.append(completionIO)
self.completed(completionIO)
self.openImages[thread.pendingIO.imagectx] = thread.pendingIO.imagectx
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:close_image_enter":
imagectx = event["imagectx"]
ionum = self.nextID()
thread.pendingIO = CloseImageIO(ionum, ts, thread, thread.pendingIO, imagectx)
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
elif event.name == "librbd:close_image_exit":
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
self.ios.append(completionIO)
self.completed(completionIO)
del self.openImages[thread.pendingIO.imagectx]
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:read_extent":
offset = event["offset"]
length = event["length"]
thread.pendingIO.extents.append(Extent(offset, length))
elif event.name == "librbd:read_exit":
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
self.ios.append(completionIO)
completed(completionIO)
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:write_enter":
name = event["name"]
snap_name = event["snap_name"]
readonly = event["read_only"]
offset = event["off"]
length = event["buf_len"]
imagectx = event["imagectx"]
self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
ionum = self.nextID()
thread.pendingIO = WriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
elif event.name == "librbd:write_exit":
thread.pendingIO.end_time = ts
completionIO = CompletionIO(ts, thread, thread.pendingIO)
thread.completedIO(completionIO)
self.ios.append(completionIO)
completed(completionIO)
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:aio_read_enter":
name = event["name"]
snap_name = event["snap_name"]
readonly = event["read_only"]
completion = event["completion"]
imagectx = event["imagectx"]
self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
ionum = self.nextID()
thread.pendingIO = AioReadIO(ionum, ts, thread, thread.pendingIO, imagectx, [])
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
self.ios.append(thread.pendingIO)
thread.issuedIO(thread.pendingIO)
pendingIOs[completion] = thread.pendingIO
elif event.name == "librbd:aio_read_extent":
offset = event["offset"]
length = event["length"]
thread.pendingIO.extents.append(Extent(offset, length))
elif event.name == "librbd:aio_read_exit":
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:aio_write_enter":
name = event["name"]
snap_name = event["snap_name"]
readonly = event["read_only"]
offset = event["off"]
length = event["len"]
completion = event["completion"]
imagectx = event["imagectx"]
self.requireImage(ts, thread, imagectx, name, snap_name, readonly)
ionum = self.nextID()
thread.pendingIO = AioWriteIO(ionum, ts, thread, thread.pendingIO, imagectx, [Extent(offset, length)])
thread.pendingIO.addThreadCompletionDependencies(self.threads, self.recentCompletions)
thread.issuedIO(thread.pendingIO)
self.ios.append(thread.pendingIO)
pendingIOs[completion] = thread.pendingIO
if self.printOnRead:
print str(thread.pendingIO)
elif event.name == "librbd:aio_complete_enter":
completion = event["completion"]
retval = event["rval"]
if completion in pendingIOs:
completedIO = pendingIOs[completion]
del pendingIOs[completion]
completedIO.end_time = ts
completionIO = CompletionIO(ts, thread, completedIO)
completedIO.thread.completedIO(completionIO)
self.ios.append(completionIO)
self.completed(completionIO)
if self.printOnRead:
print str(completionIO)
# Insert-thread-stop phase
self.ios = sorted(self.ios, key = lambda io: io.start_time)
for threadID in self.threads:
thread = self.threads[threadID]
ionum = None
maxIONum = 0 # only valid if ionum is None
for io in self.ios:
if io.ionum > maxIONum:
maxIONum = io.ionum
if io.start_time > thread.lastTS:
ionum = io.ionum
if ionum % 2 == 1:
ionum = ionum + 1
break
if not ionum:
if maxIONum % 2 == 1:
maxIONum = maxIONum - 1
ionum = maxIONum + 2
for io in self.ios:
if io.ionum >= ionum:
io.ionum = io.ionum + 2
# TODO: Insert in the right place, don't append and re-sort
self.ios.append(StopThreadIO(ionum, thread.lastTS, thread))
self.ios = sorted(self.ios, key = lambda io: io.start_time)
for io in self.ios:
if io.prev and io.prev in io.dependencies:
io.dependencies.remove(io.prev)
if io.isCompletion:
io.dependencies.clear()
for dep in io.dependencies:
dep.numSuccessors = dep.numSuccessors + 1
if self.printOnRead and printOnWrite:
print
with open(outputFileName, "wb") as f:
for io in self.ios:
if printOnWrite and not io.isCompletion:
print str(io)
io.writeTo(f)
if __name__ == '__main__':
Processor().run(sys.argv[1:])

File diff suppressed because it is too large Load Diff