mirror of https://git.ffmpeg.org/ffmpeg.git
fate/api: test threadmessage
This commit is contained in:
parent
f98abe0ee7
commit
a26e4215b9
|
@ -3,6 +3,7 @@ APITESTPROGS-$(call DEMDEC, H264, H264) += api-h264
|
|||
APITESTPROGS-yes += api-seek
|
||||
APITESTPROGS-yes += api-codec-param
|
||||
APITESTPROGS-$(call DEMDEC, H263, H263) += api-band
|
||||
APITESTPROGS-$(HAVE_PTHREADS) += api-threadmessage
|
||||
APITESTPROGS += $(APITESTPROGS-yes)
|
||||
|
||||
APITESTOBJS := $(APITESTOBJS:%=$(APITESTSDIR)%) $(APITESTPROGS:%=$(APITESTSDIR)/%-test.o)
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
||||
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
* THE SOFTWARE.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Thread message API test
|
||||
*/
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "libavutil/avassert.h"
|
||||
#include "libavutil/avstring.h"
|
||||
#include "libavutil/frame.h"
|
||||
#include "libavutil/threadmessage.h"
|
||||
|
||||
struct sender_data {
|
||||
int id;
|
||||
pthread_t tid;
|
||||
int workload;
|
||||
AVThreadMessageQueue *queue;
|
||||
};
|
||||
|
||||
/* same as sender_data but shuffled for testing purpose */
|
||||
struct receiver_data {
|
||||
pthread_t tid;
|
||||
int workload;
|
||||
int id;
|
||||
AVThreadMessageQueue *queue;
|
||||
};
|
||||
|
||||
struct message {
|
||||
AVFrame *frame;
|
||||
// we add some junk in the message to make sure the message size is >
|
||||
// sizeof(void*)
|
||||
int magic;
|
||||
};
|
||||
|
||||
#define MAGIC 0xdeadc0de
|
||||
|
||||
static void free_frame(void *arg)
|
||||
{
|
||||
struct message *msg = arg;
|
||||
av_assert0(msg->magic == MAGIC);
|
||||
av_frame_free(&msg->frame);
|
||||
}
|
||||
|
||||
static void *sender_thread(void *arg)
|
||||
{
|
||||
int i, ret = 0;
|
||||
struct sender_data *wd = arg;
|
||||
|
||||
av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
|
||||
for (i = 0; i < wd->workload; i++) {
|
||||
if (rand() % wd->workload < wd->workload / 10) {
|
||||
av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
|
||||
av_thread_message_flush(wd->queue);
|
||||
} else {
|
||||
char *val;
|
||||
AVDictionary *meta = NULL;
|
||||
struct message msg = {
|
||||
.magic = MAGIC,
|
||||
.frame = av_frame_alloc(),
|
||||
};
|
||||
|
||||
if (!msg.frame) {
|
||||
ret = AVERROR(ENOMEM);
|
||||
break;
|
||||
}
|
||||
|
||||
/* we add some metadata to identify the frames */
|
||||
val = av_asprintf("frame %d/%d from sender %d",
|
||||
i + 1, wd->workload, wd->id);
|
||||
if (!val) {
|
||||
av_frame_free(&msg.frame);
|
||||
ret = AVERROR(ENOMEM);
|
||||
break;
|
||||
}
|
||||
ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
|
||||
if (ret < 0) {
|
||||
av_frame_free(&msg.frame);
|
||||
break;
|
||||
}
|
||||
av_frame_set_metadata(msg.frame, meta);
|
||||
|
||||
/* allocate a real frame in order to simulate "real" work */
|
||||
msg.frame->format = AV_PIX_FMT_RGBA;
|
||||
msg.frame->width = 320;
|
||||
msg.frame->height = 240;
|
||||
ret = av_frame_get_buffer(msg.frame, 32);
|
||||
if (ret < 0) {
|
||||
av_frame_free(&msg.frame);
|
||||
break;
|
||||
}
|
||||
|
||||
/* push the frame in the common queue */
|
||||
av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
|
||||
wd->id, i + 1, wd->workload, msg.frame);
|
||||
ret = av_thread_message_queue_send(wd->queue, &msg, 0);
|
||||
if (ret < 0) {
|
||||
av_frame_free(&msg.frame);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
|
||||
wd->id, av_err2str(ret));
|
||||
av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *receiver_thread(void *arg)
|
||||
{
|
||||
int i, ret = 0;
|
||||
struct receiver_data *rd = arg;
|
||||
|
||||
for (i = 0; i < rd->workload; i++) {
|
||||
if (rand() % rd->workload < rd->workload / 10) {
|
||||
av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue\n", rd->id);
|
||||
av_thread_message_flush(rd->queue);
|
||||
} else {
|
||||
struct message msg;
|
||||
AVDictionary *meta;
|
||||
AVDictionaryEntry *e;
|
||||
|
||||
ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
|
||||
if (ret < 0)
|
||||
break;
|
||||
av_assert0(msg.magic == MAGIC);
|
||||
meta = av_frame_get_metadata(msg.frame);
|
||||
e = av_dict_get(meta, "sig", NULL, 0);
|
||||
av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
|
||||
av_frame_free(&msg.frame);
|
||||
}
|
||||
}
|
||||
|
||||
av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
|
||||
av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int get_workload(int minv, int maxv)
|
||||
{
|
||||
return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
|
||||
}
|
||||
|
||||
int main(int ac, char **av)
|
||||
{
|
||||
int i, ret = 0;
|
||||
int max_queue_size;
|
||||
int nb_senders, sender_min_load, sender_max_load;
|
||||
int nb_receivers, receiver_min_load, receiver_max_load;
|
||||
struct sender_data *senders;
|
||||
struct receiver_data *receivers;
|
||||
AVThreadMessageQueue *queue = NULL;
|
||||
|
||||
if (ac != 8) {
|
||||
av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
|
||||
"<nb_senders> <sender_min_send> <sender_max_send> "
|
||||
"<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
max_queue_size = atoi(av[1]);
|
||||
nb_senders = atoi(av[2]);
|
||||
sender_min_load = atoi(av[3]);
|
||||
sender_max_load = atoi(av[4]);
|
||||
nb_receivers = atoi(av[5]);
|
||||
receiver_min_load = atoi(av[6]);
|
||||
receiver_max_load = atoi(av[7]);
|
||||
|
||||
if (max_queue_size <= 0 ||
|
||||
nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
|
||||
nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
|
||||
av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
|
||||
"%d receivers receiving [%d-%d]\n", max_queue_size,
|
||||
nb_senders, sender_min_load, sender_max_load,
|
||||
nb_receivers, receiver_min_load, receiver_max_load);
|
||||
|
||||
senders = av_mallocz_array(nb_senders, sizeof(*senders));
|
||||
receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
|
||||
if (!senders || !receivers) {
|
||||
ret = AVERROR(ENOMEM);
|
||||
goto end;
|
||||
}
|
||||
|
||||
ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
|
||||
if (ret < 0)
|
||||
goto end;
|
||||
|
||||
av_thread_message_queue_set_free_func(queue, free_frame);
|
||||
|
||||
#define SPAWN_THREADS(type) do { \
|
||||
for (i = 0; i < nb_##type##s; i++) { \
|
||||
struct type##_data *td = &type##s[i]; \
|
||||
\
|
||||
td->id = i; \
|
||||
td->queue = queue; \
|
||||
td->workload = get_workload(type##_min_load, type##_max_load); \
|
||||
\
|
||||
ret = pthread_create(&td->tid, NULL, type##_thread, td); \
|
||||
if (ret) { \
|
||||
const int err = AVERROR(ret); \
|
||||
av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
|
||||
" thread: %s\n", av_err2str(err)); \
|
||||
goto end; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define WAIT_THREADS(type) do { \
|
||||
for (i = 0; i < nb_##type##s; i++) { \
|
||||
struct type##_data *td = &type##s[i]; \
|
||||
\
|
||||
ret = pthread_join(td->tid, NULL); \
|
||||
if (ret) { \
|
||||
const int err = AVERROR(ret); \
|
||||
av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
|
||||
" thread: %s\n", av_err2str(err)); \
|
||||
goto end; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
SPAWN_THREADS(receiver);
|
||||
SPAWN_THREADS(sender);
|
||||
|
||||
WAIT_THREADS(sender);
|
||||
WAIT_THREADS(receiver);
|
||||
|
||||
end:
|
||||
av_thread_message_queue_free(&queue);
|
||||
av_freep(&senders);
|
||||
av_freep(&receivers);
|
||||
|
||||
if (ret < 0 && ret != AVERROR_EOF) {
|
||||
av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -28,6 +28,12 @@ FATE_API_SAMPLES_LIBAVFORMAT-yes += fate-api-jpeg-codec-param
|
|||
fate-api-jpeg-codec-param: $(APITESTSDIR)/api-codec-param-test$(EXESUF)
|
||||
fate-api-jpeg-codec-param: CMD = run $(APITESTSDIR)/api-codec-param-test $(TARGET_SAMPLES)/exif/image_small.jpg
|
||||
|
||||
FATE_API-$(CONFIG_AVUTIL) += fate-api-threadmessage
|
||||
fate-api-threadmessage: $(APITESTSDIR)/api-threadmessage-test$(EXESUF)
|
||||
fate-api-threadmessage: CMD = run $(APITESTSDIR)/api-threadmessage-test 3 10 30 50 2 20 40
|
||||
fate-api-threadmessage: CMP = null
|
||||
fate-api-threadmessage: REF = /dev/null
|
||||
|
||||
FATE_API_SAMPLES-$(CONFIG_AVFORMAT) += $(FATE_API_SAMPLES_LIBAVFORMAT-yes)
|
||||
|
||||
ifdef SAMPLES
|
||||
|
|
Loading…
Reference in New Issue