Merge pull request #4863 from ceph/wip-9964-nosharding

rados import/export (minus pgls sharding)

Reviewed-by: David Zafman <dzafman@redhat.com>
Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
David Zafman 2015-06-10 16:57:15 -07:00
commit 2671775437
18 changed files with 1353 additions and 2517 deletions

View File

@ -49,6 +49,10 @@ public:
return key;
}
void set_key(const std::string &key_) {
key = key_;
}
string to_str() const;
uint32_t get_hash() const {

View File

@ -1103,36 +1103,46 @@ def main(argv):
ERRORS += verify(DATADIR, EC_POOL, EC_NAME)
if EXP_ERRORS == 0:
NEWPOOL = "import-rados-pool"
NEWPOOL = "rados-import-pool"
cmd = "./rados mkpool {pool}".format(pool=NEWPOOL)
logging.debug(cmd)
ret = call(cmd, shell=True, stdout=nullfd, stderr=nullfd)
print "Test import-rados"
print "Test rados import"
first = True
for osd in [f for f in os.listdir(OSDDIR) if os.path.isdir(os.path.join(OSDDIR, f)) and string.find(f, "osd") == 0]:
dir = os.path.join(TESTDIR, osd)
for pg in [f for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))]:
if string.find(pg, "{id}.".format(id=REPID)) != 0:
continue
file = os.path.join(dir, pg)
# This should do nothing
cmd = "./ceph-objectstore-tool --dry-run import-rados {pool} {file}".format(pool=NEWPOOL, file=file)
if first:
first = False
# This should do nothing
cmd = "./rados import -p {pool} --dry-run {file}".format(pool=NEWPOOL, file=file)
logging.debug(cmd)
ret = call(cmd, shell=True, stdout=nullfd)
if ret != 0:
logging.error("Rados import --dry-run failed from {file} with {ret}".format(file=file, ret=ret))
ERRORS += 1
cmd = "./rados -p {pool} ls".format(pool=NEWPOOL)
logging.debug(cmd)
data = check_output(cmd, shell=True)
if data:
logging.error("'{data}'".format(data=data))
logging.error("Found objects after dry-run")
ERRORS += 1
cmd = "./rados import -p {pool} {file}".format(pool=NEWPOOL, file=file)
logging.debug(cmd)
ret = call(cmd, shell=True, stdout=nullfd)
if ret != 0:
logging.error("Import-rados failed from {file} with {ret}".format(file=file, ret=ret))
logging.error("Rados import failed from {file} with {ret}".format(file=file, ret=ret))
ERRORS += 1
cmd = "./ceph-objectstore-tool import-rados {pool} {file}".format(pool=NEWPOOL, file=file)
cmd = "./rados import -p {pool} --no-overwrite {file}".format(pool=NEWPOOL, file=file)
logging.debug(cmd)
ret = call(cmd, shell=True, stdout=nullfd)
if ret != 0:
logging.error("Import-rados failed from {file} with {ret}".format(file=file, ret=ret))
ERRORS += 1
cmd = "./ceph-objectstore-tool --no-overwrite import-rados {pool} {file}".format(pool=NEWPOOL, file=file)
logging.debug(cmd)
ret = call(cmd, shell=True, stdout=nullfd)
if ret != 0:
logging.error("Import-rados failed from {file} with {ret}".format(file=file, ret=ret))
logging.error("Rados import --no-overwrite failed from {file} with {ret}".format(file=file, ret=ret))
ERRORS += 1
ERRORS += verify(DATADIR, NEWPOOL, REP_NAME)

View File

@ -14,9 +14,9 @@ bin_DEBUGPROGRAMS += ceph_radosacl
rados_SOURCES = \
tools/rados/rados.cc \
tools/rados/rados_import.cc \
tools/rados/rados_export.cc \
tools/rados/rados_sync.cc
tools/RadosDump.cc \
tools/rados/RadosImport.cc \
tools/rados/PoolDump.cc
rados_SOURCES += common/obj_bencher.cc # needs cleanup so it can go in libcommon.la
rados_LDADD = libcls_lock_client.la $(LIBRADOS) $(CEPH_GLOBAL)
bin_PROGRAMS += rados

View File

@ -14,8 +14,8 @@ bin_DEBUGPROGRAMS += ceph-kvstore-tool
if WITH_OSD
ceph_objectstore_tool_SOURCES = tools/ceph_objectstore_tool.cc
ceph_objectstore_tool_LDADD = $(LIBOSD) $(LIBOS) $(CEPH_GLOBAL) $(BOOST_PROGRAM_OPTIONS_LIBS) $(LIBRADOS)
ceph_objectstore_tool_SOURCES = tools/ceph_objectstore_tool.cc tools/RadosDump.cc
ceph_objectstore_tool_LDADD = $(LIBOSD) $(LIBOS) $(CEPH_GLOBAL) $(BOOST_PROGRAM_OPTIONS_LIBS)
if LINUX
ceph_objectstore_tool_LDADD += -ldl
endif # LINUX

View File

@ -54,4 +54,7 @@ noinst_HEADERS += \
tools/cephfs/Dumper.h \
tools/cephfs/TableTool.h \
tools/cephfs/MDSUtility.h \
tools/rados/rados_sync.h
tools/RadosDump.h \
tools/rados/RadosImport.h \
tools/ceph_objectstore_tool.h \
tools/rados/PoolDump.h

168
src/tools/RadosDump.cc Normal file
View File

@ -0,0 +1,168 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "RadosDump.h"
int RadosDump::read_super()
{
bufferlist ebl;
bufferlist::iterator ebliter = ebl.begin();
ssize_t bytes;
bytes = ebl.read_fd(file_fd, super_header::FIXED_LENGTH);
if ((size_t)bytes != super_header::FIXED_LENGTH) {
cerr << "Unexpected EOF" << std::endl;
return -EFAULT;
}
sh.decode(ebliter);
return 0;
}
int RadosDump::get_header(header *h)
{
assert (h != NULL);
bufferlist ebl;
bufferlist::iterator ebliter = ebl.begin();
ssize_t bytes;
bytes = ebl.read_fd(file_fd, sh.header_size);
if ((size_t)bytes != sh.header_size) {
cerr << "Unexpected EOF" << std::endl;
return -EFAULT;
}
h->decode(ebliter);
return 0;
}
int RadosDump::get_footer(footer *f)
{
assert(f != NULL);
bufferlist ebl;
bufferlist::iterator ebliter = ebl.begin();
ssize_t bytes;
bytes = ebl.read_fd(file_fd, sh.footer_size);
if ((size_t)bytes != sh.footer_size) {
cerr << "Unexpected EOF" << std::endl;
return EFAULT;
}
f->decode(ebliter);
if (f->magic != endmagic) {
cerr << "Bad footer magic" << std::endl;
return -EFAULT;
}
return 0;
}
int RadosDump::read_section(sectiontype_t *type, bufferlist *bl)
{
header hdr;
ssize_t bytes;
int ret = get_header(&hdr);
if (ret)
return ret;
*type = hdr.type;
bl->clear();
bytes = bl->read_fd(file_fd, hdr.size);
if (bytes != hdr.size) {
cerr << "Unexpected EOF" << std::endl;
return -EFAULT;
}
if (hdr.size > 0) {
footer ft;
ret = get_footer(&ft);
if (ret)
return ret;
}
return 0;
}
int RadosDump::skip_object(bufferlist &bl)
{
bufferlist::iterator ebliter = bl.begin();
bufferlist ebl;
bool done = false;
while(!done) {
sectiontype_t type;
int ret = read_section(&type, &ebl);
if (ret)
return ret;
ebliter = ebl.begin();
if (type >= END_OF_TYPES) {
cout << "Skipping unknown object section type" << std::endl;
continue;
}
switch(type) {
case TYPE_DATA:
case TYPE_ATTRS:
case TYPE_OMAP_HDR:
case TYPE_OMAP:
#ifdef DIAGNOSTIC
cerr << "Skip type " << (int)type << std::endl;
#endif
break;
case TYPE_OBJECT_END:
done = true;
break;
default:
cerr << "Can't skip unknown type: " << type << std::endl;
return -EFAULT;
}
}
return 0;
}
//Write super_header with its fixed 16 byte length
void RadosDump::write_super()
{
if (dry_run) {
return;
}
bufferlist superbl;
super_header sh;
footer ft;
header hdr(TYPE_NONE, 0);
hdr.encode(superbl);
sh.magic = super_header::super_magic;
sh.version = super_header::super_ver;
sh.header_size = superbl.length();
superbl.clear();
ft.encode(superbl);
sh.footer_size = superbl.length();
superbl.clear();
sh.encode(superbl);
assert(super_header::FIXED_LENGTH == superbl.length());
superbl.write_fd(file_fd);
}

396
src/tools/RadosDump.h Normal file
View File

@ -0,0 +1,396 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef RADOS_DUMP_H_
#define RADOS_DUMP_H_
#include <stdint.h>
#include "include/buffer.h"
#include "include/encoding.h"
#include "osd/osd_types.h"
#include "osd/OSDMap.h"
typedef uint8_t sectiontype_t;
typedef uint32_t mymagic_t;
typedef int64_t mysize_t;
enum {
TYPE_NONE = 0,
TYPE_PG_BEGIN,
TYPE_PG_END,
TYPE_OBJECT_BEGIN,
TYPE_OBJECT_END,
TYPE_DATA,
TYPE_ATTRS,
TYPE_OMAP_HDR,
TYPE_OMAP,
TYPE_PG_METADATA,
TYPE_POOL_BEGIN,
TYPE_POOL_END,
END_OF_TYPES, //Keep at the end
};
const uint16_t shortmagic = 0xffce; //goes into stream as "ceff"
//endmagic goes into stream as "ceff ffec"
const mymagic_t endmagic = (0xecff << 16) | shortmagic;
//The first FIXED_LENGTH bytes are a fixed
//portion of the export output. This includes the overall
//version number, and size of header and footer.
//THIS STRUCTURE CAN ONLY BE APPENDED TO. If it needs to expand,
//the version can be bumped and then anything
//can be added to the export format.
struct super_header {
static const uint32_t super_magic = (shortmagic << 16) | shortmagic;
// ver = 1, Initial version
// ver = 2, Add OSDSuperblock to pg_begin
static const uint32_t super_ver = 2;
static const uint32_t FIXED_LENGTH = 16;
uint32_t magic;
uint32_t version;
uint32_t header_size;
uint32_t footer_size;
super_header() : magic(0), version(0), header_size(0), footer_size(0) { }
void encode(bufferlist& bl) const {
::encode(magic, bl);
::encode(version, bl);
::encode(header_size, bl);
::encode(footer_size, bl);
}
void decode(bufferlist::iterator& bl) {
::decode(magic, bl);
::decode(version, bl);
::decode(header_size, bl);
::decode(footer_size, bl);
}
};
struct header {
sectiontype_t type;
mysize_t size;
header(sectiontype_t type, mysize_t size) :
type(type), size(size) { }
header(): type(0), size(0) { }
void encode(bufferlist& bl) const {
uint32_t debug_type = (type << 24) | (type << 16) | shortmagic;
ENCODE_START(1, 1, bl);
::encode(debug_type, bl);
::encode(size, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
uint32_t debug_type;
DECODE_START(1, bl);
::decode(debug_type, bl);
type = debug_type >> 24;
::decode(size, bl);
DECODE_FINISH(bl);
}
};
struct footer {
mymagic_t magic;
footer() : magic(endmagic) { }
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(magic, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(magic, bl);
DECODE_FINISH(bl);
}
};
struct pg_begin {
spg_t pgid;
OSDSuperblock superblock;
pg_begin(spg_t pg, const OSDSuperblock& sb):
pgid(pg), superblock(sb) { }
pg_begin() { }
void encode(bufferlist& bl) const {
// If superblock doesn't include CEPH_FS_FEATURE_INCOMPAT_SHARDS then
// shard will be NO_SHARD for a replicated pool. This means
// that we allow the decode by struct_v 2.
ENCODE_START(3, 2, bl);
::encode(pgid.pgid, bl);
::encode(superblock, bl);
::encode(pgid.shard, bl);
ENCODE_FINISH(bl);
}
// NOTE: New super_ver prevents decode from ver 1
void decode(bufferlist::iterator& bl) {
DECODE_START(3, bl);
::decode(pgid.pgid, bl);
if (struct_v > 1) {
::decode(superblock, bl);
}
if (struct_v > 2) {
::decode(pgid.shard, bl);
} else {
pgid.shard = shard_id_t::NO_SHARD;
}
DECODE_FINISH(bl);
}
};
struct object_begin {
ghobject_t hoid;
// Duplicate what is in the OI_ATTR so we have it at the start
// of object processing.
object_info_t oi;
object_begin(const ghobject_t &hoid): hoid(hoid) { }
object_begin() { }
// If superblock doesn't include CEPH_FS_FEATURE_INCOMPAT_SHARDS then
// generation will be NO_GEN, shard_id will be NO_SHARD for a replicated
// pool. This means we will allow the decode by struct_v 1.
void encode(bufferlist& bl) const {
ENCODE_START(3, 1, bl);
::encode(hoid.hobj, bl);
::encode(hoid.generation, bl);
::encode(hoid.shard_id, bl);
::encode(oi, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(3, bl);
::decode(hoid.hobj, bl);
if (struct_v > 1) {
::decode(hoid.generation, bl);
::decode(hoid.shard_id, bl);
} else {
hoid.generation = ghobject_t::NO_GEN;
hoid.shard_id = shard_id_t::NO_SHARD;
}
if (struct_v > 2) {
::decode(oi, bl);
}
DECODE_FINISH(bl);
}
};
struct data_section {
uint64_t offset;
uint64_t len;
bufferlist databl;
data_section(uint64_t offset, uint64_t len, bufferlist bl):
offset(offset), len(len), databl(bl) { }
data_section(): offset(0), len(0) { }
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(offset, bl);
::encode(len, bl);
::encode(databl, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(offset, bl);
::decode(len, bl);
::decode(databl, bl);
DECODE_FINISH(bl);
}
};
struct attr_section {
map<string,bufferlist> data;
attr_section(const map<string,bufferlist> &data) : data(data) { }
attr_section(map<string, bufferptr> &data_)
{
for (std::map<std::string, bufferptr>::iterator i = data_.begin();
i != data_.end(); ++i) {
bufferlist bl;
bl.push_front(i->second);
data[i->first] = bl;
}
}
attr_section() { }
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(data, bl);
DECODE_FINISH(bl);
}
};
struct omap_hdr_section {
bufferlist hdr;
omap_hdr_section(bufferlist hdr) : hdr(hdr) { }
omap_hdr_section() { }
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(hdr, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(hdr, bl);
DECODE_FINISH(bl);
}
};
struct omap_section {
map<string, bufferlist> omap;
omap_section(const map<string, bufferlist> &omap) :
omap(omap) { }
omap_section() { }
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(omap, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(omap, bl);
DECODE_FINISH(bl);
}
};
struct metadata_section {
// struct_ver is the on-disk version of original pg
__u8 struct_ver; // for reference
epoch_t map_epoch;
pg_info_t info;
pg_log_t log;
map<epoch_t,pg_interval_t> past_intervals;
OSDMap osdmap;
bufferlist osdmap_bl; // Used in lieu of encoding osdmap due to crc checking
map<eversion_t, hobject_t> divergent_priors;
metadata_section(__u8 struct_ver, epoch_t map_epoch, const pg_info_t &info,
const pg_log_t &log, map<epoch_t,pg_interval_t> &past_intervals,
map<eversion_t, hobject_t> &divergent_priors)
: struct_ver(struct_ver),
map_epoch(map_epoch),
info(info),
log(log),
past_intervals(past_intervals),
divergent_priors(divergent_priors) { }
metadata_section()
: struct_ver(0),
map_epoch(0) { }
void encode(bufferlist& bl) const {
ENCODE_START(4, 1, bl);
::encode(struct_ver, bl);
::encode(map_epoch, bl);
::encode(info, bl);
::encode(log, bl);
::encode(past_intervals, bl);
// Equivalent to osdmap.encode(bl, features); but
// preserving exact layout for CRC checking.
bl.append(osdmap_bl);
::encode(divergent_priors, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(4, bl);
::decode(struct_ver, bl);
::decode(map_epoch, bl);
::decode(info, bl);
::decode(log, bl);
if (struct_v > 1) {
::decode(past_intervals, bl);
} else {
cout << "NOTICE: Older export without past_intervals" << std::endl;
}
if (struct_v > 2) {
osdmap.decode(bl);
} else {
cout << "WARNING: Older export without OSDMap information" << std::endl;
}
if (struct_v > 3) {
::decode(divergent_priors, bl);
}
DECODE_FINISH(bl);
}
};
/**
* Superclass for classes that will need to handle a serialized RADOS
* dump. Requires that the serialized dump be opened with a known FD.
*/
class RadosDump
{
protected:
int file_fd;
super_header sh;
bool dry_run;
public:
RadosDump(int file_fd_, bool dry_run_)
: file_fd(file_fd_), dry_run(dry_run_)
{}
int read_super();
int get_header(header *h);
int get_footer(footer *f);
int read_section(sectiontype_t *type, bufferlist *bl);
int skip_object(bufferlist &bl);
void write_super();
// Define this in .h because it's templated
template <typename T>
int write_section(sectiontype_t type, const T& obj, int fd) {
if (dry_run)
return 0;
bufferlist blhdr, bl, blftr;
obj.encode(bl);
header hdr(type, bl.length());
hdr.encode(blhdr);
footer ft;
ft.encode(blftr);
int ret = blhdr.write_fd(fd);
if (ret) return ret;
ret = bl.write_fd(fd);
if (ret) return ret;
ret = blftr.write_fd(fd);
return ret;
}
int write_simple(sectiontype_t type, int fd)
{
if (dry_run)
return 0;
bufferlist hbl;
header hdr(type, 0);
hdr.encode(hbl);
return hbl.write_fd(fd);
}
};
#endif

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,40 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2013 Inktank
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_OBJECTSTORE_TOOL_H_
#define CEPH_OBJECTSTORE_TOOL_H_
#include "RadosDump.h"
class ObjectStoreTool : public RadosDump
{
public:
ObjectStoreTool(int file_fd, bool dry_run)
: RadosDump(file_fd, dry_run)
{}
int do_import(ObjectStore *store, OSDSuperblock& sb, bool force,
std::string pgidstr);
int do_export(ObjectStore *fs, coll_t coll, spg_t pgid,
pg_info_t &info, epoch_t map_epoch, __u8 struct_ver,
const OSDSuperblock& superblock,
map<epoch_t,pg_interval_t> &past_intervals);
int get_object(ObjectStore *store, coll_t coll,
bufferlist &bl, OSDMap &curmap, bool *skipped_objects);
int export_file(
ObjectStore *store, coll_t cid, ghobject_t &obj);
int export_files(ObjectStore *store, coll_t coll);
};
#endif // CEPH_OBJECSTORE_TOOL_H_

157
src/tools/rados/PoolDump.cc Normal file
View File

@ -0,0 +1,157 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "include/rados/librados.hpp"
#include "common/errno.h"
#include "PoolDump.h"
using namespace librados;
#define dout_subsys ceph_subsys_rados
/**
* Export RADOS objects from a live cluster
* to a serialized format via a file descriptor.
*
* @returns 0 on success, else error code
*/
int PoolDump::dump(IoCtx *io_ctx)
{
assert(io_ctx != NULL);
int r = 0;
write_super();
r = write_simple(TYPE_POOL_BEGIN, file_fd);
if (r != 0) {
return r;
}
librados::NObjectIterator i = io_ctx->nobjects_begin();
librados::NObjectIterator i_end = io_ctx->nobjects_end();
for (; i != i_end; ++i) {
const std::string oid = i->get_oid();
dout(10) << "OID '" << oid << "'" << dendl;
// Compose OBJECT_BEGIN
// ====================
object_begin obj_begin;
obj_begin.hoid.hobj.oid = i->get_oid();
obj_begin.hoid.hobj.nspace = i->get_nspace();
obj_begin.hoid.hobj.set_key(i->get_locator());
// Only output head, RadosImport only wants that
obj_begin.hoid.hobj.snap = CEPH_NOSNAP;
// Skip setting object_begin.oi, RadosImport doesn't care
r = write_section(TYPE_OBJECT_BEGIN, obj_begin, file_fd);
if (r != 0) {
return r;
}
// Compose TYPE_DATA chunks
// ========================
const uint32_t op_size = 4096 * 1024;
uint64_t offset = 0;
while (true) {
bufferlist outdata;
r = io_ctx->read(oid, outdata, op_size, offset);
if (r <= 0) {
// Error or no data
break;
}
r = write_section(TYPE_DATA,
data_section(offset, outdata.length(), outdata), file_fd);
if (r != 0) {
// Output stream error
return r;
}
if (outdata.length() < op_size) {
// No more data
r = 0;
break;
}
offset += outdata.length();
}
// Compose TYPE_ATTRS chunk
// ========================
std::map<std::string, bufferlist> xattrs;
r = io_ctx->getxattrs(oid, xattrs);
if (r < 0) {
cerr << "error getting xattr set " << oid << ": " << cpp_strerror(r)
<< std::endl;
return r;
}
r = write_section(TYPE_ATTRS, attr_section(xattrs), file_fd);
if (r != 0) {
return r;
}
// Compose TYPE_OMAP_HDR section
// =============================
bufferlist omap_header;
r = io_ctx->omap_get_header(oid, &omap_header);
if (r < 0) {
cerr << "error getting omap header " << oid
<< ": " << cpp_strerror(r) << std::endl;
return r;
}
r = write_section(TYPE_OMAP_HDR, omap_hdr_section(omap_header), file_fd);
if (r != 0) {
return r;
}
// Compose TYPE_OMAP
int MAX_READ = 512;
string last_read = "";
do {
map<string, bufferlist> values;
r = io_ctx->omap_get_vals(oid, last_read, MAX_READ, &values);
if (r < 0) {
cerr << "error getting omap keys " << oid << ": "
<< cpp_strerror(r) << std::endl;
return r;
}
if (values.size()) {
last_read = values.rbegin()->first;
} else {
break;
}
r = write_section(TYPE_OMAP, omap_section(values), file_fd);
if (r != 0) {
return r;
}
r = values.size();
} while (r == MAX_READ);
r = 0;
// Close object
// =============
r = write_simple(TYPE_OBJECT_END, file_fd);
if (r != 0) {
return r;
}
}
r = write_simple(TYPE_POOL_END, file_fd);
return r;
}

View File

@ -0,0 +1,32 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef POOL_DUMP_H_
#define POOL_DUMP_H_
#include "tools/RadosDump.h"
namespace librados {
class IoCtx;
}
class PoolDump : public RadosDump
{
public:
PoolDump(int file_fd_) : RadosDump(file_fd_, false) {}
int dump(librados::IoCtx *io_ctx);
};
#endif // POOL_DUMP_H_

View File

@ -0,0 +1,368 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "common/errno.h"
#include "osd/PGLog.h"
#include "RadosImport.h"
#define dout_subsys ceph_subsys_rados
int RadosImport::import(std::string pool, bool no_overwrite)
{
librados::IoCtx ioctx;
librados::Rados cluster;
char *id = getenv("CEPH_CLIENT_ID");
if (id) cerr << "Client id is: " << id << std::endl;
int ret = cluster.init(id);
if (ret) {
cerr << "Error " << ret << " in cluster.init" << std::endl;
return ret;
}
ret = cluster.conf_read_file(NULL);
if (ret) {
cerr << "Error " << ret << " in cluster.conf_read_file" << std::endl;
return ret;
}
ret = cluster.conf_parse_env(NULL);
if (ret) {
cerr << "Error " << ret << " in cluster.conf_read_env" << std::endl;
return ret;
}
cluster.connect();
ret = cluster.ioctx_create(pool.c_str(), ioctx);
if (ret < 0) {
cerr << "ioctx_create " << pool << " failed with " << ret << std::endl;
return ret;
}
return import(ioctx, no_overwrite);
}
int RadosImport::import(librados::IoCtx &io_ctx, bool no_overwrite)
{
bufferlist ebl;
pg_info_t info;
PGLog::IndexedLog log;
int ret = read_super();
if (ret)
return ret;
if (sh.magic != super_header::super_magic) {
cerr << "Invalid magic number: 0x"
<< std::hex << sh.magic << " vs. 0x" << super_header::super_magic
<< std::dec << std::endl;
return -EFAULT;
}
if (sh.version > super_header::super_ver) {
cerr << "Can't handle export format version=" << sh.version << std::endl;
return -EINVAL;
}
//First section must be TYPE_PG_BEGIN
sectiontype_t type;
ret = read_section(&type, &ebl);
if (ret)
return ret;
bool pool_mode = false;
if (type == TYPE_POOL_BEGIN) {
pool_mode = true;
cout << "Importing pool" << std::endl;
} else if (type == TYPE_PG_BEGIN) {
bufferlist::iterator ebliter = ebl.begin();
pg_begin pgb;
pgb.decode(ebliter);
spg_t pgid = pgb.pgid;;
if (!pgid.is_no_shard()) {
cerr << "Importing Erasure Coded shard is not supported" << std::endl;
return -EOPNOTSUPP;
}
dout(10) << "Exported features: " << pgb.superblock.compat_features << dendl;
cout << "Importing from pgid " << pgid << std::endl;
} else {
cerr << "Invalid initial section code " << type << std::endl;
return -EFAULT;
}
// XXX: How to check export features?
#if 0
if (sb.compat_features.compare(pgb.superblock.compat_features) == -1) {
cerr << "Export has incompatible features set "
<< pgb.superblock.compat_features << std::endl;
return -EINVAL;
}
#endif
bool done = false;
bool found_metadata = false;
while(!done) {
ret = read_section(&type, &ebl);
if (ret)
return ret;
//cout << "do_import: Section type " << hex << type << dec << std::endl;
if (type >= END_OF_TYPES) {
cout << "Skipping unknown section type" << std::endl;
continue;
}
switch(type) {
case TYPE_OBJECT_BEGIN:
ret = get_object_rados(io_ctx, ebl, no_overwrite);
if (ret) {
cerr << "Error inserting object: " << ret << std::endl;
return ret;
}
break;
case TYPE_PG_METADATA:
dout(10) << "Don't care about the old metadata" << dendl;
found_metadata = true;
break;
case TYPE_PG_END:
done = true;
break;
case TYPE_POOL_END:
done = true;
break;
default:
return -EFAULT;
}
}
if (!(pool_mode || found_metadata)) {
cerr << "Missing metadata section!" << std::endl;
}
return 0;
}
int RadosImport::get_object_rados(librados::IoCtx &ioctx, bufferlist &bl, bool no_overwrite)
{
bufferlist::iterator ebliter = bl.begin();
object_begin ob;
ob.decode(ebliter);
map<string,bufferlist>::iterator i;
bufferlist abl;
bool skipping;
data_section ds;
attr_section as;
omap_hdr_section oh;
omap_section os;
assert(g_ceph_context);
if (ob.hoid.hobj.nspace == g_ceph_context->_conf->osd_hit_set_namespace) {
cout << "Skipping internal object " << ob.hoid << std::endl;
skip_object(bl);
return 0;
}
if (!ob.hoid.hobj.is_head()) {
cout << "Skipping non-head for " << ob.hoid << std::endl;
skip_object(bl);
return 0;
}
ioctx.set_namespace(ob.hoid.hobj.get_namespace());
string msg("Write");
skipping = false;
if (dry_run) {
uint64_t psize;
time_t pmtime;
int ret = ioctx.stat(ob.hoid.hobj.oid.name, &psize, &pmtime);
if (ret == 0) {
if (no_overwrite)
// Could set skipping, but dry-run doesn't change anything either
msg = "Skipping existing";
else
msg = "***Overwrite***";
}
} else {
int ret = ioctx.create(ob.hoid.hobj.oid.name, true);
if (ret && ret != -EEXIST) {
cerr << "create failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
if (ret == -EEXIST) {
if (no_overwrite) {
msg = "Skipping existing";
skipping = true;
} else {
msg = "***Overwrite***";
ret = ioctx.remove(ob.hoid.hobj.oid.name);
if (ret < 0) {
cerr << "remove failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
ret = ioctx.create(ob.hoid.hobj.oid.name, true);
// If object re-appeared after removal, let's just skip it
if (ret == -EEXIST) {
skipping = true;
msg = "Skipping in-use object";
ret = 0;
}
if (ret < 0) {
cerr << "create failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
}
}
}
cout << msg << " " << ob.hoid << std::endl;
bool need_align = false;
uint64_t alignment = 0;
if (align) {
need_align = true;
alignment = align;
} else {
if ((need_align = ioctx.pool_requires_alignment()))
alignment = ioctx.pool_required_alignment();
}
if (need_align) {
dout(10) << "alignment = " << alignment << dendl;
}
bufferlist ebl, databl;
uint64_t in_offset = 0, out_offset = 0;
bool done = false;
while(!done) {
sectiontype_t type;
int ret = read_section(&type, &ebl);
if (ret) {
cerr << "Error reading section: " << ret << std::endl;
return ret;
}
ebliter = ebl.begin();
//cout << "\tdo_object: Section type " << hex << type << dec << std::endl;
//cout << "\t\tsection size " << ebl.length() << std::endl;
if (type >= END_OF_TYPES) {
cout << "Skipping unknown object section type" << std::endl;
continue;
}
switch(type) {
case TYPE_DATA:
ds.decode(ebliter);
dout(10) << "\tdata: offset " << ds.offset << " len " << ds.len << dendl;
if (need_align) {
if (ds.offset != in_offset) {
cerr << "Discontiguous object data in export" << std::endl;
return -EFAULT;
}
assert(ds.databl.length() == ds.len);
databl.claim_append(ds.databl);
in_offset += ds.len;
if (databl.length() >= alignment) {
uint64_t rndlen = uint64_t(databl.length() / alignment) * alignment;
dout(10) << "write offset=" << out_offset << " len=" << rndlen << dendl;
if (!dry_run && !skipping) {
ret = ioctx.write(ob.hoid.hobj.oid.name, databl, rndlen, out_offset);
if (ret) {
cerr << "write failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
}
out_offset += rndlen;
bufferlist n;
if (databl.length() > rndlen) {
assert(databl.length() - rndlen < alignment);
n.substr_of(databl, rndlen, databl.length() - rndlen);
}
databl = n;
}
break;
}
if (!dry_run && !skipping) {
ret = ioctx.write(ob.hoid.hobj.oid.name, ds.databl, ds.len, ds.offset);
if (ret) {
cerr << "write failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
}
break;
case TYPE_ATTRS:
as.decode(ebliter);
dout(10) << "\tattrs: len " << as.data.size() << dendl;
if (dry_run || skipping)
break;
for (std::map<string,bufferlist>::iterator i = as.data.begin();
i != as.data.end(); ++i) {
if (i->first == "_" || i->first == "snapset")
continue;
ret = ioctx.setxattr(ob.hoid.hobj.oid.name, i->first.substr(1).c_str(), i->second);
if (ret) {
cerr << "setxattr failed: " << cpp_strerror(ret) << std::endl;
if (ret != -EOPNOTSUPP)
return ret;
}
}
break;
case TYPE_OMAP_HDR:
oh.decode(ebliter);
dout(10) << "\tomap header: " << string(oh.hdr.c_str(), oh.hdr.length())
<< dendl;
if (dry_run || skipping)
break;
ret = ioctx.omap_set_header(ob.hoid.hobj.oid.name, oh.hdr);
if (ret) {
cerr << "omap_set_header failed: " << cpp_strerror(ret) << std::endl;
if (ret != -EOPNOTSUPP)
return ret;
}
break;
case TYPE_OMAP:
os.decode(ebliter);
dout(10) << "\tomap: size " << os.omap.size() << dendl;
if (dry_run || skipping)
break;
ret = ioctx.omap_set(ob.hoid.hobj.oid.name, os.omap);
if (ret) {
cerr << "omap_set failed: " << cpp_strerror(ret) << std::endl;
if (ret != -EOPNOTSUPP)
return ret;
}
break;
case TYPE_OBJECT_END:
done = true;
if (need_align && databl.length() > 0) {
assert(databl.length() < alignment);
dout(10) << "END write offset=" << out_offset << " len=" << databl.length() << dendl;
if (dry_run || skipping)
break;
ret = ioctx.write(ob.hoid.hobj.oid.name, databl, databl.length(), out_offset);
if (ret) {
cerr << "write failed: " << cpp_strerror(ret) << std::endl;
return ret;
}
}
break;
default:
cerr << "Unexpected section type " << type << std::endl;
return -EFAULT;
}
}
return 0;
}

View File

@ -0,0 +1,45 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef RADOS_IMPORT_H_
#define RADOS_IMPORT_H_
#include <string>
#include "include/rados/librados.hpp"
#include "include/buffer.h"
#include "tools/RadosDump.h"
/**
* Specialization of RadosDump that adds
* methods for importing objects from a stream
* to a live cluster.
*/
class RadosImport : public RadosDump
{
protected:
uint64_t align;
int get_object_rados(librados::IoCtx &ioctx, bufferlist &bl, bool no_overwrite);
public:
RadosImport(int file_fd_, uint64_t align_, bool dry_run_)
: RadosDump(file_fd_, dry_run_), align(align_)
{}
int import(std::string pool, bool no_overwrite);
int import(librados::IoCtx &io_ctx, bool no_overwrite);
};
#endif // RADOS_IMPORT_H_

View File

@ -16,8 +16,6 @@
#include "include/rados/librados.hpp"
#include "include/rados/rados_types.hpp"
#include "rados_sync.h"
using namespace librados;
#include "common/config.h"
#include "common/ceph_argparse.h"
@ -45,9 +43,14 @@ using namespace librados;
#include "include/compat.h"
#include "common/hobject.h"
#include "PoolDump.h"
#include "RadosImport.h"
int rados_tool_sync(const std::map < std::string, std::string > &opts,
std::vector<const char*> &args);
using namespace librados;
// two steps seem to be necessary to do this right
#define STR(x) _STR(x)
#define _STR(x) #x
@ -113,17 +116,10 @@ void usage(ostream& out)
" set allocation hint for an object\n"
"\n"
"IMPORT AND EXPORT\n"
" import [options] <local-directory> <rados-pool>\n"
" Upload <local-directory> to <rados-pool>\n"
" export [options] <rados-pool> <local-directory>\n"
" Download <rados-pool> to <local-directory>\n"
" options:\n"
" -f / --force Copy everything, even if it hasn't changed.\n"
" -d / --delete-after After synchronizing, delete unreferenced\n"
" files or objects from the target bucket\n"
" or directory.\n"
" --workers Number of worker threads to spawn \n"
" (default " STR(DEFAULT_NUM_RADOS_WORKER_THREADS) ")\n"
" export [filename]\n"
" Serialize pool contents to a file or standard out.\n"
" import [--dry-run] [--no-overwrite] < filename | - >\n"
" Load pool contents from a file or standard in\n"
"\n"
"ADVISORY LOCKS\n"
" lock list <obj-name>\n"
@ -1434,7 +1430,7 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
vec.push_back(pool_name);
}
map<string,pool_stat_t> stats;
map<string,librados::pool_stat_t> stats;
ret = rados.get_pool_stats(vec, stats);
if (ret < 0) {
cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
@ -1452,11 +1448,11 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
formatter->open_object_section("stats");
formatter->open_array_section("pools");
}
for (map<string,pool_stat_t>::iterator i = stats.begin();
for (map<string,librados::pool_stat_t>::iterator i = stats.begin();
i != stats.end();
++i) {
const char *pool_name = i->first.c_str();
pool_stat_t& s = i->second;
librados::pool_stat_t& s = i->second;
if (!formatter) {
printf("%-15s "
"%12lld %12lld %12lld %12lld"
@ -2627,6 +2623,76 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
<< cpp_strerror(ret) << std::endl;
goto out;
}
} else if (strcmp(nargs[0], "export") == 0) {
// export [filename]
if (!pool_name || nargs.size() > 2) {
usage_exit();
}
int file_fd;
if (nargs.size() < 2 || std::string(nargs[1]) == "-") {
file_fd = STDOUT_FILENO;
} else {
file_fd = open(nargs[1], O_WRONLY|O_CREAT|O_TRUNC, 0666);
if (file_fd < 0) {
cerr << "Error opening '" << nargs[1] << "': "
<< cpp_strerror(file_fd) << std::endl;
ret = file_fd;
goto out;
}
}
ret = PoolDump(file_fd).dump(&io_ctx);
if (ret < 0) {
cerr << "error from export: "
<< cpp_strerror(ret) << std::endl;
goto out;
}
} else if (strcmp(nargs[0], "import") == 0) {
// import [--no-overwrite] [--dry-run] <filename | - >
if (!pool_name || nargs.size() > 4 || nargs.size() < 2) {
usage_exit();
}
// Last arg is the filename
std::string const filename = nargs[nargs.size() - 1];
// All other args may be flags
bool dry_run = false;
bool no_overwrite = false;
for (unsigned i = 1; i < nargs.size() - 1; ++i) {
std::string arg(nargs[i]);
if (arg == std::string("--no-overwrite")) {
no_overwrite = true;
} else if (arg == std::string("--dry-run")) {
dry_run = true;
} else {
std::cerr << "Invalid argument '" << arg << "'" << std::endl;
ret = -EINVAL;
goto out;
}
}
int file_fd;
if (filename == "-") {
file_fd = STDIN_FILENO;
} else {
file_fd = open(filename.c_str(), O_RDONLY);
if (file_fd < 0) {
cerr << "Error opening '" << filename << "': "
<< cpp_strerror(file_fd) << std::endl;
ret = file_fd;
goto out;
}
}
ret = RadosImport(file_fd, 0, dry_run).import(io_ctx, no_overwrite);
if (ret < 0) {
cerr << "error from import: "
<< cpp_strerror(ret) << std::endl;
goto out;
}
} else {
cerr << "unrecognized command " << nargs[0] << "; -h or --help for usage" << std::endl;
ret = -EINVAL;
@ -2749,11 +2815,6 @@ int main(int argc, const char **argv)
cerr << "rados: you must give an action. Try --help" << std::endl;
return 1;
}
if ((strcmp(args[0], "import") == 0) || (strcmp(args[0], "export") == 0)) {
cout << "The import and export operations are not available" << std::endl;
exit(1);
//return rados_tool_sync(opts, args);
} else {
return rados_tool_common(opts, args);
}
return rados_tool_common(opts, args);
}

View File

@ -1,229 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2011 New Dream Network
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "include/int_types.h"
#include "rados_sync.h"
#include "common/errno.h"
#include "common/strtol.h"
#include "include/rados/librados.hpp"
#include <dirent.h>
#include <errno.h>
#include <fstream>
#include <iostream>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "include/compat.h"
#include "common/xattr.h"
using namespace librados;
class ExportLocalFileWQ : public RadosSyncWQ {
public:
ExportLocalFileWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
ThreadPool *tp, ExportDir *export_dir, bool force)
: RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_export_dir(export_dir),
m_force(force)
{
}
private:
void _process(std::string *s) {
IoCtx &io_ctx(m_io_ctx_dist->get_ioctx());
int flags = 0;
auto_ptr <BackedUpObject> sobj;
auto_ptr <BackedUpObject> dobj;
const std::string &rados_name(*s);
std::list < std::string > only_in_a;
std::list < std::string > only_in_b;
std::list < std::string > diff;
int ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), sobj);
if (ret) {
cerr << ERR_PREFIX << "couldn't get '" << rados_name << "' from rados: error "
<< ret << std::endl;
_exit(ret);
}
std::string obj_path(sobj->get_fs_path(m_export_dir));
if (m_force) {
flags |= (CHANGED_CONTENTS | CHANGED_XATTRS);
}
else {
ret = BackedUpObject::from_path(obj_path.c_str(), dobj);
if (ret == ENOENT) {
sobj->get_xattrs(only_in_a);
flags |= CHANGED_CONTENTS;
}
else if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_path returned "
<< ret << std::endl;
_exit(ret);
}
else {
sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff);
if ((sobj->get_rados_size() == dobj->get_rados_size()) &&
(sobj->get_mtime() == dobj->get_mtime())) {
flags |= CHANGED_CONTENTS;
}
}
}
if (flags & CHANGED_CONTENTS) {
ret = sobj->download(io_ctx, obj_path.c_str());
if (ret) {
cerr << ERR_PREFIX << "download error: " << ret << std::endl;
_exit(ret);
}
}
diff.splice(diff.begin(), only_in_a);
for (std::list < std::string >::const_iterator x = diff.begin();
x != diff.end(); ++x) {
flags |= CHANGED_XATTRS;
const Xattr *xattr = sobj->get_xattr(*x);
if (xattr == NULL) {
cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl;
_exit(ret);
}
std::string xattr_fs_name(USER_XATTR_PREFIX);
xattr_fs_name += x->c_str();
ret = ceph_os_setxattr(obj_path.c_str(), xattr_fs_name.c_str(),
xattr->data, xattr->len);
if (ret) {
ret = errno;
cerr << ERR_PREFIX << "setxattr error: " << cpp_strerror(ret) << std::endl;
_exit(ret);
}
}
for (std::list < std::string >::const_iterator x = only_in_b.begin();
x != only_in_b.end(); ++x) {
flags |= CHANGED_XATTRS;
ret = ceph_os_removexattr(obj_path.c_str(), x->c_str());
if (ret) {
ret = errno;
cerr << ERR_PREFIX << "removexattr error: " << cpp_strerror(ret) << std::endl;
_exit(ret);
}
}
if (m_force) {
cout << "[force] " << rados_name << std::endl;
}
else if (flags & CHANGED_CONTENTS) {
cout << "[exported] " << rados_name << std::endl;
}
else if (flags & CHANGED_XATTRS) {
cout << "[xattr] " << rados_name << std::endl;
}
}
ExportDir *m_export_dir;
bool m_force;
};
class ExportValidateExistingWQ : public RadosSyncWQ {
public:
ExportValidateExistingWQ(IoCtxDistributor *io_ctx_dist, time_t ti,
ThreadPool *tp, const char *dir_name)
: RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_dir_name(dir_name)
{
}
private:
void _process(std::string *s) {
IoCtx &io_ctx(m_io_ctx_dist->get_ioctx());
auto_ptr <BackedUpObject> lobj;
const std::string &local_name(*s);
int ret = BackedUpObject::from_file(local_name.c_str(), m_dir_name, lobj);
if (ret) {
cout << ERR_PREFIX << "BackedUpObject::from_file: delete loop: "
<< "got error " << ret << std::endl;
_exit(ret);
}
auto_ptr <BackedUpObject> robj;
ret = BackedUpObject::from_rados(io_ctx, lobj->get_rados_name(), robj);
if (ret == -ENOENT) {
// The entry doesn't exist on the remote server; delete it locally
char path[strlen(m_dir_name) + local_name.size() + 2];
snprintf(path, sizeof(path), "%s/%s", m_dir_name, local_name.c_str());
if (unlink(path)) {
ret = errno;
cerr << ERR_PREFIX << "error unlinking '" << path << "': "
<< cpp_strerror(ret) << std::endl;
_exit(ret);
}
cout << "[deleted] " << "removed '" << local_name << "'" << std::endl;
}
else if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_rados: delete loop: "
<< "got error " << ret << std::endl;
_exit(ret);
}
}
const char *m_dir_name;
};
int do_rados_export(ThreadPool *tp, IoCtx& io_ctx,
IoCtxDistributor *io_ctx_dist, const char *dir_name,
bool create, bool force, bool delete_after)
{
librados::NObjectIterator oi = io_ctx.nobjects_begin();
librados::NObjectIterator oi_end = io_ctx.nobjects_end();
auto_ptr <ExportDir> export_dir;
export_dir.reset(ExportDir::create_for_writing(dir_name, 1, create));
if (!export_dir.get())
return -EIO;
ExportLocalFileWQ export_object_wq(io_ctx_dist, time(NULL),
tp, export_dir.get(), force);
for (; oi != oi_end; ++oi) {
export_object_wq.queue(new std::string((*oi).get_oid()));
}
export_object_wq.drain();
if (delete_after) {
ExportValidateExistingWQ export_val_wq(io_ctx_dist, time(NULL),
tp, dir_name);
DirHolder dh;
int err = dh.opendir(dir_name);
if (err) {
cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: "
<< cpp_strerror(err) << std::endl;
return err;
}
while (true) {
struct dirent *de = readdir(dh.dp);
if (!de)
break;
if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0))
continue;
if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX)) {
char path[strlen(dir_name) + strlen(de->d_name) + 2];
snprintf(path, sizeof(path), "%s/%s", dir_name, de->d_name);
if (unlink(path)) {
int ret = errno;
cerr << ERR_PREFIX << "error unlinking temporary file '" << path << "': "
<< cpp_strerror(ret) << std::endl;
return ret;
}
cout << "[deleted] " << "removed temporary file '" << de->d_name << "'" << std::endl;
continue;
}
export_val_wq.queue(new std::string(de->d_name));
}
export_val_wq.drain();
}
cout << "[done]" << std::endl;
return 0;
}

View File

@ -1,239 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2011 New Dream Network
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "include/int_types.h"
#include <dirent.h>
#include <errno.h>
#include <fstream>
#include <iostream>
#include <sstream>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "rados_sync.h"
#include "common/errno.h"
#include "common/strtol.h"
#include "include/rados/librados.hpp"
using namespace librados;
using std::auto_ptr;
class ImportLocalFileWQ : public RadosSyncWQ {
public:
ImportLocalFileWQ(const char *dir_name, bool force,
IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
: RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_dir_name(dir_name),
m_force(force)
{
}
private:
void _process(std::string *s) {
IoCtx &io_ctx(m_io_ctx_dist->get_ioctx());
const std::string &local_name(*s);
auto_ptr <BackedUpObject> sobj;
auto_ptr <BackedUpObject> dobj;
std::list < std::string > only_in_a;
std::list < std::string > only_in_b;
std::list < std::string > diff;
int flags = 0;
int ret = BackedUpObject::from_file(local_name.c_str(),
m_dir_name.c_str(), sobj);
if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_file: got error "
<< ret << std::endl;
_exit(ret);
}
const char *rados_name(sobj->get_rados_name());
if (m_force) {
flags |= (CHANGED_CONTENTS | CHANGED_XATTRS);
}
else {
ret = BackedUpObject::from_rados(io_ctx, rados_name, dobj);
if (ret == -ENOENT) {
flags |= CHANGED_CONTENTS;
sobj->get_xattrs(only_in_a);
}
else if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_rados returned "
<< ret << std::endl;
_exit(ret);
}
else {
sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff);
if ((sobj->get_rados_size() == dobj->get_rados_size()) &&
(sobj->get_mtime() == dobj->get_mtime())) {
flags |= CHANGED_CONTENTS;
}
}
}
if (flags & CHANGED_CONTENTS) {
ret = sobj->upload(io_ctx, local_name.c_str(), m_dir_name.c_str());
if (ret) {
cerr << ERR_PREFIX << "upload error: " << ret << std::endl;
_exit(ret);
}
}
for (std::list < std::string >::const_iterator x = only_in_a.begin();
x != only_in_a.end(); ++x) {
flags |= CHANGED_XATTRS;
const Xattr *xattr = sobj->get_xattr(*x);
if (xattr == NULL) {
cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl;
_exit(ret);
}
bufferlist bl;
bl.append(xattr->data, xattr->len);
ret = io_ctx.setxattr(rados_name, x->c_str(), bl);
if (ret < 0) {
ret = errno;
cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name
<< "', xattr_name='" << x->c_str() << "'): " << cpp_strerror(ret)
<< std::endl;
_exit(ret);
}
}
for (std::list < std::string >::const_iterator x = diff.begin();
x != diff.end(); ++x) {
flags |= CHANGED_XATTRS;
const Xattr *xattr = sobj->get_xattr(*x);
if (xattr == NULL) {
cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl;
_exit(ret);
}
bufferlist bl;
bl.append(xattr->data, xattr->len);
ret = io_ctx.rmxattr(rados_name, x->c_str());
if (ret < 0) {
cerr << ERR_PREFIX << "io_ctx.rmxattr error2: " << cpp_strerror(ret)
<< std::endl;
_exit(ret);
}
ret = io_ctx.setxattr(rados_name, x->c_str(), bl);
if (ret < 0) {
ret = errno;
cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name
<< "', xattr='" << x->c_str() << "'): " << cpp_strerror(ret) << std::endl;
_exit(ret);
}
}
for (std::list < std::string >::const_iterator x = only_in_b.begin();
x != only_in_b.end(); ++x) {
flags |= CHANGED_XATTRS;
ret = io_ctx.rmxattr(rados_name, x->c_str());
if (ret < 0) {
ret = errno;
cerr << ERR_PREFIX << "rmxattr error3: " << cpp_strerror(ret) << std::endl;
_exit(ret);
}
}
if (m_force) {
cout << "[force] " << rados_name << std::endl;
}
else if (flags & CHANGED_CONTENTS) {
cout << "[imported] " << rados_name << std::endl;
}
else if (flags & CHANGED_XATTRS) {
cout << "[xattr] " << rados_name << std::endl;
}
}
std::string m_dir_name;
bool m_force;
};
class ImportValidateExistingWQ : public RadosSyncWQ {
public:
ImportValidateExistingWQ(ExportDir *export_dir,
IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp)
: RadosSyncWQ(io_ctx_dist, ti, 0, tp),
m_export_dir(export_dir)
{
}
private:
void _process(std::string *s) {
IoCtx &io_ctx(m_io_ctx_dist->get_ioctx());
const std::string &rados_name(*s);
auto_ptr <BackedUpObject> robj;
int ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), robj);
if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_rados in delete loop "
<< "returned " << ret << std::endl;
_exit(ret);
}
std::string obj_path(robj->get_fs_path(m_export_dir));
auto_ptr <BackedUpObject> lobj;
ret = BackedUpObject::from_path(obj_path.c_str(), lobj);
if (ret == ENOENT) {
ret = io_ctx.remove(rados_name);
if (ret && ret != -ENOENT) {
cerr << ERR_PREFIX << "io_ctx.remove(" << obj_path << ") failed "
<< "with error " << ret << std::endl;
_exit(ret);
}
cout << "[deleted] " << "removed '" << rados_name << "'" << std::endl;
}
else if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_path in delete loop "
<< "returned " << ret << std::endl;
_exit(ret);
}
}
ExportDir *m_export_dir;
};
int do_rados_import(ThreadPool *tp, IoCtx &io_ctx, IoCtxDistributor* io_ctx_dist,
const char *dir_name, bool force, bool delete_after)
{
auto_ptr <ExportDir> export_dir;
export_dir.reset(ExportDir::from_file_system(dir_name));
if (!export_dir.get())
return -EIO;
DirHolder dh;
int ret = dh.opendir(dir_name);
if (ret) {
cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: "
<< cpp_strerror(ret) << std::endl;
return ret;
}
ImportLocalFileWQ import_file_wq(dir_name, force,
io_ctx_dist, time(NULL), tp);
while (true) {
struct dirent *de = readdir(dh.dp);
if (!de)
break;
if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0))
continue;
if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX))
continue;
import_file_wq.queue(new std::string(de->d_name));
}
import_file_wq.drain();
if (delete_after) {
ImportValidateExistingWQ import_val_wq(export_dir.get(), io_ctx_dist,
time(NULL), tp);
librados::NObjectIterator oi = io_ctx.nobjects_begin();
librados::NObjectIterator oi_end = io_ctx.nobjects_end();
for (; oi != oi_end; ++oi) {
import_val_wq.queue(new std::string((*oi).get_oid()));
}
import_val_wq.drain();
}
cout << "[done]" << std::endl;
return 0;
}

View File

@ -1,903 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2011 New Dream Network
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "include/int_types.h"
#include "common/ceph_argparse.h"
#include "common/config.h"
#include "common/errno.h"
#include "common/strtol.h"
#include "global/global_context.h"
#include "global/global_init.h"
#include "include/rados/librados.hpp"
#include "rados_sync.h"
#include "include/compat.h"
#include "common/xattr.h"
#include <dirent.h>
#include <errno.h>
#include <fstream>
#include <iostream>
#include <memory>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
using namespace librados;
using std::auto_ptr;
static const char * const XATTR_RADOS_SYNC_VER = "user.rados_sync_ver";
static const char * const XATTR_FULLNAME = "user.rados_full_name";
const char USER_XATTR_PREFIX[] = "user.rados.";
static const size_t USER_XATTR_PREFIX_LEN =
sizeof(USER_XATTR_PREFIX) / sizeof(USER_XATTR_PREFIX[0]) - 1;
/* It's important that RADOS_SYNC_TMP_SUFFIX contain at least one character
* that we wouldn't normally alllow in a file name-- in this case, $ */
const char RADOS_SYNC_TMP_SUFFIX[] = "$tmp";
static const size_t RADOS_SYNC_TMP_SUFFIX_LEN =
sizeof(RADOS_SYNC_TMP_SUFFIX) / sizeof(RADOS_SYNC_TMP_SUFFIX[0]) - 1;
std::string get_user_xattr_name(const char *fs_xattr_name)
{
if (strncmp(fs_xattr_name, USER_XATTR_PREFIX, USER_XATTR_PREFIX_LEN))
return "";
return fs_xattr_name + USER_XATTR_PREFIX_LEN;
}
bool is_suffix(const char *str, const char *suffix)
{
size_t strlen_str = strlen(str);
size_t strlen_suffix = strlen(suffix);
if (strlen_str < strlen_suffix)
return false;
return (strcmp(str + (strlen_str - strlen_suffix), suffix) == 0);
}
ExportDir* ExportDir::create_for_writing(const std::string &path, int version,
bool create)
{
if (access(path.c_str(), R_OK | W_OK) == 0) {
return ExportDir::from_file_system(path);
}
if (!create) {
cerr << ERR_PREFIX << "ExportDir: directory '"
<< path << "' does not exist. Use --create to create it."
<< std::endl;
return NULL;
}
int ret = mkdir(path.c_str(), 0700);
if (ret < 0) {
int err = errno;
if (err != EEXIST) {
cerr << ERR_PREFIX << "ExportDir: mkdir error: "
<< cpp_strerror(err) << std::endl;
return NULL;
}
}
char buf[32];
snprintf(buf, sizeof(buf), "%d", version);
ret = ceph_os_setxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, strlen(buf) + 1);
if (ret < 0) {
int err = errno;
cerr << ERR_PREFIX << "ExportDir: setxattr error :"
<< cpp_strerror(err) << std::endl;
return NULL;
}
return new ExportDir(path);
}
ExportDir* ExportDir::from_file_system(const std::string &path)
{
if (access(path.c_str(), R_OK)) {
cerr << "ExportDir: source directory '" << path
<< "' appears to be inaccessible." << std::endl;
return NULL;
}
int ret;
char buf[32];
memset(buf, 0, sizeof(buf));
ret = ceph_os_getxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, sizeof(buf) - 1);
if (ret < 0) {
ret = errno;
if (ret == ENODATA) {
cerr << ERR_PREFIX << "ExportDir: directory '" << path
<< "' does not appear to have been created by a rados "
<< "export operation." << std::endl;
return NULL;
}
cerr << ERR_PREFIX << "ExportDir: getxattr error :"
<< cpp_strerror(ret) << std::endl;
return NULL;
}
std::string err;
ret = strict_strtol(buf, 10, &err);
if (!err.empty()) {
cerr << ERR_PREFIX << "ExportDir: invalid value for "
<< XATTR_RADOS_SYNC_VER << ": " << buf << ". parse error: "
<< err << std::endl;
return NULL;
}
if (ret != 1) {
cerr << ERR_PREFIX << "ExportDir: can't handle any naming "
<< "convention besides version 1. You must upgrade this program to "
<< "handle the data in the new format." << std::endl;
return NULL;
}
return new ExportDir(path);
}
std::string ExportDir::get_fs_path(const std::string &rados_name) const
{
static int HASH_LENGTH = 17;
size_t i;
size_t strlen_rados_name = strlen(rados_name.c_str());
size_t sz;
bool need_hash = false;
if (strlen_rados_name > 200) {
sz = 200;
need_hash = true;
}
else {
sz = strlen_rados_name;
}
char fs_path[sz + HASH_LENGTH + 1];
for (i = 0; i < sz; ++i) {
// Just replace anything that looks funny with an 'at' sign.
// Unicode also gets turned into 'at' signs.
signed char c = rados_name[i];
if (c < 0x20) {
// Since c is signed, this also eliminates bytes with the high bit set
c = '@';
need_hash = true;
}
else if (c == 0x7f) {
c = '@';
need_hash = true;
}
else if (c == '/') {
c = '@';
need_hash = true;
}
else if (c == '\\') {
c = '@';
need_hash = true;
}
else if (c == '$') {
c = '@';
need_hash = true;
}
else if (c == ' ') {
c = '_';
need_hash = true;
}
fs_path[i] = c;
}
if (need_hash) {
uint64_t hash = 17;
for (i = 0; i < strlen_rados_name; ++i) {
hash += (rados_name[i] * 33);
}
// The extra byte of length is because snprintf always NULL-terminates.
snprintf(fs_path + i, HASH_LENGTH + 1, "_%016" PRIx64, hash);
}
else {
// NULL-terminate.
fs_path[i] = '\0';
}
ostringstream oss;
oss << path << "/" << fs_path;
return oss.str();
}
ExportDir::ExportDir(const std::string &path_)
: path(path_)
{
}
DirHolder::DirHolder()
: dp(NULL)
{
}
DirHolder::~DirHolder() {
if (!dp)
return;
if (closedir(dp)) {
int err = errno;
cerr << ERR_PREFIX << "closedir failed: " << cpp_strerror(err) << std::endl;
}
dp = NULL;
}
int DirHolder::opendir(const char *dir_name) {
dp = ::opendir(dir_name);
if (!dp) {
int err = errno;
return err;
}
return 0;
}
static __thread int t_iod_idx = -1;
static pthread_mutex_t io_ctx_distributor_lock = PTHREAD_MUTEX_INITIALIZER;
IoCtxDistributor* IoCtxDistributor::instance() {
IoCtxDistributor *ret;
pthread_mutex_lock(&io_ctx_distributor_lock);
if (s_instance == NULL) {
s_instance = new IoCtxDistributor();
}
ret = s_instance;
pthread_mutex_unlock(&io_ctx_distributor_lock);
return ret;
}
int IoCtxDistributor::init(Rados &cluster, const char *pool_name,
int num_ioctxes) {
m_io_ctxes.resize(num_ioctxes);
for (std::vector<IoCtx>::iterator i = m_io_ctxes.begin();
i != m_io_ctxes.end(); ++i) {
IoCtx &io_ctx(*i);
int ret = cluster.ioctx_create(pool_name, io_ctx);
if (ret) {
return ret;
}
}
m_highest_iod_idx.set(0);
return 0;
}
void IoCtxDistributor::clear() {
for (std::vector<IoCtx>::iterator i = m_io_ctxes.begin();
i != m_io_ctxes.end(); ++i) {
IoCtx &io_ctx(*i);
io_ctx.close();
}
m_io_ctxes.clear();
m_highest_iod_idx.set(0);
}
IoCtx& IoCtxDistributor::get_ioctx() {
if (t_iod_idx == -1) {
t_iod_idx = m_highest_iod_idx.inc() - 1;
}
if (m_io_ctxes.size() <= (unsigned int)t_iod_idx) {
cerr << ERR_PREFIX << "IoCtxDistributor: logic error on line "
<< __LINE__ << std::endl;
_exit(1);
}
return m_io_ctxes[t_iod_idx];
}
IoCtxDistributor *IoCtxDistributor::s_instance = NULL;
IoCtxDistributor::IoCtxDistributor() {
clear();
}
IoCtxDistributor::~IoCtxDistributor() {
clear();
}
RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
: ThreadPool::WorkQueue<std::string>("FileStore::OpWQ", timeout, suicide_timeout, tp),
m_io_ctx_dist(io_ctx_dist)
{
}
bool RadosSyncWQ::_enqueue(std::string *s) {
m_items.push_back(s);
return true;
}
void RadosSyncWQ::_dequeue(std::string *o) {
assert(0);
}
bool RadosSyncWQ::_empty() {
return m_items.empty();
}
std::string *RadosSyncWQ::_dequeue() {
if (m_items.empty())
return NULL;
std::string *ret = m_items.front();
m_items.pop_front();
return ret;
}
void RadosSyncWQ::_process_finish(std::string *s) {
delete s;
}
void RadosSyncWQ::_clear() {
for (std::deque<std::string*>::iterator i = m_items.begin();
i != m_items.end(); ++i) {
delete *i;
}
m_items.clear();
}
Xattr::Xattr(char *data_, ssize_t len_)
: data(data_), len(len_)
{
}
Xattr::~Xattr() {
free(data);
}
bool Xattr::operator==(const class Xattr &rhs) const {
if (len != rhs.len)
return false;
return (memcmp(data, rhs.data, len) == 0);
}
bool Xattr::operator!=(const class Xattr &rhs) const {
return !((*this) == rhs);
}
int BackedUpObject::from_file(const char *file_name, const char *dir_name,
std::auto_ptr<BackedUpObject> &obj)
{
char obj_path[strlen(dir_name) + strlen(file_name) + 2];
snprintf(obj_path, sizeof(obj_path), "%s/%s", dir_name, file_name);
return BackedUpObject::from_path(obj_path, obj);
}
int BackedUpObject::from_path(const char *path, std::auto_ptr<BackedUpObject> &obj)
{
int ret;
FILE *fp = fopen(path, "r");
if (!fp) {
ret = errno;
if (ret != ENOENT) {
cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying to "
<< "open '" << path << "': " << cpp_strerror(ret) << std::endl;
}
return ret;
}
int fd = fileno(fp);
struct stat st_buf;
memset(&st_buf, 0, sizeof(st_buf));
ret = fstat(fd, &st_buf);
if (ret) {
ret = errno;
fclose(fp);
cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying "
<< "to stat '" << path << "': " << cpp_strerror(ret) << std::endl;
return ret;
}
// get fullname
ssize_t res = ceph_os_fgetxattr(fd, XATTR_FULLNAME, NULL, 0);
if (res <= 0) {
fclose(fp);
ret = errno;
if (res == 0) {
cerr << ERR_PREFIX << "BackedUpObject::from_path: found empty "
<< XATTR_FULLNAME << " attribute on '" << path
<< "'" << std::endl;
ret = ENODATA;
} else if (ret == ENODATA) {
cerr << ERR_PREFIX << "BackedUpObject::from_path: there was no "
<< XATTR_FULLNAME << " attribute found on '" << path
<< "'" << std::endl;
} else {
cerr << ERR_PREFIX << "getxattr error: " << cpp_strerror(ret) << std::endl;
}
return ret;
}
char rados_name_[res + 1];
memset(rados_name_, 0, sizeof(rados_name_));
res = ceph_os_fgetxattr(fd, XATTR_FULLNAME, rados_name_, res);
if (res < 0) {
ret = errno;
fclose(fp);
cerr << ERR_PREFIX << "BackedUpObject::getxattr(" << XATTR_FULLNAME
<< ") error: " << cpp_strerror(ret) << std::endl;
return ret;
}
BackedUpObject *o = new BackedUpObject(rados_name_,
st_buf.st_size, st_buf.st_mtime);
if (!o) {
fclose(fp);
return ENOBUFS;
}
ret = o->read_xattrs_from_file(fileno(fp));
if (ret) {
fclose(fp);
cerr << ERR_PREFIX << "BackedUpObject::from_path(path = '"
<< path << "): read_xattrs_from_file returned " << ret << std::endl;
delete o;
return ret;
}
fclose(fp);
obj.reset(o);
return 0;
}
int BackedUpObject::from_rados(IoCtx& io_ctx, const char *rados_name_,
auto_ptr<BackedUpObject> &obj)
{
uint64_t rados_size_ = 0;
time_t rados_time_ = 0;
int ret = io_ctx.stat(rados_name_, &rados_size_, &rados_time_);
if (ret == -ENOENT) {
// don't complain here about ENOENT
return ret;
} else if (ret < 0) {
cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '"
<< rados_name_ << "'): stat failed with error " << ret << std::endl;
return ret;
}
BackedUpObject *o = new BackedUpObject(rados_name_, rados_size_, rados_time_);
ret = o->read_xattrs_from_rados(io_ctx);
if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '"
<< rados_name_ << "'): read_xattrs_from_rados returned "
<< ret << std::endl;
delete o;
return ret;
}
obj.reset(o);
return 0;
}
BackedUpObject::~BackedUpObject()
{
for (std::map < std::string, Xattr* >::iterator x = xattrs.begin();
x != xattrs.end(); ++x)
{
delete x->second;
x->second = NULL;
}
free(rados_name);
}
std::string BackedUpObject::get_fs_path(const ExportDir *export_dir) const
{
return export_dir->get_fs_path(rados_name);
}
std::string BackedUpObject::xattrs_to_str() const
{
ostringstream oss;
std::string prefix;
for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin();
x != xattrs.end(); ++x)
{
char buf[x->second->len + 1];
memcpy(buf, x->second->data, x->second->len);
buf[x->second->len] = '\0';
oss << prefix << "{" << x->first << ":" << buf << "}";
prefix = ", ";
}
return oss.str();
}
void BackedUpObject::xattr_diff(const BackedUpObject *rhs,
std::list < std::string > &only_in_a,
std::list < std::string > &only_in_b,
std::list < std::string > &diff) const
{
only_in_a.clear();
only_in_b.clear();
diff.clear();
for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin();
x != xattrs.end(); ++x)
{
std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.find(x->first);
if (r == rhs->xattrs.end()) {
only_in_a.push_back(x->first);
}
else {
const Xattr &r_obj(*r->second);
const Xattr &x_obj(*x->second);
if (r_obj != x_obj)
diff.push_back(x->first);
}
}
for (std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.begin();
r != rhs->xattrs.end(); ++r)
{
std::map < std::string, Xattr* >::const_iterator x = xattrs.find(r->first);
if (x == xattrs.end()) {
only_in_b.push_back(r->first);
}
}
}
void BackedUpObject::get_xattrs(std::list < std::string > &xattrs_) const
{
for (std::map < std::string, Xattr* >::const_iterator r = xattrs.begin();
r != xattrs.end(); ++r)
{
xattrs_.push_back(r->first);
}
}
const Xattr* BackedUpObject::get_xattr(const std::string &name) const
{
std::map < std::string, Xattr* >::const_iterator x = xattrs.find(name);
if (x == xattrs.end())
return NULL;
else
return x->second;
}
const char *BackedUpObject::get_rados_name() const {
return rados_name;
}
uint64_t BackedUpObject::get_rados_size() const {
return rados_size;
}
time_t BackedUpObject::get_mtime() const {
return rados_time;
}
int BackedUpObject::download(IoCtx &io_ctx, const char *path)
{
char tmp_path[strlen(path) + RADOS_SYNC_TMP_SUFFIX_LEN + 1];
snprintf(tmp_path, sizeof(tmp_path), "%s%s", path, RADOS_SYNC_TMP_SUFFIX);
FILE *fp = fopen(tmp_path, "w");
if (!fp) {
int err = errno;
cerr << ERR_PREFIX << "download: error opening '" << tmp_path << "':"
<< cpp_strerror(err) << std::endl;
return err;
}
int fd = fileno(fp);
uint64_t off = 0;
static const int CHUNK_SZ = 32765;
while (true) {
bufferlist bl;
int rlen = io_ctx.read(rados_name, bl, CHUNK_SZ, off);
if (rlen < 0) {
cerr << ERR_PREFIX << "download: io_ctx.read(" << rados_name << ") returned "
<< rlen << std::endl;
fclose(fp);
return rlen;
}
if (rlen < CHUNK_SZ)
off = 0;
else
off += rlen;
size_t flen = fwrite(bl.c_str(), 1, rlen, fp);
if (flen != (size_t)rlen) {
int err = errno;
cerr << ERR_PREFIX << "download: fwrite(" << tmp_path << ") error: "
<< cpp_strerror(err) << std::endl;
fclose(fp);
return err;
}
if (off == 0)
break;
}
size_t attr_sz = strlen(rados_name) + 1;
int res = ceph_os_fsetxattr(fd, XATTR_FULLNAME, rados_name, attr_sz);
if (res) {
int err = errno;
cerr << ERR_PREFIX << "download: fsetxattr(" << tmp_path << ") error: "
<< cpp_strerror(err) << std::endl;
fclose(fp);
return err;
}
if (fclose(fp)) {
int err = errno;
cerr << ERR_PREFIX << "download: fclose(" << tmp_path << ") error: "
<< cpp_strerror(err) << std::endl;
return err;
}
if (rename(tmp_path, path)) {
int err = errno;
cerr << ERR_PREFIX << "download: rename(" << tmp_path << ", "
<< path << ") error: " << cpp_strerror(err) << std::endl;
return err;
}
return 0;
}
int BackedUpObject::upload(IoCtx &io_ctx, const char *file_name, const char *dir_name)
{
char path[strlen(file_name) + strlen(dir_name) + 2];
snprintf(path, sizeof(path), "%s/%s", dir_name, file_name);
FILE *fp = fopen(path, "r");
if (!fp) {
int err = errno;
cerr << ERR_PREFIX << "upload: error opening '" << path << "': "
<< cpp_strerror(err) << std::endl;
return err;
}
// Need to truncate RADOS object to size 0, in case there is
// already something there.
int ret = io_ctx.trunc(rados_name, 0);
if (ret) {
cerr << ERR_PREFIX << "upload: trunc failed with error " << ret << std::endl;
fclose(fp);
return ret;
}
uint64_t off = 0;
static const int CHUNK_SZ = 32765;
while (true) {
char buf[CHUNK_SZ];
int flen = fread(buf, 1, CHUNK_SZ, fp);
if (flen < 0) {
int err = errno;
cerr << ERR_PREFIX << "upload: fread(" << file_name << ") error: "
<< cpp_strerror(err) << std::endl;
fclose(fp);
return err;
}
if ((flen == 0) && (off != 0)) {
fclose(fp);
break;
}
// There must be a zero-copy way to do this?
bufferlist bl;
bl.append(buf, flen);
int rlen = io_ctx.write(rados_name, bl, flen, off);
if (rlen < 0) {
fclose(fp);
cerr << ERR_PREFIX << "upload: rados_write error: " << rlen << std::endl;
return rlen;
}
if (rlen != flen) {
fclose(fp);
cerr << ERR_PREFIX << "upload: rados_write error: short write" << std::endl;
return -EIO;
}
off += rlen;
if (flen < CHUNK_SZ) {
fclose(fp);
return 0;
}
}
return 0;
}
BackedUpObject::BackedUpObject(const char *rados_name_,
uint64_t rados_size_, time_t rados_time_)
: rados_name(strdup(rados_name_)),
rados_size(rados_size_),
rados_time(rados_time_)
{
}
int BackedUpObject::read_xattrs_from_file(int fd)
{
ssize_t blen = ceph_os_flistxattr(fd, NULL, 0);
if (blen > 0x1000000) {
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: unwilling "
<< "to allocate a buffer of size " << blen << " on the stack for "
<< "flistxattr." << std::endl;
return ENOBUFS;
}
char buf[blen + 1];
memset(buf, 0, sizeof(buf));
ssize_t blen2 = ceph_os_flistxattr(fd, buf, blen);
if (blen != blen2) {
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattrs changed while "
<< "we were trying to "
<< "list them? First length was " << blen << ", but now it's " << blen2
<< std::endl;
return EDOM;
}
const char *b = buf;
while (*b) {
size_t bs = strlen(b);
std::string xattr_name = get_user_xattr_name(b);
if (!xattr_name.empty()) {
ssize_t attr_len = ceph_os_fgetxattr(fd, b, NULL, 0);
if (attr_len < 0) {
int err = errno;
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: "
<< "fgetxattr(rados_name = '" << rados_name << "', xattr_name='"
<< xattr_name << "') failed: " << cpp_strerror(err) << std::endl;
return EDOM;
}
char *attr = (char*)malloc(attr_len);
if (!attr) {
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: "
<< "malloc(" << attr_len << ") failed for xattr_name='"
<< xattr_name << "'" << std::endl;
return ENOBUFS;
}
ssize_t attr_len2 = ceph_os_fgetxattr(fd, b, attr, attr_len);
if (attr_len2 < 0) {
int err = errno;
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: "
<< "fgetxattr(rados_name = '" << rados_name << "', "
<< "xattr_name='" << xattr_name << "') failed: "
<< cpp_strerror(err) << std::endl;
free(attr);
return EDOM;
}
if (attr_len2 != attr_len) {
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattr "
<< "changed while we were trying to get it? "
<< "fgetxattr(rados_name = '"<< rados_name
<< "', xattr_name='" << xattr_name << "') returned a different length "
<< "than when we first called it! old_len = " << attr_len
<< "new_len = " << attr_len2 << std::endl;
free(attr);
return EDOM;
}
xattrs[xattr_name] = new Xattr(attr, attr_len);
}
b += (bs + 1);
}
return 0;
}
int BackedUpObject::read_xattrs_from_rados(IoCtx &io_ctx)
{
map<std::string, bufferlist> attrset;
int ret = io_ctx.getxattrs(rados_name, attrset);
if (ret) {
cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_rados: "
<< "getxattrs failed with error code " << ret << std::endl;
return ret;
}
for (map<std::string, bufferlist>::iterator i = attrset.begin();
i != attrset.end(); )
{
bufferlist& bl(i->second);
char *data = (char*)malloc(bl.length());
if (!data)
return ENOBUFS;
memcpy(data, bl.c_str(), bl.length());
Xattr *xattr = new Xattr(data, bl.length());
if (!xattr) {
free(data);
return ENOBUFS;
}
xattrs[i->first] = xattr;
attrset.erase(i++);
}
return 0;
}
int rados_tool_sync(const std::map < std::string, std::string > &opts,
std::vector<const char*> &args)
{
int ret;
bool force = opts.count("force");
bool delete_after = opts.count("delete-after");
bool create = opts.count("create");
std::map < std::string, std::string >::const_iterator n = opts.find("workers");
int num_threads;
if (n == opts.end()) {
num_threads = DEFAULT_NUM_RADOS_WORKER_THREADS;
}
else {
std::string err;
num_threads = strict_strtol(n->second.c_str(), 10, &err);
if (!err.empty()) {
cerr << "rados: can't parse number of worker threads given: "
<< err << std::endl;
return 1;
}
if ((num_threads < 1) || (num_threads > 9000)) {
cerr << "rados: unreasonable value given for num_threads: "
<< num_threads << std::endl;
return 1;
}
}
std::string action, src, dst;
std::vector<const char*>::iterator i = args.begin();
if ((i != args.end()) &&
((strcmp(*i, "import") == 0) || (strcmp(*i, "export") == 0))) {
action = *i;
++i;
}
else {
cerr << "rados" << ": You must specify either 'import' or 'export'.\n";
cerr << "Use --help to show help.\n";
exit(1);
}
if (i != args.end()) {
src = *i;
++i;
}
else {
cerr << "rados" << ": You must give a source.\n";
cerr << "Use --help to show help.\n";
exit(1);
}
if (i != args.end()) {
dst = *i;
++i;
}
else {
cerr << "rados" << ": You must give a destination.\n";
cerr << "Use --help to show help.\n";
exit(1);
}
// open rados
Rados rados;
if (rados.init_with_context(g_ceph_context) < 0) {
cerr << "rados" << ": failed to initialize Rados!" << std::endl;
exit(1);
}
if (rados.connect() < 0) {
cerr << "rados" << ": failed to connect to Rados cluster!" << std::endl;
exit(1);
}
IoCtx io_ctx;
std::string pool_name = (action == "import") ? dst : src;
ret = rados.ioctx_create(pool_name.c_str(), io_ctx);
if ((ret == -ENOENT) && (action == "import")) {
if (create) {
ret = rados.pool_create(pool_name.c_str());
if (ret) {
cerr << "rados" << ": pool_create failed with error " << ret
<< std::endl;
exit(ret);
}
ret = rados.ioctx_create(pool_name.c_str(), io_ctx);
}
else {
cerr << "rados" << ": pool '" << pool_name << "' does not exist. Use "
<< "--create to try to create it." << std::endl;
exit(ENOENT);
}
}
if (ret < 0) {
cerr << "rados" << ": error opening pool " << pool_name << ": "
<< cpp_strerror(ret) << std::endl;
exit(ret);
}
IoCtxDistributor *io_ctx_dist = IoCtxDistributor::instance();
ret = io_ctx_dist->init(rados, pool_name.c_str(), num_threads);
if (ret) {
cerr << ERR_PREFIX << "failed to initialize Rados io contexts."
<< std::endl;
_exit(ret);
}
ThreadPool thread_pool(g_ceph_context, "rados_sync_threadpool", num_threads);
thread_pool.start();
if (action == "import") {
ret = do_rados_import(&thread_pool, io_ctx, io_ctx_dist, src.c_str(),
force, delete_after);
thread_pool.stop();
return ret;
}
else {
ret = do_rados_export(&thread_pool, io_ctx, io_ctx_dist, dst.c_str(),
create, force, delete_after);
thread_pool.stop();
return ret;
}
}

View File

@ -1,216 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_RADOS_SYNC_H
#define CEPH_RADOS_SYNC_H
#include <stddef.h>
#include "include/atomic.h"
#include "common/WorkQueue.h"
#include <string>
#include <sys/types.h>
namespace librados {
class IoCtx;
class Rados;
}
extern const char USER_XATTR_PREFIX[];
extern const char RADOS_SYNC_TMP_SUFFIX[];
#define ERR_PREFIX "[ERROR] "
#define DEFAULT_NUM_RADOS_WORKER_THREADS 5
/* Linux seems to use ENODATA instead of ENOATTR when an extended attribute
* is missing */
#ifndef ENOATTR
#define ENOATTR ENODATA
#endif
enum {
CHANGED_XATTRS = 0x1,
CHANGED_CONTENTS = 0x2,
};
/** Given the name of an extended attribute from a file in the filesystem,
* returns an empty string if the extended attribute does not represent a rados
* user extended attribute. Otherwise, returns the name of the rados extended
* attribute.
*
* Rados user xattrs are prefixed with USER_XATTR_PREFIX.
*/
std::string get_user_xattr_name(const char *fs_xattr_name);
/* Returns true if 'suffix' is a suffix of str */
bool is_suffix(const char *str, const char *suffix);
/** Represents a directory in the filesystem that we export rados objects to (or
* import them from.)
*/
class ExportDir
{
public:
static ExportDir* create_for_writing(const std::string &path, int version,
bool create);
static ExportDir* from_file_system(const std::string &path);
/* Given a rados object name, return something which looks kind of like the
* first part of the name.
*
* The actual file name that the backed-up object is stored in is irrelevant
* to rados_sync. The only reason to make it human-readable at all is to make
* things easier on sysadmins. The XATTR_FULLNAME extended attribute has the
* real, full object name.
*
* This function turns unicode into a bunch of 'at' signs. This could be
* fixed. If you try, be sure to handle all the multibyte characters
* correctly.
* I guess a better hash would be nice too.
*/
std::string get_fs_path(const std::string &rados_name) const;
private:
explicit ExportDir(const std::string &path_);
std::string path;
};
/** Smart pointer wrapper for a DIR*
*/
class DirHolder {
public:
DirHolder();
~DirHolder();
int opendir(const char *dir_name);
DIR *dp;
};
/** IoCtxDistributor is a singleton that distributes out IoCtx instances to
* different threads.
*/
class IoCtxDistributor
{
public:
static IoCtxDistributor* instance();
int init(librados::Rados &cluster, const char *pool_name, int num_ioctxes);
void clear();
librados::IoCtx& get_ioctx();
private:
static IoCtxDistributor *s_instance;
IoCtxDistributor();
~IoCtxDistributor();
ceph::atomic_t m_highest_iod_idx;
/* NB: there might be some false sharing here that we could optimize
* away in the future */
std::vector<librados::IoCtx> m_io_ctxes;
};
class RadosSyncWQ : public ThreadPool::WorkQueue<std::string> {
public:
RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t timeout, time_t suicide_timeout, ThreadPool *tp);
protected:
IoCtxDistributor *m_io_ctx_dist;
private:
bool _enqueue(std::string *s);
void _dequeue(std::string *o);
bool _empty();
std::string *_dequeue();
void _process_finish(std::string *s);
void _clear();
std::deque<std::string*> m_items;
};
/* Stores a length and a chunk of malloc()ed data */
class Xattr {
public:
Xattr(char *data_, ssize_t len_);
~Xattr();
bool operator==(const class Xattr &rhs) const;
bool operator!=(const class Xattr &rhs) const;
char *data;
ssize_t len;
};
/* Represents an object that we are backing up */
class BackedUpObject
{
public:
static int from_file(const char *file_name, const char *dir_name,
std::auto_ptr<BackedUpObject> &obj);
static int from_path(const char *path, std::auto_ptr<BackedUpObject> &obj);
static int from_rados(librados::IoCtx& io_ctx, const char *rados_name_,
auto_ptr<BackedUpObject> &obj);
~BackedUpObject();
/* Get the mangled name for this rados object. */
std::string get_fs_path(const ExportDir *export_dir) const;
/* Convert the xattrs on this BackedUpObject to a kind of JSON-like string.
* This is only used for debugging.
* Note that we're assuming we can just treat the xattr data as a
* null-terminated string, which isn't true. Again, this is just for debugging,
* so it doesn't matter.
*/
std::string xattrs_to_str() const;
/* Diff the extended attributes on this BackedUpObject with those found on a
* different BackedUpObject
*/
void xattr_diff(const BackedUpObject *rhs,
std::list < std::string > &only_in_a,
std::list < std::string > &only_in_b,
std::list < std::string > &diff) const;
void get_xattrs(std::list < std::string > &xattrs_) const;
const Xattr* get_xattr(const std::string &name) const;
const char *get_rados_name() const;
uint64_t get_rados_size() const;
time_t get_mtime() const;
int download(librados::IoCtx &io_ctx, const char *path);
int upload(librados::IoCtx &io_ctx, const char *file_name, const char *dir_name);
private:
BackedUpObject(const char *rados_name_, uint64_t rados_size_, time_t rados_time_);
int read_xattrs_from_file(int fd);
int read_xattrs_from_rados(librados::IoCtx &io_ctx);
// don't allow copying
BackedUpObject &operator=(const BackedUpObject &rhs);
BackedUpObject(const BackedUpObject &rhs);
char *rados_name;
uint64_t rados_size;
uint64_t rados_time;
std::map < std::string, Xattr* > xattrs;
};
extern int do_rados_import(ThreadPool *tp, librados::IoCtx &io_ctx,
IoCtxDistributor* io_ctx_dist, const char *dir_name,
bool force, bool delete_after);
extern int do_rados_export(ThreadPool *tp, librados::IoCtx& io_ctx,
IoCtxDistributor *io_ctx_dist, const char *dir_name,
bool create, bool force, bool delete_after);
#endif