Merge pull request #38068 from dillaman/wip-librbd-migration-4

librbd: S3-based migration source 

Reviewed-by: Mykola Golub <mgolub@suse.com>
This commit is contained in:
Mykola Golub 2020-11-25 17:30:54 +02:00 committed by GitHub
commit 8dbb87cf5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 778 additions and 201 deletions

View File

@ -133,6 +133,7 @@ set(librbd_internal_srcs
migration/NativeFormat.cc
migration/OpenSourceImageRequest.cc
migration/RawFormat.cc
migration/S3Stream.cc
migration/SourceSpecBuilder.cc
migration/Utils.cc
mirror/DemoteRequest.cc

View File

@ -269,6 +269,7 @@ int open_images(librados::IoCtx& io_ctx, const std::string &image_name,
ldout(cct, 10) << "re-opening the destination image" << dendl;
r = image_ctx->state->open(0);
if (r < 0) {
image_ctx = nullptr;
lderr(cct) << "failed to re-open destination image: " << cpp_strerror(r)
<< dendl;
return r;

View File

@ -6,7 +6,10 @@
#include "common/errno.h"
#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/asio/Utils.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ReadResult.h"
#include "librbd/migration/Utils.h"
#include <boost/asio/buffer.hpp>
#include <boost/asio/post.hpp>
@ -14,7 +17,6 @@
#include <boost/asio/read.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/lexical_cast.hpp>
#include <deque>
@ -108,7 +110,7 @@ public:
ldout(cct, 20) << "work=" << work.get() << dendl;
++m_in_flight_requests;
(*work)(this, derived().stream());
(*work)(derived().stream());
}
void handle_issue(boost::system::error_code ec,
@ -216,8 +218,7 @@ private:
std::deque<std::shared_ptr<Work>> m_receive_queue;
boost::beast::flat_buffer m_buffer;
std::optional<boost::beast::http::parser<
false, boost::beast::http::empty_body>> m_header_parser;
std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser;
std::optional<boost::beast::http::parser<false, StringBody>> m_parser;
D& derived() {
@ -455,14 +456,33 @@ private:
return;
}
boost::beast::http::response<StringBody> response;
Response response;
if (work->header_only()) {
m_parser.emplace(std::move(*m_header_parser));
}
response = std::move(m_parser->release());
// basic response code handling in a common location
int r = 0;
auto result = response.result();
if (result == boost::beast::http::status::not_found) {
lderr(cct) << "requested resource does not exist" << dendl;
r = -ENOENT;
} else if (result == boost::beast::http::status::forbidden) {
lderr(cct) << "permission denied attempting to access resource" << dendl;
r = -EACCES;
} else if (boost::beast::http::to_status_class(result) !=
boost::beast::http::status_class::successful) {
lderr(cct) << "failed to retrieve size: HTTP " << result << dendl;
r = -EIO;
}
bool need_eof = response.need_eof();
work->complete(0, std::move(response));
if (r < 0) {
work->complete(r, {});
} else {
work->complete(0, std::move(response));
}
if (need_eof) {
ldout(cct, 20) << "reset required for non-pipelined response: "
@ -534,8 +554,7 @@ private:
ceph_assert(false);
}
void complete_work(std::shared_ptr<Work> work, int r,
boost::beast::http::response<StringBody>&& response) {
void complete_work(std::shared_ptr<Work> work, int r, Response&& response) {
auto cct = m_http_client->m_cct;
ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl;
@ -744,8 +763,8 @@ private:
template <typename I>
HttpClient<I>::HttpClient(I* image_ctx, const std::string& url)
: m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine), m_url(url),
m_strand(*m_asio_engine),
: m_cct(image_ctx->cct), m_image_ctx(image_ctx),
m_asio_engine(image_ctx->asio_engine), m_url(url), m_strand(*m_asio_engine),
m_ssl_context(boost::asio::ssl::context::sslv23_client) {
m_ssl_context.set_default_verify_paths();
}
@ -776,37 +795,24 @@ template <typename I>
void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) {
ldout(m_cct, 10) << dendl;
boost::beast::http::request<boost::beast::http::empty_body> req;
Request req;
req.method(boost::beast::http::verb::head);
issue(
std::move(req), [this, size, on_finish]
(int r, boost::beast::http::response<StringBody>&& response) {
std::move(req), [this, size, on_finish](int r, Response&& response) {
handle_get_size(r, std::move(response), size, on_finish);
});
}
template <typename I>
void HttpClient<I>::handle_get_size(
int r, boost::beast::http::response<StringBody>&& response,
uint64_t* size, Context* on_finish) {
void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size,
Context* on_finish) {
ldout(m_cct, 10) << "r=" << r << dendl;
if (r < 0) {
lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
} else if (response.result() == boost::beast::http::status::not_found) {
lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(-ENOENT)
<< dendl;
on_finish->complete(-ENOENT);
return;
} else if (boost::beast::http::to_status_class(response.result()) !=
boost::beast::http::status_class::successful) {
lderr(m_cct) << "failed to retrieve size: HTTP " << response.result()
<< dendl;
on_finish->complete(-EIO);
return;
} else if (!response.has_content_length()) {
lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl;
on_finish->complete(-EINVAL);
@ -825,6 +831,76 @@ void HttpClient<I>::handle_get_size(
on_finish->complete(0);
}
template <typename I>
void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data,
Context* on_finish) {
ldout(m_cct, 20) << dendl;
auto aio_comp = io::AioCompletion::create_and_start(
on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
aio_comp->set_request_count(byte_extents.size());
// utilize ReadResult to assemble multiple byte extents into a single bl
// since boost::beast doesn't support multipart responses out-of-the-box
io::ReadResult read_result{data};
aio_comp->read_result = std::move(read_result);
aio_comp->read_result.set_image_extents(byte_extents);
// issue a range get request for each extent
uint64_t buffer_offset = 0;
for (auto [byte_offset, byte_length] : byte_extents) {
auto ctx = new io::ReadResult::C_ImageReadRequest(
aio_comp, buffer_offset, {{byte_offset, byte_length}});
buffer_offset += byte_length;
Request req;
req.method(boost::beast::http::verb::get);
std::stringstream range;
ceph_assert(byte_length > 0);
range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
req.set(boost::beast::http::field::range, range.str());
issue(
std::move(req),
[this, byte_offset=byte_offset, byte_length=byte_length, ctx]
(int r, Response&& response) {
handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
ctx);
});
}
}
template <typename I>
void HttpClient<I>::handle_read(int r, Response&& response,
uint64_t byte_offset, uint64_t byte_length,
bufferlist* data, Context* on_finish) {
ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
<< "r=" << r << dendl;
if (r < 0) {
lderr(m_cct) << "failed to read requested byte range: "
<< cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
} else if (response.result() != boost::beast::http::status::partial_content) {
lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
<< response.result() << dendl;
on_finish->complete(-EIO);
return;
} else if (byte_length != response.body().size()) {
lderr(m_cct) << "unexpected short range read: "
<< "wanted=" << byte_length << ", "
<< "received=" << response.body().size() << dendl;
on_finish->complete(-EINVAL);
return;
}
data->clear();
data->append(response.body());
on_finish->complete(data->length());
}
template <typename I>
void HttpClient<I>::issue(std::shared_ptr<Work>&& work) {
boost::asio::post(m_strand, [this, work=std::move(work)]() mutable {

View File

@ -6,16 +6,20 @@
#include "include/common_fwd.h"
#include "include/int_types.h"
#include "librbd/io/Types.h"
#include "librbd/migration/HttpProcessorInterface.h"
#include "librbd/migration/Types.h"
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/version.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <functional>
#include <memory>
#include <string>
#include <utility>
@ -32,6 +36,13 @@ namespace migration {
template <typename ImageCtxT>
class HttpClient {
public:
using EmptyBody = boost::beast::http::empty_body;
using StringBody = boost::beast::http::string_body;
using Request = boost::beast::http::request<EmptyBody>;
using Response = boost::beast::http::response<StringBody>;
using RequestPreprocessor = std::function<void(Request&)>;
static HttpClient* create(ImageCtxT* image_ctx, const std::string& url) {
return new HttpClient(image_ctx, url);
}
@ -45,22 +56,30 @@ public:
void get_size(uint64_t* size, Context* on_finish);
void read(io::Extents&& byte_extents, bufferlist* data,
Context* on_finish);
void set_ignore_self_signed_cert(bool ignore) {
m_ignore_self_signed_cert = ignore;
}
using StringBody = boost::beast::http::string_body;
void set_http_processor(HttpProcessorInterface* http_processor) {
m_http_processor = http_processor;
}
template <class Body, typename Completion>
void issue(boost::beast::http::request<Body>&& request,
Completion&& completion) {
struct WorkImpl : Work {
HttpClient* http_client;
boost::beast::http::request<Body> request;
Completion completion;
WorkImpl(boost::beast::http::request<Body>&& request,
WorkImpl(HttpClient* http_client,
boost::beast::http::request<Body>&& request,
Completion&& completion)
: request(std::move(request)), completion(std::move(completion)) {
: http_client(http_client), request(std::move(request)),
completion(std::move(completion)) {
}
WorkImpl(const WorkImpl&) = delete;
WorkImpl& operator=(const WorkImpl&) = delete;
@ -73,36 +92,44 @@ public:
return (request.method() == boost::beast::http::verb::head);
}
void complete(
int r, boost::beast::http::response<StringBody>&& response) override {
void complete(int r, Response&& response) override {
completion(r, std::move(response));
}
void operator()(
HttpSessionInterface* http_session,
boost::beast::tcp_stream& stream) override {
void operator()(boost::beast::tcp_stream& stream) override {
preprocess_request();
boost::beast::http::async_write(
stream, request,
[http_session, work=this->shared_from_this()]
[http_session=http_client->m_http_session.get(),
work=this->shared_from_this()]
(boost::beast::error_code ec, std::size_t) mutable {
http_session->handle_issue(ec, std::move(work));
});
}
void operator()(
HttpSessionInterface* http_session,
boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override {
preprocess_request();
boost::beast::http::async_write(
stream, request,
[http_session, work=this->shared_from_this()]
[http_session=http_client->m_http_session.get(),
work=this->shared_from_this()]
(boost::beast::error_code ec, std::size_t) mutable {
http_session->handle_issue(ec, std::move(work));
});
}
void preprocess_request() {
if (http_client->m_http_processor) {
http_client->m_http_processor->process_request(request);
}
}
};
initialize_default_fields(request);
issue(std::make_shared<WorkImpl>(std::move(request),
issue(std::make_shared<WorkImpl>(this, std::move(request),
std::move(completion)));
}
@ -123,13 +150,9 @@ private:
virtual ~Work() {}
virtual bool need_eof() const = 0;
virtual bool header_only() const = 0;
virtual void complete(
int r, boost::beast::http::response<StringBody>&&) = 0;
virtual void complete(int r, Response&&) = 0;
virtual void operator()(boost::beast::tcp_stream& stream) = 0;
virtual void operator()(
HttpSessionInterface* http_session,
boost::beast::tcp_stream& stream) = 0;
virtual void operator()(
HttpSessionInterface* http_session,
boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0;
};
@ -139,6 +162,7 @@ private:
struct SslHttpSession;
CephContext* m_cct;
ImageCtxT* m_image_ctx;
std::shared_ptr<AsioEngine> m_asio_engine;
std::string m_url;
@ -146,6 +170,8 @@ private:
bool m_ignore_self_signed_cert = false;
HttpProcessorInterface* m_http_processor = nullptr;
boost::asio::io_context::strand m_strand;
boost::asio::ssl::context m_ssl_context;
@ -159,9 +185,11 @@ private:
BOOST_BEAST_VERSION_STRING);
}
void handle_get_size(
int r, boost::beast::http::response<StringBody>&& response,
uint64_t* size, Context* on_finish);
void handle_get_size(int r, Response&& response, uint64_t* size,
Context* on_finish);
void handle_read(int r, Response&& response, uint64_t byte_offset,
uint64_t byte_length, bufferlist* data, Context* on_finish);
void issue(std::shared_ptr<Work>&& work);

View File

@ -0,0 +1,27 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H
#define CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H
#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/message.hpp>
namespace librbd {
namespace migration {
struct HttpProcessorInterface {
using EmptyBody = boost::beast::http::empty_body;
using EmptyRequest = boost::beast::http::request<EmptyBody>;
virtual ~HttpProcessorInterface() {
}
virtual void process_request(EmptyRequest& request) = 0;
};
} // namespace migration
} // namespace librbd
#endif // CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H

View File

@ -6,10 +6,7 @@
#include "common/errno.h"
#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/asio/Utils.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ReadResult.h"
#include "librbd/migration/HttpClient.h"
#include <boost/beast/http.hpp>
@ -74,72 +71,9 @@ void HttpStream<I>::get_size(uint64_t* size, Context* on_finish) {
template <typename I>
void HttpStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
Context* on_finish) {
using HttpRequest = boost::beast::http::request<
boost::beast::http::empty_body>;
ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl;
ldout(m_cct, 20) << dendl;
auto aio_comp = io::AioCompletion::create_and_start(
on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
aio_comp->set_request_count(byte_extents.size());
// utilize ReadResult to assemble multiple byte extents into a single bl
// since boost::beast doesn't support multipart responses out-of-the-box
io::ReadResult read_result{data};
aio_comp->read_result = std::move(read_result);
aio_comp->read_result.set_image_extents(byte_extents);
// issue a range get request for each extent
uint64_t buffer_offset = 0;
for (auto [byte_offset, byte_length] : byte_extents) {
auto ctx = new io::ReadResult::C_ImageReadRequest(
aio_comp, buffer_offset, {{byte_offset, byte_length}});
buffer_offset += byte_length;
HttpRequest req;
req.method(boost::beast::http::verb::get);
std::stringstream range;
ceph_assert(byte_length > 0);
range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
req.set(boost::beast::http::field::range, range.str());
m_http_client->issue(std::move(req),
[this, byte_offset=byte_offset, byte_length=byte_length, ctx](int r, HttpResponse&& response) {
handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
ctx);
});
}
}
template <typename I>
void HttpStream<I>::handle_read(int r, HttpResponse&& response,
uint64_t byte_offset, uint64_t byte_length,
bufferlist* data, Context* on_finish) {
ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
<< "r=" << r << dendl;
if (r < 0) {
lderr(m_cct) << "failed to read requested byte range: "
<< cpp_strerror(r) << dendl;
on_finish->complete(r);
return;
} else if (response.result() != boost::beast::http::status::partial_content) {
lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
<< response.result() << dendl;
on_finish->complete(-EIO);
return;
} else if (byte_length != response.body().size()) {
lderr(m_cct) << "unexpected short range read: "
<< "wanted=" << byte_length << ", "
<< "received=" << response.body().size() << dendl;
on_finish->complete(-EINVAL);
return;
}
data->clear();
data->append(response.body());
on_finish->complete(data->length());
m_http_client->read(std::move(byte_extents), data, on_finish);
}
} // namespace migration

View File

@ -58,8 +58,6 @@ private:
std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
void handle_read(int r, HttpResponse&& response, uint64_t byte_offset,
uint64_t byte_length, bufferlist* data, Context* on_finish);
};
} // namespace migration

View File

@ -0,0 +1,178 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "librbd/migration/S3Stream.h"
#include "common/armor.h"
#include "common/ceph_crypto.h"
#include "common/ceph_time.h"
#include "common/dout.h"
#include "common/errno.h"
#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/asio/Utils.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ReadResult.h"
#include "librbd/migration/HttpClient.h"
#include "librbd/migration/HttpProcessorInterface.h"
#include <boost/beast/http.hpp>
#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
#include <fmt/chrono.h>
#include <fmt/format.h>
#include <time.h>
namespace librbd {
namespace migration {
using HttpRequest = boost::beast::http::request<boost::beast::http::empty_body>;
namespace {
const std::string URL_KEY {"url"};
const std::string ACCESS_KEY {"access_key"};
const std::string SECRET_KEY {"secret_key"};
} // anonymous namespace
template <typename I>
struct S3Stream<I>::HttpProcessor : public HttpProcessorInterface {
S3Stream* s3stream;
HttpProcessor(S3Stream* s3stream) : s3stream(s3stream) {
}
void process_request(EmptyRequest& request) override {
s3stream->process_request(request);
}
};
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::migration::S3Stream: " << this \
<< " " << __func__ << ": "
template <typename I>
S3Stream<I>::S3Stream(I* image_ctx, const json_spirit::mObject& json_object)
: m_image_ctx(image_ctx), m_cct(image_ctx->cct),
m_asio_engine(image_ctx->asio_engine), m_json_object(json_object),
m_http_processor(std::make_unique<HttpProcessor>(this)) {
}
template <typename I>
S3Stream<I>::~S3Stream() {
}
template <typename I>
void S3Stream<I>::open(Context* on_finish) {
auto& url_value = m_json_object[URL_KEY];
if (url_value.type() != json_spirit::str_type) {
lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl;
on_finish->complete(-EINVAL);
return;
}
auto& access_key = m_json_object[ACCESS_KEY];
if (access_key.type() != json_spirit::str_type) {
lderr(m_cct) << "failed to locate '" << ACCESS_KEY << "' key" << dendl;
on_finish->complete(-EINVAL);
return;
}
auto& secret_key = m_json_object[SECRET_KEY];
if (secret_key.type() != json_spirit::str_type) {
lderr(m_cct) << "failed to locate '" << SECRET_KEY << "' key" << dendl;
on_finish->complete(-EINVAL);
return;
}
m_url = url_value.get_str();
m_access_key = access_key.get_str();
m_secret_key = secret_key.get_str();
ldout(m_cct, 10) << "url=" << m_url << ", "
<< "access_key=" << m_access_key << dendl;
m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url));
m_http_client->set_http_processor(m_http_processor.get());
m_http_client->open(on_finish);
}
template <typename I>
void S3Stream<I>::close(Context* on_finish) {
ldout(m_cct, 10) << dendl;
if (!m_http_client) {
on_finish->complete(0);
}
m_http_client->close(on_finish);
}
template <typename I>
void S3Stream<I>::get_size(uint64_t* size, Context* on_finish) {
ldout(m_cct, 10) << dendl;
m_http_client->get_size(size, on_finish);
}
template <typename I>
void S3Stream<I>::read(io::Extents&& byte_extents, bufferlist* data,
Context* on_finish) {
ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl;
m_http_client->read(std::move(byte_extents), data, on_finish);
}
template <typename I>
void S3Stream<I>::process_request(HttpRequest& http_request) {
ldout(m_cct, 20) << dendl;
// format RFC 1123 date/time
auto time = ceph::real_clock::to_time_t(ceph::real_clock::now());
struct tm timeInfo;
gmtime_r(&time, &timeInfo);
std::string date = fmt::format("{:%a, %d %b %Y %H:%M:%S %z}", timeInfo);
http_request.set(boost::beast::http::field::date, date);
// note: we don't support S3 subresources
std::string canonicalized_resource = std::string(http_request.target());
std::string string_to_sign = fmt::format(
"{}\n\n\n{}\n{}",
std::string(boost::beast::http::to_string(http_request.method())),
date, canonicalized_resource);
// create HMAC-SHA1 signature from secret key + string-to-sign
sha1_digest_t digest;
crypto::HMACSHA1 hmac(
reinterpret_cast<const unsigned char*>(m_secret_key.data()),
m_secret_key.size());
hmac.Update(reinterpret_cast<const unsigned char*>(string_to_sign.data()),
string_to_sign.size());
hmac.Final(reinterpret_cast<unsigned char*>(digest.v));
// base64 encode the result
char buf[64];
int r = ceph_armor(std::begin(buf), std::begin(buf) + sizeof(buf),
reinterpret_cast<const char *>(digest.v),
reinterpret_cast<const char *>(digest.v + digest.SIZE));
if (r < 0) {
ceph_abort("ceph_armor failed");
}
// store the access-key + signature in the HTTP authorization header
std::string signature = std::string(std::begin(buf), std::begin(buf) + r);
std::string authorization = fmt::format("AWS {}:{}", m_access_key, signature);
http_request.set(boost::beast::http::field::authorization, authorization);
ldout(m_cct, 20) << "string_to_sign=" << string_to_sign << ", "
<< "authorization=" << authorization << dendl;
}
} // namespace migration
} // namespace librbd
template class librbd::migration::S3Stream<librbd::ImageCtx>;

View File

@ -0,0 +1,78 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_MIGRATION_S3_STREAM_H
#define CEPH_LIBRBD_MIGRATION_S3_STREAM_H
#include "include/int_types.h"
#include "librbd/migration/StreamInterface.h"
#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <json_spirit/json_spirit.h>
#include <memory>
#include <string>
struct Context;
namespace librbd {
struct AsioEngine;
struct ImageCtx;
namespace migration {
template <typename> class HttpClient;
template <typename ImageCtxT>
class S3Stream : public StreamInterface {
public:
static S3Stream* create(ImageCtxT* image_ctx,
const json_spirit::mObject& json_object) {
return new S3Stream(image_ctx, json_object);
}
S3Stream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
~S3Stream() override;
S3Stream(const S3Stream&) = delete;
S3Stream& operator=(const S3Stream&) = delete;
void open(Context* on_finish) override;
void close(Context* on_finish) override;
void get_size(uint64_t* size, Context* on_finish) override;
void read(io::Extents&& byte_extents, bufferlist* data,
Context* on_finish) override;
private:
using HttpRequest = boost::beast::http::request<
boost::beast::http::empty_body>;
using HttpResponse = boost::beast::http::response<
boost::beast::http::string_body>;
struct HttpProcessor;
ImageCtxT* m_image_ctx;
CephContext* m_cct;
std::shared_ptr<AsioEngine> m_asio_engine;
json_spirit::mObject m_json_object;
std::string m_url;
std::string m_access_key;
std::string m_secret_key;
std::unique_ptr<HttpProcessor> m_http_processor;
std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
void process_request(HttpRequest& http_request);
};
} // namespace migration
} // namespace librbd
extern template class librbd::migration::S3Stream<librbd::ImageCtx>;
#endif // CEPH_LIBRBD_MIGRATION_S3_STREAM_H

View File

@ -6,6 +6,7 @@
#include "librbd/ImageCtx.h"
#include "librbd/migration/FileStream.h"
#include "librbd/migration/HttpStream.h"
#include "librbd/migration/S3Stream.h"
#include "librbd/migration/NativeFormat.h"
#include "librbd/migration/RawFormat.h"
@ -99,6 +100,8 @@ int SourceSpecBuilder<I>::build_stream(
stream->reset(FileStream<I>::create(m_image_ctx, stream_obj));
} else if (type == "http") {
stream->reset(HttpStream<I>::create(m_image_ctx, stream_obj));
} else if (type == "s3") {
stream->reset(S3Stream<I>::create(m_image_ctx, stream_obj));
} else {
lderr(cct) << "unknown or unsupported stream type '" << type << "'"
<< dendl;

View File

@ -93,6 +93,7 @@ set(unittest_librbd_srcs
migration/test_mock_HttpClient.cc
migration/test_mock_HttpStream.cc
migration/test_mock_RawFormat.cc
migration/test_mock_S3Stream.cc
migration/test_mock_Utils.cc
mirror/snapshot/test_mock_CreateNonPrimaryRequest.cc
mirror/snapshot/test_mock_CreatePrimaryRequest.cc

View File

@ -21,14 +21,20 @@ struct MockTestImageCtx : public MockImageCtx {
};
} // anonymous namespace
namespace util {
inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) {
return image_ctx->image_ctx;
}
} // namespace util
} // namespace librbd
#include "librbd/migration/HttpClient.cc"
using EmptyHttpRequest = boost::beast::http::request<
boost::beast::http::empty_body>;
using HttpRequest = boost::beast::http::request<
boost::beast::http::string_body>;
using HttpResponse = boost::beast::http::response<
boost::beast::http::string_body>;
@ -449,11 +455,11 @@ TEST_F(TestMockMigrationHttpClient, IssueSendFailed) {
client_accept(&socket, false, &on_connect_ctx2);
// send request via closed connection
HttpRequest req;
EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
@ -486,11 +492,11 @@ TEST_F(TestMockMigrationHttpClient, IssueReceiveFailed) {
ASSERT_EQ(0, ctx1.wait());
// send request via closed connection
HttpRequest req;
EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
@ -536,17 +542,17 @@ TEST_F(TestMockMigrationHttpClient, IssueResetFailed) {
ASSERT_EQ(0, ctx1.wait());
// send requests then close connection
HttpRequest req;
EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
C_SaferCond ctx3;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx3](int r, HttpResponse&&) mutable {
ctx3.complete(r);
});
@ -569,7 +575,7 @@ TEST_F(TestMockMigrationHttpClient, IssueResetFailed) {
client_accept(&socket, false, &on_connect_ctx2);
C_SaferCond ctx4;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx4](int r, HttpResponse&&) mutable {
ctx4.complete(r);
});
@ -602,23 +608,23 @@ TEST_F(TestMockMigrationHttpClient, IssuePipelined) {
ASSERT_EQ(0, ctx1.wait());
// issue two pipelined (concurrent) get requests
HttpRequest req1;
EmptyHttpRequest req1;
req1.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
HttpResponse res1;
http_client.issue(HttpRequest{req1},
http_client.issue(EmptyHttpRequest{req1},
[&ctx2, &res1](int r, HttpResponse&& response) mutable {
res1 = std::move(response);
ctx2.complete(r);
});
HttpRequest req2;
EmptyHttpRequest req2;
req2.method(boost::beast::http::verb::get);
C_SaferCond ctx3;
HttpResponse res2;
http_client.issue(HttpRequest{req2},
http_client.issue(EmptyHttpRequest{req2},
[&ctx3, &res2](int r, HttpResponse&& response) mutable {
res2 = std::move(response);
ctx3.complete(r);
@ -660,7 +666,7 @@ TEST_F(TestMockMigrationHttpClient, IssuePipelinedRestart) {
ASSERT_EQ(0, ctx1.wait());
// issue two pipelined (concurrent) get requests
HttpRequest req1;
EmptyHttpRequest req1;
req1.keep_alive(false);
req1.method(boost::beast::http::verb::get);
@ -669,18 +675,18 @@ TEST_F(TestMockMigrationHttpClient, IssuePipelinedRestart) {
C_SaferCond ctx2;
HttpResponse res1;
http_client.issue(HttpRequest{req1},
http_client.issue(EmptyHttpRequest{req1},
[&ctx2, &res1](int r, HttpResponse&& response) mutable {
res1 = std::move(response);
ctx2.complete(r);
});
HttpRequest req2;
EmptyHttpRequest req2;
req2.method(boost::beast::http::verb::get);
C_SaferCond ctx3;
HttpResponse res2;
http_client.issue(HttpRequest{req2},
http_client.issue(EmptyHttpRequest{req2},
[&ctx3, &res2](int r, HttpResponse&& response) mutable {
res2 = std::move(response);
ctx3.complete(r);
@ -726,11 +732,11 @@ TEST_F(TestMockMigrationHttpClient, ShutdownInFlight) {
ASSERT_EQ(0, on_connect_ctx.wait());
ASSERT_EQ(0, ctx1.wait());
HttpRequest req;
EmptyHttpRequest req;
req.method(boost::beast::http::verb::get);
C_SaferCond ctx2;
http_client.issue(HttpRequest{req},
http_client.issue(EmptyHttpRequest{req},
[&ctx2](int r, HttpResponse&&) mutable {
ctx2.complete(r);
});
@ -761,7 +767,7 @@ TEST_F(TestMockMigrationHttpClient, GetSize) {
C_SaferCond ctx2;
http_client.get_size(&size, &ctx2);
HttpRequest expected_req;
EmptyHttpRequest expected_req;
expected_req.method(boost::beast::http::verb::head);
client_read_request(socket, expected_req);
@ -795,7 +801,7 @@ TEST_F(TestMockMigrationHttpClient, GetSizeError) {
C_SaferCond ctx2;
http_client.get_size(&size, &ctx2);
HttpRequest expected_req;
EmptyHttpRequest expected_req;
expected_req.method(boost::beast::http::verb::head);
client_read_request(socket, expected_req);
@ -810,5 +816,55 @@ TEST_F(TestMockMigrationHttpClient, GetSizeError) {
ASSERT_EQ(0, ctx3.wait());
}
TEST_F(TestMockMigrationHttpClient, Read) {
MockTestImageCtx mock_test_image_ctx(*m_image_ctx);
MockHttpClient http_client(&mock_test_image_ctx,
get_local_url(URL_SCHEME_HTTP));
boost::asio::ip::tcp::socket socket(*m_image_ctx->asio_engine);
C_SaferCond on_connect_ctx;
client_accept(&socket, false, &on_connect_ctx);
C_SaferCond ctx1;
http_client.open(&ctx1);
ASSERT_EQ(0, on_connect_ctx.wait());
ASSERT_EQ(0, ctx1.wait());
bufferlist bl;
C_SaferCond ctx2;
http_client.read({{0, 128}, {256, 64}}, &bl, &ctx2);
EmptyHttpRequest expected_req1;
expected_req1.method(boost::beast::http::verb::get);
expected_req1.set(boost::beast::http::field::range, "bytes=0-127");
client_read_request(socket, expected_req1);
EmptyHttpRequest expected_req2;
expected_req2.method(boost::beast::http::verb::get);
expected_req2.set(boost::beast::http::field::range, "bytes=256-319");
client_read_request(socket, expected_req2);
HttpResponse expected_res1;
expected_res1.result(boost::beast::http::status::partial_content);
expected_res1.body() = std::string(128, '1');
client_write_response(socket, expected_res1);
HttpResponse expected_res2;
expected_res2.result(boost::beast::http::status::partial_content);
expected_res2.body() = std::string(64, '2');
client_write_response(socket, expected_res2);
ASSERT_EQ(192, ctx2.wait());
bufferlist expect_bl;
expect_bl.append(std::string(128, '1'));
expect_bl.append(std::string(64, '2'));
ASSERT_EQ(expect_bl, bl);
C_SaferCond ctx3;
http_client.close(&ctx3);
ASSERT_EQ(0, ctx3.wait());
}
} // namespace migration
} // namespace librbd

View File

@ -35,30 +35,9 @@ struct HttpClient<MockTestImageCtx> {
MOCK_METHOD1(open, void(Context*));
MOCK_METHOD1(close, void(Context*));
MOCK_METHOD2(get_size, void(uint64_t*, Context*));
MOCK_METHOD3(issue, void(const boost::beast::http::request<
boost::beast::http::empty_body>&,
boost::beast::http::response<
boost::beast::http::string_body>*,
Context*));
template <class Body, typename Completion>
void issue(boost::beast::http::request<Body>&& request,
Completion&& completion) {
struct ContextImpl : public Context {
boost::beast::http::response<boost::beast::http::string_body> res;
Completion completion;
ContextImpl(Completion&& completion) : completion(std::move(completion)) {
}
void finish(int r) override {
completion(r, std::move(res));
}
};
auto ctx = new ContextImpl(std::move(completion));
issue(request, &ctx->res, ctx);
MOCK_METHOD3(do_read, void(const io::Extents&, bufferlist*, Context*));
void read(io::Extents&& extents, bufferlist* bl, Context* ctx) {
do_read(extents, bl, ctx);
}
HttpClient() {
@ -69,14 +48,6 @@ struct HttpClient<MockTestImageCtx> {
HttpClient<MockTestImageCtx>* HttpClient<MockTestImageCtx>::s_instance = nullptr;
} // namespace migration
namespace util {
inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) {
return image_ctx->image_ctx;
}
} // namespace util
} // namespace librbd
#include "librbd/migration/HttpStream.cc"
@ -87,6 +58,7 @@ namespace migration {
using ::testing::_;
using ::testing::Invoke;
using ::testing::InSequence;
using ::testing::WithArgs;
class TestMockMigrationHttpStream : public TestMockFixture {
public:
@ -120,29 +92,18 @@ public:
}));
}
void expect_read(MockHttpClient& mock_http_client, io::Extent byte_extent,
const std::string& data, int r) {
std::stringstream byte_range;
byte_range << byte_extent.first << "-"
<< (byte_extent.first + byte_extent.second - 1);
EXPECT_CALL(mock_http_client, issue(_, _, _))
.WillOnce(Invoke(
[data, r, byte_range=byte_range.str()]
(const boost::beast::http::request<
boost::beast::http::empty_body>& request,
boost::beast::http::response<
boost::beast::http::string_body>* response, Context* ctx) {
ASSERT_EQ("bytes=" + byte_range,
request[boost::beast::http::field::range]);
response->result(boost::beast::http::status::partial_content);
response->set(boost::beast::http::field::content_range,
"bytes " + byte_range + "/*");
response->body() = data;
response->prepare_payload();
ctx->complete(r);
}));
void expect_read(MockHttpClient& mock_http_client, io::Extents byte_extents,
const bufferlist& bl, int r) {
uint64_t len = 0;
for (auto [_, byte_len] : byte_extents) {
len += byte_len;
}
EXPECT_CALL(mock_http_client, do_read(byte_extents, _, _))
.WillOnce(WithArgs<1, 2>(Invoke(
[len, bl, r](bufferlist* out_bl, Context* ctx) {
*out_bl = bl;
ctx->complete(r < 0 ? r : len);
})));
}
json_spirit::mObject json_object;
@ -206,8 +167,9 @@ TEST_F(TestMockMigrationHttpStream, Read) {
auto mock_http_client = new MockHttpClient();
expect_open(*mock_http_client, 0);
expect_read(*mock_http_client, {0, 128}, std::string(128, '1'), 0);
expect_read(*mock_http_client, {256, 64}, std::string(64, '2'), 0);
bufferlist expect_bl;
expect_bl.append(std::string(192, '1'));
expect_read(*mock_http_client, {{0, 128}, {256, 64}}, expect_bl, 0);
expect_close(*mock_http_client, 0);
@ -221,10 +183,6 @@ TEST_F(TestMockMigrationHttpStream, Read) {
bufferlist bl;
mock_http_stream.read({{0, 128}, {256, 64}}, &bl, &ctx2);
ASSERT_EQ(192, ctx2.wait());
bufferlist expect_bl;
expect_bl.append(std::string(128, '1'));
expect_bl.append(std::string(64, '2'));
ASSERT_EQ(expect_bl, bl);
C_SaferCond ctx3;

View File

@ -0,0 +1,238 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "test/librbd/test_mock_fixture.h"
#include "test/librbd/test_support.h"
#include "include/rbd_types.h"
#include "common/ceph_mutex.h"
#include "librbd/migration/HttpClient.h"
#include "librbd/migration/S3Stream.h"
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "json_spirit/json_spirit.h"
#include <boost/algorithm/string/predicate.hpp>
#include <boost/beast/http.hpp>
namespace librbd {
namespace {
struct MockTestImageCtx : public MockImageCtx {
MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
}
};
} // anonymous namespace
namespace migration {
template <>
struct HttpClient<MockTestImageCtx> {
static HttpClient* s_instance;
static HttpClient* create(MockTestImageCtx*, const std::string&) {
ceph_assert(s_instance != nullptr);
return s_instance;
}
HttpProcessorInterface* http_processor = nullptr;
void set_http_processor(HttpProcessorInterface* http_processor) {
this->http_processor = http_processor;
}
MOCK_METHOD1(open, void(Context*));
MOCK_METHOD1(close, void(Context*));
MOCK_METHOD2(get_size, void(uint64_t*, Context*));
MOCK_METHOD3(do_read, void(const io::Extents&, bufferlist*, Context*));
void read(io::Extents&& extents, bufferlist* bl, Context* ctx) {
do_read(extents, bl, ctx);
}
HttpClient() {
s_instance = this;
}
};
HttpClient<MockTestImageCtx>* HttpClient<MockTestImageCtx>::s_instance = nullptr;
} // namespace migration
} // namespace librbd
#include "librbd/migration/S3Stream.cc"
namespace librbd {
namespace migration {
using ::testing::_;
using ::testing::Invoke;
using ::testing::InSequence;
using ::testing::WithArgs;
class TestMockMigrationS3Stream : public TestMockFixture {
public:
typedef S3Stream<MockTestImageCtx> MockS3Stream;
typedef HttpClient<MockTestImageCtx> MockHttpClient;
using EmptyBody = boost::beast::http::empty_body;
using EmptyRequest = boost::beast::http::request<EmptyBody>;
librbd::ImageCtx *m_image_ctx;
void SetUp() override {
TestMockFixture::SetUp();
ASSERT_EQ(0, open_image(m_image_name, &m_image_ctx));
json_object["url"] = "http://some.site/bucket/file";
json_object["access_key"] = "0555b35654ad1656d804";
json_object["secret_key"] = "h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==";
}
void expect_open(MockHttpClient& mock_http_client, int r) {
EXPECT_CALL(mock_http_client, open(_))
.WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
}
void expect_close(MockHttpClient& mock_http_client, int r) {
EXPECT_CALL(mock_http_client, close(_))
.WillOnce(Invoke([r](Context* ctx) { ctx->complete(r); }));
}
void expect_get_size(MockHttpClient& mock_http_client, uint64_t size, int r) {
EXPECT_CALL(mock_http_client, get_size(_, _))
.WillOnce(Invoke([size, r](uint64_t* out_size, Context* ctx) {
*out_size = size;
ctx->complete(r);
}));
}
void expect_read(MockHttpClient& mock_http_client, io::Extents byte_extents,
const bufferlist& bl, int r) {
uint64_t len = 0;
for (auto [_, byte_len] : byte_extents) {
len += byte_len;
}
EXPECT_CALL(mock_http_client, do_read(byte_extents, _, _))
.WillOnce(WithArgs<1, 2>(Invoke(
[len, bl, r](bufferlist* out_bl, Context* ctx) {
*out_bl = bl;
ctx->complete(r < 0 ? r : len);
})));
}
json_spirit::mObject json_object;
};
TEST_F(TestMockMigrationS3Stream, OpenClose) {
MockTestImageCtx mock_image_ctx(*m_image_ctx);
InSequence seq;
auto mock_http_client = new MockHttpClient();
expect_open(*mock_http_client, 0);
expect_close(*mock_http_client, 0);
MockS3Stream mock_http_stream(&mock_image_ctx, json_object);
C_SaferCond ctx1;
mock_http_stream.open(&ctx1);
ASSERT_EQ(0, ctx1.wait());
C_SaferCond ctx2;
mock_http_stream.close(&ctx2);
ASSERT_EQ(0, ctx2.wait());
}
TEST_F(TestMockMigrationS3Stream, GetSize) {
MockTestImageCtx mock_image_ctx(*m_image_ctx);
InSequence seq;
auto mock_http_client = new MockHttpClient();
expect_open(*mock_http_client, 0);
expect_get_size(*mock_http_client, 128, 0);
expect_close(*mock_http_client, 0);
MockS3Stream mock_http_stream(&mock_image_ctx, json_object);
C_SaferCond ctx1;
mock_http_stream.open(&ctx1);
ASSERT_EQ(0, ctx1.wait());
C_SaferCond ctx2;
uint64_t size;
mock_http_stream.get_size(&size, &ctx2);
ASSERT_EQ(0, ctx2.wait());
ASSERT_EQ(128, size);
C_SaferCond ctx3;
mock_http_stream.close(&ctx3);
ASSERT_EQ(0, ctx3.wait());
}
TEST_F(TestMockMigrationS3Stream, Read) {
MockTestImageCtx mock_image_ctx(*m_image_ctx);
InSequence seq;
auto mock_http_client = new MockHttpClient();
expect_open(*mock_http_client, 0);
bufferlist expect_bl;
expect_bl.append(std::string(192, '1'));
expect_read(*mock_http_client, {{0, 128}, {256, 64}}, expect_bl, 0);
expect_close(*mock_http_client, 0);
MockS3Stream mock_http_stream(&mock_image_ctx, json_object);
C_SaferCond ctx1;
mock_http_stream.open(&ctx1);
ASSERT_EQ(0, ctx1.wait());
C_SaferCond ctx2;
bufferlist bl;
mock_http_stream.read({{0, 128}, {256, 64}}, &bl, &ctx2);
ASSERT_EQ(192, ctx2.wait());
ASSERT_EQ(expect_bl, bl);
C_SaferCond ctx3;
mock_http_stream.close(&ctx3);
ASSERT_EQ(0, ctx3.wait());
}
TEST_F(TestMockMigrationS3Stream, ProcessRequest) {
MockTestImageCtx mock_image_ctx(*m_image_ctx);
InSequence seq;
auto mock_http_client = new MockHttpClient();
expect_open(*mock_http_client, 0);
expect_close(*mock_http_client, 0);
MockS3Stream mock_http_stream(&mock_image_ctx, json_object);
C_SaferCond ctx1;
mock_http_stream.open(&ctx1);
ASSERT_EQ(0, ctx1.wait());
EmptyRequest request;
request.method(boost::beast::http::verb::get);
request.target("/bucket/resource");
mock_http_client->http_processor->process_request(request);
// basic test for date and known portion of authorization
ASSERT_EQ(1U, request.count(boost::beast::http::field::date));
ASSERT_EQ(1U, request.count(boost::beast::http::field::authorization));
ASSERT_TRUE(boost::algorithm::starts_with(
request[boost::beast::http::field::authorization],
"AWS 0555b35654ad1656d804:"));
C_SaferCond ctx2;
mock_http_stream.close(&ctx2);
ASSERT_EQ(0, ctx2.wait());
}
} // namespace migration
} // namespace librbd