1
0
mirror of https://github.com/mpv-player/mpv synced 2025-03-11 08:37:59 +00:00

lua: subprocess: use overlapped I/O on Windows

Instead of threads, use overlapped (asynchronous) I/O to read from both
stdout and stderr. Like in d0643fa, stdout and stderr could be closed at
different times, so a sparse_wait function is added to wrap
WaitForMultipleObjects and skip NULL handles.
This commit is contained in:
James Ross-Gowan 2014-11-17 16:33:39 +11:00 committed by wm4
parent c862955bac
commit 67132750a6

View File

@ -1173,12 +1173,7 @@ typedef void (*read_cb)(void *ctx, char *data, size_t size);
#ifdef __MINGW32__
#include <windows.h>
#include "osdep/io.h"
struct subprocess_ctx {
HANDLE stderr_read;
void *cb_ctx;
read_cb on_stderr;
};
#include "osdep/atomics.h"
static void write_arg(bstr *cmdline, char *arg)
{
@ -1248,38 +1243,99 @@ static wchar_t *write_cmdline(void *ctx, char **argv)
return wcmdline;
}
static void *stderr_routine(void *arg)
static int create_overlapped_pipe(HANDLE *read, HANDLE *write)
{
struct subprocess_ctx *ctx = arg;
static atomic_ulong counter = ATOMIC_VAR_INIT(0);
// Read from stderr until it's closed on process exit
char buf[4096];
DWORD r;
while (ReadFile(ctx->stderr_read, buf, 4096, &r, NULL))
ctx->on_stderr(ctx->cb_ctx, buf, r);
// Generate pipe name
unsigned long id = atomic_fetch_add(&counter, 1);
unsigned pid = GetCurrentProcessId();
wchar_t buf[36];
swprintf(buf, sizeof(buf), L"\\\\?\\pipe\\mpv-anon-%08x-%08lx", pid, id);
return NULL;
// The function for creating anonymous pipes (CreatePipe) can't create
// overlapped pipes, so instead, use a named pipe with a unique name
*read = CreateNamedPipeW(buf, PIPE_ACCESS_INBOUND |
FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS,
1, 0, 4096, 0, NULL);
if (!*read)
goto error;
// Open the write end of the pipe as a synchronous handle
*write = CreateFileW(buf, GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL, NULL);
if (*write == INVALID_HANDLE_VALUE)
goto error;
return 0;
error:
*read = *write = INVALID_HANDLE_VALUE;
return -1;
}
// Helper method similar to sparse_poll, skips NULL handles
static int sparse_wait(HANDLE *handles, unsigned num_handles)
{
unsigned w_num_handles = 0;
HANDLE w_handles[num_handles];
int map[num_handles];
for (unsigned i = 0; i < num_handles; i++) {
if (!handles[i])
continue;
w_handles[w_num_handles] = handles[i];
map[w_num_handles] = i;
w_num_handles++;
}
if (w_num_handles == 0)
return -1;
DWORD i = WaitForMultipleObjects(w_num_handles, w_handles, FALSE, INFINITE);
i -= WAIT_OBJECT_0;
if (i >= w_num_handles)
return -1;
return map[i];
}
// Wrapper for ReadFile that treats ERROR_IO_PENDING as success
static int async_read(HANDLE file, void *buf, unsigned size, OVERLAPPED* ol)
{
if (!ReadFile(file, buf, size, NULL, ol))
return (GetLastError() == ERROR_IO_PENDING) ? 0 : -1;
return 0;
}
static int subprocess(char **args, struct mp_cancel *cancel, void *ctx,
read_cb on_stdout, read_cb on_stderr, char **error)
{
wchar_t *tmp = talloc_new(NULL);
HANDLE stdout_read = NULL, stdout_write = NULL;
HANDLE stderr_read = NULL, stderr_write = NULL;
int status = -1;
struct {
HANDLE read;
HANDLE write;
OVERLAPPED ol;
char buf[4096];
read_cb read_cb;
} pipes[2] = {
{ .read_cb = on_stdout },
{ .read_cb = on_stderr },
};
// If the function exits before CreateProcess, there was an init error
*error = "init";
if (!CreatePipe(&stdout_read, &stdout_write, NULL, 0))
goto done;
if (!CreatePipe(&stderr_read, &stderr_write, NULL, 0))
goto done;
if (!SetHandleInformation(stdout_write, HANDLE_FLAG_INHERIT, 1))
goto done;
if (!SetHandleInformation(stderr_write, HANDLE_FLAG_INHERIT, 1))
goto done;
for (int i = 0; i < 2; i++) {
pipes[i].ol.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
if (!pipes[i].ol.hEvent)
goto done;
if (create_overlapped_pipe(&pipes[i].read, &pipes[i].write))
goto done;
if (!SetHandleInformation(pipes[i].write, HANDLE_FLAG_INHERIT, 1))
goto done;
}
// Convert the args array to a UTF-16 Windows command-line string
wchar_t *cmdline = write_cmdline(tmp, args);
@ -1289,8 +1345,8 @@ static int subprocess(char **args, struct mp_cancel *cancel, void *ctx,
.cb = sizeof(si),
.dwFlags = STARTF_USESTDHANDLES,
.hStdInput = NULL,
.hStdOutput = stdout_write,
.hStdError = stderr_write,
.hStdOutput = pipes[0].write,
.hStdError = pipes[1].write,
};
if (!CreateProcessW(NULL, cmdline, NULL, NULL, TRUE,
@ -1303,42 +1359,63 @@ static int subprocess(char **args, struct mp_cancel *cancel, void *ctx,
// Init is finished
*error = NULL;
// Close our copy of the write end of the pipes
CloseHandle(stdout_write);
stdout_write = NULL;
CloseHandle(stderr_write);
stderr_write = NULL;
// List of handles to watch with sparse_wait
HANDLE handles[] = { pipes[0].ol.hEvent, pipes[1].ol.hEvent, pi.hProcess };
// Create a thread to read stderr output
pthread_t stderr_thread;
struct subprocess_ctx sctx = {
.stderr_read = stderr_read,
.cb_ctx = ctx,
.on_stderr = on_stderr,
};
if (pthread_create(&stderr_thread, NULL, stderr_routine, &sctx))
goto done;
for (int i = 0; i < 2; i++) {
// Close our copy of the write end of the pipes
CloseHandle(pipes[i].write);
pipes[i].write = NULL;
// Do the first read operation on each pipe
if (async_read(pipes[i].read, pipes[i].buf, 4096, &pipes[i].ol)) {
CloseHandle(pipes[i].read);
handles[i] = pipes[i].read = NULL;
}
}
// Read from stdout until it's closed on process exit
char buf[4096];
DWORD r;
while (ReadFile(stdout_read, buf, 4096, &r, NULL))
on_stdout(ctx, buf, r);
pthread_join(stderr_thread, NULL);
// Get process exit code
DWORD exit_code;
WaitForSingleObject(pi.hProcess, INFINITE);
GetExitCodeProcess(pi.hProcess, &exit_code);
status = exit_code;
while (pipes[0].read || pipes[1].read || pi.hProcess) {
int i = sparse_wait(handles, MP_ARRAY_SIZE(handles));
switch (i) {
case 0:
case 1:
// Complete the read operation on the pipe
if (!GetOverlappedResult(pipes[i].read, &pipes[i].ol, &r, TRUE)) {
CloseHandle(pipes[i].read);
handles[i] = pipes[i].read = NULL;
break;
}
pipes[i].read_cb(ctx, pipes[i].buf, r);
// Begin the next read operation on the pipe
if (async_read(pipes[i].read, pipes[i].buf, 4096, &pipes[i].ol)) {
CloseHandle(pipes[i].read);
handles[i] = pipes[i].read = NULL;
}
break;
case 2:
GetExitCodeProcess(pi.hProcess, &exit_code);
status = exit_code;
CloseHandle(pi.hProcess);
handles[i] = pi.hProcess = NULL;
break;
default:
goto done;
}
}
done:
CloseHandle(stdout_read);
CloseHandle(stdout_write);
CloseHandle(stderr_read);
CloseHandle(stderr_write);
CloseHandle(pi.hProcess);
for (int i = 0; i < 2; i++) {
if (pipes[i].ol.hEvent) CloseHandle(pipes[i].ol.hEvent);
if (pipes[i].read) CloseHandle(pipes[i].read);
if (pipes[i].write) CloseHandle(pipes[i].write);
}
if (pi.hProcess) CloseHandle(pi.hProcess);
talloc_free(tmp);
return status;
}