From c1a081c00f803fc28e51f155f25abe8346ce5f13 Mon Sep 17 00:00:00 2001 From: Stefan Becker Date: Tue, 22 Mar 2016 13:48:07 +0200 Subject: [PATCH] Add GNU make jobserver client support - add new TokenPool interface - GNU make implementation for TokenPool parses and verifies the magic information from the MAKEFLAGS environment variable - RealCommandRunner tries to acquire TokenPool * if no token pool is available then there is no change in behaviour - When a token pool is available then RealCommandRunner behaviour changes as follows * CanRunMore() only returns true if TokenPool::Acquire() returns true * StartCommand() calls TokenPool::Reserve() * WaitForCommand() calls TokenPool::Release() Documentation for GNU make jobserver http://make.mad-scientist.net/papers/jobserver-implementation/ Fixes https://github.com/ninja-build/ninja/issues/1139 Add TokenPool monitoring to SubprocessSet::DoWork() Improve on the original jobserver client implementation. This makes ninja a more aggressive GNU make jobserver client. - add monitor interface to TokenPool - TokenPool is passed down when main loop indicates that more work is ready and would be allowed to start if a token becomes available - posix: update DoWork() to monitor TokenPool read file descriptor - WaitForCommand() exits when DoWork() sets token flag - Main loop starts over when WaitForCommand() sets token exit status Ignore jobserver when -jN is forced on command line This emulates the behaviour of GNU make. - add parallelism_from_cmdline flag to build configuration - set the flag when -jN is given on command line - pass the flag to TokenPool::Get() - GNUmakeTokenPool::Setup() * prints a warning when the flag is true and jobserver was detected * returns false, i.e. jobserver will be ignored - ignore config.parallelism in CanRunMore() when we have a valid TokenPool, because it gets always initialized to a default when not given on the command line Honor -lN from MAKEFLAGS This emulates the behaviour of GNU make. - build: make a copy of max_load_average and pass it to TokenPool. - GNUmakeTokenPool: if we detect a jobserver and a valid -lN argument in MAKEFLAGS then set max_load_average to N. Use LinePrinter for TokenPool messages - replace printf() with calls to LinePrinter - print GNU make jobserver message only when verbose build is requested Prepare PR for merging - fix Windows build error in no-op TokenPool implementation - improve Acquire() to block for a maximum of 100ms - address review comments Add tests for TokenPool - TokenPool setup - GetMonitorFd() API - implicit token and tokens in jobserver pipe - Acquire() / Reserve() / Release() protocol - Clear() API Add tests for subprocess module - add TokenPoolTest stub to provide TokenPool::GetMonitorFd() - add two tests * both tests set up a dummy GNUmake jobserver pipe * both tests call DoWork() with TokenPoolTest * test 1: verify that DoWork() detects when a token is available * test 2: verify that DoWork() works as before without a token - the tests are not compiled in under Windows Add tests for build module Add tests that verify the token functionality of the builder main loop. We replace the default fake command runner with a special version where the tests can control each call to AcquireToken(), CanRunMore() and WaitForCommand(). Add Win32 implementation for GNUmakeTokenPool GNU make uses a semaphore as jobserver protocol on Win32. See also https://www.gnu.org/software/make/manual/html_node/Windows-Jobserver.html Usage is pretty simple and straightforward, i.e. WaitForSingleObject() to obtain a token and ReleaseSemaphore() to return it. Unfortunately subprocess-win32.cc uses an I/O completion port (IOCP). IOCPs aren't waitable objects, i.e. we can't use WaitForMultipleObjects() to wait on the IOCP and the token semaphore at the same time. Therefore GNUmakeTokenPoolWin32 creates a child thread that waits on the token semaphore and posts a dummy I/O completion status on the IOCP when it was able to obtain a token. That unblocks SubprocessSet::DoWork() and it can then check if a token became available or not. - split existing GNUmakeTokenPool into common and platform bits - add GNUmakeTokenPool interface - move the Posix bits to GNUmakeTokenPoolPosix - add the Win32 bits as GNUmakeTokenPoolWin32 - move Setup() method up to TokenPool interface - update Subprocess & TokenPool tests accordingly Prepare PR for merging - part II - remove unnecessary "struct" from TokenPool - add PAPCFUNC cast to QueryUserAPC() - remove hard-coded MAKEFLAGS string from win32 - remove useless build test CompleteNoWork - rename TokenPoolTest to TestTokenPool - add tokenpool modules to CMake build - remove unused no-op TokenPool implementation - address review comments from https://github.com/ninja-build/ninja/pull/1140#pullrequestreview-195195803 https://github.com/ninja-build/ninja/pull/1140#pullrequestreview-185089255 https://github.com/ninja-build/ninja/pull/1140#issuecomment-473898963 https://github.com/ninja-build/ninja/pull/1140#issuecomment-596624610 --- CMakeLists.txt | 8 +- configure.py | 7 +- src/build.cc | 127 ++++++++--- src/build.h | 12 +- src/build_test.cc | 363 +++++++++++++++++++++++++++++++- src/exit_status.h | 3 +- src/ninja.cc | 1 + src/subprocess-posix.cc | 33 ++- src/subprocess-win32.cc | 11 +- src/subprocess.h | 8 +- src/subprocess_test.cc | 149 +++++++++++-- src/tokenpool-gnu-make-posix.cc | 202 ++++++++++++++++++ src/tokenpool-gnu-make-win32.cc | 239 +++++++++++++++++++++ src/tokenpool-gnu-make.cc | 108 ++++++++++ src/tokenpool-gnu-make.h | 40 ++++ src/tokenpool.h | 42 ++++ src/tokenpool_test.cc | 269 +++++++++++++++++++++++ 17 files changed, 1562 insertions(+), 60 deletions(-) create mode 100644 src/tokenpool-gnu-make-posix.cc create mode 100644 src/tokenpool-gnu-make-win32.cc create mode 100644 src/tokenpool-gnu-make.cc create mode 100644 src/tokenpool-gnu-make.h create mode 100644 src/tokenpool.h create mode 100644 src/tokenpool_test.cc --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,6 +94,7 @@ add_library(libninja OBJECT src/parser.cc src/state.cc src/string_piece_util.cc + src/tokenpool-gnu-make.cc src/util.cc src/version.cc ) @@ -104,12 +105,16 @@ if(WIN32) src/msvc_helper-win32.cc src/msvc_helper_main-win32.cc src/getopt.c + src/tokenpool-gnu-make-win32.cc ) if(MSVC) target_sources(libninja PRIVATE src/minidump-win32.cc) endif() else() - target_sources(libninja PRIVATE src/subprocess-posix.cc) + target_sources(libninja PRIVATE + src/subprocess-posix.cc + src/tokenpool-gnu-make-posix.cc + ) if(CMAKE_SYSTEM_NAME STREQUAL "OS400" OR CMAKE_SYSTEM_NAME STREQUAL "AIX") target_sources(libninja PRIVATE src/getopt.c) endif() @@ -182,6 +187,7 @@ if(BUILD_TESTING) src/string_piece_util_test.cc src/subprocess_test.cc src/test.cc + src/tokenpool_test.cc src/util_test.cc ) if(WIN32) --- a/configure.py +++ b/configure.py @@ -514,11 +514,13 @@ for name in ['build', 'parser', 'state', 'string_piece_util', + 'tokenpool-gnu-make', 'util', 'version']: objs += cxx(name, variables=cxxvariables) if platform.is_windows(): for name in ['subprocess-win32', + 'tokenpool-gnu-make-win32', 'includes_normalize-win32', 'msvc_helper-win32', 'msvc_helper_main-win32']: @@ -527,7 +529,9 @@ if platform.is_windows(): objs += cxx('minidump-win32', variables=cxxvariables) objs += cc('getopt') else: - objs += cxx('subprocess-posix') + for name in ['subprocess-posix', + 'tokenpool-gnu-make-posix']: + objs += cxx(name) if platform.is_aix(): objs += cc('getopt') if platform.is_msvc(): @@ -582,6 +586,7 @@ for name in ['build_log_test', 'string_piece_util_test', 'subprocess_test', 'test', + 'tokenpool_test', 'util_test']: objs += cxx(name, variables=cxxvariables) if platform.is_windows(): --- a/src/build.cc +++ b/src/build.cc @@ -38,6 +38,7 @@ #include "graph.h" #include "state.h" #include "subprocess.h" +#include "tokenpool.h" #include "util.h" using namespace std; @@ -50,8 +51,9 @@ struct DryRunCommandRunner : public Comm // Overridden from CommandRunner: virtual bool CanRunMore() const; + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); private: queue finished_; @@ -61,12 +63,16 @@ bool DryRunCommandRunner::CanRunMore() c return true; } +bool DryRunCommandRunner::AcquireToken() { + return true; +} + bool DryRunCommandRunner::StartCommand(Edge* edge) { finished_.push(edge); return true; } -bool DryRunCommandRunner::WaitForCommand(Result* result) { +bool DryRunCommandRunner::WaitForCommand(Result* result, bool more_ready) { if (finished_.empty()) return false; @@ -379,7 +385,7 @@ void Plan::EdgeWanted(const Edge* edge) } Edge* Plan::FindWork() { - if (ready_.empty()) + if (!more_ready()) return NULL; set::iterator e = ready_.begin(); Edge* edge = *e; @@ -665,19 +671,39 @@ void Plan::Dump() const { } struct RealCommandRunner : public CommandRunner { - explicit RealCommandRunner(const BuildConfig& config) : config_(config) {} - virtual ~RealCommandRunner() {} + explicit RealCommandRunner(const BuildConfig& config); + virtual ~RealCommandRunner(); virtual bool CanRunMore() const; + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); virtual vector GetActiveEdges(); virtual void Abort(); const BuildConfig& config_; + // copy of config_.max_load_average; can be modified by TokenPool setup + double max_load_average_; SubprocessSet subprocs_; + TokenPool* tokens_; map subproc_to_edge_; }; +RealCommandRunner::RealCommandRunner(const BuildConfig& config) : config_(config) { + max_load_average_ = config.max_load_average; + if ((tokens_ = TokenPool::Get()) != NULL) { + if (!tokens_->Setup(config_.parallelism_from_cmdline, + config_.verbosity == BuildConfig::VERBOSE, + max_load_average_)) { + delete tokens_; + tokens_ = NULL; + } + } +} + +RealCommandRunner::~RealCommandRunner() { + delete tokens_; +} + vector RealCommandRunner::GetActiveEdges() { vector edges; for (map::iterator e = subproc_to_edge_.begin(); @@ -688,14 +714,23 @@ vector RealCommandRunner::GetActi void RealCommandRunner::Abort() { subprocs_.Clear(); + if (tokens_) + tokens_->Clear(); } bool RealCommandRunner::CanRunMore() const { - size_t subproc_number = - subprocs_.running_.size() + subprocs_.finished_.size(); - return (int)subproc_number < config_.parallelism - && ((subprocs_.running_.empty() || config_.max_load_average <= 0.0f) - || GetLoadAverage() < config_.max_load_average); + bool parallelism_limit_not_reached = + tokens_ || // ignore config_.parallelism + ((int) (subprocs_.running_.size() + + subprocs_.finished_.size()) < config_.parallelism); + return parallelism_limit_not_reached + && (subprocs_.running_.empty() || + (max_load_average_ <= 0.0f || + GetLoadAverage() < max_load_average_)); +} + +bool RealCommandRunner::AcquireToken() { + return (!tokens_ || tokens_->Acquire()); } bool RealCommandRunner::StartCommand(Edge* edge) { @@ -703,19 +738,33 @@ bool RealCommandRunner::StartCommand(Edg Subprocess* subproc = subprocs_.Add(command, edge->use_console()); if (!subproc) return false; + if (tokens_) + tokens_->Reserve(); subproc_to_edge_.insert(make_pair(subproc, edge)); return true; } -bool RealCommandRunner::WaitForCommand(Result* result) { +bool RealCommandRunner::WaitForCommand(Result* result, bool more_ready) { Subprocess* subproc; - while ((subproc = subprocs_.NextFinished()) == NULL) { - bool interrupted = subprocs_.DoWork(); + subprocs_.ResetTokenAvailable(); + while (((subproc = subprocs_.NextFinished()) == NULL) && + !subprocs_.IsTokenAvailable()) { + bool interrupted = subprocs_.DoWork(more_ready ? tokens_ : NULL); if (interrupted) return false; } + // token became available + if (subproc == NULL) { + result->status = ExitTokenAvailable; + return true; + } + + // command completed + if (tokens_) + tokens_->Release(); + result->status = subproc->Finish(); result->output = subproc->GetOutput(); @@ -825,38 +874,42 @@ bool Builder::Build(string* err) { // command runner. // Second, we attempt to wait for / reap the next finished command. while (plan_.more_to_do()) { - // See if we can start any more commands. - if (failures_allowed && command_runner_->CanRunMore()) { - if (Edge* edge = plan_.FindWork()) { - if (edge->GetBindingBool("generator")) { - scan_.build_log()->Close(); - } + // See if we can start any more commands... + bool can_run_more = + failures_allowed && + plan_.more_ready() && + command_runner_->CanRunMore(); + + // ... but we also need a token to do that. + if (can_run_more && command_runner_->AcquireToken()) { + Edge* edge = plan_.FindWork(); + if (edge->GetBindingBool("generator")) { + scan_.build_log()->Close(); + } + if (!StartEdge(edge, err)) { + Cleanup(); + status_->BuildFinished(); + return false; + } - if (!StartEdge(edge, err)) { + if (edge->is_phony()) { + if (!plan_.EdgeFinished(edge, Plan::kEdgeSucceeded, err)) { Cleanup(); status_->BuildFinished(); return false; } - - if (edge->is_phony()) { - if (!plan_.EdgeFinished(edge, Plan::kEdgeSucceeded, err)) { - Cleanup(); - status_->BuildFinished(); - return false; - } - } else { - ++pending_commands; - } - - // We made some progress; go back to the main loop. - continue; + } else { + ++pending_commands; } + + // We made some progress; go back to the main loop. + continue; } // See if we can reap any finished commands. if (pending_commands) { CommandRunner::Result result; - if (!command_runner_->WaitForCommand(&result) || + if (!command_runner_->WaitForCommand(&result, can_run_more) || result.status == ExitInterrupted) { Cleanup(); status_->BuildFinished(); @@ -864,6 +917,10 @@ bool Builder::Build(string* err) { return false; } + // We might be able to start another command; start the main loop over. + if (result.status == ExitTokenAvailable) + continue; + --pending_commands; if (!FinishCommand(&result, err)) { Cleanup(); --- a/src/build.h +++ b/src/build.h @@ -55,6 +55,9 @@ struct Plan { /// Returns true if there's more work to be done. bool more_to_do() const { return wanted_edges_ > 0 && command_edges_ > 0; } + /// Returns true if there's more edges ready to start + bool more_ready() const { return !ready_.empty(); } + /// Dumps the current state of the plan. void Dump() const; @@ -139,6 +142,7 @@ private: struct CommandRunner { virtual ~CommandRunner() {} virtual bool CanRunMore() const = 0; + virtual bool AcquireToken() = 0; virtual bool StartCommand(Edge* edge) = 0; /// The result of waiting for a command. @@ -150,7 +154,9 @@ struct CommandRunner { bool success() const { return status == ExitSuccess; } }; /// Wait for a command to complete, or return false if interrupted. - virtual bool WaitForCommand(Result* result) = 0; + /// If more_ready is true then the optional TokenPool is monitored too + /// and we return when a token becomes available. + virtual bool WaitForCommand(Result* result, bool more_ready) = 0; virtual std::vector GetActiveEdges() { return std::vector(); } virtual void Abort() {} @@ -158,7 +164,8 @@ struct CommandRunner { /// Options (e.g. verbosity, parallelism) passed to a build. struct BuildConfig { - BuildConfig() : verbosity(NORMAL), dry_run(false), parallelism(1), + BuildConfig() : verbosity(NORMAL), dry_run(false), + parallelism(1), parallelism_from_cmdline(false), failures_allowed(1), max_load_average(-0.0f) {} enum Verbosity { @@ -169,6 +176,7 @@ struct BuildConfig { Verbosity verbosity; bool dry_run; int parallelism; + bool parallelism_from_cmdline; int failures_allowed; /// The maximum load average we must not exceed. A negative value /// means that we do not have any limit. --- a/src/build_test.cc +++ b/src/build_test.cc @@ -15,6 +15,7 @@ #include "build.h" #include +#include #include "build_log.h" #include "deps_log.h" @@ -473,8 +474,9 @@ struct FakeCommandRunner : public Comman // CommandRunner impl virtual bool CanRunMore() const; + virtual bool AcquireToken(); virtual bool StartCommand(Edge* edge); - virtual bool WaitForCommand(Result* result); + virtual bool WaitForCommand(Result* result, bool more_ready); virtual vector GetActiveEdges(); virtual void Abort(); @@ -580,6 +582,10 @@ bool FakeCommandRunner::CanRunMore() con return active_edges_.size() < max_active_edges_; } +bool FakeCommandRunner::AcquireToken() { + return true; +} + bool FakeCommandRunner::StartCommand(Edge* edge) { assert(active_edges_.size() < max_active_edges_); assert(find(active_edges_.begin(), active_edges_.end(), edge) @@ -625,7 +631,7 @@ bool FakeCommandRunner::StartCommand(Edg return true; } -bool FakeCommandRunner::WaitForCommand(Result* result) { +bool FakeCommandRunner::WaitForCommand(Result* result, bool more_ready) { if (active_edges_.empty()) return false; @@ -3302,3 +3308,356 @@ TEST_F(BuildTest, DyndepTwoLevelDiscover EXPECT_EQ("touch tmp", command_runner_.commands_ran_[3]); EXPECT_EQ("touch out", command_runner_.commands_ran_[4]); } + +/// The token tests are concerned with the main loop functionality when +// the CommandRunner has an active TokenPool. It is therefore intentional +// that the plan doesn't complete and that builder_.Build() returns false! + +/// Fake implementation of CommandRunner that simulates a TokenPool +struct FakeTokenCommandRunner : public CommandRunner { + explicit FakeTokenCommandRunner() {} + + // CommandRunner impl + virtual bool CanRunMore() const; + virtual bool AcquireToken(); + virtual bool StartCommand(Edge* edge); + virtual bool WaitForCommand(Result* result, bool more_ready); + virtual vector GetActiveEdges(); + virtual void Abort(); + + vector commands_ran_; + vector edges_; + + vector acquire_token_; + vector can_run_more_; + vector wait_for_command_; +}; + +bool FakeTokenCommandRunner::CanRunMore() const { + if (can_run_more_.size() == 0) { + EXPECT_FALSE("unexpected call to CommandRunner::CanRunMore()"); + return false; + } + + bool result = can_run_more_[0]; + + // Unfortunately CanRunMore() isn't "const" for tests + const_cast(this)->can_run_more_.erase( + const_cast(this)->can_run_more_.begin() + ); + + return result; +} + +bool FakeTokenCommandRunner::AcquireToken() { + if (acquire_token_.size() == 0) { + EXPECT_FALSE("unexpected call to CommandRunner::AcquireToken()"); + return false; + } + + bool result = acquire_token_[0]; + acquire_token_.erase(acquire_token_.begin()); + return result; +} + +bool FakeTokenCommandRunner::StartCommand(Edge* edge) { + commands_ran_.push_back(edge->EvaluateCommand()); + edges_.push_back(edge); + return true; +} + +bool FakeTokenCommandRunner::WaitForCommand(Result* result, bool more_ready) { + if (wait_for_command_.size() == 0) { + EXPECT_FALSE("unexpected call to CommandRunner::WaitForCommand()"); + return false; + } + + bool expected = wait_for_command_[0]; + if (expected != more_ready) { + EXPECT_EQ(expected, more_ready); + return false; + } + wait_for_command_.erase(wait_for_command_.begin()); + + if (edges_.size() == 0) + return false; + + Edge* edge = edges_[0]; + result->edge = edge; + + if (more_ready && + (edge->rule().name() == "token-available")) { + result->status = ExitTokenAvailable; + } else { + edges_.erase(edges_.begin()); + result->status = ExitSuccess; + } + + return true; +} + +vector FakeTokenCommandRunner::GetActiveEdges() { + return edges_; +} + +void FakeTokenCommandRunner::Abort() { + edges_.clear(); +} + +struct BuildTokenTest : public BuildTest { + virtual void SetUp(); + virtual void TearDown(); + + FakeTokenCommandRunner token_command_runner_; + + void ExpectAcquireToken(int count, ...); + void ExpectCanRunMore(int count, ...); + void ExpectWaitForCommand(int count, ...); + +private: + void EnqueueBooleans(vector& booleans, int count, va_list ao); +}; + +void BuildTokenTest::SetUp() { + BuildTest::SetUp(); + + // replace FakeCommandRunner with FakeTokenCommandRunner + builder_.command_runner_.release(); + builder_.command_runner_.reset(&token_command_runner_); +} +void BuildTokenTest::TearDown() { + EXPECT_EQ(0u, token_command_runner_.acquire_token_.size()); + EXPECT_EQ(0u, token_command_runner_.can_run_more_.size()); + EXPECT_EQ(0u, token_command_runner_.wait_for_command_.size()); + + BuildTest::TearDown(); +} + +void BuildTokenTest::ExpectAcquireToken(int count, ...) { + va_list ap; + va_start(ap, count); + EnqueueBooleans(token_command_runner_.acquire_token_, count, ap); + va_end(ap); +} + +void BuildTokenTest::ExpectCanRunMore(int count, ...) { + va_list ap; + va_start(ap, count); + EnqueueBooleans(token_command_runner_.can_run_more_, count, ap); + va_end(ap); +} + +void BuildTokenTest::ExpectWaitForCommand(int count, ...) { + va_list ap; + va_start(ap, count); + EnqueueBooleans(token_command_runner_.wait_for_command_, count, ap); + va_end(ap); +} + +void BuildTokenTest::EnqueueBooleans(vector& booleans, int count, va_list ap) { + while (count--) { + int value = va_arg(ap, int); + booleans.push_back(!!value); // force bool + } +} + +TEST_F(BuildTokenTest, DoNotAquireToken) { + // plan should execute one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat1", &err)); + ASSERT_EQ("", err); + + // pretend we can't run anything + ExpectCanRunMore(1, false); + + EXPECT_FALSE(builder_.Build(&err)); + EXPECT_EQ("stuck [this is a bug]", err); + + EXPECT_EQ(0u, token_command_runner_.commands_ran_.size()); +} + +TEST_F(BuildTokenTest, DoNotStartWithoutToken) { + // plan should execute one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat1", &err)); + ASSERT_EQ("", err); + + // we could run a command but do not have a token for it + ExpectCanRunMore(1, true); + ExpectAcquireToken(1, false); + + EXPECT_FALSE(builder_.Build(&err)); + EXPECT_EQ("stuck [this is a bug]", err); + + EXPECT_EQ(0u, token_command_runner_.commands_ran_.size()); +} + +TEST_F(BuildTokenTest, CompleteOneStep) { + // plan should execute one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat1", &err)); + ASSERT_EQ("", err); + + // allow running of one command + ExpectCanRunMore(1, true); + ExpectAcquireToken(1, true); + // block and wait for command to finalize + ExpectWaitForCommand(1, false); + + EXPECT_TRUE(builder_.Build(&err)); + EXPECT_EQ("", err); + + EXPECT_EQ(1u, token_command_runner_.commands_ran_.size()); + EXPECT_TRUE(token_command_runner_.commands_ran_[0] == "cat in1 > cat1"); +} + +TEST_F(BuildTokenTest, AcquireOneToken) { + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat12", &err)); + ASSERT_EQ("", err); + + // allow running of one command + ExpectCanRunMore(3, true, false, false); + ExpectAcquireToken(1, true); + // block and wait for command to finalize + ExpectWaitForCommand(1, false); + + EXPECT_FALSE(builder_.Build(&err)); + EXPECT_EQ("stuck [this is a bug]", err); + + EXPECT_EQ(1u, token_command_runner_.commands_ran_.size()); + // any of the two dependencies could have been executed + EXPECT_TRUE(token_command_runner_.commands_ran_[0] == "cat in1 > cat1" || + token_command_runner_.commands_ran_[0] == "cat in1 in2 > cat2"); +} + +TEST_F(BuildTokenTest, WantTwoTokens) { + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat12", &err)); + ASSERT_EQ("", err); + + // allow running of one command + ExpectCanRunMore(3, true, true, false); + ExpectAcquireToken(2, true, false); + // wait for command to finalize or token to become available + ExpectWaitForCommand(1, true); + + EXPECT_FALSE(builder_.Build(&err)); + EXPECT_EQ("stuck [this is a bug]", err); + + EXPECT_EQ(1u, token_command_runner_.commands_ran_.size()); + // any of the two dependencies could have been executed + EXPECT_TRUE(token_command_runner_.commands_ran_[0] == "cat in1 > cat1" || + token_command_runner_.commands_ran_[0] == "cat in1 in2 > cat2"); +} + +TEST_F(BuildTokenTest, CompleteTwoSteps) { + ASSERT_NO_FATAL_FAILURE(AssertParse(&state_, +"build out1: cat in1\n" +"build out2: cat out1\n")); + + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("out2", &err)); + ASSERT_EQ("", err); + + // allow running of two commands + ExpectCanRunMore(2, true, true); + ExpectAcquireToken(2, true, true); + // wait for commands to finalize + ExpectWaitForCommand(2, false, false); + + EXPECT_TRUE(builder_.Build(&err)); + EXPECT_EQ("", err); + + EXPECT_EQ(2u, token_command_runner_.commands_ran_.size()); + EXPECT_TRUE(token_command_runner_.commands_ran_[0] == "cat in1 > out1"); + EXPECT_TRUE(token_command_runner_.commands_ran_[1] == "cat out1 > out2"); +} + +TEST_F(BuildTokenTest, TwoCommandsInParallel) { + ASSERT_NO_FATAL_FAILURE(AssertParse(&state_, +"rule token-available\n" +" command = cat $in > $out\n" +"build out1: token-available in1\n" +"build out2: token-available in2\n" +"build out12: cat out1 out2\n")); + + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("out12", &err)); + ASSERT_EQ("", err); + + // 1st command: token available -> allow running + // 2nd command: no token available but becomes available later + ExpectCanRunMore(4, true, true, true, false); + ExpectAcquireToken(3, true, false, true); + // 1st call waits for command to finalize or token to become available + // 2nd call waits for command to finalize + // 3rd call waits for command to finalize + ExpectWaitForCommand(3, true, false, false); + + EXPECT_FALSE(builder_.Build(&err)); + EXPECT_EQ("stuck [this is a bug]", err); + + EXPECT_EQ(2u, token_command_runner_.commands_ran_.size()); + EXPECT_TRUE((token_command_runner_.commands_ran_[0] == "cat in1 > out1" && + token_command_runner_.commands_ran_[1] == "cat in2 > out2") || + (token_command_runner_.commands_ran_[0] == "cat in2 > out2" && + token_command_runner_.commands_ran_[1] == "cat in1 > out1")); +} + +TEST_F(BuildTokenTest, CompleteThreeStepsSerial) { + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("cat12", &err)); + ASSERT_EQ("", err); + + // allow running of all commands + ExpectCanRunMore(4, true, true, true, true); + ExpectAcquireToken(4, true, false, true, true); + // wait for commands to finalize + ExpectWaitForCommand(3, true, false, false); + + EXPECT_TRUE(builder_.Build(&err)); + EXPECT_EQ("", err); + + EXPECT_EQ(3u, token_command_runner_.commands_ran_.size()); + EXPECT_TRUE((token_command_runner_.commands_ran_[0] == "cat in1 > cat1" && + token_command_runner_.commands_ran_[1] == "cat in1 in2 > cat2") || + (token_command_runner_.commands_ran_[0] == "cat in1 in2 > cat2" && + token_command_runner_.commands_ran_[1] == "cat in1 > cat1" )); + EXPECT_TRUE(token_command_runner_.commands_ran_[2] == "cat cat1 cat2 > cat12"); +} + +TEST_F(BuildTokenTest, CompleteThreeStepsParallel) { + ASSERT_NO_FATAL_FAILURE(AssertParse(&state_, +"rule token-available\n" +" command = cat $in > $out\n" +"build out1: token-available in1\n" +"build out2: token-available in2\n" +"build out12: cat out1 out2\n")); + + // plan should execute more than one command + string err; + EXPECT_TRUE(builder_.AddTarget("out12", &err)); + ASSERT_EQ("", err); + + // allow running of all commands + ExpectCanRunMore(4, true, true, true, true); + ExpectAcquireToken(4, true, false, true, true); + // wait for commands to finalize + ExpectWaitForCommand(4, true, false, false, false); + + EXPECT_TRUE(builder_.Build(&err)); + EXPECT_EQ("", err); + + EXPECT_EQ(3u, token_command_runner_.commands_ran_.size()); + EXPECT_TRUE((token_command_runner_.commands_ran_[0] == "cat in1 > out1" && + token_command_runner_.commands_ran_[1] == "cat in2 > out2") || + (token_command_runner_.commands_ran_[0] == "cat in2 > out2" && + token_command_runner_.commands_ran_[1] == "cat in1 > out1")); + EXPECT_TRUE(token_command_runner_.commands_ran_[2] == "cat out1 out2 > out12"); +} --- a/src/exit_status.h +++ b/src/exit_status.h @@ -18,7 +18,8 @@ enum ExitStatus { ExitSuccess, ExitFailure, - ExitInterrupted + ExitTokenAvailable, + ExitInterrupted, }; #endif // NINJA_EXIT_STATUS_H_ --- a/src/ninja.cc +++ b/src/ninja.cc @@ -1289,6 +1289,7 @@ int ReadFlags(int* argc, char*** argv, // We want to run N jobs in parallel. For N = 0, INT_MAX // is close enough to infinite for most sane builds. config->parallelism = value > 0 ? value : INT_MAX; + config->parallelism_from_cmdline = true; break; } case 'k': { --- a/src/subprocess-posix.cc +++ b/src/subprocess-posix.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "subprocess.h" +#include "tokenpool.h" #include #include @@ -249,7 +250,7 @@ Subprocess *SubprocessSet::Add(const str } #ifdef USE_PPOLL -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(TokenPool* tokens) { vector fds; nfds_t nfds = 0; @@ -263,6 +264,12 @@ bool SubprocessSet::DoWork() { ++nfds; } + if (tokens) { + pollfd pfd = { tokens->GetMonitorFd(), POLLIN | POLLPRI, 0 }; + fds.push_back(pfd); + ++nfds; + } + interrupted_ = 0; int ret = ppoll(&fds.front(), nfds, NULL, &old_mask_); if (ret == -1) { @@ -295,11 +302,20 @@ bool SubprocessSet::DoWork() { ++i; } + if (tokens) { + pollfd *pfd = &fds[nfds - 1]; + if (pfd->fd >= 0) { + assert(pfd->fd == tokens->GetMonitorFd()); + if (pfd->revents != 0) + token_available_ = true; + } + } + return IsInterrupted(); } #else // !defined(USE_PPOLL) -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(TokenPool* tokens) { fd_set set; int nfds = 0; FD_ZERO(&set); @@ -314,6 +330,13 @@ bool SubprocessSet::DoWork() { } } + if (tokens) { + int fd = tokens->GetMonitorFd(); + FD_SET(fd, &set); + if (nfds < fd+1) + nfds = fd+1; + } + interrupted_ = 0; int ret = pselect(nfds, &set, 0, 0, 0, &old_mask_); if (ret == -1) { @@ -342,6 +365,12 @@ bool SubprocessSet::DoWork() { ++i; } + if (tokens) { + int fd = tokens->GetMonitorFd(); + if ((fd >= 0) && FD_ISSET(fd, &set)) + token_available_ = true; + } + return IsInterrupted(); } #endif // !defined(USE_PPOLL) --- a/src/subprocess-win32.cc +++ b/src/subprocess-win32.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "subprocess.h" +#include "tokenpool.h" #include #include @@ -251,11 +252,14 @@ Subprocess *SubprocessSet::Add(const str return subprocess; } -bool SubprocessSet::DoWork() { +bool SubprocessSet::DoWork(TokenPool* tokens) { DWORD bytes_read; Subprocess* subproc; OVERLAPPED* overlapped; + if (tokens) + tokens->WaitForTokenAvailability(ioport_); + if (!GetQueuedCompletionStatus(ioport_, &bytes_read, (PULONG_PTR)&subproc, &overlapped, INFINITE)) { if (GetLastError() != ERROR_BROKEN_PIPE) @@ -266,6 +270,11 @@ bool SubprocessSet::DoWork() { // delivered by NotifyInterrupted above. return true; + if (tokens && tokens->TokenIsAvailable((ULONG_PTR)subproc)) { + token_available_ = true; + return false; + } + subproc->OnPipeReady(); if (subproc->Done()) { --- a/src/subprocess.h +++ b/src/subprocess.h @@ -76,6 +76,8 @@ struct Subprocess { friend struct SubprocessSet; }; +struct TokenPool; + /// SubprocessSet runs a ppoll/pselect() loop around a set of Subprocesses. /// DoWork() waits for any state change in subprocesses; finished_ /// is a queue of subprocesses as they finish. @@ -84,13 +86,17 @@ struct SubprocessSet { ~SubprocessSet(); Subprocess* Add(const std::string& command, bool use_console = false); - bool DoWork(); + bool DoWork(struct TokenPool* tokens); Subprocess* NextFinished(); void Clear(); std::vector running_; std::queue finished_; + bool token_available_; + bool IsTokenAvailable() { return token_available_; } + void ResetTokenAvailable() { token_available_ = false; } + #ifdef _WIN32 static BOOL WINAPI NotifyInterrupted(DWORD dwCtrlType); static HANDLE ioport_; --- a/src/subprocess_test.cc +++ b/src/subprocess_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "subprocess.h" +#include "tokenpool.h" #include "test.h" @@ -34,8 +35,30 @@ const char* kSimpleCommand = "cmd /c dir const char* kSimpleCommand = "ls /"; #endif +struct TestTokenPool : public TokenPool { + bool Acquire() { return false; } + void Reserve() {} + void Release() {} + void Clear() {} + bool Setup(bool ignore_unused, bool verbose, double& max_load_average) { return false; } + +#ifdef _WIN32 + bool _token_available; + void WaitForTokenAvailability(HANDLE ioport) { + if (_token_available) + // unblock GetQueuedCompletionStatus() + PostQueuedCompletionStatus(ioport, 0, (ULONG_PTR) this, NULL); + } + bool TokenIsAvailable(ULONG_PTR key) { return key == (ULONG_PTR) this; } +#else + int _fd; + int GetMonitorFd() { return _fd; } +#endif +}; + struct SubprocessTest : public testing::Test { SubprocessSet subprocs_; + TestTokenPool tokens_; }; } // anonymous namespace @@ -45,10 +68,12 @@ TEST_F(SubprocessTest, BadCommandStderr) Subprocess* subproc = subprocs_.Add("cmd /c ninja_no_such_command"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { // Pretend we discovered that stderr was ready for writing. - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitFailure, subproc->Finish()); EXPECT_NE("", subproc->GetOutput()); @@ -59,10 +84,12 @@ TEST_F(SubprocessTest, NoSuchCommand) { Subprocess* subproc = subprocs_.Add("ninja_no_such_command"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { // Pretend we discovered that stderr was ready for writing. - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitFailure, subproc->Finish()); EXPECT_NE("", subproc->GetOutput()); @@ -78,9 +105,11 @@ TEST_F(SubprocessTest, InterruptChild) { Subprocess* subproc = subprocs_.Add("kill -INT $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -90,7 +119,7 @@ TEST_F(SubprocessTest, InterruptParent) ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -102,9 +131,11 @@ TEST_F(SubprocessTest, InterruptChildWit Subprocess* subproc = subprocs_.Add("kill -TERM $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -114,7 +145,7 @@ TEST_F(SubprocessTest, InterruptParentWi ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -126,9 +157,11 @@ TEST_F(SubprocessTest, InterruptChildWit Subprocess* subproc = subprocs_.Add("kill -HUP $$"); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitInterrupted, subproc->Finish()); } @@ -138,7 +171,7 @@ TEST_F(SubprocessTest, InterruptParentWi ASSERT_NE((Subprocess *) 0, subproc); while (!subproc->Done()) { - bool interrupted = subprocs_.DoWork(); + bool interrupted = subprocs_.DoWork(NULL); if (interrupted) return; } @@ -153,9 +186,11 @@ TEST_F(SubprocessTest, Console) { subprocs_.Add("test -t 0 -a -t 1 -a -t 2", /*use_console=*/true); ASSERT_NE((Subprocess*)0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); EXPECT_EQ(ExitSuccess, subproc->Finish()); } @@ -167,9 +202,11 @@ TEST_F(SubprocessTest, SetWithSingle) { Subprocess* subproc = subprocs_.Add(kSimpleCommand); ASSERT_NE((Subprocess *) 0, subproc); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); ASSERT_EQ(ExitSuccess, subproc->Finish()); ASSERT_NE("", subproc->GetOutput()); @@ -200,12 +237,13 @@ TEST_F(SubprocessTest, SetWithMulti) { ASSERT_EQ("", processes[i]->GetOutput()); } + subprocs_.ResetTokenAvailable(); while (!processes[0]->Done() || !processes[1]->Done() || !processes[2]->Done()) { ASSERT_GT(subprocs_.running_.size(), 0u); - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } - + ASSERT_FALSE(subprocs_.IsTokenAvailable()); ASSERT_EQ(0u, subprocs_.running_.size()); ASSERT_EQ(3u, subprocs_.finished_.size()); @@ -237,8 +275,10 @@ TEST_F(SubprocessTest, SetWithLots) { ASSERT_NE((Subprocess *) 0, subproc); procs.push_back(subproc); } + subprocs_.ResetTokenAvailable(); while (!subprocs_.running_.empty()) - subprocs_.DoWork(); + subprocs_.DoWork(NULL); + ASSERT_FALSE(subprocs_.IsTokenAvailable()); for (size_t i = 0; i < procs.size(); ++i) { ASSERT_EQ(ExitSuccess, procs[i]->Finish()); ASSERT_NE("", procs[i]->GetOutput()); @@ -254,10 +294,91 @@ TEST_F(SubprocessTest, SetWithLots) { // that stdin is closed. TEST_F(SubprocessTest, ReadStdin) { Subprocess* subproc = subprocs_.Add("cat -"); + subprocs_.ResetTokenAvailable(); while (!subproc->Done()) { - subprocs_.DoWork(); + subprocs_.DoWork(NULL); } + ASSERT_FALSE(subprocs_.IsTokenAvailable()); ASSERT_EQ(ExitSuccess, subproc->Finish()); ASSERT_EQ(1u, subprocs_.finished_.size()); } #endif // _WIN32 + +TEST_F(SubprocessTest, TokenAvailable) { + Subprocess* subproc = subprocs_.Add(kSimpleCommand); + ASSERT_NE((Subprocess *) 0, subproc); + + // simulate GNUmake jobserver pipe with 1 token +#ifdef _WIN32 + tokens_._token_available = true; +#else + int fds[2]; + ASSERT_EQ(0u, pipe(fds)); + tokens_._fd = fds[0]; + ASSERT_EQ(1u, write(fds[1], "T", 1)); +#endif + + subprocs_.ResetTokenAvailable(); + subprocs_.DoWork(&tokens_); +#ifdef _WIN32 + tokens_._token_available = false; + // we need to loop here as we have no conrol where the token + // I/O completion post ends up in the queue + while (!subproc->Done() && !subprocs_.IsTokenAvailable()) { + subprocs_.DoWork(&tokens_); + } +#endif + + EXPECT_TRUE(subprocs_.IsTokenAvailable()); + EXPECT_EQ(0u, subprocs_.finished_.size()); + + // remove token to let DoWork() wait for command again +#ifndef _WIN32 + char token; + ASSERT_EQ(1u, read(fds[0], &token, 1)); +#endif + + while (!subproc->Done()) { + subprocs_.DoWork(&tokens_); + } + +#ifndef _WIN32 + close(fds[1]); + close(fds[0]); +#endif + + EXPECT_EQ(ExitSuccess, subproc->Finish()); + EXPECT_NE("", subproc->GetOutput()); + + EXPECT_EQ(1u, subprocs_.finished_.size()); +} + +TEST_F(SubprocessTest, TokenNotAvailable) { + Subprocess* subproc = subprocs_.Add(kSimpleCommand); + ASSERT_NE((Subprocess *) 0, subproc); + + // simulate GNUmake jobserver pipe with 0 tokens +#ifdef _WIN32 + tokens_._token_available = false; +#else + int fds[2]; + ASSERT_EQ(0u, pipe(fds)); + tokens_._fd = fds[0]; +#endif + + subprocs_.ResetTokenAvailable(); + while (!subproc->Done()) { + subprocs_.DoWork(&tokens_); + } + +#ifndef _WIN32 + close(fds[1]); + close(fds[0]); +#endif + + EXPECT_FALSE(subprocs_.IsTokenAvailable()); + EXPECT_EQ(ExitSuccess, subproc->Finish()); + EXPECT_NE("", subproc->GetOutput()); + + EXPECT_EQ(1u, subprocs_.finished_.size()); +} --- /dev/null +++ b/src/tokenpool-gnu-make-posix.cc @@ -0,0 +1,202 @@ +// Copyright 2016-2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tokenpool-gnu-make.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// TokenPool implementation for GNU make jobserver - POSIX implementation +// (http://make.mad-scientist.net/papers/jobserver-implementation/) +struct GNUmakeTokenPoolPosix : public GNUmakeTokenPool { + GNUmakeTokenPoolPosix(); + virtual ~GNUmakeTokenPoolPosix(); + + virtual int GetMonitorFd(); + + virtual const char* GetEnv(const char* name) { return getenv(name); }; + virtual bool ParseAuth(const char* jobserver); + virtual bool AcquireToken(); + virtual bool ReturnToken(); + + private: + int rfd_; + int wfd_; + + struct sigaction old_act_; + bool restore_; + + static int dup_rfd_; + static void CloseDupRfd(int signum); + + bool CheckFd(int fd); + bool SetAlarmHandler(); +}; + +GNUmakeTokenPoolPosix::GNUmakeTokenPoolPosix() : rfd_(-1), wfd_(-1), restore_(false) { +} + +GNUmakeTokenPoolPosix::~GNUmakeTokenPoolPosix() { + Clear(); + if (restore_) + sigaction(SIGALRM, &old_act_, NULL); +} + +bool GNUmakeTokenPoolPosix::CheckFd(int fd) { + if (fd < 0) + return false; + int ret = fcntl(fd, F_GETFD); + if (ret < 0) + return false; + return true; +} + +int GNUmakeTokenPoolPosix::dup_rfd_ = -1; + +void GNUmakeTokenPoolPosix::CloseDupRfd(int signum) { + close(dup_rfd_); + dup_rfd_ = -1; +} + +bool GNUmakeTokenPoolPosix::SetAlarmHandler() { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = CloseDupRfd; + if (sigaction(SIGALRM, &act, &old_act_) < 0) { + perror("sigaction:"); + return false; + } + restore_ = true; + return true; +} + +bool GNUmakeTokenPoolPosix::ParseAuth(const char* jobserver) { + int rfd = -1; + int wfd = -1; + if ((sscanf(jobserver, "%*[^=]=%d,%d", &rfd, &wfd) == 2) && + CheckFd(rfd) && + CheckFd(wfd) && + SetAlarmHandler()) { + rfd_ = rfd; + wfd_ = wfd; + return true; + } + + return false; +} + +bool GNUmakeTokenPoolPosix::AcquireToken() { + // Please read + // + // http://make.mad-scientist.net/papers/jobserver-implementation/ + // + // for the reasoning behind the following code. + // + // Try to read one character from the pipe. Returns true on success. + // + // First check if read() would succeed without blocking. +#ifdef USE_PPOLL + pollfd pollfds[] = {{rfd_, POLLIN, 0}}; + int ret = poll(pollfds, 1, 0); +#else + fd_set set; + struct timeval timeout = { 0, 0 }; + FD_ZERO(&set); + FD_SET(rfd_, &set); + int ret = select(rfd_ + 1, &set, NULL, NULL, &timeout); +#endif + if (ret > 0) { + // Handle potential race condition: + // - the above check succeeded, i.e. read() should not block + // - the character disappears before we call read() + // + // Create a duplicate of rfd_. The duplicate file descriptor dup_rfd_ + // can safely be closed by signal handlers without affecting rfd_. + dup_rfd_ = dup(rfd_); + + if (dup_rfd_ != -1) { + struct sigaction act, old_act; + int ret = 0; + + // Temporarily replace SIGCHLD handler with our own + memset(&act, 0, sizeof(act)); + act.sa_handler = CloseDupRfd; + if (sigaction(SIGCHLD, &act, &old_act) == 0) { + struct itimerval timeout; + + // install a 100ms timeout that generates SIGALARM on expiration + memset(&timeout, 0, sizeof(timeout)); + timeout.it_value.tv_usec = 100 * 1000; // [ms] -> [usec] + if (setitimer(ITIMER_REAL, &timeout, NULL) == 0) { + char buf; + + // Now try to read() from dup_rfd_. Return values from read(): + // + // 1. token read -> 1 + // 2. pipe closed -> 0 + // 3. alarm expires -> -1 (EINTR) + // 4. child exits -> -1 (EINTR) + // 5. alarm expired before entering read() -> -1 (EBADF) + // 6. child exited before entering read() -> -1 (EBADF) + // 7. child exited before handler is installed -> go to 1 - 3 + ret = read(dup_rfd_, &buf, 1); + + // disarm timer + memset(&timeout, 0, sizeof(timeout)); + setitimer(ITIMER_REAL, &timeout, NULL); + } + + sigaction(SIGCHLD, &old_act, NULL); + } + + CloseDupRfd(0); + + // Case 1 from above list + if (ret > 0) + return true; + } + } + + // read() would block, i.e. no token available, + // cases 2-6 from above list or + // select() / poll() / dup() / sigaction() / setitimer() failed + return false; +} + +bool GNUmakeTokenPoolPosix::ReturnToken() { + const char buf = '+'; + while (1) { + int ret = write(wfd_, &buf, 1); + if (ret > 0) + return true; + if ((ret != -1) || (errno != EINTR)) + return false; + // write got interrupted - retry + } +} + +int GNUmakeTokenPoolPosix::GetMonitorFd() { + return rfd_; +} + +TokenPool* TokenPool::Get() { + return new GNUmakeTokenPoolPosix; +} --- /dev/null +++ b/src/tokenpool-gnu-make-win32.cc @@ -0,0 +1,239 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tokenpool-gnu-make.h" + +// Always include this first. +// Otherwise the other system headers don't work correctly under Win32 +#include + +#include +#include +#include + +#include "util.h" + +// TokenPool implementation for GNU make jobserver - Win32 implementation +// (https://www.gnu.org/software/make/manual/html_node/Windows-Jobserver.html) +struct GNUmakeTokenPoolWin32 : public GNUmakeTokenPool { + GNUmakeTokenPoolWin32(); + virtual ~GNUmakeTokenPoolWin32(); + + virtual void WaitForTokenAvailability(HANDLE ioport); + virtual bool TokenIsAvailable(ULONG_PTR key); + + virtual const char* GetEnv(const char* name); + virtual bool ParseAuth(const char* jobserver); + virtual bool AcquireToken(); + virtual bool ReturnToken(); + + private: + // Semaphore for GNU make jobserver protocol + HANDLE semaphore_jobserver_; + // Semaphore Child -> Parent + // - child releases it before entering wait on jobserver semaphore + // - parent blocks on it to know when child enters wait + HANDLE semaphore_enter_wait_; + // Semaphore Parent -> Child + // - parent releases it to allow child to restart loop + // - child blocks on it to know when to restart loop + HANDLE semaphore_restart_; + // set to false if child should exit loop and terminate thread + bool running_; + // child thread + HANDLE child_; + // I/O completion port from SubprocessSet + HANDLE ioport_; + + + DWORD SemaphoreThread(); + void ReleaseSemaphore(HANDLE semaphore); + void WaitForObject(HANDLE object); + static DWORD WINAPI SemaphoreThreadWrapper(LPVOID param); + static void NoopAPCFunc(ULONG_PTR param); +}; + +GNUmakeTokenPoolWin32::GNUmakeTokenPoolWin32() : semaphore_jobserver_(NULL), + semaphore_enter_wait_(NULL), + semaphore_restart_(NULL), + running_(false), + child_(NULL), + ioport_(NULL) { +} + +GNUmakeTokenPoolWin32::~GNUmakeTokenPoolWin32() { + Clear(); + CloseHandle(semaphore_jobserver_); + semaphore_jobserver_ = NULL; + + if (child_) { + // tell child thread to exit + running_ = false; + ReleaseSemaphore(semaphore_restart_); + + // wait for child thread to exit + WaitForObject(child_); + CloseHandle(child_); + child_ = NULL; + } + + if (semaphore_restart_) { + CloseHandle(semaphore_restart_); + semaphore_restart_ = NULL; + } + + if (semaphore_enter_wait_) { + CloseHandle(semaphore_enter_wait_); + semaphore_enter_wait_ = NULL; + } +} + +const char* GNUmakeTokenPoolWin32::GetEnv(const char* name) { + // getenv() does not work correctly together with tokenpool_tests.cc + static char buffer[MAX_PATH + 1]; + if (GetEnvironmentVariable(name, buffer, sizeof(buffer)) == 0) + return NULL; + return buffer; +} + +bool GNUmakeTokenPoolWin32::ParseAuth(const char* jobserver) { + // match "--jobserver-auth=gmake_semaphore_..." + const char* start = strchr(jobserver, '='); + if (start) { + const char* end = start; + unsigned int len; + char c, *auth; + + while ((c = *++end) != '\0') + if (!(isalnum(c) || (c == '_'))) + break; + len = end - start; // includes string terminator in count + + if ((len > 1) && ((auth = (char*)malloc(len)) != NULL)) { + strncpy(auth, start + 1, len - 1); + auth[len - 1] = '\0'; + + if ((semaphore_jobserver_ = + OpenSemaphore(SEMAPHORE_ALL_ACCESS, /* Semaphore access setting */ + FALSE, /* Child processes DON'T inherit */ + auth /* Semaphore name */ + )) != NULL) { + free(auth); + return true; + } + + free(auth); + } + } + + return false; +} + +bool GNUmakeTokenPoolWin32::AcquireToken() { + return WaitForSingleObject(semaphore_jobserver_, 0) == WAIT_OBJECT_0; +} + +bool GNUmakeTokenPoolWin32::ReturnToken() { + ReleaseSemaphore(semaphore_jobserver_); + return true; +} + +DWORD GNUmakeTokenPoolWin32::SemaphoreThread() { + while (running_) { + // indicate to parent that we are entering wait + ReleaseSemaphore(semaphore_enter_wait_); + + // alertable wait forever on token semaphore + if (WaitForSingleObjectEx(semaphore_jobserver_, INFINITE, TRUE) == WAIT_OBJECT_0) { + // release token again for AcquireToken() + ReleaseSemaphore(semaphore_jobserver_); + + // indicate to parent on ioport that a token might be available + if (!PostQueuedCompletionStatus(ioport_, 0, (ULONG_PTR) this, NULL)) + Win32Fatal("PostQueuedCompletionStatus"); + } + + // wait for parent to allow loop restart + WaitForObject(semaphore_restart_); + // semaphore is now in nonsignaled state again for next run... + } + + return 0; +} + +DWORD WINAPI GNUmakeTokenPoolWin32::SemaphoreThreadWrapper(LPVOID param) { + GNUmakeTokenPoolWin32* This = (GNUmakeTokenPoolWin32*) param; + return This->SemaphoreThread(); +} + +void GNUmakeTokenPoolWin32::NoopAPCFunc(ULONG_PTR param) { +} + +void GNUmakeTokenPoolWin32::WaitForTokenAvailability(HANDLE ioport) { + if (child_ == NULL) { + // first invocation + // + // subprocess-win32.cc uses I/O completion port (IOCP) which can't be + // used as a waitable object. Therefore we can't use WaitMultipleObjects() + // to wait on the IOCP and the token semaphore at the same time. Create + // a child thread that waits on the semaphore and posts an I/O completion + ioport_ = ioport; + + // create both semaphores in nonsignaled state + if ((semaphore_enter_wait_ = CreateSemaphore(NULL, 0, 1, NULL)) + == NULL) + Win32Fatal("CreateSemaphore/enter_wait"); + if ((semaphore_restart_ = CreateSemaphore(NULL, 0, 1, NULL)) + == NULL) + Win32Fatal("CreateSemaphore/restart"); + + // start child thread + running_ = true; + if ((child_ = CreateThread(NULL, 0, &SemaphoreThreadWrapper, this, 0, NULL)) + == NULL) + Win32Fatal("CreateThread"); + + } else { + // all further invocations - allow child thread to loop + ReleaseSemaphore(semaphore_restart_); + } + + // wait for child thread to enter wait + WaitForObject(semaphore_enter_wait_); + // semaphore is now in nonsignaled state again for next run... + + // now SubprocessSet::DoWork() can enter GetQueuedCompletionStatus()... +} + +bool GNUmakeTokenPoolWin32::TokenIsAvailable(ULONG_PTR key) { + // alert child thread to break wait on token semaphore + QueueUserAPC((PAPCFUNC)&NoopAPCFunc, child_, (ULONG_PTR)NULL); + + // return true when GetQueuedCompletionStatus() returned our key + return key == (ULONG_PTR) this; +} + +void GNUmakeTokenPoolWin32::ReleaseSemaphore(HANDLE semaphore) { + if (!::ReleaseSemaphore(semaphore, 1, NULL)) + Win32Fatal("ReleaseSemaphore"); +} + +void GNUmakeTokenPoolWin32::WaitForObject(HANDLE object) { + if (WaitForSingleObject(object, INFINITE) != WAIT_OBJECT_0) + Win32Fatal("WaitForSingleObject"); +} + +TokenPool* TokenPool::Get() { + return new GNUmakeTokenPoolWin32; +} --- /dev/null +++ b/src/tokenpool-gnu-make.cc @@ -0,0 +1,108 @@ +// Copyright 2016-2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tokenpool-gnu-make.h" + +#include +#include +#include + +#include "line_printer.h" + +// TokenPool implementation for GNU make jobserver - common bits +// every instance owns an implicit token -> available_ == 1 +GNUmakeTokenPool::GNUmakeTokenPool() : available_(1), used_(0) { +} + +GNUmakeTokenPool::~GNUmakeTokenPool() { +} + +bool GNUmakeTokenPool::Setup(bool ignore, + bool verbose, + double& max_load_average) { + const char* value = GetEnv("MAKEFLAGS"); + if (!value) + return false; + + // GNU make <= 4.1 + const char* jobserver = strstr(value, "--jobserver-fds="); + if (!jobserver) + // GNU make => 4.2 + jobserver = strstr(value, "--jobserver-auth="); + if (jobserver) { + LinePrinter printer; + + if (ignore) { + printer.PrintOnNewLine("ninja: warning: -jN forced on command line; ignoring GNU make jobserver.\n"); + } else { + if (ParseAuth(jobserver)) { + const char* l_arg = strstr(value, " -l"); + int load_limit = -1; + + if (verbose) { + printer.PrintOnNewLine("ninja: using GNU make jobserver.\n"); + } + + // translate GNU make -lN to ninja -lN + if (l_arg && + (sscanf(l_arg + 3, "%d ", &load_limit) == 1) && + (load_limit > 0)) { + max_load_average = load_limit; + } + + return true; + } + } + } + + return false; +} + +bool GNUmakeTokenPool::Acquire() { + if (available_ > 0) + return true; + + if (AcquireToken()) { + // token acquired + available_++; + return true; + } + + // no token available + return false; +} + +void GNUmakeTokenPool::Reserve() { + available_--; + used_++; +} + +void GNUmakeTokenPool::Return() { + if (ReturnToken()) + available_--; +} + +void GNUmakeTokenPool::Release() { + available_++; + used_--; + if (available_ > 1) + Return(); +} + +void GNUmakeTokenPool::Clear() { + while (used_ > 0) + Release(); + while (available_ > 1) + Return(); +} --- /dev/null +++ b/src/tokenpool-gnu-make.h @@ -0,0 +1,40 @@ +// Copyright 2016-2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tokenpool.h" + +// interface to GNU make token pool +struct GNUmakeTokenPool : public TokenPool { + GNUmakeTokenPool(); + ~GNUmakeTokenPool(); + + // token pool implementation + virtual bool Acquire(); + virtual void Reserve(); + virtual void Release(); + virtual void Clear(); + virtual bool Setup(bool ignore, bool verbose, double& max_load_average); + + // platform specific implementation + virtual const char* GetEnv(const char* name) = 0; + virtual bool ParseAuth(const char* jobserver) = 0; + virtual bool AcquireToken() = 0; + virtual bool ReturnToken() = 0; + + private: + int available_; + int used_; + + void Return(); +}; --- /dev/null +++ b/src/tokenpool.h @@ -0,0 +1,42 @@ +// Copyright 2016-2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef _WIN32 +#include +#endif + +// interface to token pool +struct TokenPool { + virtual ~TokenPool() {} + + virtual bool Acquire() = 0; + virtual void Reserve() = 0; + virtual void Release() = 0; + virtual void Clear() = 0; + + // returns false if token pool setup failed + virtual bool Setup(bool ignore, bool verbose, double& max_load_average) = 0; + +#ifdef _WIN32 + virtual void WaitForTokenAvailability(HANDLE ioport) = 0; + // returns true if a token has become available + // key is result from GetQueuedCompletionStatus() + virtual bool TokenIsAvailable(ULONG_PTR key) = 0; +#else + virtual int GetMonitorFd() = 0; +#endif + + // returns NULL if token pool is not available + static TokenPool* Get(); +}; --- /dev/null +++ b/src/tokenpool_test.cc @@ -0,0 +1,269 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tokenpool.h" + +#include "test.h" + +#ifdef _WIN32 +#include +#else +#include +#endif + +#include +#include + +#ifdef _WIN32 +// should contain all valid characters +#define SEMAPHORE_NAME "abcdefghijklmnopqrstwxyz01234567890_" +#define AUTH_FORMAT(tmpl) "foo " tmpl "=%s bar" +#define ENVIRONMENT_CLEAR() SetEnvironmentVariable("MAKEFLAGS", NULL) +#define ENVIRONMENT_INIT(v) SetEnvironmentVariable("MAKEFLAGS", v) +#else +#define AUTH_FORMAT(tmpl) "foo " tmpl "=%d,%d bar" +#define ENVIRONMENT_CLEAR() unsetenv("MAKEFLAGS") +#define ENVIRONMENT_INIT(v) setenv("MAKEFLAGS", v, true) +#endif + +namespace { + +const double kLoadAverageDefault = -1.23456789; + +struct TokenPoolTest : public testing::Test { + double load_avg_; + TokenPool* tokens_; + char buf_[1024]; +#ifdef _WIN32 + const char* semaphore_name_; + HANDLE semaphore_; +#else + int fds_[2]; +#endif + + virtual void SetUp() { + load_avg_ = kLoadAverageDefault; + tokens_ = NULL; + ENVIRONMENT_CLEAR(); +#ifdef _WIN32 + semaphore_name_ = SEMAPHORE_NAME; + if ((semaphore_ = CreateSemaphore(0, 0, 2, SEMAPHORE_NAME)) == NULL) +#else + if (pipe(fds_) < 0) +#endif + ASSERT_TRUE(false); + } + + void CreatePool(const char* format, bool ignore_jobserver = false) { + if (format) { + sprintf(buf_, format, +#ifdef _WIN32 + semaphore_name_ +#else + fds_[0], fds_[1] +#endif + ); + ENVIRONMENT_INIT(buf_); + } + if ((tokens_ = TokenPool::Get()) != NULL) { + if (!tokens_->Setup(ignore_jobserver, false, load_avg_)) { + delete tokens_; + tokens_ = NULL; + } + } + } + + void CreateDefaultPool() { + CreatePool(AUTH_FORMAT("--jobserver-auth")); + } + + virtual void TearDown() { + if (tokens_) + delete tokens_; +#ifdef _WIN32 + CloseHandle(semaphore_); +#else + close(fds_[0]); + close(fds_[1]); +#endif + ENVIRONMENT_CLEAR(); + } +}; + +} // anonymous namespace + +// verifies none implementation +TEST_F(TokenPoolTest, NoTokenPool) { + CreatePool(NULL, false); + + EXPECT_EQ(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); +} + +TEST_F(TokenPoolTest, SuccessfulOldSetup) { + // GNUmake <= 4.1 + CreatePool(AUTH_FORMAT("--jobserver-fds")); + + EXPECT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); +} + +TEST_F(TokenPoolTest, SuccessfulNewSetup) { + // GNUmake => 4.2 + CreateDefaultPool(); + + EXPECT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); +} + +TEST_F(TokenPoolTest, IgnoreWithJN) { + CreatePool(AUTH_FORMAT("--jobserver-auth"), true); + + EXPECT_EQ(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); +} + +TEST_F(TokenPoolTest, HonorLN) { + CreatePool(AUTH_FORMAT("-l9 --jobserver-auth")); + + EXPECT_NE(NULL, tokens_); + EXPECT_EQ(9.0, load_avg_); +} + +#ifdef _WIN32 +TEST_F(TokenPoolTest, SemaphoreNotFound) { + semaphore_name_ = SEMAPHORE_NAME "_foobar"; + CreateDefaultPool(); + + EXPECT_EQ(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); +} + +TEST_F(TokenPoolTest, TokenIsAvailable) { + CreateDefaultPool(); + + ASSERT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); + + EXPECT_TRUE(tokens_->TokenIsAvailable((ULONG_PTR)tokens_)); +} +#else +TEST_F(TokenPoolTest, MonitorFD) { + CreateDefaultPool(); + + ASSERT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); + + EXPECT_EQ(fds_[0], tokens_->GetMonitorFd()); +} +#endif + +TEST_F(TokenPoolTest, ImplicitToken) { + CreateDefaultPool(); + + ASSERT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); + + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_FALSE(tokens_->Acquire()); + tokens_->Release(); + EXPECT_TRUE(tokens_->Acquire()); +} + +TEST_F(TokenPoolTest, TwoTokens) { + CreateDefaultPool(); + + ASSERT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); + + // implicit token + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_FALSE(tokens_->Acquire()); + + // jobserver offers 2nd token +#ifdef _WIN32 + LONG previous; + ASSERT_TRUE(ReleaseSemaphore(semaphore_, 1, &previous)); + ASSERT_EQ(0, previous); +#else + ASSERT_EQ(1u, write(fds_[1], "T", 1)); +#endif + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_FALSE(tokens_->Acquire()); + + // release 2nd token + tokens_->Release(); + EXPECT_TRUE(tokens_->Acquire()); + + // release implict token - must return 2nd token back to jobserver + tokens_->Release(); + EXPECT_TRUE(tokens_->Acquire()); + + // there must be one token available +#ifdef _WIN32 + EXPECT_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore_, 0)); + EXPECT_TRUE(ReleaseSemaphore(semaphore_, 1, &previous)); + EXPECT_EQ(0, previous); +#else + EXPECT_EQ(1u, read(fds_[0], buf_, sizeof(buf_))); +#endif + + // implicit token + EXPECT_TRUE(tokens_->Acquire()); +} + +TEST_F(TokenPoolTest, Clear) { + CreateDefaultPool(); + + ASSERT_NE(NULL, tokens_); + EXPECT_EQ(kLoadAverageDefault, load_avg_); + + // implicit token + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_FALSE(tokens_->Acquire()); + + // jobserver offers 2nd & 3rd token +#ifdef _WIN32 + LONG previous; + ASSERT_TRUE(ReleaseSemaphore(semaphore_, 2, &previous)); + ASSERT_EQ(0, previous); +#else + ASSERT_EQ(2u, write(fds_[1], "TT", 2)); +#endif + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_TRUE(tokens_->Acquire()); + tokens_->Reserve(); + EXPECT_FALSE(tokens_->Acquire()); + + tokens_->Clear(); + EXPECT_TRUE(tokens_->Acquire()); + + // there must be two tokens available +#ifdef _WIN32 + EXPECT_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore_, 0)); + EXPECT_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore_, 0)); + EXPECT_TRUE(ReleaseSemaphore(semaphore_, 2, &previous)); + EXPECT_EQ(0, previous); +#else + EXPECT_EQ(2u, read(fds_[0], buf_, sizeof(buf_))); +#endif + + // implicit token + EXPECT_TRUE(tokens_->Acquire()); +}