server: separate flying reads and write counters

This commit is contained in:
Thomas Schoebel-Theuer 2023-01-16 13:34:23 +01:00 committed by Thomas Schoebel-Theuer
parent e33ec25563
commit 7f876a1a82
2 changed files with 33 additions and 12 deletions

View File

@ -115,18 +115,21 @@ int cb_thread(void *data)
brick_wake_smp(&brick->startup_event);
while (!brick_thread_should_stop() ||
atomic_read(&brick->in_flight) > 0) {
atomic_read(&brick->in_flight_reads) > 0 ||
atomic_read(&brick->in_flight_writes) > 0) {
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct list_head *tmp;
unsigned long wait_jiffies =
brick_thread_should_stop() ? 1 * HZ : 2;
bool cork;
bool was_write;
brick_wait_smp(
brick->cb_event,
atomic_read(&brick->in_flight) > 0,
atomic_read(&brick->in_flight_reads) > 0 ||
atomic_read(&brick->in_flight_writes) > 0,
wait_jiffies);
/* Try to get the next request for callback over
@ -139,6 +142,7 @@ int cb_thread(void *data)
*/
tmp = NULL;
cork = false;
was_write = false;
mutex_lock(&brick->cb_mutex);
if (!list_empty(&brick->cb_write_list)) {
tmp = brick->cb_write_list.next;
@ -146,6 +150,7 @@ int cb_thread(void *data)
cork =
!list_empty(&brick->cb_write_list) ||
!list_empty(&brick->cb_read_list);
was_write = true;
} else if (!list_empty(&brick->cb_read_list)) {
tmp = brick->cb_read_list.next;
list_del_init(tmp);
@ -202,7 +207,10 @@ int cb_thread(void *data)
if (mref_a->do_put) {
GENERIC_INPUT_CALL_VOID(brick->inputs[0], mref_put, mref);
atomic_dec(&brick->in_flight);
if (was_write)
atomic_dec(&brick->in_flight_writes);
else
atomic_dec(&brick->in_flight_reads);
} else {
mars_free_mref(mref);
}
@ -266,6 +274,7 @@ int server_io(struct server_brick *brick, struct mars_socket *sock, struct mars_
{
struct mref_object *mref;
struct server_mref_aspect *mref_a;
bool is_write;
int amount;
int status = -ENOTRECOVERABLE;
@ -314,7 +323,11 @@ int server_io(struct server_brick *brick, struct mars_socket *sock, struct mars_
mref_a->first_len = mref->ref_len;
}
mref_a->do_put = true;
atomic_inc(&brick->in_flight);
is_write = (mref->ref_flags & MREF_WRITE);
if (is_write)
atomic_inc(&brick->in_flight_writes);
else
atomic_inc(&brick->in_flight_reads);
GENERIC_INPUT_CALL_VOID(brick->inputs[0], mref_io, mref);
@ -323,7 +336,7 @@ done:
}
static
void _clean_list(struct server_brick *brick, struct list_head *start)
void _clean_list(struct server_brick *brick, struct list_head *start, bool was_write)
{
for (;;) {
struct server_mref_aspect *mref_a;
@ -342,7 +355,10 @@ void _clean_list(struct server_brick *brick, struct list_head *start)
if (mref_a->do_put) {
GENERIC_INPUT_CALL_VOID(brick->inputs[0], mref_put, mref);
atomic_dec(&brick->in_flight);
if (was_write)
atomic_dec(&brick->in_flight_writes);
else
atomic_dec(&brick->in_flight_reads);
} else {
mars_free_mref(mref);
}
@ -454,7 +470,7 @@ int handler_thread(void *data)
}
#endif
if (!mars_socket_is_alive(sock) &&
atomic_read(&brick->in_flight) <= 0 &&
atomic_read(&brick->in_flight_reads) + atomic_read(&brick->in_flight_writes) <= 0 &&
brick->conn_brick) {
if (mars_disconnect((void*)brick->inputs[0]) >= 0)
brick->conn_brick = NULL;
@ -851,8 +867,8 @@ static int server_switch(struct server_brick *brick)
current->comm);
goto done;
}
_clean_list(brick, &brick->cb_read_list);
_clean_list(brick, &brick->cb_write_list);
_clean_list(brick, &brick->cb_read_list, false);
_clean_list(brick, &brick->cb_write_list, true);
mutex_unlock(&brick->cb_mutex);
mars_power_led_off((void*)brick, true);
@ -879,10 +895,12 @@ char *server_statistics(struct server_brick *brick, int verbose)
snprintf(res, 1024,
"cb_running = %d "
"handler_running = %d "
"in_flight = %d\n",
"in_flight_reads = %d "
"in_flight_writes = %d\n",
brick->cb_running,
brick->handler_running,
atomic_read(&brick->in_flight));
atomic_read(&brick->in_flight_reads),
atomic_read(&brick->in_flight_writes));
return res;
}
@ -958,6 +976,8 @@ static int server_brick_construct(struct server_brick *brick)
mutex_init(&brick->cb_mutex);
INIT_LIST_HEAD(&brick->cb_read_list);
INIT_LIST_HEAD(&brick->cb_write_list);
atomic_set(&brick->in_flight_reads, 0);
atomic_set(&brick->in_flight_writes, 0);
return 0;
}

View File

@ -64,7 +64,8 @@ struct server_brick {
struct mutex cb_mutex;
struct list_head cb_read_list;
struct list_head cb_write_list;
atomic_t in_flight;
atomic_t in_flight_reads;
atomic_t in_flight_writes;
bool cb_running;
bool handler_running;
};