diff --git a/input/input.h b/input/input.h index 41432eb54d..a5710b6065 100644 --- a/input/input.h +++ b/input/input.h @@ -257,4 +257,13 @@ struct mp_ipc_ctx *mp_init_ipc(struct mp_client_api *client_api, struct mpv_global *global); void mp_uninit_ipc(struct mp_ipc_ctx *ctx); +// Serialize the given mpv_event structure to JSON. Returns an allocated string. +struct mpv_event; +char *mp_json_encode_event(struct mpv_event *event); + +// Given the raw IPC input buffer "buf", remove the first newline-separated +// command, execute it and return the result (if any) as an allocated string. +struct mpv_handle; +char *mp_ipc_consume_next_command(struct mpv_handle *client, void *ctx, bstr *buf); + #endif /* MPLAYER_INPUT_H */ diff --git a/input/ipc-unix.c b/input/ipc-unix.c new file mode 100644 index 0000000000..8b8a158c5f --- /dev/null +++ b/input/ipc-unix.c @@ -0,0 +1,422 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with mpv. If not, see . + */ + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "config.h" + +#include "osdep/io.h" +#include "osdep/threads.h" + +#include "common/common.h" +#include "common/global.h" +#include "common/msg.h" +#include "input/input.h" +#include "libmpv/client.h" +#include "options/options.h" +#include "options/path.h" +#include "player/client.h" + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +struct mp_ipc_ctx { + struct mp_log *log; + struct mp_client_api *client_api; + const char *path; + + pthread_t thread; + int death_pipe[2]; +}; + +struct client_arg { + struct mp_log *log; + struct mpv_handle *client; + + char *client_name; + int client_fd; + bool close_client_fd; + + bool writable; +}; + +static int ipc_write_str(struct client_arg *client, const char *buf) +{ + size_t count = strlen(buf); + while (count > 0) { + ssize_t rc = send(client->client_fd, buf, count, MSG_NOSIGNAL); + if (rc <= 0) { + if (rc == 0) + return -1; + + if (errno == EBADF) { + client->writable = false; + return 0; + } + + if (errno == EINTR) + continue; + + if (errno == EAGAIN) + return 0; + + return rc; + } + + count -= rc; + buf += rc; + } + + return 0; +} + +static void *client_thread(void *p) +{ + pthread_detach(pthread_self()); + + int rc; + + struct client_arg *arg = p; + bstr client_msg = { talloc_strdup(NULL, ""), 0 }; + + mpthread_set_name(arg->client_name); + + int pipe_fd = mpv_get_wakeup_pipe(arg->client); + if (pipe_fd < 0) { + MP_ERR(arg, "Could not get wakeup pipe\n"); + goto done; + } + + MP_VERBOSE(arg, "Client connected\n"); + + struct pollfd fds[2] = { + {.events = POLLIN, .fd = pipe_fd}, + {.events = POLLIN, .fd = arg->client_fd}, + }; + + fcntl(arg->client_fd, F_SETFL, fcntl(arg->client_fd, F_GETFL, 0) | O_NONBLOCK); + mpv_suspend(arg->client); + + while (1) { + rc = poll(fds, 2, 0); + if (rc == 0) { + mpv_resume(arg->client); + rc = poll(fds, 2, -1); + mpv_suspend(arg->client); + } + if (rc < 0) { + MP_ERR(arg, "Poll error\n"); + continue; + } + + if (fds[0].revents & POLLIN) { + char discard[100]; + read(pipe_fd, discard, sizeof(discard)); + + while (1) { + mpv_event *event = mpv_wait_event(arg->client, 0); + + if (event->event_id == MPV_EVENT_NONE) + break; + + if (event->event_id == MPV_EVENT_SHUTDOWN) + goto done; + + if (!arg->writable) + continue; + + char *event_msg = mp_json_encode_event(event); + if (!event_msg) { + MP_ERR(arg, "Encoding error\n"); + goto done; + } + + rc = ipc_write_str(arg, event_msg); + talloc_free(event_msg); + if (rc < 0) { + MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno)); + goto done; + } + } + } + + if (fds[1].revents & (POLLIN | POLLHUP)) { + while (1) { + char buf[128]; + bstr append = { buf, 0 }; + + ssize_t bytes = read(arg->client_fd, buf, sizeof(buf)); + if (bytes < 0) { + if (errno == EAGAIN) + break; + + MP_ERR(arg, "Read error (%s)\n", mp_strerror(errno)); + goto done; + } + + if (bytes == 0) { + MP_VERBOSE(arg, "Client disconnected\n"); + goto done; + } + + append.len = bytes; + + bstr_xappend(NULL, &client_msg, append); + + while (bstrchr(client_msg, '\n') != -1) { + char *reply_msg = mp_ipc_consume_next_command(arg->client, + NULL, &client_msg); + + if (reply_msg && arg->writable) { + rc = ipc_write_str(arg, reply_msg); + if (rc < 0) { + MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno)); + talloc_free(reply_msg); + goto done; + } + } + + talloc_free(reply_msg); + } + } + } + } + +done: + if (client_msg.len > 0) + MP_WARN(arg, "Ignoring unterminated command on disconnect.\n"); + talloc_free(client_msg.start); + if (arg->close_client_fd) + close(arg->client_fd); + mpv_detach_destroy(arg->client); + talloc_free(arg); + return NULL; +} + +static void ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client) +{ + client->client = mp_new_client(ctx->client_api, client->client_name), + client->log = mp_client_get_log(client->client); + + pthread_t client_thr; + if (pthread_create(&client_thr, NULL, client_thread, client)) { + mpv_detach_destroy(client->client); + if (client->close_client_fd) + close(client->client_fd); + talloc_free(client); + } +} + +static void ipc_start_client_json(struct mp_ipc_ctx *ctx, int id, int fd) +{ + struct client_arg *client = talloc_ptrtype(NULL, client); + *client = (struct client_arg){ + .client_name = talloc_asprintf(client, "ipc-%d", id), + .client_fd = fd, + .close_client_fd = true, + + .writable = true, + }; + + ipc_start_client(ctx, client); +} + +static void ipc_start_client_text(struct mp_ipc_ctx *ctx, const char *path) +{ + int mode = O_RDONLY; + int client_fd = -1; + bool close_client_fd = true; + bool writable = false; + + if (strcmp(path, "/dev/stdin") == 0) { // for symmetry with Linux + client_fd = STDIN_FILENO; + close_client_fd = false; + } else if (strncmp(path, "fd://", 5) == 0) { + char *end = NULL; + client_fd = strtol(path + 5, &end, 0); + if (!end || end == path + 5 || end[0]) { + MP_ERR(ctx, "Invalid FD: %s\n", path); + return; + } + close_client_fd = false; + writable = true; // maybe + } else { + // Use RDWR for FIFOs to ensure they stay open over multiple accesses. + struct stat st; + if (stat(path, &st) == 0 && S_ISFIFO(st.st_mode)) + mode = O_RDWR; + client_fd = open(path, mode); + } + if (client_fd < 0) { + MP_ERR(ctx, "Could not open '%s'\n", path); + return; + } + + struct client_arg *client = talloc_ptrtype(NULL, client); + *client = (struct client_arg){ + .client_name = "input-file", + .client_fd = client_fd, + .close_client_fd = close_client_fd, + .writable = writable, + }; + + ipc_start_client(ctx, client); +} + +static void *ipc_thread(void *p) +{ + int rc; + + int ipc_fd; + struct sockaddr_un ipc_un = {0}; + + struct mp_ipc_ctx *arg = p; + + mpthread_set_name("ipc socket listener"); + + MP_VERBOSE(arg, "Starting IPC master\n"); + + ipc_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ipc_fd < 0) { + MP_ERR(arg, "Could not create IPC socket\n"); + goto done; + } + +#if HAVE_FCHMOD + fchmod(ipc_fd, 0600); +#endif + + size_t path_len = strlen(arg->path); + if (path_len >= sizeof(ipc_un.sun_path) - 1) { + MP_ERR(arg, "Could not create IPC socket\n"); + goto done; + } + + ipc_un.sun_family = AF_UNIX, + strncpy(ipc_un.sun_path, arg->path, sizeof(ipc_un.sun_path)); + + unlink(ipc_un.sun_path); + + if (ipc_un.sun_path[0] == '@') { + ipc_un.sun_path[0] = '\0'; + path_len--; + } + + size_t addr_len = offsetof(struct sockaddr_un, sun_path) + 1 + path_len; + rc = bind(ipc_fd, (struct sockaddr *) &ipc_un, addr_len); + if (rc < 0) { + MP_ERR(arg, "Could not bind IPC socket\n"); + goto done; + } + + rc = listen(ipc_fd, 10); + if (rc < 0) { + MP_ERR(arg, "Could not listen on IPC socket\n"); + goto done; + } + + int client_num = 0; + + struct pollfd fds[2] = { + {.events = POLLIN, .fd = arg->death_pipe[0]}, + {.events = POLLIN, .fd = ipc_fd}, + }; + + while (1) { + rc = poll(fds, 2, -1); + if (rc < 0) { + MP_ERR(arg, "Poll error\n"); + continue; + } + + if (fds[0].revents & POLLIN) + goto done; + + if (fds[1].revents & POLLIN) { + int client_fd = accept(ipc_fd, NULL, NULL); + if (client_fd < 0) { + MP_ERR(arg, "Could not accept IPC client\n"); + goto done; + } + + ipc_start_client_json(arg, client_num++, client_fd); + } + } + +done: + if (ipc_fd >= 0) + close(ipc_fd); + + return NULL; +} + +struct mp_ipc_ctx *mp_init_ipc(struct mp_client_api *client_api, + struct mpv_global *global) +{ + struct MPOpts *opts = global->opts; + + struct mp_ipc_ctx *arg = talloc_ptrtype(NULL, arg); + *arg = (struct mp_ipc_ctx){ + .log = mp_log_new(arg, global->log, "ipc"), + .client_api = client_api, + .path = mp_get_user_path(arg, global, opts->ipc_path), + .death_pipe = {-1, -1}, + }; + char *input_file = mp_get_user_path(arg, global, opts->input_file); + + if (input_file && *input_file) + ipc_start_client_text(arg, input_file); + + if (!opts->ipc_path || !*opts->ipc_path) + goto out; + + if (mp_make_wakeup_pipe(arg->death_pipe) < 0) + goto out; + + if (pthread_create(&arg->thread, NULL, ipc_thread, arg)) + goto out; + + return arg; + +out: + close(arg->death_pipe[0]); + close(arg->death_pipe[1]); + talloc_free(arg); + return NULL; +} + +void mp_uninit_ipc(struct mp_ipc_ctx *arg) +{ + if (!arg) + return; + + write(arg->death_pipe[1], &(char){0}, 1); + pthread_join(arg->thread, NULL); + + close(arg->death_pipe[0]); + close(arg->death_pipe[1]); + talloc_free(arg); +} diff --git a/input/ipc-win.c b/input/ipc-win.c new file mode 100644 index 0000000000..3c5f3832aa --- /dev/null +++ b/input/ipc-win.c @@ -0,0 +1,408 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with mpv. If not, see . + */ + +#include + +#include "config.h" + +#include "osdep/io.h" +#include "osdep/threads.h" +#include "osdep/windows_utils.h" + +#include "common/common.h" +#include "common/global.h" +#include "common/msg.h" +#include "input/input.h" +#include "libmpv/client.h" +#include "options/options.h" +#include "player/client.h" + +struct mp_ipc_ctx { + struct mp_log *log; + struct mp_client_api *client_api; + const wchar_t *path; + + pthread_t thread; + HANDLE death_event; +}; + +struct client_arg { + struct mp_log *log; + struct mpv_handle *client; + + char *client_name; + HANDLE client_h; + bool writable; + OVERLAPPED write_ol; +}; + +static void wakeup_cb(void *d) +{ + HANDLE event = d; + SetEvent(event); +} + +// Wrapper for ReadFile that treats ERROR_IO_PENDING as success +static DWORD async_read(HANDLE file, void *buf, unsigned size, OVERLAPPED* ol) +{ + DWORD err = ReadFile(file, buf, size, NULL, ol) ? 0 : GetLastError(); + return err == ERROR_IO_PENDING ? 0 : err; +} + +// Wrapper for WriteFile that treats ERROR_IO_PENDING as success +static DWORD async_write(HANDLE file, const void *buf, unsigned size, OVERLAPPED* ol) +{ + DWORD err = WriteFile(file, buf, size, NULL, ol) ? 0 : GetLastError(); + return err == ERROR_IO_PENDING ? 0 : err; +} + +static bool pipe_error_is_fatal(DWORD error) +{ + switch (error) { + case 0: + case ERROR_HANDLE_EOF: + case ERROR_BROKEN_PIPE: + case ERROR_PIPE_NOT_CONNECTED: + case ERROR_NO_DATA: + return false; + } + return true; +} + +static DWORD ipc_write_str(struct client_arg *arg, const char *buf) +{ + DWORD error = 0; + + if ((error = async_write(arg->client_h, buf, strlen(buf), &arg->write_ol))) + goto done; + if (!GetOverlappedResult(arg->client_h, &arg->write_ol, &(DWORD){0}, TRUE)) { + error = GetLastError(); + goto done; + } + +done: + if (pipe_error_is_fatal(error)) { + MP_VERBOSE(arg, "Error writing to pipe: %s\n", + mp_HRESULT_to_str(HRESULT_FROM_WIN32(error))); + } + + if (error) + arg->writable = false; + return error; +} + +static void report_read_error(struct client_arg *arg, DWORD error) +{ + // Only report the error if it's not just due to the pipe closing + if (pipe_error_is_fatal(error)) { + MP_ERR(arg, "Error reading from pipe: %s\n", + mp_HRESULT_to_str(HRESULT_FROM_WIN32(error))); + } else { + MP_VERBOSE(arg, "Client disconnected\n"); + } +} + +static void *client_thread(void *p) +{ + pthread_detach(pthread_self()); + + struct client_arg *arg = p; + char buf[4096]; + HANDLE wakeup_event = CreateEventW(NULL, TRUE, FALSE, NULL); + OVERLAPPED ol = { .hEvent = CreateEventW(NULL, TRUE, TRUE, NULL) }; + bstr client_msg = { talloc_strdup(NULL, ""), 0 }; + DWORD ioerr = 0; + DWORD r; + + mpthread_set_name(arg->client_name); + + arg->write_ol.hEvent = CreateEventW(NULL, TRUE, TRUE, NULL); + if (!wakeup_event || !ol.hEvent || !arg->write_ol.hEvent) { + MP_ERR(arg, "Couldn't create events\n"); + goto done; + } + + MP_VERBOSE(arg, "Client connected\n"); + + mpv_set_wakeup_callback(arg->client, wakeup_cb, wakeup_event); + mpv_suspend(arg->client); + + // Do the first read operation on the pipe + if ((ioerr = async_read(arg->client_h, buf, 4096, &ol))) { + report_read_error(arg, ioerr); + goto done; + } + + while (1) { + HANDLE handles[] = { wakeup_event, ol.hEvent }; + int n = WaitForMultipleObjects(2, handles, FALSE, 0); + if (n == WAIT_TIMEOUT) { + mpv_resume(arg->client); + n = WaitForMultipleObjects(2, handles, FALSE, INFINITE); + mpv_suspend(arg->client); + } + + switch (n) { + case WAIT_OBJECT_0: // wakeup_event + ResetEvent(wakeup_event); + + while (1) { + mpv_event *event = mpv_wait_event(arg->client, 0); + + if (event->event_id == MPV_EVENT_NONE) + break; + + if (event->event_id == MPV_EVENT_SHUTDOWN) + goto done; + + if (!arg->writable) + continue; + + char *event_msg = mp_json_encode_event(event); + if (!event_msg) { + MP_ERR(arg, "Encoding error\n"); + goto done; + } + + ipc_write_str(arg, event_msg); + talloc_free(event_msg); + } + + break; + case WAIT_OBJECT_0 + 1: // ol.hEvent + // Complete the read operation on the pipe + if (!GetOverlappedResult(arg->client_h, &ol, &r, TRUE)) { + report_read_error(arg, GetLastError()); + goto done; + } + + bstr_xappend(NULL, &client_msg, (bstr){buf, r}); + while (bstrchr(client_msg, '\n') != -1) { + char *reply_msg = mp_ipc_consume_next_command(arg->client, + NULL, &client_msg); + if (reply_msg && arg->writable) + ipc_write_str(arg, reply_msg); + talloc_free(reply_msg); + } + + // Begin the next read operation on the pipe + if ((ioerr = async_read(arg->client_h, buf, 4096, &ol))) { + report_read_error(arg, ioerr); + goto done; + } + break; + default: + MP_ERR(arg, "WaitForMultipleObjects failed\n"); + goto done; + } + } + +done: + if (client_msg.len > 0) + MP_WARN(arg, "Ignoring unterminated command on disconnect.\n"); + + if (CancelIoEx(arg->client_h, &ol) || GetLastError() != ERROR_NOT_FOUND) + GetOverlappedResult(arg->client_h, &ol, &(DWORD){0}, TRUE); + if (wakeup_event) + CloseHandle(wakeup_event); + if (ol.hEvent) + CloseHandle(ol.hEvent); + if (arg->write_ol.hEvent) + CloseHandle(arg->write_ol.hEvent); + + CloseHandle(arg->client_h); + mpv_detach_destroy(arg->client); + talloc_free(arg); + return NULL; +} + +static void ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client) +{ + client->client = mp_new_client(ctx->client_api, client->client_name), + client->log = mp_client_get_log(client->client); + + pthread_t client_thr; + if (pthread_create(&client_thr, NULL, client_thread, client)) { + mpv_detach_destroy(client->client); + CloseHandle(client->client_h); + talloc_free(client); + } +} + +static void ipc_start_client_json(struct mp_ipc_ctx *ctx, int id, HANDLE h) +{ + struct client_arg *client = talloc_ptrtype(NULL, client); + *client = (struct client_arg){ + .client_name = talloc_asprintf(client, "ipc-%d", id), + .client_h = h, + .writable = true, + }; + + ipc_start_client(ctx, client); +} + +static void *ipc_thread(void *p) +{ + // Use PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE so message framing is + // maintained for message-mode clients, but byte-mode clients can still + // connect, send and receive data. This is the most compatible mode. + static const DWORD state = + PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT | + PIPE_REJECT_REMOTE_CLIENTS; + static const DWORD mode = + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED; + static const DWORD bufsiz = 4096; + + struct mp_ipc_ctx *arg = p; + HANDLE server = INVALID_HANDLE_VALUE; + HANDLE client = INVALID_HANDLE_VALUE; + int client_num = 0; + + mpthread_set_name("ipc named pipe listener"); + MP_VERBOSE(arg, "Starting IPC master\n"); + + OVERLAPPED ol = { .hEvent = CreateEventW(NULL, TRUE, TRUE, NULL) }; + if (!ol.hEvent) { + MP_ERR(arg, "Couldn't create event"); + goto done; + } + + server = CreateNamedPipeW(arg->path, mode | FILE_FLAG_FIRST_PIPE_INSTANCE, + state, PIPE_UNLIMITED_INSTANCES, bufsiz, bufsiz, 0, NULL); + if (server == INVALID_HANDLE_VALUE) { + MP_ERR(arg, "Couldn't create first pipe instance: %s\n", + mp_LastError_to_str()); + goto done; + } + + while (1) { + DWORD err = ConnectNamedPipe(server, &ol) ? 0 : GetLastError(); + + if (err == ERROR_IO_PENDING) { + int n = WaitForMultipleObjects(2, (HANDLE[]) { + arg->death_event, + ol.hEvent, + }, FALSE, INFINITE) - WAIT_OBJECT_0; + + switch (n) { + case 0: + // Stop waiting for new clients + CancelIo(server); + GetOverlappedResult(server, &ol, &(DWORD){0}, TRUE); + goto done; + case 1: + // Complete the ConnectNamedPipe request + err = GetOverlappedResult(server, &ol, &(DWORD){0}, TRUE) + ? 0 : GetLastError(); + break; + default: + MP_ERR(arg, "WaitForMultipleObjects failed\n"); + goto done; + } + } + + // ERROR_PIPE_CONNECTED is returned if a client connects before + // ConnectNamedPipe is called. ERROR_NO_DATA is returned if a client + // connects, (possibly) writes data and exits before ConnectNamedPipe + // is called. Both cases should be handled as normal connections. + if (err == ERROR_PIPE_CONNECTED || err == ERROR_NO_DATA) + err = 0; + + if (err) { + MP_ERR(arg, "ConnectNamedPipe failed: %s\n", + mp_HRESULT_to_str(HRESULT_FROM_WIN32(err))); + goto done; + } + + // Create the next pipe instance before the client thread to avoid the + // theoretical race condition where the client thread immediately + // closes the handle and there are no active instances of the pipe + client = server; + server = CreateNamedPipeW(arg->path, mode, state, + PIPE_UNLIMITED_INSTANCES, bufsiz, bufsiz, 0, NULL); + if (server == INVALID_HANDLE_VALUE) { + MP_ERR(arg, "Couldn't create additional pipe instance: %s\n", + mp_LastError_to_str()); + goto done; + } + + ipc_start_client_json(arg, client_num++, client); + client = NULL; + } + +done: + if (client != INVALID_HANDLE_VALUE) + CloseHandle(client); + if (server != INVALID_HANDLE_VALUE) + CloseHandle(server); + if (ol.hEvent) + CloseHandle(ol.hEvent); + return NULL; +} + +struct mp_ipc_ctx *mp_init_ipc(struct mp_client_api *client_api, + struct mpv_global *global) +{ + struct MPOpts *opts = global->opts; + + struct mp_ipc_ctx *arg = talloc_ptrtype(NULL, arg); + *arg = (struct mp_ipc_ctx){ + .log = mp_log_new(arg, global->log, "ipc"), + .client_api = client_api, + }; + + if (!opts->ipc_path || !*opts->ipc_path) + goto out; + + // Ensure the path is a legal Win32 pipe name by prepending \\.\pipe\ if + // it's not already present. Qt's QLocalSocket uses the same logic, so + // cross-platform programs that use paths like /tmp/mpv-socket should just + // work. (Win32 converts this path to \Device\NamedPipe\tmp\mpv-socket) + if (!strncmp(opts->ipc_path, "\\\\.\\pipe\\", 9)) { + arg->path = mp_from_utf8(arg, opts->ipc_path); + } else { + char *path = talloc_asprintf(NULL, "\\\\.\\pipe\\%s", opts->ipc_path); + arg->path = mp_from_utf8(arg, path); + talloc_free(path); + } + + if (!(arg->death_event = CreateEventW(NULL, TRUE, FALSE, NULL))) + goto out; + + if (pthread_create(&arg->thread, NULL, ipc_thread, arg)) + goto out; + + return arg; + +out: + if (arg->death_event) + CloseHandle(arg->death_event); + talloc_free(arg); + return NULL; +} + +void mp_uninit_ipc(struct mp_ipc_ctx *arg) +{ + if (!arg) + return; + + SetEvent(arg->death_event); + pthread_join(arg->thread, NULL); + + CloseHandle(arg->death_event); + talloc_free(arg); +} diff --git a/input/ipc.c b/input/ipc.c index c628fc9ea4..c7563e30d8 100644 --- a/input/ipc.c +++ b/input/ipc.c @@ -15,58 +15,16 @@ * License along with mpv. If not, see . */ -#include -#include -#include -#include - -#include -#include -#include -#include -#include - #include "config.h" -#include "osdep/io.h" -#include "osdep/threads.h" - -#include "common/common.h" -#include "common/global.h" #include "common/msg.h" #include "input/input.h" -#include "libmpv/client.h" -#include "misc/bstr.h" #include "misc/json.h" #include "options/m_option.h" #include "options/options.h" #include "options/path.h" #include "player/client.h" -#ifndef MSG_NOSIGNAL -#define MSG_NOSIGNAL 0 -#endif - -struct mp_ipc_ctx { - struct mp_log *log; - struct mp_client_api *client_api; - const char *path; - - pthread_t thread; - int death_pipe[2]; -}; - -struct client_arg { - struct mp_log *log; - struct mpv_handle *client; - - char *client_name; - int client_fd; - bool close_client_fd; - - bool writable; -}; - static mpv_node *mpv_node_map_get(mpv_node *src, const char *key) { if (src->format != MPV_FORMAT_NODE_MAP) @@ -219,7 +177,7 @@ static void mpv_event_to_node(void *ta_parent, mpv_event *event, mpv_node *dst) } } -static char *json_encode_event(mpv_event *event) +char *mp_json_encode_event(mpv_event *event) { void *ta_parent = talloc_new(NULL); mpv_node event_node = {.format = MPV_FORMAT_NODE_MAP, .u.list = NULL}; @@ -236,11 +194,12 @@ static char *json_encode_event(mpv_event *event) } // Function is allowed to modify src[n]. -static char *json_execute_command(struct client_arg *arg, void *ta_parent, +static char *json_execute_command(struct mpv_handle *client, void *ta_parent, char *src) { int rc; const char *cmd = NULL; + struct mp_log *log = mp_client_get_log(client); mpv_node msg_node; mpv_node reply_node = {.format = MPV_FORMAT_NODE_MAP, .u.list = NULL}; @@ -248,7 +207,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, rc = json_parse(ta_parent, &msg_node, &src, 3); if (rc < 0) { - MP_ERR(arg, "malformed JSON received\n"); + mp_err(log, "malformed JSON received\n"); rc = MPV_ERROR_INVALID_PARAMETER; goto error; } @@ -278,11 +237,11 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, cmd = cmd_str_node->u.string; if (!strcmp("client_name", cmd)) { - const char *client_name = mpv_client_name(arg->client); + const char *client_name = mpv_client_name(client); mpv_node_map_add_string(ta_parent, &reply_node, "data", client_name); rc = MPV_ERROR_SUCCESS; } else if (!strcmp("get_time_us", cmd)) { - int64_t time_us = mpv_get_time_us(arg->client); + int64_t time_us = mpv_get_time_us(client); mpv_node_map_add_int64(ta_parent, &reply_node, "data", time_us); rc = MPV_ERROR_SUCCESS; } else if (!strcmp("get_version", cmd)) { @@ -302,7 +261,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_get_property(arg->client, cmd_node->u.list->values[1].u.string, + rc = mpv_get_property(client, cmd_node->u.list->values[1].u.string, MPV_FORMAT_NODE, &result_node); if (rc >= 0) { mpv_node_map_add(ta_parent, &reply_node, "data", &result_node); @@ -319,7 +278,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - char *result = mpv_get_property_string(arg->client, + char *result = mpv_get_property_string(client, cmd_node->u.list->values[1].u.string); if (!result) { mpv_node_map_add_null(ta_parent, &reply_node, "data"); @@ -338,7 +297,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_set_property(arg->client, cmd_node->u.list->values[1].u.string, + rc = mpv_set_property(client, cmd_node->u.list->values[1].u.string, MPV_FORMAT_NODE, &cmd_node->u.list->values[2]); } else if (!strcmp("set_property_string", cmd)) { if (cmd_node->u.list->num != 3) { @@ -356,7 +315,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_set_property_string(arg->client, + rc = mpv_set_property_string(client, cmd_node->u.list->values[1].u.string, cmd_node->u.list->values[2].u.string); } else if (!strcmp("observe_property", cmd)) { @@ -375,7 +334,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_observe_property(arg->client, + rc = mpv_observe_property(client, cmd_node->u.list->values[1].u.int64, cmd_node->u.list->values[2].u.string, MPV_FORMAT_NODE); @@ -395,7 +354,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_observe_property(arg->client, + rc = mpv_observe_property(client, cmd_node->u.list->values[1].u.int64, cmd_node->u.list->values[2].u.string, MPV_FORMAT_STRING); @@ -410,8 +369,8 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_unobserve_property(arg->client, - cmd_node->u.list->values[1].u.int64); + rc = mpv_unobserve_property(client, + cmd_node->u.list->values[1].u.int64); } else if (!strcmp("request_log_messages", cmd)) { if (cmd_node->u.list->num != 2) { rc = MPV_ERROR_INVALID_PARAMETER; @@ -423,13 +382,13 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, goto error; } - rc = mpv_request_log_messages(arg->client, + rc = mpv_request_log_messages(client, cmd_node->u.list->values[1].u.string); } else if (!strcmp("suspend", cmd)) { - mpv_suspend(arg->client); + mpv_suspend(client); rc = MPV_ERROR_SUCCESS; } else if (!strcmp("resume", cmd)) { - mpv_resume(arg->client); + mpv_resume(client); rc = MPV_ERROR_SUCCESS; } else if (!strcmp("enable_event", cmd) || !strcmp("disable_event", cmd)) @@ -449,7 +408,7 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, char *name = cmd_node->u.list->values[1].u.string; if (strcmp(name, "all") == 0) { for (int n = 0; n < 64; n++) - mpv_request_event(arg->client, n, enable); + mpv_request_event(client, n, enable); rc = MPV_ERROR_SUCCESS; } else { int event = -1; @@ -462,12 +421,12 @@ static char *json_execute_command(struct client_arg *arg, void *ta_parent, rc = MPV_ERROR_INVALID_PARAMETER; goto error; } - rc = mpv_request_event(arg->client, event, enable); + rc = mpv_request_event(client, event, enable); } } else { mpv_node result_node; - rc = mpv_command_node(arg->client, cmd_node, &result_node); + rc = mpv_command_node(client, cmd_node, &result_node); if (rc >= 0) mpv_node_map_add(ta_parent, &reply_node, "data", &result_node); } @@ -490,382 +449,35 @@ error: return output; } -static char *text_execute_command(struct client_arg *arg, void *tmp, char *src) +static char *text_execute_command(struct mpv_handle *client, void *tmp, char *src) { - mpv_command_string(arg->client, src); + mpv_command_string(client, src); return NULL; } -static int ipc_write_str(struct client_arg *client, const char *buf) +char *mp_ipc_consume_next_command(struct mpv_handle *client, void *ctx, bstr *buf) { - size_t count = strlen(buf); - while (count > 0) { - ssize_t rc = send(client->client_fd, buf, count, MSG_NOSIGNAL); - if (rc <= 0) { - if (rc == 0) - return -1; + void *tmp = talloc_new(NULL); - if (errno == EBADF) { - client->writable = false; - return 0; - } + bstr rest; + bstr line = bstr_getline(*buf, &rest); + char *line0 = bstrto0(tmp, line); + talloc_steal(tmp, buf->start); + *buf = bstrdup(NULL, rest); - if (errno == EINTR) - continue; + json_skip_whitespace(&line0); - if (errno == EAGAIN) - return 0; - - return rc; - } - - count -= rc; - buf += rc; - } - - return 0; -} - -static void *client_thread(void *p) -{ - pthread_detach(pthread_self()); - - int rc; - - struct client_arg *arg = p; - bstr client_msg = { talloc_strdup(NULL, ""), 0 }; - - mpthread_set_name(arg->client_name); - - int pipe_fd = mpv_get_wakeup_pipe(arg->client); - if (pipe_fd < 0) { - MP_ERR(arg, "Could not get wakeup pipe\n"); - goto done; - } - - MP_VERBOSE(arg, "Client connected\n"); - - struct pollfd fds[2] = { - {.events = POLLIN, .fd = pipe_fd}, - {.events = POLLIN, .fd = arg->client_fd}, - }; - - fcntl(arg->client_fd, F_SETFL, fcntl(arg->client_fd, F_GETFL, 0) | O_NONBLOCK); - mpv_suspend(arg->client); - - while (1) { - rc = poll(fds, 2, 0); - if (rc == 0) { - mpv_resume(arg->client); - rc = poll(fds, 2, -1); - mpv_suspend(arg->client); - } - if (rc < 0) { - MP_ERR(arg, "Poll error\n"); - continue; - } - - if (fds[0].revents & POLLIN) { - char discard[100]; - read(pipe_fd, discard, sizeof(discard)); - - while (1) { - mpv_event *event = mpv_wait_event(arg->client, 0); - - if (event->event_id == MPV_EVENT_NONE) - break; - - if (event->event_id == MPV_EVENT_SHUTDOWN) - goto done; - - if (!arg->writable) - continue; - - char *event_msg = json_encode_event(event); - if (!event_msg) { - MP_ERR(arg, "Encoding error\n"); - goto done; - } - - rc = ipc_write_str(arg, event_msg); - talloc_free(event_msg); - if (rc < 0) { - MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno)); - goto done; - } - } - } - - if (fds[1].revents & (POLLIN | POLLHUP)) { - while (1) { - char buf[128]; - bstr append = { buf, 0 }; - - ssize_t bytes = read(arg->client_fd, buf, sizeof(buf)); - if (bytes < 0) { - if (errno == EAGAIN) - break; - - MP_ERR(arg, "Read error (%s)\n", mp_strerror(errno)); - goto done; - } - - if (bytes == 0) { - MP_VERBOSE(arg, "Client disconnected\n"); - goto done; - } - - append.len = bytes; - - bstr_xappend(NULL, &client_msg, append); - - while (bstrchr(client_msg, '\n') != -1) { - void *tmp = talloc_new(NULL); - bstr rest; - bstr line = bstr_getline(client_msg, &rest); - char *line0 = bstrto0(tmp, line); - talloc_steal(tmp, client_msg.start); - client_msg = bstrdup(NULL, rest); - - json_skip_whitespace(&line0); - - char *reply_msg = NULL; - if (line0[0] == '\0' || line0[0] == '#') { - // skip - } else if (line0[0] == '{') { - reply_msg = json_execute_command(arg, tmp, line0); - } else { - reply_msg = text_execute_command(arg, tmp, line0); - } - - if (reply_msg && arg->writable) { - rc = ipc_write_str(arg, reply_msg); - if (rc < 0) { - MP_ERR(arg, "Write error (%s)\n", mp_strerror(errno)); - talloc_free(tmp); - goto done; - } - } - - talloc_free(tmp); - } - } - } - } - -done: - if (client_msg.len > 0) - MP_WARN(arg, "Ignoring unterminated command on disconnect.\n"); - talloc_free(client_msg.start); - if (arg->close_client_fd) - close(arg->client_fd); - mpv_detach_destroy(arg->client); - talloc_free(arg); - return NULL; -} - -static void ipc_start_client(struct mp_ipc_ctx *ctx, struct client_arg *client) -{ - client->client = mp_new_client(ctx->client_api, client->client_name), - client->log = mp_client_get_log(client->client); - - pthread_t client_thr; - if (pthread_create(&client_thr, NULL, client_thread, client)) { - mpv_detach_destroy(client->client); - if (client->close_client_fd) - close(client->client_fd); - talloc_free(client); - } -} - -static void ipc_start_client_json(struct mp_ipc_ctx *ctx, int id, int fd) -{ - struct client_arg *client = talloc_ptrtype(NULL, client); - *client = (struct client_arg){ - .client_name = talloc_asprintf(client, "ipc-%d", id), - .client_fd = fd, - .close_client_fd = true, - - .writable = true, - }; - - ipc_start_client(ctx, client); -} - -static void ipc_start_client_text(struct mp_ipc_ctx *ctx, const char *path) -{ - int mode = O_RDONLY; - int client_fd = -1; - bool close_client_fd = true; - bool writable = false; - - if (strcmp(path, "/dev/stdin") == 0) { // for symmetry with Linux - client_fd = STDIN_FILENO; - close_client_fd = false; - } else if (strncmp(path, "fd://", 5) == 0) { - char *end = NULL; - client_fd = strtol(path + 5, &end, 0); - if (!end || end == path + 5 || end[0]) { - MP_ERR(ctx, "Invalid FD: %s\n", path); - return; - } - close_client_fd = false; - writable = true; // maybe + char *reply_msg = NULL; + if (line0[0] == '\0' || line0[0] == '#') { + // skip + } else if (line0[0] == '{') { + reply_msg = json_execute_command(client, tmp, line0); } else { - // Use RDWR for FIFOs to ensure they stay open over multiple accesses. - struct stat st; - if (stat(path, &st) == 0 && S_ISFIFO(st.st_mode)) - mode = O_RDWR; - client_fd = open(path, mode); - } - if (client_fd < 0) { - MP_ERR(ctx, "Could not open '%s'\n", path); - return; + reply_msg = text_execute_command(client, tmp, line0); } - struct client_arg *client = talloc_ptrtype(NULL, client); - *client = (struct client_arg){ - .client_name = "input-file", - .client_fd = client_fd, - .close_client_fd = close_client_fd, - .writable = writable, - }; - - ipc_start_client(ctx, client); -} - -static void *ipc_thread(void *p) -{ - int rc; - - int ipc_fd; - struct sockaddr_un ipc_un = {0}; - - struct mp_ipc_ctx *arg = p; - - mpthread_set_name("ipc socket listener"); - - MP_VERBOSE(arg, "Starting IPC master\n"); - - ipc_fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (ipc_fd < 0) { - MP_ERR(arg, "Could not create IPC socket\n"); - goto done; - } - -#if HAVE_FCHMOD - fchmod(ipc_fd, 0600); -#endif - - size_t path_len = strlen(arg->path); - if (path_len >= sizeof(ipc_un.sun_path) - 1) { - MP_ERR(arg, "Could not create IPC socket\n"); - goto done; - } - - ipc_un.sun_family = AF_UNIX, - strncpy(ipc_un.sun_path, arg->path, sizeof(ipc_un.sun_path)); - - unlink(ipc_un.sun_path); - - if (ipc_un.sun_path[0] == '@') { - ipc_un.sun_path[0] = '\0'; - path_len--; - } - - size_t addr_len = offsetof(struct sockaddr_un, sun_path) + 1 + path_len; - rc = bind(ipc_fd, (struct sockaddr *) &ipc_un, addr_len); - if (rc < 0) { - MP_ERR(arg, "Could not bind IPC socket\n"); - goto done; - } - - rc = listen(ipc_fd, 10); - if (rc < 0) { - MP_ERR(arg, "Could not listen on IPC socket\n"); - goto done; - } - - int client_num = 0; - - struct pollfd fds[2] = { - {.events = POLLIN, .fd = arg->death_pipe[0]}, - {.events = POLLIN, .fd = ipc_fd}, - }; - - while (1) { - rc = poll(fds, 2, -1); - if (rc < 0) { - MP_ERR(arg, "Poll error\n"); - continue; - } - - if (fds[0].revents & POLLIN) - goto done; - - if (fds[1].revents & POLLIN) { - int client_fd = accept(ipc_fd, NULL, NULL); - if (client_fd < 0) { - MP_ERR(arg, "Could not accept IPC client\n"); - goto done; - } - - ipc_start_client_json(arg, client_num++, client_fd); - } - } - -done: - if (ipc_fd >= 0) - close(ipc_fd); - - return NULL; -} - -struct mp_ipc_ctx *mp_init_ipc(struct mp_client_api *client_api, - struct mpv_global *global) -{ - struct MPOpts *opts = global->opts; - - struct mp_ipc_ctx *arg = talloc_ptrtype(NULL, arg); - *arg = (struct mp_ipc_ctx){ - .log = mp_log_new(arg, global->log, "ipc"), - .client_api = client_api, - .path = mp_get_user_path(arg, global, opts->ipc_path), - .death_pipe = {-1, -1}, - }; - char *input_file = mp_get_user_path(arg, global, opts->input_file); - - if (input_file && *input_file) - ipc_start_client_text(arg, input_file); - - if (!opts->ipc_path || !*opts->ipc_path) - goto out; - - if (mp_make_wakeup_pipe(arg->death_pipe) < 0) - goto out; - - if (pthread_create(&arg->thread, NULL, ipc_thread, arg)) - goto out; - - return arg; - -out: - close(arg->death_pipe[0]); - close(arg->death_pipe[1]); - talloc_free(arg); - return NULL; -} - -void mp_uninit_ipc(struct mp_ipc_ctx *arg) -{ - if (!arg) - return; - - write(arg->death_pipe[1], &(char){0}, 1); - pthread_join(arg->thread, NULL); - - close(arg->death_pipe[0]); - close(arg->death_pipe[1]); - talloc_free(arg); + talloc_steal(ctx, reply_msg); + talloc_free(tmp); + return reply_msg; } diff --git a/player/main.c b/player/main.c index 70176f2d35..21c273338c 100644 --- a/player/main.c +++ b/player/main.c @@ -472,9 +472,7 @@ int mp_initialize(struct MPContext *mpctx, char **options) if (opts->force_vo == 2 && handle_force_window(mpctx, false) < 0) return -1; -#if !defined(__MINGW32__) mpctx->ipc_ctx = mp_init_ipc(mpctx->clients, mpctx->global); -#endif #ifdef _WIN32 if (opts->w32_priority > 0) diff --git a/wscript_build.py b/wscript_build.py index 1bf67b0b59..fb98affe89 100644 --- a/wscript_build.py +++ b/wscript_build.py @@ -190,7 +190,9 @@ def build(ctx): ( "input/cmd_parse.c" ), ( "input/event.c" ), ( "input/input.c" ), - ( "input/ipc.c", "!mingw" ), + ( "input/ipc.c" ), + ( "input/ipc-unix.c", "!mingw" ), + ( "input/ipc-win.c", "mingw" ), ( "input/keycodes.c" ), ( "input/pipe-win32.c", "mingw" ),