logger: reduce atomic_t overhead for production

This commit is contained in:
Thomas Schoebel-Theuer 2019-03-12 11:38:35 +01:00
parent 079a5f7714
commit 5b2672e211
2 changed files with 78 additions and 1 deletions

View File

@ -219,7 +219,9 @@ void qq_mref_insert(struct logger_queue *q, struct trans_logger_mref_aspect *mre
{
struct mref_object *mref = mref_a->object;
_mref_get(mref); // must be paired with __trans_logger_ref_put()
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&q->q_brick->inner_balance_count);
#endif
mars_trace(mref, q->q_insert_info);
@ -365,7 +367,9 @@ struct trans_logger_mref_aspect *hash_find(struct trans_logger_brick *brick, lof
struct trans_logger_mref_aspect *res;
//unsigned int flags;
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_hash_find_count);
#endif
down_read(&start->hash_mutex);
@ -394,9 +398,11 @@ void hash_insert(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
_mref_check(elem_a->object);
#endif
#ifdef ADDITIONAL_COUNTERS
// only for statistics:
atomic_inc(&brick->hash_count);
atomic_inc(&brick->total_hash_insert_count);
#endif
down_write(&start->hash_mutex);
@ -428,7 +434,9 @@ void hash_extend(struct trans_logger_brick *brick, loff_t *_pos, int *_len, stru
CHECK_HEAD_EMPTY(collect_list);
}
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_hash_extend_count);
#endif
down_read(&start->hash_mutex);
@ -558,7 +566,9 @@ void hash_put_all(struct trans_logger_brick *brick, struct list_head *list)
list_del_init(&elem_a->hash_head);
elem_a->is_hashed = false;
#ifdef ADDITIONAL_COUNTERS
atomic_dec(&brick->hash_count);
#endif
}
err:
@ -661,7 +671,9 @@ int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_a
if (!mref->ref_data) { // buffered IO
mref->ref_data = mref_a->shadow_data;
mref_a->do_buffered = true;
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_sshadow_buffered_count);
#endif
}
mref->ref_flags = mshadow->ref_flags;
mref_a->shadow_ref = mshadow_a;
@ -670,6 +682,8 @@ int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_a
/* Get an ordinary internal reference
*/
_mref_get_first(mref); // must be paired with __trans_logger_ref_put()
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->inner_balance_count);
/* The internal reference from slave to master is already
@ -678,9 +692,9 @@ int _make_sshadow(struct trans_logger_output *output, struct trans_logger_mref_a
* It is compensated by master transition in __trans_logger_ref_put()
*/
atomic_inc(&brick->inner_balance_count);
atomic_inc(&brick->sshadow_count);
atomic_inc(&brick->total_sshadow_count);
#endif
#if 1
if (unlikely(mref->ref_len <= 0)) {
MARS_ERR("oops, len = %d\n", mref->ref_len);
@ -757,18 +771,24 @@ int _write_ref_get(struct trans_logger_output *output, struct trans_logger_mref_
if (!mref->ref_data) { // buffered IO
mref->ref_data = data;
mref_a->do_buffered = true;
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_mshadow_buffered_count);
#endif
}
mref_a->my_brick = brick;
mref->ref_flags = 0;
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->mshadow_count);
atomic_inc(&brick->total_mshadow_count);
#endif
atomic_inc(&global_mshadow_count);
atomic64_add(mref->ref_len, &global_mshadow_used);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->inner_balance_count);
#endif
_mref_get_first(mref); // must be paired with __trans_logger_ref_put()
return mref->ref_len;
@ -792,7 +812,9 @@ int trans_logger_ref_get(struct trans_logger_output *output, struct mref_object
CHECK_PTR(mref_a, err);
CHECK_ASPECT(mref_a, mref, err);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->outer_balance_count);
#endif
if (mref->ref_initialized) { // setup already performed
MARS_IO("again %d\n", atomic_read(&mref->ref_count.ta_atomic));
@ -870,7 +892,9 @@ restart:
_mref_check(shadow_a->object);
finished = _mref_put(mref);
#ifdef ADDITIONAL_COUNTERS
atomic_dec(&brick->inner_balance_count);
#endif
if (unlikely(finished && mref_a->is_hashed)) {
MARS_ERR("trying to put a hashed mref, pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
finished = false; // leaves a memleak
@ -895,7 +919,9 @@ restart:
if (shadow_a != mref_a) { // we are a slave shadow
//MARS_DBG("slave\n");
#ifdef ADDITIONAL_COUNTERS
atomic_dec(&brick->sshadow_count);
#endif
CHECK_HEAD_EMPTY(&mref_a->hash_head);
trans_logger_free_mref(mref);
// now put the master shadow
@ -913,7 +939,9 @@ restart:
if (mref_a->do_buffered) {
mref->ref_data = NULL;
}
#ifdef ADDITIONAL_COUNTERS
atomic_dec(&brick->mshadow_count);
#endif
atomic_dec(&global_mshadow_count);
atomic64_sub(mref->ref_len, &global_mshadow_used);
trans_logger_free_mref(mref);
@ -954,8 +982,11 @@ err:
static noinline
void trans_logger_ref_put(struct trans_logger_output *output, struct mref_object *mref)
{
#ifdef ADDITIONAL_COUNTERS
struct trans_logger_brick *brick = output->brick;
atomic_dec(&brick->outer_balance_count);
#endif
_trans_logger_ref_put(output, mref);
}
@ -979,7 +1010,9 @@ void _trans_logger_endio(struct generic_callback *cb)
NEXT_CHECKED_CALLBACK(cb, err);
atomic_dec(&brick->any_fly_count);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_cb_count);
#endif
brick_wake(&brick->worker_event, brick->worker_flag);
return;
@ -1003,12 +1036,14 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
MARS_IO("pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
#ifdef ADDITIONAL_COUNTERS
// statistics
if (mref->ref_rw) {
atomic_inc(&brick->total_write_count);
} else {
atomic_inc(&brick->total_read_count);
}
#endif
// is this a shadow buffer?
shadow_a = mref_a->shadow_ref;
@ -1019,7 +1054,9 @@ void trans_logger_ref_io(struct trans_logger_output *output, struct mref_object
CHECK_HEAD_EMPTY(&mref_a->pos_head);
#endif
_mref_get(mref); // must be paired with __trans_logger_ref_put()
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->inner_balance_count);
#endif
qq_mref_insert(&brick->q_phase[0], mref_a);
brick_wake(&brick->worker_event, brick->worker_flag);
@ -1060,7 +1097,9 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
CHECK_PTR(brick, err);
CHECK_PTR(log_input, err);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_writeback_count);
#endif
tmp = &orig_mref_a->pos_head;
@ -1163,7 +1202,9 @@ void wb_endio(struct generic_callback *cb)
wb->w_error = cb->cb_error;
}
#ifdef ADDITIONAL_COUNTERS
atomic_dec(&brick->wb_balance_count);
#endif
rw = sub_mref_a->orig_rw;
dec = rw ? &wb->w_sub_write_count : &wb->w_sub_read_count;
@ -1290,7 +1331,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_read_list);
atomic_inc(&wb->w_sub_read_count);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->wb_balance_count);
#endif
this_len = sub_mref->ref_len;
pos += this_len;
@ -1315,7 +1358,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
int diff;
int status;
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_hash_find_count);
#endif
orig_mref_a = _hash_find(&wb->w_collect_list, pos, &this_len, true, false);
if (unlikely(!orig_mref_a)) {
@ -1365,7 +1410,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
list_add_tail(&sub_mref_a->sub_head, &wb->w_sub_write_list);
atomic_inc(&wb->w_sub_write_count);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->wb_balance_count);
#endif
this_len = sub_mref->ref_len;
pos += this_len;
@ -1626,7 +1673,9 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
* This may happen rather early in phase0_preio().
*/
_mref_get(orig_mref); // must be paired with __trans_logger_ref_put()
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->inner_balance_count);
#endif
atomic_inc(&brick->log_fly_count);
ok = log_finalize(logst, orig_mref->ref_len, phase0_endio, orig_mref_a);
@ -2028,7 +2077,9 @@ void phase3_endio(struct generic_callback *cb)
hash_put_all(brick, &wb->w_collect_list);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_writeback_cluster_count);
#endif
free_writeback(wb);
@ -2301,7 +2352,9 @@ int _do_ranking(struct trans_logger_brick *brick)
if (delay_callers) {
if (!brick->delay_callers) {
brick->delay_callers = true;
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_delay_count);
#endif
}
} else if (brick->delay_callers) {
brick->delay_callers = false;
@ -2514,7 +2567,9 @@ void _flush_inputs(struct trans_logger_brick *brick)
struct trans_logger_input *input = brick->inputs[i];
struct log_status *logst = &input->logst;
if (input->is_operating && logst->count > 0) {
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_flush_count);
#endif
log_flush(logst);
}
}
@ -2628,7 +2683,9 @@ void trans_logger_log(struct trans_logger_brick *brick)
}),
HZ / 10);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_round_count);
#endif
if (brick->cease_logging) {
brick->stopped_logging = true;
@ -2784,9 +2841,11 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
&& (_has_conflict(brick, mref_a) ? conflicts++ : (ok = true), ok),
60 * HZ);
#ifdef ADDITIONAL_COUNTERS
atomic_inc(&brick->total_replay_count);
if (conflicts)
atomic_inc(&brick->total_replay_conflict_count);
#endif
down_write(&brick->replay_mutex);
was_empty = !!list_empty(&mref_a->replay_head);
@ -3162,6 +3221,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
"inf_max_pos1 = %lld "
"inf_min_pos2 = %lld "
"inf_max_pos2 = %lld | "
#ifdef ADDITIONAL_COUNTERS
"total hash_insert=%d "
"hash_find=%d "
"hash_extend=%d "
@ -3180,13 +3240,16 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
"rounds=%d "
"restarts=%d "
"delays=%d | "
#endif
"current #mrefs = %d "
"shadow_mem_used=%ld/%lld "
"replay_count=%d "
#ifdef ADDITIONAL_COUNTERS
"mshadow=%d/%d "
"sshadow=%d "
"hash_count=%d "
"balance=%d/%d/%d/%d "
#endif
"pos_count1=%d "
"pos_count2=%d "
"log_refs1=%d "
@ -3216,6 +3279,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
brick->inputs[TL_INPUT_LOG1]->inf.inf_max_pos,
brick->inputs[TL_INPUT_LOG2]->inf.inf_min_pos,
brick->inputs[TL_INPUT_LOG2]->inf.inf_max_pos,
#ifdef ADDITIONAL_COUNTERS
atomic_read(&brick->total_hash_insert_count),
atomic_read(&brick->total_hash_find_count),
atomic_read(&brick->total_hash_extend_count),
@ -3239,10 +3303,12 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
atomic_read(&brick->total_round_count),
atomic_read(&brick->total_restart_count),
atomic_read(&brick->total_delay_count),
#endif
atomic_read(&brick->mref_object_layout.alloc_count),
atomic64_read(&brick->shadow_mem_used) / 1024,
brick_global_memlimit,
atomic_read(&brick->replay_count),
#ifdef ADDITIONAL_COUNTERS
atomic_read(&brick->mshadow_count),
brick->shadow_mem_limit,
atomic_read(&brick->sshadow_count),
@ -3251,6 +3317,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
atomic_read(&brick->inner_balance_count),
atomic_read(&brick->outer_balance_count),
atomic_read(&brick->wb_balance_count),
#endif
atomic_read(&brick->inputs[TL_INPUT_LOG1]->pos_count),
atomic_read(&brick->inputs[TL_INPUT_LOG2]->pos_count),
atomic_read(&brick->inputs[TL_INPUT_LOG1]->log_ref_count),
@ -3281,6 +3348,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
static noinline
void trans_logger_reset_statistics(struct trans_logger_brick *brick)
{
#ifdef ADDITIONAL_COUNTERS
atomic_set(&brick->total_hash_insert_count, 0);
atomic_set(&brick->total_hash_find_count, 0);
atomic_set(&brick->total_hash_extend_count, 0);
@ -3300,6 +3368,7 @@ void trans_logger_reset_statistics(struct trans_logger_brick *brick)
atomic_set(&brick->total_round_count, 0);
atomic_set(&brick->total_restart_count, 0);
atomic_set(&brick->total_delay_count, 0);
#endif
}
@ -3399,7 +3468,9 @@ int trans_logger_brick_construct(struct trans_logger_brick *brick)
}
}
#ifdef ADDITIONAL_COUNTERS
atomic_set(&brick->hash_count, 0);
#endif
init_rwsem(&brick->replay_mutex);
INIT_LIST_HEAD(&brick->replay_list);
INIT_LIST_HEAD(&brick->group_head);

View File

@ -37,6 +37,10 @@
#include "lib_timing.h"
#include "lib_rank.h"
#ifdef CONFIG_MARS_DEBUG
#define ADDITIONAL_COUNTERS
#endif
///////////////////////// global tuning ////////////////////////
/* 0 = early completion of all writes
@ -201,6 +205,7 @@ struct trans_logger_brick {
atomic_t replay_count;
atomic_t any_fly_count;
atomic_t log_fly_count;
#ifdef ADDITIONAL_COUNTERS
atomic_t hash_count;
atomic_t mshadow_count;
atomic_t sshadow_count;
@ -227,6 +232,7 @@ struct trans_logger_brick {
atomic_t total_round_count;
atomic_t total_restart_count;
atomic_t total_delay_count;
#endif
// queues
struct logger_queue q_phase[LOGGER_QUEUES];
struct rank_data rkd[LOGGER_QUEUES];