all: deprecate mref_rw and mref_may_write

This commit is contained in:
Thomas Schoebel-Theuer 2019-03-14 21:11:51 +01:00
parent af89dba044
commit 40e72f9e7d
15 changed files with 172 additions and 117 deletions

View File

@ -178,7 +178,7 @@ void log_flush(struct log_status *logst)
logst->private = NULL;
SETUP_CALLBACK(mref, log_write_endio, cb_info);
cb_info->logst = logst;
mref->ref_rw = 1;
mref->ref_flags |= MREF_WRITE | MREF_MAY_WRITE;
mars_trace(mref, "log_flush");
@ -243,7 +243,7 @@ void *log_reserve(struct log_status *logst, struct log_header *lh)
mref->ref_pos = logst->log_pos;
mref->ref_len = logst->chunk_size ? logst->chunk_size : total_len;
mref->ref_may_write = WRITE;
mref->ref_flags = MREF_MAY_WRITE;
mref->ref_prio = logst->io_prio;
for (;;) {
@ -465,7 +465,7 @@ restart:
}
SETUP_CALLBACK(mref, log_read_endio, logst);
mref->ref_rw = READ;
logst->offset = 0;
logst->got = false;
logst->do_free = false;

View File

@ -153,11 +153,15 @@ enum _MREF_FLAGS {
_MREF_UPTODATE,
_MREF_READING,
_MREF_WRITING,
_MREF_WRITE,
_MREF_MAY_WRITE,
};
#define MREF_UPTODATE (1UL << _MREF_UPTODATE)
#define MREF_READING (1UL << _MREF_READING)
#define MREF_WRITING (1UL << _MREF_WRITING)
#define MREF_WRITE (1UL << _MREF_WRITE)
#define MREF_MAY_WRITE (1UL << _MREF_MAY_WRITE)
#define MREF_OBJECT(OBJTYPE) \
CALLBACK_OBJECT(OBJTYPE); \
@ -165,7 +169,6 @@ enum _MREF_FLAGS {
void *ref_data; /* preset to NULL for buffered IO */ \
loff_t ref_pos; \
int ref_len; \
int ref_may_write; \
int ref_prio; \
int ref_timeout; \
int ref_cs_mode; /* 0 = off, 1 = checksum + data, 2 = checksum only */ \
@ -174,13 +177,15 @@ enum _MREF_FLAGS {
/* maintained by the ref implementation, readable for callers */ \
loff_t ref_total_size; /* just for info, need not be implemented */ \
unsigned char ref_checksum[16]; \
int ref_rw; \
int ref_id; /* not mandatory; may be used for identification */ \
bool ref_skip_sync; /* skip sync for this particular mref */ \
/* maintained by the ref implementation, incrementable for \
* callers (but not decrementable! use ref_put()) */ \
bool ref_initialized; /* internally used for checking */ \
tatomic_t ref_count; \
/* deprecated, to be removed in future */ \
int ref_rw; \
int ref_may_write; \
/* internal */ \
atomic_trace_t ref_at; \
TRACING_INFO;

View File

@ -161,8 +161,10 @@ done:
if (likely(mref_a && mref_a->object)) {
unsigned long long latency;
int rw = mref_a->object->ref_flags & MREF_WRITE ? 1 : 0;
latency = cpu_clock(raw_smp_processor_id()) - mref_a->enqueue_stamp;
threshold_check(&aio_io_threshold[mref_a->object->ref_rw & 1], latency);
threshold_check(&aio_io_threshold[rw], latency);
}
return mref_a;
}
@ -227,9 +229,6 @@ static int aio_ref_get(struct aio_output *output, struct mref_object *mref)
MARS_ERR("ENOMEM %d bytes\n", mref->ref_len);
return -ENOMEM;
}
#if 0 // ???
mref->ref_flags = 0;
#endif
mref_a->do_dealloc = true;
atomic_inc(&output->total_alloc_count);
atomic_inc(&output->alloc_count);
@ -282,7 +281,7 @@ void _complete(struct aio_output *output, struct aio_mref_aspect *mref_a, int er
CHECKED_CALLBACK(mref, err, err_found);
done:
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
mf_dirty_append(output->mf, DIRTY_FINISHED, mref->ref_pos + mref->ref_len);
atomic_dec(&output->write_count);
} else {
@ -345,7 +344,7 @@ static void aio_ref_io(struct aio_output *output, struct mref_object *mref)
atomic_inc(&output->work_count);
// statistics
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
atomic_inc(&output->total_write_count);
atomic_inc(&output->write_count);
} else {
@ -359,7 +358,8 @@ static void aio_ref_io(struct aio_output *output, struct mref_object *mref)
mapfree_set(output->mf, mref->ref_pos, -1);
MARS_IO("AIO rw=%d pos=%lld len=%d data=%p\n", mref->ref_rw, mref->ref_pos, mref->ref_len, mref->ref_data);
MARS_IO("AIO flags=%ux pos=%lld len=%d data=%p\n",
mref->ref_flags, mref->ref_pos, mref->ref_len, mref->ref_data);
mref_a = aio_mref_get_aspect(output->brick, mref);
if (unlikely(!mref_a)) {
@ -378,9 +378,10 @@ static int aio_submit(struct aio_output *output, struct aio_mref_aspect *mref_a,
struct mref_object *mref = mref_a->object;
mm_segment_t oldfs;
int res;
__u32 rw = mref->ref_flags & MREF_WRITE ? 1 : 0;
struct iocb iocb = {
.aio_data = (__u64)mref_a,
.aio_lio_opcode = use_fdsync ? IOCB_CMD_FDSYNC : (mref->ref_rw != 0 ? IOCB_CMD_PWRITE : IOCB_CMD_PREAD),
.aio_lio_opcode = use_fdsync ? IOCB_CMD_FDSYNC : (rw ? IOCB_CMD_PWRITE : IOCB_CMD_PREAD),
.aio_fildes = output->fd,
.aio_buf = (unsigned long)mref->ref_data,
.aio_nbytes = mref->ref_len,
@ -388,7 +389,7 @@ static int aio_submit(struct aio_output *output, struct aio_mref_aspect *mref_a,
// .aio_reqprio = something(mref->ref_prio) field exists, but not yet implemented in kernelspace :(
};
struct iocb *iocbp = &iocb;
struct timing_stats *this_timing = &timings[mref->ref_rw & 1];
struct timing_stats *this_timing = &timings[rw];
unsigned long long latency;
mars_trace(mref, "aio_submit");
@ -682,21 +683,23 @@ static int aio_event_thread(void *data)
}
mref = mref_a->object;
MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw);
MARS_IO("AIO done %p pos = %lld len = %d flags = %ux\n",
mref, mref->ref_pos, mref->ref_len,
mref->ref_flags);
mapfree_set(output->mf, mref->ref_pos, mref->ref_pos + mref->ref_len);
if (mref->ref_rw)
if (mref->ref_flags & MREF_WRITE)
mf_dirty_append(output->mf, DIRTY_COMPLETED, mref->ref_pos + mref->ref_len);
/* Workaround for never implemented aio_fsync operation,
* see also upstream commit 723c038475b78edc9327eb952f95f9881cc9d7.
* FIXME: don't use aio anymore at all in the long-term future.
*/
if (output->brick->o_fdsync
&& err >= 0
&& mref->ref_rw != READ
&& !mref->ref_skip_sync
&& !mref_a->resubmit++) {
if (output->brick->o_fdsync &&
err >= 0 &&
(mref->ref_flags & MREF_WRITE) &&
!mref->ref_skip_sync &&
!mref_a->resubmit++) {
mars_trace(mref, "aio_fsync");
_enqueue(other, mref_a, mref->ref_prio, true);
continue;
@ -964,7 +967,7 @@ static int aio_submit_thread(void *data)
mapfree_set(output->mf, mref->ref_pos, -1);
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
mf_dirty_append(output->mf, DIRTY_SUBMITTED, mref->ref_pos + mref->ref_len);
}
@ -972,7 +975,7 @@ static int aio_submit_thread(void *data)
// check for reads crossing the EOF boundary (special case)
if (mref->ref_timeout > 0 &&
!mref->ref_rw &&
!(mref->ref_flags & MREF_WRITE) &&
mref->ref_pos + mref->ref_len > mref->ref_total_size) {
loff_t len = mref->ref_total_size - mref->ref_pos;
if (len > 0) {

View File

@ -433,7 +433,7 @@ void _bio_ref_io(struct bio_output *output, struct mref_object *mref, bool cork)
bio_get(bio);
rw = mref->ref_rw & 1;
rw = (mref->ref_flags & MREF_WRITE) ? WRITE : READ;
if (cork) {
// adapt to different kernel versions (TBD: improve)
#ifdef REQ_IDLE
@ -551,7 +551,8 @@ void bio_ref_io(struct bio_output *output, struct mref_object *mref)
atomic_inc(&mars_global_io_flying);
if (mref->ref_prio == MARS_PRIO_LOW ||
(mref->ref_prio == MARS_PRIO_NORMAL && mref->ref_rw)) {
(mref->ref_prio == MARS_PRIO_NORMAL &&
(mref->ref_flags & MREF_WRITE))) {
struct bio_mref_aspect *mref_a = bio_mref_get_aspect(output->brick, mref);
struct bio_brick *brick = output->brick;
unsigned long flags;
@ -641,6 +642,7 @@ int bio_response_thread(void *data)
struct bio_mref_aspect *mref_a;
struct mref_object *mref;
unsigned long long latency;
int rw;
int code;
if (list_empty(&tmp_list)) {
@ -658,10 +660,10 @@ int bio_response_thread(void *data)
mref_a = container_of(tmp, struct bio_mref_aspect, io_head);
mref = mref_a->object;
rw = mref->ref_flags & MREF_WRITE ? 1 : 0;
latency = cpu_clock(raw_smp_processor_id()) - mref_a->start_stamp;
threshold_check(&bio_io_threshold[mref->ref_rw & 1], latency);
threshold_check(&bio_io_threshold[rw], latency);
code = mref_a->status_code;
#ifdef IO_DEBUGGING

View File

@ -518,7 +518,7 @@ again:
/* Important optimization: treat whole buffer as uptodate
* upon full write.
*/
if (mref->ref_may_write != READ &&
if ((mref->ref_flags & MREF_MAY_WRITE) &&
((!base_offset && mref->ref_len >= brick->backing_size) ||
(mref->ref_pos >= brick->base_info.current_size && brick->base_info.current_size > 0))) {
new->bf_flags |= MREF_UPTODATE;
@ -603,7 +603,8 @@ static void buf_ref_put(struct buf_output *output, struct mref_object *mref)
static void _buf_endio(struct generic_callback *cb);
static int _buf_make_io(struct buf_brick *brick, struct buf_head *bf, void *start_data, loff_t start_pos, int start_len, int rw)
static int _buf_make_io(struct buf_brick *brick, struct buf_head *bf, void *start_data,
loff_t start_pos, int start_len, __u32 ref_flags)
{
struct buf_input *input;
int status = EINVAL;
@ -629,7 +630,8 @@ static int _buf_make_io(struct buf_brick *brick, struct buf_head *bf, void *star
}
#endif
MARS_DBG("bf = %p rw = %d start = %lld len = %d flags = %x\n", bf, rw, start_pos, start_len, bf->bf_flags);
MARS_DBG("bf = %p %ux start = %lld len = %d flags = %x\n", bf,
ref_flags, start_pos, start_len, bf->bf_flags);
atomic_set(&bf->bf_io_count, 0);
status = -ENOMEM;
@ -655,8 +657,7 @@ static int _buf_make_io(struct buf_brick *brick, struct buf_head *bf, void *star
mref->ref_pos = start_pos;
mref->ref_len = start_len;
mref->ref_may_write = rw;
mref->ref_rw = rw;
mref->ref_flags = ref_flags;
mref->ref_data = start_data;
status = GENERIC_INPUT_CALL(input, mref_get, mref);
@ -840,7 +841,7 @@ static void _buf_endio(struct generic_callback *cb)
if (start_len) {
MARS_DBG("ATTENTION restart %d\n", start_len);
_buf_make_io(brick, bf, start_data, start_pos, start_len, WRITE);
_buf_make_io(brick, bf, start_data, start_pos, start_len, MREF_WRITE);
}
// drop the extra reference from above
_bf_put(bf);
@ -886,11 +887,12 @@ static void buf_ref_io(struct buf_output *output, struct mref_object *mref)
_mref_get(mref);
CHECK_ATOMIC(&bf->bf_hash_count, 1);
MARS_DBG("IO mref=%p rw=%d bf=%p flags=%x\n", mref, mref->ref_rw, bf, bf->bf_flags);
MARS_DBG("IO mref=%p %d bf=%p flags=%x\n", mref, mref->ref_flags,
bf, bf->bf_flags);
if (mref->ref_rw != READ) {
if (mref->ref_flags & MREF_WRITE) {
loff_t end;
if (unlikely(mref->ref_may_write == READ)) {
if (unlikely(!(mref->ref_flags & MREF_MAY_WRITE))) {
MARS_ERR("sorry, you have forgotten to set ref_may_write\n");
goto callback;
}
@ -920,7 +922,7 @@ static void buf_ref_io(struct buf_output *output, struct mref_object *mref)
goto already_done;
}
if (mref->ref_rw != 0) { // WRITE
if (mref->ref_flags & MREF_WRITE) {
#ifdef FAKE_WRITES
bf->bf_flags |= MREF_UPTODATE;
goto already_done;
@ -980,7 +982,7 @@ static void buf_ref_io(struct buf_output *output, struct mref_object *mref)
if (likely(delay)) {
atomic_inc(&brick->nr_io_pending);
atomic_inc(&brick->io_count);
if (mref->ref_rw != 0)
if (mref->ref_flags & MREF_WRITE)
atomic_inc(&brick->write_count);
}
@ -991,7 +993,7 @@ static void buf_ref_io(struct buf_output *output, struct mref_object *mref)
goto no_callback;
}
status = _buf_make_io(brick, bf, start_data, start_pos, start_len, mref->ref_rw);
status = _buf_make_io(brick, bf, start_data, start_pos, start_len, mref->ref_flags);
if (likely(status >= 0)) {
/* No immediate callback, this time.
* Callbacks will be called later from _bf_endio().

View File

@ -451,7 +451,6 @@ static int client_ref_get(struct client_output *output, struct mref_object *mref
return -ENOMEM;
mref_a->do_dealloc = true;
mref->ref_flags = 0;
}
_mref_get_first(mref);
@ -497,7 +496,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
}
while (output->brick->max_flying > 0 && atomic_read(&output->fly_count) > output->brick->max_flying) {
MARS_IO("sleeping request pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count));
MARS_IO("sleeping request pos = %lld len = %d flags = %ux (flying = %d)\n",
mref->ref_pos, mref->ref_len, mref->ref_flags,
atomic_read(&output->fly_count));
#ifdef IO_DEBUGGING
brick_msleep(3000);
#else
@ -516,7 +517,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
mref_a->submit_jiffies = jiffies;
_hash_insert(output, mref_a);
MARS_IO("added request id = %d pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count));
MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n",
mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_flags,
atomic_read(&output->fly_count));
wake_up_interruptible_all(&output->bundle.sender_event);
@ -617,10 +620,14 @@ int receiver_thread(void *data)
goto done;
}
MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
MARS_IO("got callback id = %d, old pos = %lld len = %d flags = %ux\n",
mref->ref_id, mref->ref_pos, mref->ref_len,
mref->ref_flags);
status = mars_recv_cb(&ch->socket, mref, &cmd);
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
MARS_IO("new status = %d, pos = %lld len = %d flags = %ux\n",
status, mref->ref_pos, mref->ref_len,
mref->ref_flags);
if (unlikely(status < 0)) {
MARS_WRN("interrupted data transfer during callback on '%s' @%s, status = %d\n",
output->bundle.path,
@ -879,7 +886,7 @@ static int sender_thread(void *data)
// try to spread reads over multiple channels....
min_nr = 0;
max_nr = max_client_channels;
if (!mref->ref_rw) {
if (!(mref->ref_flags & MREF_WRITE)) {
/* optionally separate reads from writes */
if (brick->separate_reads && max_nr > 1)
min_nr = 1;

View File

@ -131,7 +131,6 @@ int _clear_clash(struct copy_brick *brick)
static
int _determine_input(struct copy_brick *brick, struct mref_object *mref)
{
int rw;
int below;
int behind;
loff_t ref_end;
@ -142,8 +141,7 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref)
ref_end = mref->ref_pos + mref->ref_len;
below = ref_end <= brick->copy_start;
behind = !brick->copy_end || mref->ref_pos >= brick->copy_end;
rw = mref->ref_may_write | mref->ref_rw;
if (rw) {
if (mref->ref_flags & (MREF_WRITE | MREF_MAY_WRITE)) {
if (!behind) {
brick->low_dirty = true;
if (!below) {
@ -266,7 +264,7 @@ exit:
_clash(brick);
}
st->active[queue] = false;
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
atomic_dec(&brick->copy_write_flight);
atomic_dec(&global_copy_write_flight);
} else {
@ -282,7 +280,7 @@ err:
}
static
int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_t pos, loff_t end_pos, int rw, int cs_mode)
int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_t pos, loff_t end_pos, __u32 flags, int cs_mode)
{
struct mref_object *mref;
struct copy_mref_aspect *mref_a;
@ -308,8 +306,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
mref_a->brick = brick;
mref_a->queue = queue;
mref->ref_may_write = rw;
mref->ref_rw = rw;
mref->ref_flags = flags;
mref->ref_data = data;
mref->ref_pos = pos;
mref->ref_cs_mode = cs_mode;
@ -319,7 +316,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
len = end_pos - pos;
}
mref->ref_len = len;
mref->ref_prio = rw ?
mref->ref_prio = (flags & MREF_WRITE) ?
mars_copy_write_prio :
mars_copy_read_prio;
if (mref->ref_prio < MARS_PRIO_HIGH || mref->ref_prio > MARS_PRIO_LOW)
@ -345,13 +342,11 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
GET_STATE(brick, index).len = mref->ref_len;
}
//MARS_IO("queue = %d index = %d pos = %lld len = %d rw = %d\n", queue, index, mref->ref_pos, mref->ref_len, rw);
st = &GET_STATE(brick, index);
st->table[queue] = mref;
st->active[queue] = true;
if (rw) {
if (flags & MREF_WRITE) {
atomic_inc(&brick->copy_write_flight);
atomic_inc(&global_copy_write_flight);
} else {
@ -446,7 +441,7 @@ restart:
is_read_limited(brick))
goto idle;
status = _make_mref(brick, index, 0, NULL, pos, brick->copy_end, READ, brick->verify_mode ? 2 : 0);
status = _make_mref(brick, index, 0, NULL, pos, brick->copy_end, 0, brick->verify_mode ? 2 : 0);
if (unlikely(status < 0)) {
MARS_DBG("status = %d\n", status);
progress = status;
@ -461,7 +456,7 @@ restart:
next_state = COPY_STATE_START2;
/* fallthrough */
case COPY_STATE_START2:
status = _make_mref(brick, index, 1, NULL, pos, brick->copy_end, READ, 2);
status = _make_mref(brick, index, 1, NULL, pos, brick->copy_end, 0, 2);
if (unlikely(status < 0)) {
MARS_DBG("status = %d\n", status);
progress = status;
@ -523,7 +518,7 @@ restart:
if (mref0->ref_cs_mode > 1) { // re-read, this time with data
_clear_mref(brick, index, 0);
status = _make_mref(brick, index, 0, NULL, pos, brick->copy_end, READ, 0);
status = _make_mref(brick, index, 0, NULL, pos, brick->copy_end, 0, 0);
if (unlikely(status < 0)) {
MARS_DBG("status = %d\n", status);
progress = status;
@ -569,7 +564,7 @@ restart:
break;
}
/* start writeout */
status = _make_mref(brick, index, 1, mref0->ref_data, pos, pos + mref0->ref_len, WRITE, 0);
status = _make_mref(brick, index, 1, mref0->ref_data, pos, pos + mref0->ref_len, MREF_WRITE | MREF_MAY_WRITE, 0);
if (unlikely(status < 0)) {
MARS_DBG("status = %d\n", status);
progress = status;

View File

@ -302,7 +302,9 @@ void mars_log_trace(struct mref_object *mref)
diff = mref->ref_trace_stamp[mref->ref_traces-1] - mref->ref_trace_stamp[0];
len = scnprintf(buf, PAGE_SIZE, "%c ;%12lld ;%6d;%10llu", mref->ref_rw ? 'W' : 'R', mref->ref_pos, mref->ref_len, diff / 1000);
len = scnprintf(buf, PAGE_SIZE, "%c ;%12lld ;%6d;%10llu",
mref->ref_flags & MREF_WRITE ? 'W' : 'R',
mref->ref_pos, mref->ref_len, diff / 1000);
old = start_trace_clock;
for (i = 0; i < mref->ref_traces; i++) {

View File

@ -199,7 +199,6 @@ void if_endio(struct generic_callback *cb)
struct if_mref_aspect *mref_a = cb->cb_private;
struct if_input *input;
int k;
int rw;
int error;
LAST_CALLBACK(cb);
@ -213,8 +212,7 @@ void if_endio(struct generic_callback *cb)
mars_trace(mref_a->object, "if_endio");
mars_log_trace(mref_a->object);
rw = mref_a->object->ref_rw;
MARS_IO("rw = %d bio_count = %d\n", rw, mref_a->bio_count);
MARS_IO("flags = %ux bio_count = %d\n", mref->ref_flags, mref_a->bio_count);
for (k = 0; k < mref_a->bio_count; k++) {
struct bio_wrapper *biow;
@ -258,7 +256,8 @@ void if_endio(struct generic_callback *cb)
#endif
// end_remove_this
}
MARS_IO("calling end_io() rw = %d error = %d\n", rw, error);
MARS_IO("calling end_io() flags = %ux error = %d\n",
mref->ref_flags, error);
// remove_this
#ifdef MARS_HAS_BI_STATUS
// end_remove_this
@ -277,7 +276,7 @@ void if_endio(struct generic_callback *cb)
brick_mem_free(biow);
}
atomic_dec(&input->brick->flying_count);
if (rw) {
if (mref_a->object->ref_flags & MREF_WRITE) {
atomic_dec(&input->write_flying_count);
} else {
atomic_dec(&input->read_flying_count);
@ -350,7 +349,7 @@ void _if_unplug(struct if_input *input)
atomic_inc(&input->brick->flying_count);
atomic_inc(&input->total_fire_count);
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
atomic_inc(&input->write_flying_count);
} else {
atomic_inc(&input->read_flying_count);
@ -669,7 +668,10 @@ void if_make_request(struct request_queue *q, struct bio *bio)
tmp_a = container_of(tmp, struct if_mref_aspect, hash_head);
tmp_mref = tmp_a->object;
if (tmp_a->orig_page != page || tmp_mref->ref_rw != rw || tmp_a->bio_count >= MAX_BIO || tmp_a->current_len + bv_len > tmp_a->max_len) {
if (tmp_a->orig_page != page ||
(tmp_mref->ref_flags & MREF_WRITE) != (rw ? MREF_WRITE : 0) ||
tmp_a->bio_count >= MAX_BIO ||
tmp_a->current_len + bv_len > tmp_a->max_len) {
continue;
}
@ -746,7 +748,7 @@ void if_make_request(struct request_queue *q, struct bio *bio)
SETUP_CALLBACK(mref, if_endio, mref_a);
mref_a->input = input;
mref->ref_rw = mref->ref_may_write = rw;
mref->ref_flags = rw ? MREF_WRITE | MREF_MAY_WRITE : 0;
mref->ref_pos = pos;
mref->ref_len = prefetch_len;
mref->ref_data = data; // direct IO

View File

@ -1209,6 +1209,18 @@ int _mars_recv_cmd(struct mars_socket *msock, struct mars_cmd *cmd, int line)
return status;
}
/* This should be removed some day
*/
static void _send_deprecated(struct mref_object *mref)
{
/* compatibility to old protocol */
if (mref->ref_flags & MREF_WRITE)
mref->ref_rw = 1;
if (mref->ref_flags & MREF_MAY_WRITE)
mref->ref_may_write = 1;
}
int mars_send_mref(struct mars_socket *msock, struct mref_object *mref, bool cork)
{
struct mars_cmd cmd = {
@ -1218,7 +1230,10 @@ int mars_send_mref(struct mars_socket *msock, struct mref_object *mref, bool cor
int seq = 0;
int status;
if (mref->ref_rw != 0 && mref->ref_data && mref->ref_cs_mode < 2)
/* compatibility to old protocol */
_send_deprecated(mref);
if ((mref->ref_flags & MREF_WRITE) && mref->ref_data && mref->ref_cs_mode < 2)
cmd.cmd_code |= CMD_FLAG_HAS_DATA;
if (!cork || !msock->s_pos)
@ -1241,6 +1256,21 @@ done:
}
EXPORT_SYMBOL_GPL(mars_send_mref);
/* This should be removed some day
*/
static void _recv_deprecated(struct mref_object *mref)
{
/* compatibility to old protocol */
if (mref->ref_rw) {
mref->ref_rw = 0;
mref->ref_flags |= MREF_WRITE;
}
if (mref->ref_may_write) {
mref->ref_may_write = 0;
mref->ref_flags |= MREF_MAY_WRITE;
}
}
int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd)
{
int status;
@ -1249,6 +1279,9 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct m
if (status < 0)
goto done;
/* compatibility to old protocol */
_recv_deprecated(mref);
if (cmd->cmd_stamp.tv_sec)
set_lamport_nonstrict(&cmd->cmd_stamp);
@ -1277,7 +1310,12 @@ int mars_send_cb(struct mars_socket *msock, struct mref_object *mref, bool cork)
int seq = 0;
int status;
if (mref->ref_rw == 0 && mref->ref_data && mref->ref_cs_mode < 2)
/* compatibility to old protocol */
_send_deprecated(mref);
if (!(mref->ref_flags & MREF_WRITE) &&
mref->ref_data &&
mref->ref_cs_mode < 2)
cmd.cmd_code |= CMD_FLAG_HAS_DATA;
if (!cork || !msock->s_pos)
@ -1309,6 +1347,9 @@ int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref, struct mar
if (status < 0)
goto done;
/* compatibility to old protocol */
_recv_deprecated(mref);
if (cmd->cmd_stamp.tv_sec)
set_lamport_nonstrict(&cmd->cmd_stamp);

View File

@ -192,7 +192,6 @@ void server_endio(struct generic_callback *cb)
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct server_brick *brick;
int rw;
unsigned long flags;
mref_a = cb->cb_private;
@ -210,10 +209,8 @@ void server_endio(struct generic_callback *cb)
return;
}
rw = mref->ref_rw;
traced_lock(&brick->cb_lock, flags);
if (rw) {
if (mref->ref_flags & MREF_WRITE) {
list_add_tail(&mref_a->cb_head, &brick->cb_write_list);
} else {
list_add_tail(&mref_a->cb_head, &brick->cb_read_list);

View File

@ -68,7 +68,7 @@ static int sio_ref_get(struct sio_output *output, struct mref_object *mref)
/* Only check reads.
* Writes behind EOF are always allowed (sparse files)
*/
if (!mref->ref_may_write) {
if (!(mref->ref_flags & MREF_MAY_WRITE)) {
loff_t len = total_size - mref->ref_pos;
if (unlikely(len <= 0)) {
/* Special case: allow reads starting _exactly_ at EOF when a timeout is specified.
@ -100,9 +100,6 @@ static int sio_ref_get(struct sio_output *output, struct mref_object *mref)
MARS_ERR("ENOMEM %d bytes\n", mref->ref_len);
return -ENOMEM;
}
#if 0 // ???
mref->ref_flags = 0;
#endif
mref_a->do_dealloc = true;
//atomic_inc(&output->total_alloc_count);
//atomic_inc(&output->alloc_count);
@ -376,7 +373,7 @@ void _complete(struct sio_output *output, struct mref_object *mref, int err)
done:
#if 0
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
atomic_dec(&output->write_count);
} else {
atomic_dec(&output->read_count);
@ -416,7 +413,7 @@ void _sio_ref_io(struct sio_threadinfo *tinfo, struct mref_object *mref)
sync_file(output);
}
if (mref->ref_rw == READ) {
if (!(mref->ref_flags & MREF_WRITE)) {
status = read_aops(output, mref);
} else {
mf_dirty_append(output->mf, DIRTY_SUBMITTED, mref->ref_pos + mref->ref_len);
@ -434,7 +431,7 @@ done:
_complete(output, mref, status);
atomic_dec(&tinfo->fly_count);
if (mref->ref_rw && status >= 0)
if ((mref->ref_flags & MREF_WRITE) && status >= 0)
mf_dirty_append(output->mf, DIRTY_FINISHED, mref->ref_pos + mref->ref_len);
}
@ -469,7 +466,7 @@ void sio_ref_io(struct sio_output *output, struct mref_object *mref)
mapfree_set(output->mf, mref->ref_pos, -1);
index = 0;
if (mref->ref_rw == READ) {
if (!(mref->ref_flags & MREF_WRITE)) {
traced_lock(&output->g_lock, flags);
index = output->index++;
traced_unlock(&output->g_lock, flags);

View File

@ -680,7 +680,6 @@ int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_a
atomic_inc(&brick->total_sshadow_buffered_count);
#endif
}
mref->ref_flags = mshadow->ref_flags;
mref_a->shadow_ref = mshadow_a;
mref_a->my_brick = brick;
@ -781,7 +780,6 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
#endif
}
mref_a->my_brick = brick;
mref->ref_flags = 0;
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
#ifdef ADDITIONAL_COUNTERS
@ -839,7 +837,7 @@ int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object
mref->ref_len = REGION_SIZE - base_offset;
}
if (mref->ref_may_write == READ) {
if (!(mref->ref_flags & MREF_MAY_WRITE)) {
return _read_ref_get(output, mref_a);
}
@ -954,8 +952,9 @@ restart:
}
// only READ is allowed on non-shadow buffers
if (unlikely(mref->ref_rw != READ && !mref_a->is_emergency)) {
MARS_FAT("bad operation %d on non-shadow\n", mref->ref_rw);
if (unlikely((mref->ref_flags & MREF_WRITE) && !mref_a->is_emergency)) {
MARS_FAT("bad operation %ux on non-shadow\n",
mref->ref_flags);
}
// no shadow => call through
@ -1043,7 +1042,7 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
#ifdef ADDITIONAL_COUNTERS
// statistics
if (mref->ref_rw) {
if (mref->ref_flags & MREF_WRITE) {
atomic_inc(&brick->total_write_count);
} else {
atomic_inc(&brick->total_read_count);
@ -1069,8 +1068,9 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
}
// only READ is allowed on non-shadow buffers
if (unlikely(mref->ref_rw != READ && !mref_a->is_emergency)) {
MARS_FAT("bad operation %d on non-shadow\n", mref->ref_rw);
if (unlikely((mref->ref_flags & MREF_WRITE) && !mref_a->is_emergency)) {
MARS_FAT("bad operation %ux on non-shadow\n",
mref->ref_flags);
}
atomic_inc(&brick->any_fly_count);
@ -1186,7 +1186,6 @@ void wb_endio(struct generic_callback *cb)
struct mref_object *sub_mref;
struct trans_logger_brick *brick;
struct writeback_info *wb;
int rw;
atomic_t *dec;
void (**_endio)(struct generic_callback *cb);
void (*endio)(struct generic_callback *cb);
@ -1211,14 +1210,18 @@ void wb_endio(struct generic_callback *cb)
atomic_dec(&brick->wb_balance_count);
#endif
rw = sub_mref_a->orig_rw;
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
if (sub_mref_a->orig_flags & MREF_WRITE) {
dec = &wb->w_sub_write_count;
_endio = &wb->write_endio;
} else {
dec = &wb->w_sub_read_count;
_endio = &wb->read_endio;
}
CHECK_ATOMIC(dec, 1);
if (!atomic_dec_and_test(dec)) {
goto done;
}
_endio = rw ? &wb->write_endio : &wb->read_endio;
endio = *_endio;
*_endio = NULL;
if (likely(endio)) {
@ -1312,8 +1315,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
sub_mref->ref_pos = pos;
sub_mref->ref_len = len;
sub_mref->ref_may_write = READ;
sub_mref->ref_rw = READ;
sub_mref->ref_flags = 0;
sub_mref->ref_data = NULL;
sub_mref_a = trans_logger_mref_get_aspect(brick, sub_mref);
@ -1325,7 +1327,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
sub_mref_a->log_input = log_input;
atomic_inc(&log_input->log_ref_count);
sub_mref_a->my_brick = brick;
sub_mref_a->orig_rw = READ;
sub_mref_a->orig_flags = 0;
sub_mref_a->wb = wb;
status = GENERIC_INPUT_CALL(read_input, mref_get, sub_mref);
@ -1389,8 +1391,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
sub_mref->ref_pos = pos;
sub_mref->ref_len = this_len;
sub_mref->ref_may_write = WRITE;
sub_mref->ref_rw = WRITE;
sub_mref->ref_flags = MREF_WRITE | MREF_MAY_WRITE;
sub_mref->ref_data = data;
sub_mref_a = trans_logger_mref_get_aspect(brick, sub_mref);
@ -1403,7 +1404,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
sub_mref_a->log_input = log_input;
atomic_inc(&log_input->log_ref_count);
sub_mref_a->my_brick = brick;
sub_mref_a->orig_rw = WRITE;
sub_mref_a->orig_flags = MREF_WRITE;
sub_mref_a->wb = wb;
status = GENERIC_INPUT_CALL(write_input, mref_get, sub_mref);
@ -1731,9 +1732,11 @@ bool prep_phase_startio(struct trans_logger_mref_aspect *mref_a)
brick = mref_a->my_brick;
CHECK_PTR(brick, err);
MARS_IO("pos = %lld len = %d rw = %d\n", mref->ref_pos, mref->ref_len, mref->ref_rw);
MARS_IO("pos = %lld len = %d flags = %ux\n",
mref->ref_pos, mref->ref_len,
mref->ref_flags);
if (mref->ref_rw == READ) {
if (!(mref->ref_flags & MREF_WRITE)) {
// nothing to do: directly signal success.
struct mref_object *shadow = shadow_a->object;
if (unlikely(shadow == mref)) {
@ -2961,8 +2964,7 @@ int replay_data(struct trans_logger_brick *brick, loff_t pos, void *buf, int len
mref->ref_pos = pos;
mref->ref_data = NULL;
mref->ref_len = len;
mref->ref_may_write = WRITE;
mref->ref_rw = WRITE;
mref->ref_flags = MREF_WRITE | MREF_MAY_WRITE;
status = GENERIC_INPUT_CALL(input, mref_get, mref);
if (unlikely(status < 0)) {

View File

@ -148,7 +148,7 @@ struct trans_logger_mref_aspect {
struct trans_logger_mref_aspect *shadow_ref;
struct trans_logger_mref_aspect *orig_mref_a;
void *shadow_data;
int orig_rw;
int orig_flags;
int wb_error;
bool do_dealloc;
bool do_buffered;

View File

@ -89,15 +89,15 @@ static void _usebuf_endio(struct generic_callback *cb)
CHECK_PTR(sub_mref, done);
if (mref->ref_data != sub_mref->ref_data && cb->cb_error >= 0) {
if (sub_mref->ref_may_write == 0) {
if (!(sub_mref->ref_flags & MREF_MAY_WRITE)) {
if (sub_mref->ref_flags & MREF_UPTODATE) {
_usebuf_copy(mref, sub_mref, 0);
mref->ref_flags |= MREF_UPTODATE;
}
#ifndef FAKE_ALL
} else if (sub_mref->ref_rw == 0) {
} else if (!(sub_mref->ref_flags & MREF_WRITE)) {
MARS_IO("re-kick %p\n", sub_mref);
sub_mref->ref_rw = 1;
sub_mref->ref_flags |= MREF_WRITE;
_usebuf_copy(mref, sub_mref, 1);
mref->ref_flags |= MREF_UPTODATE;
GENERIC_INPUT_CALL(mref_a->input, mref_io, sub_mref);
@ -158,7 +158,7 @@ static int usebuf_ref_get(struct usebuf_output *output, struct mref_object *mref
mref_a->sub_mref_a = sub_mref_a;
sub_mref->ref_pos = mref->ref_pos;
sub_mref->ref_len = mref->ref_len;
sub_mref->ref_may_write = mref->ref_may_write;
sub_mref->ref_flags = mref->ref_flags & MREF_MAY_WRITE;
#ifdef DIRECT_IO // shortcut solely for testing: do direct IO
if (!mref->ref_data)
MARS_ERR("NULL.......\n");
@ -253,26 +253,26 @@ static void usebuf_ref_io(struct usebuf_output *output, struct mref_object *mref
goto err;
}
if (mref->ref_rw != 0 && sub_mref->ref_may_write == 0) {
if ((mref->ref_flags % MREF_WRITE) && !(sub_mref->ref_flags & MREF_MAY_WRITE)) {
MARS_ERR("mref_may_write was not set before\n");
goto err;
}
_mref_get(mref);
sub_mref->ref_rw = mref->ref_rw;
sub_mref->ref_flags |= mref->ref_flags & MREF_WRITE;
sub_mref->ref_len = mref->ref_len;
mref_a->input = input;
/* Optimization: when buffered IO is used and buffer is already
* uptodate, skip real IO operation.
*/
if (mref->ref_rw != 0) {
if (mref->ref_flags & MREF_WRITE) {
#ifdef DIRECT_WRITE
sub_mref->ref_rw = 1;
sub_mref->ref_flags |= MREF_WRITE;
#else // normal case
sub_mref->ref_rw = 0;
sub_mref->ref_flags &= ~MREF_WRITE;
if (sub_mref->ref_flags & MREF_UPTODATE) {
sub_mref->ref_rw = 1;
sub_mref->ref_flags |= MREF_WRITE;
}
#endif
} else if (sub_mref->ref_flags & MREF_UPTODATE) {
@ -281,7 +281,7 @@ static void usebuf_ref_io(struct usebuf_output *output, struct mref_object *mref
return;
}
if (mref->ref_data != sub_mref->ref_data) {
if (sub_mref->ref_rw != 0) {
if (sub_mref->ref_flags & MREF_WRITE) {
_usebuf_copy(mref, sub_mref, 1);
mref->ref_flags |= MREF_UPTODATE;
}