mirror of
https://github.com/schoebel/mars
synced 2025-01-13 02:15:06 +00:00
client: flush old buffers when channel is changed
This commit is contained in:
parent
8045e6b632
commit
bf2358f4dc
@ -772,6 +772,8 @@ static int sender_thread(void *data)
|
||||
struct client_brick *brick = output->brick;
|
||||
struct client_channel *ch = NULL;
|
||||
bool do_timeout = false;
|
||||
bool cork = false;
|
||||
bool old_cork;
|
||||
int ch_skip = max_client_bulk;
|
||||
int status = -ESHUTDOWN;
|
||||
unsigned long flags;
|
||||
@ -782,7 +784,6 @@ static int sender_thread(void *data)
|
||||
struct mref_object *mref;
|
||||
int min_nr;
|
||||
int max_nr;
|
||||
bool cork;
|
||||
|
||||
// timeouting is a rather expensive operation, don't do it too often
|
||||
if (do_timeout) {
|
||||
@ -798,6 +799,11 @@ static int sender_thread(void *data)
|
||||
|
||||
|
||||
if (output->get_info) {
|
||||
if (ch && old_cork) {
|
||||
/* flush old buffer */
|
||||
old_cork = false;
|
||||
mars_send_raw(&ch->socket, NULL, 0, false);
|
||||
}
|
||||
ch = _get_channel(bundle, 0, 1);
|
||||
if (unlikely(!ch)) {
|
||||
do_timeout = true;
|
||||
@ -856,6 +862,11 @@ static int sender_thread(void *data)
|
||||
!mars_socket_is_alive(&ch->socket))
|
||||
do_timeout = true;
|
||||
if (do_timeout || ch->ch_nr >= max_nr || --ch_skip < 0) {
|
||||
if (ch && old_cork) {
|
||||
/* flush old buffer */
|
||||
old_cork = false;
|
||||
mars_send_raw(&ch->socket, NULL, 0, false);
|
||||
}
|
||||
ch = _get_channel(bundle, min_nr, max_nr);
|
||||
if (unlikely(!ch)) {
|
||||
// notice: this will re-assign hash_head without harm
|
||||
@ -876,6 +887,7 @@ static int sender_thread(void *data)
|
||||
spin_unlock(&output->lock);
|
||||
|
||||
status = mars_send_mref(&ch->socket, mref, cork);
|
||||
old_cork = cork;
|
||||
if (unlikely(status < 0)) {
|
||||
_hash_insert(output, mref_a);
|
||||
do_timeout = true;
|
||||
|
@ -515,9 +515,11 @@ restart:
|
||||
}
|
||||
|
||||
if (msock->s_pos + rest < PAGE_SIZE) {
|
||||
memcpy(msock->s_buffer + msock->s_pos, buf, rest);
|
||||
msock->s_pos += rest;
|
||||
sent += rest;
|
||||
if (buf) {
|
||||
memcpy(msock->s_buffer + msock->s_pos, buf, rest);
|
||||
msock->s_pos += rest;
|
||||
sent += rest;
|
||||
}
|
||||
rest = 0;
|
||||
status = sent;
|
||||
if (cork)
|
||||
|
Loading…
Reference in New Issue
Block a user