Merge pull request #31909 from rosinL/wip-fix-dpdk-cond-wait

msg/async/dpdk: exit condition waiting when DPDKStack is destructed

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2021-10-15 19:25:55 +08:00 committed by GitHub
commit cae9552863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 38 deletions

View File

@ -249,12 +249,12 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)
//
funcs.push_back(std::move(func));
int r = 0;
r = dpdk::eal::init(cct);
r = eal.start();
if (r < 0) {
lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
lderr(cct) << __func__ << " start dpdk rte failed, r=" << r << dendl;
ceph_abort();
}
// if dpdk::eal::init already called by NVMEDevice, we will select 1..n
// if eal.start already called by NVMEDevice, we will select 1..n
// cores
unsigned nr_worker = funcs.size();
ceph_assert(rte_lcore_count() >= nr_worker);
@ -265,7 +265,7 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)
}
}
void *adapted_func = static_cast<void*>(&funcs.back());
dpdk::eal::execute_on_master([adapted_func, core_id, this]() {
eal.execute_on_master([adapted_func, core_id, this]() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, adapted_func, core_id);
if (r < 0) {
lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
@ -276,7 +276,9 @@ void DPDKStack::spawn_worker(std::function<void ()> &&func)
void DPDKStack::join_worker(unsigned i)
{
dpdk::eal::execute_on_master([&]() {
eal.execute_on_master([&]() {
rte_eal_wait_lcore(i+1);
});
if (i+1 == get_num_worker())
eal.stop();
}

View File

@ -25,6 +25,7 @@
#include "const.h"
#include "IP.h"
#include "Packet.h"
#include "dpdk_rte.h"
class interface;
@ -246,6 +247,7 @@ class DPDKWorker : public Worker {
friend class DPDKServerSocketImpl<tcp4>;
};
using namespace dpdk;
class DPDKStack : public NetworkStack {
std::vector<std::function<void()> > funcs;
@ -254,13 +256,15 @@ class DPDKStack : public NetworkStack {
}
public:
explicit DPDKStack(CephContext *cct): NetworkStack(cct) {
explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) {
funcs.reserve(cct->_conf->ms_async_op_threads);
}
virtual bool support_local_listen_table() const override { return true; }
virtual void spawn_worker(std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
private:
dpdk::eal eal;
};
#endif

View File

@ -34,12 +34,6 @@ namespace dpdk {
return v;
}
bool eal::initialized = false;
std::thread eal::t;
std::mutex eal::lock;
std::condition_variable eal::cond;
std::list<std::function<void()>> eal::funcs;
static int bitcount(unsigned long long n)
{
return std::bitset<CHAR_BIT * sizeof(n)>{n}.count();
@ -75,31 +69,33 @@ namespace dpdk {
return count;
}
int eal::init(CephContext *c)
bool eal::rte_initialized = false;
int eal::start()
{
if (initialized) {
return 1;
}
bool done = false;
auto coremask = c->_conf.get_val<std::string>("ms_dpdk_coremask");
auto coremask = cct->_conf.get_val<std::string>("ms_dpdk_coremask");
int coremaskbit = coremask_bitcount(coremask.c_str());
if (coremaskbit <= 0
|| static_cast<uint64_t>(coremaskbit) < c->_conf->ms_async_op_threads)
|| static_cast<uint64_t>(coremaskbit) < cct->_conf->ms_async_op_threads)
return -EINVAL;
t = std::thread([&]() {
// TODO: Inherit these from the app parameters - "opts"
std::vector<std::vector<char>> args {
string2vector("ceph"),
string2vector("-c"), string2vector(c->_conf.get_val<std::string>("ms_dpdk_coremask")),
string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel),
string2vector("-c"), string2vector(cct->_conf.get_val<std::string>("ms_dpdk_coremask")),
string2vector("-n"), string2vector(cct->_conf->ms_dpdk_memory_channel),
};
std::optional<std::string> hugepages_path;
if (!c->_conf->ms_dpdk_hugepages.empty()) {
hugepages_path.emplace(c->_conf->ms_dpdk_hugepages);
if (!cct->_conf->ms_dpdk_hugepages.empty()) {
hugepages_path.emplace(cct->_conf->ms_dpdk_hugepages);
}
// If "hugepages" is not provided and DPDK PMD drivers mode is requested -
@ -123,13 +119,13 @@ namespace dpdk {
args.push_back(string2vector("-m"));
args.push_back(string2vector(size_MB_str.str()));
} else if (!c->_conf->ms_dpdk_pmd.empty()) {
} else if (!cct->_conf->ms_dpdk_pmd.empty()) {
args.push_back(string2vector("--no-huge"));
}
std::string rte_file_prefix;
rte_file_prefix = "rte_";
rte_file_prefix += c->_conf->name.to_str();
rte_file_prefix += cct->_conf->name.to_str();
args.push_back(string2vector("--file-prefix"));
args.push_back(string2vector(rte_file_prefix));
@ -138,27 +134,28 @@ namespace dpdk {
for (auto&& a: args) {
cargs.push_back(a.data());
}
/* initialise the EAL for all */
int ret = rte_eal_init(cargs.size(), cargs.data());
if (ret < 0)
return ret;
if (!rte_initialized) {
/* initialise the EAL for all */
int ret = rte_eal_init(cargs.size(), cargs.data());
if (ret < 0)
return;
rte_initialized = true;
}
std::unique_lock<std::mutex> l(lock);
initialized = true;
done = true;
cond.notify_all();
while (true) {
while (!stopped) {
cond.wait(l, [this] { return !funcs.empty() || stopped; });
if (!funcs.empty()) {
auto f = std::move(funcs.front());
funcs.pop_front();
f();
cond.notify_all();
} else {
cond.wait(l);
}
}
});
t.detach();
std::unique_lock<std::mutex> l(lock);
while (!done)
cond.wait(l);
@ -182,4 +179,13 @@ namespace dpdk {
return memsize;
}
void eal::stop()
{
assert(initialized);
assert(!stopped);
stopped = true;
cond.notify_all();
t.join();
}
} // namespace dpdk

View File

@ -46,12 +46,10 @@ namespace dpdk {
class eal {
public:
using cpuset = std::bitset<RTE_MAX_LCORE>;
static std::mutex lock;
static std::condition_variable cond;
static std::list<std::function<void()>> funcs;
static int init(CephContext *c);
static void execute_on_master(std::function<void()> &&f) {
explicit eal(CephContext *cct) : cct(cct) {}
int start();
void stop();
void execute_on_master(std::function<void()> &&f) {
bool done = false;
std::unique_lock<std::mutex> l(lock);
funcs.emplace_back([&]() { f(); done = true; });
@ -65,9 +63,16 @@ class eal {
*
* @return
*/
static size_t mem_size(int num_cpus);
static bool initialized;
static std::thread t;
size_t mem_size(int num_cpus);
static bool rte_initialized;
private:
CephContext *cct;
bool initialized = false;
bool stopped = false;
std::thread t;
std::mutex lock;
std::condition_variable cond;
std::list<std::function<void()>> funcs;
};
} // namespace dpdk