infra: introduce temporary bounce buffers

This commit is contained in:
Thomas Schoebel-Theuer 2019-07-25 12:59:32 +02:00 committed by Thomas Schoebel-Theuer
parent 3029876200
commit 00834a2370
3 changed files with 25 additions and 6 deletions

View File

@ -410,13 +410,18 @@ err:
MARS_FAT("internal pointer corruption\n"); MARS_FAT("internal pointer corruption\n");
} }
int log_read(struct log_status *logst,
int log_read(struct log_status *logst, bool sloppy, struct log_header *lh, void **payload, int *payload_len) bool sloppy,
struct log_header *lh,
void **payload, int *payload_len,
void **dealloc)
{ {
struct mref_object *mref; struct mref_object *mref;
int old_offset; int old_offset;
int status; int status;
*dealloc = NULL;
restart: restart:
status = 0; status = 0;
mref = logst->read_mref; mref = logst->read_mref;
@ -485,6 +490,7 @@ restart:
lh, lh,
payload, payload,
payload_len, payload_len,
dealloc,
&logst->seq_nr); &logst->seq_nr);
if (unlikely(status == 0)) { if (unlikely(status == 0)) {
@ -516,6 +522,10 @@ done_put:
logst->offset = 0; logst->offset = 0;
} }
if (status == -EAGAIN && old_offset > 0) { if (status == -EAGAIN && old_offset > 0) {
if (*dealloc) {
brick_mem_free(*dealloc);
*dealloc = NULL;
}
goto restart; goto restart;
} }
goto done; goto done;
@ -528,8 +538,6 @@ done_free:
goto done; goto done;
} }
EXPORT_SYMBOL_GPL(log_read);
int log_scan(void *buf, int log_scan(void *buf,
int len, int len,
@ -538,6 +546,7 @@ int log_scan(void *buf,
bool sloppy, bool sloppy,
struct log_header *lh, struct log_header *lh,
void **payload, int *payload_len, void **payload, int *payload_len,
void **dealloc,
unsigned int *seq_nr) unsigned int *seq_nr)
{ {
bool dirty = false; bool dirty = false;

View File

@ -116,6 +116,7 @@ extern int log_scan(void *buf,
bool sloppy, bool sloppy,
struct log_header *lh, struct log_header *lh,
void **payload, int *payload_len, void **payload, int *payload_len,
void **dealloc,
unsigned int *seq_nr); unsigned int *seq_nr);
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
@ -170,7 +171,11 @@ void *log_reserve(struct log_status *logst, struct log_header *lh);
bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private); bool log_finalize(struct log_status *logst, int len, void (*endio)(void *private, int error), void *private);
int log_read(struct log_status *logst, bool sloppy, struct log_header *lh, void **payload, int *payload_len); int log_read(struct log_status *logst,
bool sloppy,
struct log_header *lh,
void **payload, int *payload_len,
void **dealloc);
///////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////

View File

@ -3016,6 +3016,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
{ {
struct trans_logger_input *input = brick->inputs[brick->log_input_nr]; struct trans_logger_input *input = brick->inputs[brick->log_input_nr];
struct log_header lh = {}; struct log_header lh = {};
void *dealloc = NULL;
loff_t start_pos; loff_t start_pos;
loff_t end_pos; loff_t end_pos;
loff_t finished_pos = -1; loff_t finished_pos = -1;
@ -3055,8 +3056,10 @@ void trans_logger_replay(struct trans_logger_brick *brick)
status = 0; // treat as EOF status = 0; // treat as EOF
break; break;
} }
brick_mem_free(dealloc);
status = log_read(&input->logst, false, &lh, &buf, &len); status = log_read(&input->logst, false, &lh,
&buf, &len, &dealloc);
new_finished_pos = input->logst.log_pos + input->logst.offset; new_finished_pos = input->logst.log_pos + input->logst.offset;
MARS_RPL("read %lld %lld\n", finished_pos, new_finished_pos); MARS_RPL("read %lld %lld\n", finished_pos, new_finished_pos);
@ -3153,6 +3156,8 @@ void trans_logger_replay(struct trans_logger_brick *brick)
_exit_inputs(brick, false); _exit_inputs(brick, false);
} }
brick_mem_free(dealloc);
MARS_INF("waiting for finish...\n"); MARS_INF("waiting for finish...\n");
brick_wait(brick->worker_event, brick_wait(brick->worker_event,