diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 6ac2daf0058..1e498e4623f 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7538,59 +7538,28 @@ bool RGWRados::aio_completed(void *handle) return c->is_safe(); } -// PutObj filter that buffers data so we don't try to compress tiny blocks. -// libcurl reads in 16k at a time, and we need at least 64k to get a good -// compression ratio -class RGWPutObj_Buffer : public RGWPutObj_Filter { - const unsigned buffer_size; - bufferlist buffer; - public: - RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size) - : RGWPutObj_Filter(next), buffer_size(buffer_size) { - ceph_assert(isp2(buffer_size)); // must be power of 2 - } - - int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, - bool *again) override { - if (*again || !bl.length()) { - // flush buffered data - return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again); - } - // transform offset to the beginning of the buffer - ofs = ofs - buffer.length(); - buffer.claim_append(bl); - if (buffer.length() < buffer_size) { - *again = false; // don't come back until there's more data - return 0; - } - const auto count = p2align(buffer.length(), buffer_size); - buffer.splice(0, count, &bl); - return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again); - } -}; - class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB { CephContext* cct; rgw_obj obj; - RGWPutObjDataProcessor *filter; + rgw::putobj::DataProcessor *filter; boost::optional& compressor; - boost::optional buffering; + boost::optional buffering; CompressorRef& plugin; - RGWPutObjProcessor_Atomic *processor; + rgw::putobj::ObjectProcessor *processor; void (*progress_cb)(off_t, void *); void *progress_data; bufferlist extra_data_bl; uint64_t extra_data_left; uint64_t data_len; map src_attrs; - off_t ofs{0}; - off_t lofs{0}; /* logical ofs */ + uint64_t ofs{0}; + uint64_t lofs{0}; /* logical ofs */ public: RGWRadosPutObj(CephContext* cct, CompressorRef& plugin, boost::optional& compressor, - RGWPutObjProcessor_Atomic *p, + rgw::putobj::ObjectProcessor *p, void (*_progress_cb)(off_t, void *), void *_progress_data) : cct(cct), @@ -7619,22 +7588,23 @@ public: if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) { //do not compress if object is encrypted -#if 0 compressor = boost::in_place(cct, plugin, filter); + // add a filter that buffers data so we don't try to compress tiny blocks. + // libcurl reads in 16k at a time, and we need at least 64k to get a good + // compression ratio constexpr unsigned buffer_size = 512 * 1024; buffering = boost::in_place(&*compressor, buffer_size); filter = &*buffering; -#endif } return 0; } int handle_data(bufferlist& bl, bool *pause) override { if (progress_cb) { - progress_cb(lofs, progress_data); + progress_cb(data_len, progress_data); } if (extra_data_left) { - size_t extra_len = bl.length(); + uint64_t extra_len = bl.length(); if (extra_len > extra_data_left) extra_len = extra_data_left; @@ -7656,32 +7626,17 @@ public: ceph_assert(uint64_t(ofs) >= extra_data_len); - lofs = ofs - extra_data_len; + uint64_t size = bl.length(); + ofs += size; - data_len += bl.length(); - bool again = false; + const uint64_t lofs = data_len; + data_len += size; - do { - void *handle = NULL; - rgw_raw_obj obj; - uint64_t size = bl.length(); - int ret = filter->handle_data(bl, lofs, &handle, &obj, &again); - if (ret < 0) - return ret; - - ofs += size; - - ret = filter->throttle_data(handle, obj, size, false); - if (ret < 0) - return ret; - } while (again); - - return 0; + return filter->process(std::move(bl), lofs); } int flush() { - bufferlist bl; - return put_data_and_throttle(filter, bl, ofs, false); + return filter->process({}, data_len); } bufferlist& get_extra_data() { return extra_data_bl; } @@ -7698,12 +7653,9 @@ public: } int complete(const string& etag, real_time *mtime, real_time set_mtime, - map& attrs, real_time delete_at, rgw_zone_set *zones_trace) { - return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace); - } - - bool is_canceled() { - return processor->is_canceled(); + map& attrs, real_time delete_at, + rgw_zone_set *zones_trace, bool *canceled) { + return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace, canceled); } }; @@ -8164,11 +8116,12 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, #define MAX_COMPLETE_RETRY 100 for (i = 0; i < MAX_COMPLETE_RETRY; i++) { - ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace); + bool canceled = false; + ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace, &canceled); if (ret < 0) { goto set_err_state; } - if (copy_if_newer && cb.is_canceled()) { + if (copy_if_newer && canceled) { ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl; obj_ctx.obj.invalidate(dest_obj); /* object was overwritten */ ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false);