rgw: prepare RGWRadosPutObj for fetch_remote_obj

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-10-10 15:54:26 -04:00
parent a7eab94ba8
commit 1ecd78b2da

View File

@ -7538,59 +7538,28 @@ bool RGWRados::aio_completed(void *handle)
return c->is_safe();
}
// PutObj filter that buffers data so we don't try to compress tiny blocks.
// libcurl reads in 16k at a time, and we need at least 64k to get a good
// compression ratio
class RGWPutObj_Buffer : public RGWPutObj_Filter {
const unsigned buffer_size;
bufferlist buffer;
public:
RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
: RGWPutObj_Filter(next), buffer_size(buffer_size) {
ceph_assert(isp2(buffer_size)); // must be power of 2
}
int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
bool *again) override {
if (*again || !bl.length()) {
// flush buffered data
return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
}
// transform offset to the beginning of the buffer
ofs = ofs - buffer.length();
buffer.claim_append(bl);
if (buffer.length() < buffer_size) {
*again = false; // don't come back until there's more data
return 0;
}
const auto count = p2align(buffer.length(), buffer_size);
buffer.splice(0, count, &bl);
return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
}
};
class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
{
CephContext* cct;
rgw_obj obj;
RGWPutObjDataProcessor *filter;
rgw::putobj::DataProcessor *filter;
boost::optional<RGWPutObj_Compress>& compressor;
boost::optional<RGWPutObj_Buffer> buffering;
boost::optional<rgw::putobj::ChunkProcessor> buffering;
CompressorRef& plugin;
RGWPutObjProcessor_Atomic *processor;
rgw::putobj::ObjectProcessor *processor;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
uint64_t extra_data_left;
uint64_t data_len;
map<string, bufferlist> src_attrs;
off_t ofs{0};
off_t lofs{0}; /* logical ofs */
uint64_t ofs{0};
uint64_t lofs{0}; /* logical ofs */
public:
RGWRadosPutObj(CephContext* cct,
CompressorRef& plugin,
boost::optional<RGWPutObj_Compress>& compressor,
RGWPutObjProcessor_Atomic *p,
rgw::putobj::ObjectProcessor *p,
void (*_progress_cb)(off_t, void *),
void *_progress_data) :
cct(cct),
@ -7619,22 +7588,23 @@ public:
if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
//do not compress if object is encrypted
#if 0
compressor = boost::in_place(cct, plugin, filter);
// add a filter that buffers data so we don't try to compress tiny blocks.
// libcurl reads in 16k at a time, and we need at least 64k to get a good
// compression ratio
constexpr unsigned buffer_size = 512 * 1024;
buffering = boost::in_place(&*compressor, buffer_size);
filter = &*buffering;
#endif
}
return 0;
}
int handle_data(bufferlist& bl, bool *pause) override {
if (progress_cb) {
progress_cb(lofs, progress_data);
progress_cb(data_len, progress_data);
}
if (extra_data_left) {
size_t extra_len = bl.length();
uint64_t extra_len = bl.length();
if (extra_len > extra_data_left)
extra_len = extra_data_left;
@ -7656,32 +7626,17 @@ public:
ceph_assert(uint64_t(ofs) >= extra_data_len);
lofs = ofs - extra_data_len;
uint64_t size = bl.length();
ofs += size;
data_len += bl.length();
bool again = false;
const uint64_t lofs = data_len;
data_len += size;
do {
void *handle = NULL;
rgw_raw_obj obj;
uint64_t size = bl.length();
int ret = filter->handle_data(bl, lofs, &handle, &obj, &again);
if (ret < 0)
return ret;
ofs += size;
ret = filter->throttle_data(handle, obj, size, false);
if (ret < 0)
return ret;
} while (again);
return 0;
return filter->process(std::move(bl), lofs);
}
int flush() {
bufferlist bl;
return put_data_and_throttle(filter, bl, ofs, false);
return filter->process({}, data_len);
}
bufferlist& get_extra_data() { return extra_data_bl; }
@ -7698,12 +7653,9 @@ public:
}
int complete(const string& etag, real_time *mtime, real_time set_mtime,
map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
}
bool is_canceled() {
return processor->is_canceled();
map<string, bufferlist>& attrs, real_time delete_at,
rgw_zone_set *zones_trace, bool *canceled) {
return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace, canceled);
}
};
@ -8164,11 +8116,12 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
bool canceled = false;
ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace, &canceled);
if (ret < 0) {
goto set_err_state;
}
if (copy_if_newer && cb.is_canceled()) {
if (copy_if_newer && canceled) {
ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
obj_ctx.obj.invalidate(dest_obj); /* object was overwritten */
ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false);