diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 1de125f17..f4da4ddd7 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -246,8 +246,8 @@ struct spoe_context { struct stream *strm; /* The stream that should be offloaded */ struct list *messages; /* List of messages that will be sent during the stream processing */ - struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */ - struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */ + struct buffer *buffer; /* Buffer used to store a encoded messages */ + struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct list list; enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ @@ -270,6 +270,8 @@ struct spoe_appctx { unsigned int flags; /* SPOE_APPCTX_FL_* */ unsigned int status_code; /* SPOE_FRM_ERR_* */ + struct buffer *buffer; /* Buffer used to store a encoded messages */ + struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */ struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */ struct list list; /* next spoe appctx for the same agent */ }; @@ -309,8 +311,8 @@ char spoe_reason[256]; struct flt_ops spoe_ops; static int queue_spoe_context(struct spoe_context *ctx); -static int acquire_spoe_buffer(struct spoe_context *ctx); -static void release_spoe_buffer(struct spoe_context *ctx); +static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); +static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait); /******************************************************************** * helper functions/globals @@ -913,15 +915,15 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, idx += encode_spoe_varint(ctx->stream_id, frame+idx); idx += encode_spoe_varint(ctx->frame_id, frame+idx); - /* Copy encoded messages */ - if (idx + ctx->buffer->i > size) { + /* check the buffer size */ + if (idx + SPOE_APPCTX(appctx)->buffer->i > size) { spoe_status_code = SPOE_FRM_ERR_TOO_BIG; return 0; } /* Copy encoded messages */ - memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i); - idx += ctx->buffer->i; + memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i); + idx += SPOE_APPCTX(appctx)->buffer->i; return idx; } @@ -1230,9 +1232,13 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) return 0; found: - if (!acquire_spoe_buffer(ctx)) + if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait)) return 1; /* Retry later */ + /* Transfer the buffer ownership to the SPOE context */ + ctx->buffer = SPOE_APPCTX(appctx)->buffer; + SPOE_APPCTX(appctx)->buffer = &buf_empty; + /* Copy encoded actions */ memcpy(ctx->buffer->p, frame+idx, size-idx); ctx->buffer->i = size-idx; @@ -1379,6 +1385,15 @@ recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) /******************************************************************** * Functions that manage the SPOE applet ********************************************************************/ +static int +wakeup_spoe_appctx(struct appctx *appctx) +{ + si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); + appctx_wakeup(appctx); + return 1; +} + /* Callback function that catches applet timeouts. If a timeout occurred, we set * <appctx->st1> flag and the SPOE applet is woken up. */ static struct task * @@ -1391,9 +1406,7 @@ process_spoe_applet(struct task * task) task->expire = TICK_ETERNITY; appctx->st1 = SPOE_APPCTX_ERR_TOUT; } - si_applet_want_get(appctx->owner); - si_applet_want_put(appctx->owner); - appctx_wakeup(appctx); + wakeup_spoe_appctx(appctx); return task; } @@ -1441,6 +1454,7 @@ release_spoe_applet(struct appctx *appctx) task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); } + release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx)); if (!LIST_ISEMPTY(&agent->applets)) @@ -1633,6 +1647,11 @@ handle_processing_spoe_applet(struct appctx *appctx) } ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + + /* Transfer the buffer ownership to the SPOE appctx */ + SPOE_APPCTX(appctx)->buffer = ctx->buffer; + ctx->buffer = &buf_empty; + ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size); if (ret > 1) ret = send_spoe_frame(appctx, frame, ret); @@ -1646,7 +1665,7 @@ handle_processing_spoe_applet(struct appctx *appctx) agent->sending_rate++; ctx->state = SPOE_CTX_ST_ERROR; ctx->status_code = (spoe_status_code + 0x100); - release_spoe_buffer(ctx); + release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); @@ -1661,7 +1680,7 @@ handle_processing_spoe_applet(struct appctx *appctx) default: agent->sending_rate++; ctx->state = SPOE_CTX_ST_WAITING_ACK; - release_spoe_buffer(ctx); + release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait); LIST_DEL(&ctx->list); LIST_INIT(&ctx->list); if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) @@ -1963,6 +1982,11 @@ create_spoe_appctx(struct spoe_config *conf) SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size; SPOE_APPCTX(appctx)->flags = 0; SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE; + SPOE_APPCTX(appctx)->buffer = &buf_empty; + + LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list); + SPOE_APPCTX(appctx)->buffer_wait.target = appctx; + SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx; LIST_INIT(&SPOE_APPCTX(appctx)->list); LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue); @@ -2093,9 +2117,7 @@ queue_spoe_context(struct spoe_context *ctx) list_for_each_entry(spoe_appctx, &agent->applets, list) { appctx = spoe_appctx->owner; if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { - si_applet_want_get(appctx->owner); - si_applet_want_put(appctx->owner); - appctx_wakeup(appctx); + wakeup_spoe_appctx(appctx); LIST_DEL(&spoe_appctx->list); LIST_ADDQ(&agent->applets, &spoe_appctx->list); break; @@ -2379,7 +2401,7 @@ start_event_processing(struct spoe_context *ctx, int dir) if (ctx->flags & SPOE_CTX_FL_PROCESS) goto wait; - if (!acquire_spoe_buffer(ctx)) + if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait)) goto wait; /* Set the right flag to prevent request and response processing @@ -2405,7 +2427,7 @@ stop_event_processing(struct spoe_context *ctx) /* Reset processing timer */ ctx->process_exp = TICK_ETERNITY; - release_spoe_buffer(ctx); + release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait); if (!LIST_ISEMPTY(&ctx->list)) { LIST_DEL(&ctx->list); @@ -2539,39 +2561,41 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, * Functions that create/destroy SPOE contexts **************************************************************************/ static int -acquire_spoe_buffer(struct spoe_context *ctx) +acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) { - if (ctx->buffer != &buf_empty) + if (*buf != &buf_empty) return 1; - if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { - LIST_DEL(&ctx->buffer_wait.list); - LIST_INIT(&ctx->buffer_wait.list); + if (!LIST_ISEMPTY(&buffer_wait->list)) { + LIST_DEL(&buffer_wait->list); + LIST_INIT(&buffer_wait->list); } - if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) + if (b_alloc_margin(buf, global.tune.reserved_bufs)) return 1; - LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); + LIST_ADDQ(&buffer_wq, &buffer_wait->list); return 0; } static void -release_spoe_buffer(struct spoe_context *ctx) +release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) { - if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { - LIST_DEL(&ctx->buffer_wait.list); - LIST_INIT(&ctx->buffer_wait.list); + if (!LIST_ISEMPTY(&buffer_wait->list)) { + LIST_DEL(&buffer_wait->list); + LIST_INIT(&buffer_wait->list); } /* Release the buffer if needed */ - if (ctx->buffer != &buf_empty) { - b_free(&ctx->buffer); - offer_buffers(ctx, tasks_run_queue + applets_active_queue); + if (*buf != &buf_empty) { + b_free(buf); + offer_buffers(buffer_wait->target, + tasks_run_queue + applets_active_queue); } } -static int wakeup_spoe_context(struct spoe_context *ctx) +static int +wakeup_spoe_context(struct spoe_context *ctx) { task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); return 1; @@ -2643,7 +2667,6 @@ sig_stop_spoe(struct sig_handler *sh) list_for_each_entry(fconf, &p->filter_configs, list) { struct spoe_config *conf; struct spoe_agent *agent; - struct appctx *appctx; struct spoe_appctx *spoe_appctx; if (fconf->id != spoe_filter_id) @@ -2653,10 +2676,7 @@ sig_stop_spoe(struct sig_handler *sh) agent = conf->agent; list_for_each_entry(spoe_appctx, &agent->applets, list) { - appctx = spoe_appctx->owner; - si_applet_want_get(appctx->owner); - si_applet_want_put(appctx->owner); - appctx_wakeup(appctx); + wakeup_spoe_appctx(spoe_appctx->owner); } } p = p->next; @@ -2818,7 +2838,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter) if (tick_is_expired(ctx->process_exp, now_ms)) { s->pending_events |= TASK_WOKEN_MSG; - release_spoe_buffer(ctx); + release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait); } }