rgw: implement RGWStreamIOReorderingEngine.

Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
This commit is contained in:
Radoslaw Zarzynski 2016-08-02 15:30:57 +02:00
parent c1780267ff
commit d54eb7e42c

View File

@ -6,6 +6,8 @@
#include <type_traits>
#include <boost/optional.hpp>
#include "rgw_common.h"
#include "rgw_client_io.h"
@ -290,11 +292,114 @@ public:
}
};
template <typename T>
RGWStreamIOConLenControllingEngine<T> add_conlen_controlling(T&& t) {
return RGWStreamIOConLenControllingEngine<T>(std::move(t));
}
/* Filter that rectifies the wrong behaviour of some clients of the RGWStreamIO
* interface. Should be removed after fixing those clients. */
template <typename T>
class RGWStreamIOReorderingEngine : public RGWDecoratedStreamIO<T> {
protected:
enum class ReorderState {
RGW_EARLY_HEADERS, /* Got headers sent before calling send_status. */
RGW_STATUS_SEEN, /* Status has been seen. */
RGW_DATA /* Header has been completed. */
} phase;
boost::optional<uint64_t> content_length;
ceph::bufferlist early_header_data;
ceph::bufferlist header_data;
int write_data(const char* const buf, const int len) override {
switch (phase) {
case ReorderState::RGW_EARLY_HEADERS:
early_header_data.append(buf, len);
return len;
case ReorderState::RGW_STATUS_SEEN:
header_data.append(buf, len);
return len;
case ReorderState::RGW_DATA:
return RGWDecoratedStreamIO<T>::write_data(buf, len);
}
return -EIO;
}
public:
template <typename U>
RGWStreamIOReorderingEngine(U&& decoratee)
: RGWDecoratedStreamIO<T>(std::move(decoratee)),
phase(ReorderState::RGW_EARLY_HEADERS) {
}
int send_status(const int status, const char* const status_name) override {
phase = ReorderState::RGW_STATUS_SEEN;
return RGWDecoratedStreamIO<T>::send_status(status, status_name);
}
int send_content_length(const uint64_t len) override {
if (ReorderState::RGW_EARLY_HEADERS == phase) {
/* Oh great, someone tries to send content length before status. */
content_length = len;
return 0;
} else {
return RGWDecoratedStreamIO<T>::send_content_length(len);
}
}
int complete_header() override {
size_t sent = 0;
/* Change state in order to immediately send everything we get. */
phase = ReorderState::RGW_DATA;
/* Sent content length if necessary. */
if (content_length) {
ssize_t rc = RGWDecoratedStreamIO<T>::send_content_length(*content_length);
if (rc < 0) {
return rc;
} else {
sent += rc;
}
}
/* Header data in buffers are already counted. */
if (header_data.length()) {
ssize_t rc = RGWDecoratedStreamIO<T>::write_data(header_data.c_str(),
header_data.length());
if (rc < 0) {
return rc;
} else {
sent += rc;
}
header_data.clear();
}
if (early_header_data.length()) {
ssize_t rc = RGWDecoratedStreamIO<T>::write_data(early_header_data.c_str(),
early_header_data.length());
if (rc < 0) {
return rc;
} else {
sent += rc;
}
early_header_data.clear();
}
return sent + RGWDecoratedStreamIO<T>::complete_header();
}
};
template <typename T>
RGWStreamIOReorderingEngine<T> add_reordering(T&& t) {
return RGWStreamIOReorderingEngine<T>(std::move(t));
}
#endif /* CEPH_RGW_CLIENT_IO_DECOIMPL_H */