Merge pull request #32742 from tchaikov/wip-crimson-thread-pool

crimson/thread: generalize Task so it works w/ func returns void

Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
Kefu Chai 2020-01-21 22:32:12 +08:00 committed by GitHub
commit 951dd14e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 7 deletions

View File

@ -22,28 +22,43 @@ struct WorkItem {
virtual void process() = 0;
};
template<typename Func, typename T = std::invoke_result_t<Func>>
template<typename Func>
struct Task final : WorkItem {
Func func;
seastar::future_state<T> state;
crimson::thread::Condition on_done;
using T = std::invoke_result_t<Func>;
using future_state_t = std::conditional_t<std::is_void_v<T>,
seastar::future_state<>,
seastar::future_state<T>>;
using futurator_t = seastar::futurize<T>;
public:
explicit Task(Func&& f)
: func(std::move(f))
{}
void process() override {
try {
state.set(func());
if constexpr (std::is_void_v<T>) {
func();
state.set();
} else {
state.set(func());
}
} catch (...) {
state.set_exception(std::current_exception());
}
on_done.notify();
}
seastar::future<T> get_future() {
typename futurator_t::type get_future() {
return on_done.wait().then([this] {
return seastar::make_ready_future<T>(state.get0(std::move(state).get()));
if (state.failed()) {
return futurator_t::make_exception_future(state.get_exception());
} else {
return futurator_t::from_tuple(state.get_value());
}
});
}
private:
Func func;
future_state_t state;
crimson::thread::Condition on_done;
};
struct SubmitQueue {

View File

@ -25,6 +25,12 @@ seastar::future<> test_accumulate(ThreadPool& tp) {
});
}
seastar::future<> test_void_return(ThreadPool& tp) {
return tp.submit([=] {
std::this_thread::sleep_for(10ns);
});
}
int main(int argc, char** argv)
{
ThreadPool tp{2, 128, 0};
@ -32,6 +38,8 @@ int main(int argc, char** argv)
return app.run(argc, argv, [&tp] {
return tp.start().then([&tp] {
return test_accumulate(tp);
}).then([&tp] {
return test_void_return(tp);
}).handle_exception([](auto e) {
std::cerr << "Error: " << e << std::endl;
seastar::engine().exit(1);