mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-03-06 03:18:43 +00:00
BUG/MEDIUM: stream: Save unprocessed events for a stream
A stream can be awakened for different reasons. During its processing, it can be early stopped if no buffer is available. In this situation, the reason why the stream was awakened is lost, because we rely on the task state, which is reset after each processing loop. In many cases, that's not a big deal. But it can be useful to accumulate the task states if the stream processing is interrupted, especially if some filters need to be called. To be clearer, here is an simple example: 1) A stream is awakened with the reason TASK_WOKEN_MSG. 2) Because no buffer is available, the processing is interrupted, the stream is back to sleep. And the task state is reset. 3) Some buffers become available, so the stream is awakened with the reason TASK_WOKEN_RES. At this step, the previous reason (TASK_WOKEN_MSG) is lost. Now, the task states are saved for a stream and reset only when the stream processing is not interrupted. The correspoing bitfield represents the pending events for a stream. And we use this one instead of the task state during the stream processing. Note that TASK_WOKEN_TIMER and TASK_WOKEN_RES are always removed because these events are always handled during the stream processing. [wt: backport to 1.7 and 1.6]
This commit is contained in:
parent
34c5cc98da
commit
9d810cae11
@ -129,6 +129,9 @@ struct stream {
|
||||
struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */
|
||||
|
||||
struct task *task; /* the task associated with this stream */
|
||||
unsigned short pending_events; /* the pending events not yet processed by the stream.
|
||||
* This is a bit field of TASK_WOKEN_* */
|
||||
|
||||
struct list list; /* position in global streams list */
|
||||
struct list by_srv; /* position in server stream list */
|
||||
struct list back_refs; /* list of users tracking this stream */
|
||||
|
16
src/stream.c
16
src/stream.c
@ -145,6 +145,7 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
|
||||
s->unique_id = NULL;
|
||||
|
||||
s->task = t;
|
||||
s->pending_events = 0;
|
||||
t->process = process_stream;
|
||||
t->context = s;
|
||||
t->expire = TICK_ETERNITY;
|
||||
@ -1584,10 +1585,13 @@ struct task *process_stream(struct task *t)
|
||||
si_f->flags |= SI_FL_DONT_WAKE;
|
||||
si_b->flags |= SI_FL_DONT_WAKE;
|
||||
|
||||
/* update pending events */
|
||||
s->pending_events |= (t->state & TASK_WOKEN_ANY);
|
||||
|
||||
/* 1a: Check for low level timeouts if needed. We just set a flag on
|
||||
* stream interfaces when their timeouts have expired.
|
||||
*/
|
||||
if (unlikely(t->state & TASK_WOKEN_TIMER)) {
|
||||
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
||||
stream_int_check_timeouts(si_f);
|
||||
stream_int_check_timeouts(si_b);
|
||||
|
||||
@ -1635,7 +1639,7 @@ struct task *process_stream(struct task *t)
|
||||
(CF_SHUTR|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_SHUTW|
|
||||
CF_WRITE_ACTIVITY|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT)) &&
|
||||
!((si_f->flags | si_b->flags) & (SI_FL_EXP|SI_FL_ERR)) &&
|
||||
((t->state & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
|
||||
((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
|
||||
si_f->flags &= ~SI_FL_DONT_WAKE;
|
||||
si_b->flags &= ~SI_FL_DONT_WAKE;
|
||||
goto update_exp_and_leave;
|
||||
@ -1769,7 +1773,7 @@ struct task *process_stream(struct task *t)
|
||||
((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
|
||||
si_f->state != rq_prod_last ||
|
||||
si_b->state != rq_cons_last ||
|
||||
s->task->state & TASK_WOKEN_MSG) {
|
||||
s->pending_events & TASK_WOKEN_MSG) {
|
||||
unsigned int flags = req->flags;
|
||||
|
||||
if (si_f->state >= SI_ST_EST) {
|
||||
@ -1868,7 +1872,7 @@ struct task *process_stream(struct task *t)
|
||||
(res->flags ^ rpf_last) & CF_MASK_STATIC ||
|
||||
si_f->state != rp_cons_last ||
|
||||
si_b->state != rp_prod_last ||
|
||||
s->task->state & TASK_WOKEN_MSG) {
|
||||
s->pending_events & TASK_WOKEN_MSG) {
|
||||
unsigned int flags = res->flags;
|
||||
|
||||
if ((res->flags & CF_MASK_ANALYSER) &&
|
||||
@ -2369,6 +2373,9 @@ struct task *process_stream(struct task *t)
|
||||
req->rex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
/* Reset pending events now */
|
||||
s->pending_events = 0;
|
||||
|
||||
update_exp_and_leave:
|
||||
/* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */
|
||||
t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire),
|
||||
@ -2402,6 +2409,7 @@ struct task *process_stream(struct task *t)
|
||||
if (!tick_isset(t->expire))
|
||||
ABORT_NOW();
|
||||
#endif
|
||||
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
||||
stream_release_buffers(s);
|
||||
return t; /* nothing more to do */
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user