MEDIUM: applet: Handle channel's STREAMER flags on applets size

Till now, it was not possible to notify an producing applet is streaming
data. It means, it was not possible to set CF_STREAMER and CF_STREAMER_FLAGS
on the input channel of an applet streaming data.

While it is not a big deal for most of applets, it is interesting for the
cache. Because there are now dedicated functions to deal with these flags,
we can use them in task_run_applet() to set/unset these flags on the input
channel.

This patch relies on "MINOR: channel: Use dedicated functions to deal with
STREAMER flags".
This commit is contained in:
Christopher Faulet 2023-11-21 08:03:37 +01:00
parent a40321eb3b
commit 52c84ab0e0

View File

@ -404,8 +404,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
{
struct appctx *app = context;
struct stconn *sc, *sco;
struct channel *ic, *oc;
unsigned int rate;
size_t count;
size_t input, output;
int did_send = 0;
TRACE_ENTER(APPLET_EV_PROCESS, app);
@ -434,6 +435,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
sc = appctx_sc(app);
sco = sc_opposite(sc);
ic = sc_ic(sc);
oc = sc_oc(sc);
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
* that one applet which ignores any event will not spin.
@ -450,7 +454,10 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
if (!sc_alloc_ibuf(sc, &app->buffer_wait))
applet_have_more_data(app);
count = co_data(sc_oc(sc));
channel_check_idletimer(ic);
input = channel_data(ic);
output = co_data(oc);
app->applet->fct(app);
TRACE_POINT(APPLET_EV_PROCESS, app);
@ -458,9 +465,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
/* now check if the applet has released some room and forgot to
* notify the other side about it.
*/
if (count != co_data(sc_oc(sc))) {
sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
if (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)
if (output != co_data(oc)) {
oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
if (sco->room_needed < 0 || channel_recv_max(oc) >= sco->room_needed)
sc_have_room(sco);
did_send = 1;
}
@ -469,14 +476,17 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
sc_have_room(sco);
}
if (sc_ic(sc)->flags & CF_READ_EVENT)
input = channel_data(ic) - input;
if (input) {
channel_check_xfer(ic, input);
sc_ep_report_read_activity(sc);
}
if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) {
sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR);
}
if (!co_data(sc_oc(sc))) {
if (!co_data(oc)) {
if (did_send)
sc_ep_report_send_activity(sc);
}
@ -495,7 +505,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
}
sc->app_ops->wake(sc);
channel_release_buffer(sc_ic(sc), &app->buffer_wait);
channel_release_buffer(ic, &app->buffer_wait);
TRACE_LEAVE(APPLET_EV_PROCESS, app);
return t;
}