1
0
mirror of https://github.com/mpv-player/mpv synced 2025-01-16 20:14:57 +00:00
mpv/input/ipc-unix.c
Kacper Michajłow 55ed50ba90 mp_thread: prefer tracking threads with id
This change essentially removes mp_thread_self() and instead add
mp_thread_id to track threads and have ability to query current thread
id during runtime.

This will be useful for upcoming win32 implementation, where accessing
thread handle is different than on pthreads. Greatly reduces complexity.
Otherweis locked map of tid <-> handle is required which is completely
unnecessary for all mpv use-cases.

Note that this is the mp_thread_id, not to confuse with system tid. For
example on threads-posix implementation it is simply pthread_t.
2023-11-05 17:36:17 +00:00

445 lines
11 KiB
C

/*
* This file is part of mpv.
*
* mpv is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* mpv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with mpv. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <unistd.h>
#include <limits.h>
#include <poll.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include "osdep/io.h"
#include "osdep/threads.h"
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
#include "input/input.h"
#include "libmpv/client.h"
#include "options/m_config.h"
#include "options/options.h"
#include "options/path.h"
#include "player/client.h"
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
struct mp_ipc_ctx {
struct mp_log *log;
struct mp_client_api *client_api;
const char *path;
mp_thread thread;
int death_pipe[2];
};
struct client_arg {
struct mp_log *log;
struct mpv_handle *client;
const char *client_name;
int client_fd;
bool close_client_fd;
bool quit_on_close;
bool writable;
};
static int ipc_write_str(struct client_arg *client, const char *buf)
{
size_t count = strlen(buf);
while (count > 0) {
ssize_t rc = send(client->client_fd, buf, count, MSG_NOSIGNAL);
if (rc <= 0) {
if (rc == 0)
return -1;
if (errno == EBADF || errno == ENOTSOCK) {
client->writable = false;
return 0;
}
if (errno == EINTR || errno == EAGAIN)
continue;
return rc;
}
count -= rc;
buf += rc;
}
return 0;
}
static MP_THREAD_VOID client_thread(void *p)
{
// We don't use MSG_NOSIGNAL because the moldy fruit OS doesn't support it.
struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = SA_RESTART };
sigfillset(&sa.sa_mask);
sigaction(SIGPIPE, &sa, NULL);
int rc;
struct client_arg *arg = p;
bstr client_msg = { talloc_strdup(NULL, ""), 0 };
char *tname = talloc_asprintf(NULL, "ipc/%s", arg->client_name);
mp_thread_set_name(tname);
talloc_free(tname);
int pipe_fd = mpv_get_wakeup_pipe(arg->client);
if (pipe_fd < 0) {
MP_ERR(arg, "Could not get wakeup pipe\n");
goto done;
}
MP_VERBOSE(arg, "Client connected\n");
struct pollfd fds[2] = {
{.events = POLLIN, .fd = pipe_fd},
{.events = POLLIN, .fd = arg->client_fd},
};
fcntl(arg->client_fd, F_SETFL, fcntl(arg->client_fd, F_GETFL, 0) | O_NONBLOCK);
while (1) {
rc = poll(fds, 2, 0);
if (rc == 0)
rc = poll(fds, 2, -1);
if (rc < 0) {
MP_ERR(arg, "Poll error\n");
continue;
}
if (fds[0].revents & POLLIN) {
mp_flush_wakeup_pipe(pipe_fd);
while (1) {
mpv_event *event = mpv_wait_event(arg->client, 0);
if (event->event_id == MPV_EVENT_NONE)
break;
if (event->event_id == MPV_EVENT_SHUTDOWN)
goto done;
if (!arg->writable)
continue;
char *event_msg = mp_json_encode_event(event);
if (!event_msg) {
MP_ERR(arg, "Encoding error\n");
goto done;
}
rc = ipc_write_str(arg, event_msg);
talloc_free(event_msg);
if (rc < 0) {
MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno));
goto done;
}
}
}
if (fds[1].revents & (POLLIN | POLLHUP | POLLNVAL)) {
while (1) {
char buf[128];
bstr append = { buf, 0 };
ssize_t bytes = read(arg->client_fd, buf, sizeof(buf));
if (bytes < 0) {
if (errno == EAGAIN)
break;
MP_ERR(arg, "Read error (%s)\n", mp_strerror(errno));
goto done;
}
if (bytes == 0) {
MP_VERBOSE(arg, "Client disconnected\n");
goto done;
}
append.len = bytes;
bstr_xappend(NULL, &client_msg, append);
while (bstrchr(client_msg, '\n') != -1) {
char *reply_msg = mp_ipc_consume_next_command(arg->client,
NULL, &client_msg);
if (reply_msg && arg->writable) {
rc = ipc_write_str(arg, reply_msg);
if (rc < 0) {
MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno));
talloc_free(reply_msg);
goto done;
}
}
talloc_free(reply_msg);
}
}
}
}
done:
if (client_msg.len > 0)
MP_WARN(arg, "Ignoring unterminated command on disconnect.\n");
talloc_free(client_msg.start);
if (arg->close_client_fd)
close(arg->client_fd);
struct mpv_handle *h = arg->client;
bool quit = arg->quit_on_close;
talloc_free(arg);
if (quit) {
mpv_terminate_destroy(h);
} else {
mpv_destroy(h);
}
MP_THREAD_RETURN();
}
static bool ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client,
bool free_on_init_fail)
{
if (!client->client)
client->client = mp_new_client(ctx->client_api, client->client_name);
if (!client->client)
goto err;
client->log = mp_client_get_log(client->client);
mp_thread client_thr;
if (mp_thread_create(&client_thr, client_thread, client))
goto err;
mp_thread_detach(client_thr);
return true;
err:
if (free_on_init_fail) {
if (client->client)
mpv_destroy(client->client);
if (client->close_client_fd)
close(client->client_fd);
}
talloc_free(client);
return false;
}
static void ipc_start_client_json(struct mp_ipc_ctx *ctx, int id, int fd)
{
struct client_arg *client = talloc_ptrtype(NULL, client);
*client = (struct client_arg){
.client_name =
id >= 0 ? talloc_asprintf(client, "ipc-%d", id) : "ipc",
.client_fd = fd,
.close_client_fd = id >= 0,
.quit_on_close = id < 0,
.writable = true,
};
ipc_start_client(ctx, client, true);
}
bool mp_ipc_start_anon_client(struct mp_ipc_ctx *ctx, struct mpv_handle *h,
int out_fd[2])
{
int pair[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pair))
return false;
mp_set_cloexec(pair[0]);
mp_set_cloexec(pair[1]);
struct client_arg *client = talloc_ptrtype(NULL, client);
*client = (struct client_arg){
.client = h,
.client_name = mpv_client_name(h),
.client_fd = pair[1],
.close_client_fd = true,
.writable = true,
};
if (!ipc_start_client(ctx, client, false)) {
close(pair[0]);
close(pair[1]);
return false;
}
out_fd[0] = pair[0];
out_fd[1] = -1;
return true;
}
static MP_THREAD_VOID ipc_thread(void *p)
{
int rc;
int ipc_fd;
struct sockaddr_un ipc_un = {0};
struct mp_ipc_ctx *arg = p;
mp_thread_set_name("ipc/socket");
MP_VERBOSE(arg, "Starting IPC master\n");
ipc_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (ipc_fd < 0) {
MP_ERR(arg, "Could not create IPC socket\n");
goto done;
}
fchmod(ipc_fd, 0600);
size_t path_len = strlen(arg->path);
if (path_len >= sizeof(ipc_un.sun_path) - 1) {
MP_ERR(arg, "Could not create IPC socket\n");
goto done;
}
ipc_un.sun_family = AF_UNIX,
strncpy(ipc_un.sun_path, arg->path, sizeof(ipc_un.sun_path) - 1);
unlink(ipc_un.sun_path);
if (ipc_un.sun_path[0] == '@') {
ipc_un.sun_path[0] = '\0';
path_len--;
}
size_t addr_len = offsetof(struct sockaddr_un, sun_path) + 1 + path_len;
rc = bind(ipc_fd, (struct sockaddr *) &ipc_un, addr_len);
if (rc < 0) {
MP_ERR(arg, "Could not bind IPC socket\n");
goto done;
}
rc = listen(ipc_fd, 10);
if (rc < 0) {
MP_ERR(arg, "Could not listen on IPC socket\n");
goto done;
}
MP_VERBOSE(arg, "Listening to IPC socket.\n");
int client_num = 0;
struct pollfd fds[2] = {
{.events = POLLIN, .fd = arg->death_pipe[0]},
{.events = POLLIN, .fd = ipc_fd},
};
while (1) {
rc = poll(fds, 2, -1);
if (rc < 0) {
MP_ERR(arg, "Poll error\n");
continue;
}
if (fds[0].revents & POLLIN)
goto done;
if (fds[1].revents & POLLIN) {
int client_fd = accept(ipc_fd, NULL, NULL);
if (client_fd < 0) {
MP_ERR(arg, "Could not accept IPC client\n");
goto done;
}
ipc_start_client_json(arg, client_num++, client_fd);
}
}
done:
if (ipc_fd >= 0)
close(ipc_fd);
MP_THREAD_RETURN();
}
struct mp_ipc_ctx *mp_init_ipc(struct mp_client_api *client_api,
struct mpv_global *global)
{
struct MPOpts *opts = mp_get_config_group(NULL, global, &mp_opt_root);
struct mp_ipc_ctx *arg = talloc_ptrtype(NULL, arg);
*arg = (struct mp_ipc_ctx){
.log = mp_log_new(arg, global->log, "ipc"),
.client_api = client_api,
.path = mp_get_user_path(arg, global, opts->ipc_path),
.death_pipe = {-1, -1},
};
if (opts->ipc_client && opts->ipc_client[0]) {
int fd = -1;
if (strncmp(opts->ipc_client, "fd://", 5) == 0) {
char *end;
unsigned long l = strtoul(opts->ipc_client + 5, &end, 0);
if (!end[0] && l <= INT_MAX)
fd = l;
}
if (fd < 0) {
MP_ERR(arg, "Invalid IPC client argument: '%s'\n", opts->ipc_client);
} else {
ipc_start_client_json(arg, -1, fd);
}
}
talloc_free(opts);
if (!arg->path || !arg->path[0])
goto out;
if (mp_make_wakeup_pipe(arg->death_pipe) < 0)
goto out;
if (mp_thread_create(&arg->thread, ipc_thread, arg))
goto out;
return arg;
out:
if (arg->death_pipe[0] >= 0) {
close(arg->death_pipe[0]);
close(arg->death_pipe[1]);
}
talloc_free(arg);
return NULL;
}
void mp_uninit_ipc(struct mp_ipc_ctx *arg)
{
if (!arg)
return;
(void)write(arg->death_pipe[1], &(char){0}, 1);
mp_thread_join(arg->thread);
close(arg->death_pipe[0]);
close(arg->death_pipe[1]);
talloc_free(arg);
}