mirror of
https://github.com/ceph/ceph
synced 2025-01-21 02:31:19 +00:00
Merge remote-tracking branch 'gh/next'
This commit is contained in:
commit
4fd60ac29b
@ -4151,6 +4151,7 @@ struct get_obj_data : public RefCountedObject {
|
||||
atomic_t cancelled;
|
||||
atomic_t err_code;
|
||||
Throttle throttle;
|
||||
list<bufferlist> read_list;
|
||||
|
||||
get_obj_data(CephContext *_cct)
|
||||
: cct(_cct),
|
||||
@ -4186,7 +4187,6 @@ struct get_obj_data : public RefCountedObject {
|
||||
|
||||
c->wait_for_complete_and_cb();
|
||||
int r = c->get_return_value();
|
||||
c->release();
|
||||
|
||||
lock.Lock();
|
||||
completion_map.erase(cur_ofs);
|
||||
@ -4195,6 +4195,8 @@ struct get_obj_data : public RefCountedObject {
|
||||
*done = true;
|
||||
}
|
||||
lock.Unlock();
|
||||
|
||||
c->release();
|
||||
|
||||
return r;
|
||||
}
|
||||
@ -4338,14 +4340,7 @@ void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
|
||||
goto done_unlock;
|
||||
}
|
||||
|
||||
for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
|
||||
bufferlist& bl = *iter;
|
||||
int r = d->client_cb->handle_data(bl, 0, bl.length());
|
||||
if (r < 0) {
|
||||
d->set_cancelled(r);
|
||||
break;
|
||||
}
|
||||
}
|
||||
d->read_list.splice(d->read_list.end(), bl_list);
|
||||
|
||||
done_unlock:
|
||||
d->data_lock.Unlock();
|
||||
@ -4354,6 +4349,37 @@ done:
|
||||
return;
|
||||
}
|
||||
|
||||
int RGWRados::flush_read_list(struct get_obj_data *d)
|
||||
{
|
||||
d->data_lock.Lock();
|
||||
list<bufferlist> l;
|
||||
l.swap(d->read_list);
|
||||
d->get();
|
||||
d->read_list.clear();
|
||||
|
||||
d->data_lock.Unlock();
|
||||
|
||||
int r = 0;
|
||||
|
||||
list<bufferlist>::iterator iter;
|
||||
for (iter = l.begin(); iter != l.end(); ++iter) {
|
||||
bufferlist& bl = *iter;
|
||||
r = d->client_cb->handle_data(bl, 0, bl.length());
|
||||
if (r < 0) {
|
||||
dout(0) << "ERROR: flush_read_list(): d->client_c->handle_data() returned " << r << dendl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
d->data_lock.Lock();
|
||||
d->put();
|
||||
if (r < 0) {
|
||||
d->set_cancelled(r);
|
||||
}
|
||||
d->data_lock.Unlock();
|
||||
return r;
|
||||
}
|
||||
|
||||
int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
|
||||
rgw_obj& obj,
|
||||
off_t obj_ofs,
|
||||
@ -4398,6 +4424,10 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
|
||||
}
|
||||
}
|
||||
|
||||
r = flush_read_list(d);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
|
||||
|
||||
d->throttle.get(len);
|
||||
@ -4458,6 +4488,12 @@ int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
|
||||
data->cancel_all_io();
|
||||
break;
|
||||
}
|
||||
r = flush_read_list(data);
|
||||
if (r < 0) {
|
||||
dout(10) << "get_obj_iterate() r=" << r << ", canceling all io" << dendl;
|
||||
data->cancel_all_io();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
|
@ -1263,6 +1263,8 @@ public:
|
||||
off_t ofs, off_t end,
|
||||
RGWGetDataCB *cb);
|
||||
|
||||
int flush_read_list(struct get_obj_data *d);
|
||||
|
||||
int get_obj_iterate_cb(void *ctx, RGWObjState *astate,
|
||||
rgw_obj& obj,
|
||||
off_t obj_ofs, off_t read_ofs, off_t len,
|
||||
|
Loading…
Reference in New Issue
Block a user