mirror of
https://github.com/ceph/ceph
synced 2025-02-23 11:07:35 +00:00
Merge pull request #14839 from chardan/jfw-wip-halflife_atomic_t-ebirah
rgw: migrate atomic_t to std::atomic<> (ebirah) Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
commit
1e0b919e51
@ -21,6 +21,11 @@
|
||||
#include "common/ceph_context.h"
|
||||
#include "common/valgrind.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
// re-include our assert to clobber the system one; fix dout:
|
||||
#include "include/assert.h"
|
||||
|
||||
struct RefCountedObject {
|
||||
private:
|
||||
mutable atomic_t nref;
|
||||
|
@ -1950,11 +1950,11 @@ int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time
|
||||
|
||||
bool RGWDataChangesLog::going_down()
|
||||
{
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
RGWDataChangesLog::~RGWDataChangesLog() {
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
renew_thread->stop();
|
||||
renew_thread->join();
|
||||
delete renew_thread;
|
||||
|
@ -404,7 +404,7 @@ class RGWDataChangesLog {
|
||||
RWLock modified_lock;
|
||||
map<int, set<string> > modified_shards;
|
||||
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
struct ChangeStatus {
|
||||
real_time cur_expiration;
|
||||
|
@ -1,10 +1,11 @@
|
||||
|
||||
|
||||
#include "common/ceph_json.h"
|
||||
|
||||
#include "rgw_coroutine.h"
|
||||
#include "rgw_boost_asio_yield.h"
|
||||
|
||||
// re-include our assert to clobber the system one; fix dout:
|
||||
#include "include/assert.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
@ -68,7 +69,7 @@ int RGWCompletionManager::get_next(void **user_info)
|
||||
Mutex::Locker l(lock);
|
||||
while (complete_reqs.empty()) {
|
||||
cond.Wait(lock);
|
||||
if (going_down.read() != 0) {
|
||||
if (going_down) {
|
||||
return -ECANCELED;
|
||||
}
|
||||
}
|
||||
@ -94,7 +95,7 @@ void RGWCompletionManager::go_down()
|
||||
for (auto cn : cns) {
|
||||
cn->unregister();
|
||||
}
|
||||
going_down.set(1);
|
||||
going_down = true;
|
||||
cond.Signal();
|
||||
}
|
||||
|
||||
@ -460,7 +461,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
|
||||
bool canceled = false; // set on going_down
|
||||
RGWCoroutinesEnv env;
|
||||
|
||||
uint64_t run_context = run_context_count.inc();
|
||||
uint64_t run_context = ++run_context_count;
|
||||
|
||||
lock.get_write();
|
||||
set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
|
||||
@ -475,7 +476,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
|
||||
env.manager = this;
|
||||
env.scheduled_stacks = &scheduled_stacks;
|
||||
|
||||
for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) {
|
||||
for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
|
||||
lock.get_write();
|
||||
|
||||
RGWCoroutinesStack *stack = *iter;
|
||||
@ -566,7 +567,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
|
||||
if (ret < 0) {
|
||||
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
|
||||
}
|
||||
if (going_down.read() > 0) {
|
||||
if (going_down) {
|
||||
ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
|
||||
ret = -ECANCELED;
|
||||
canceled = true;
|
||||
@ -585,7 +586,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
|
||||
}
|
||||
|
||||
lock.get_write();
|
||||
if (!context_stacks.empty() && !going_down.read()) {
|
||||
if (!context_stacks.empty() && !going_down) {
|
||||
JSONFormatter formatter(true);
|
||||
formatter.open_array_section("context_stacks");
|
||||
for (auto& s : context_stacks) {
|
||||
@ -595,7 +596,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
|
||||
lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
|
||||
formatter.flush(*_dout);
|
||||
*_dout << dendl;
|
||||
assert(context_stacks.empty() || going_down.read()); // assert on deadlock
|
||||
assert(context_stacks.empty() || going_down); // assert on deadlock
|
||||
}
|
||||
|
||||
for (auto stack : context_stacks) {
|
||||
|
@ -22,6 +22,8 @@
|
||||
#include "rgw_common.h"
|
||||
#include "rgw_boost_asio_coroutine.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define RGW_ASYNC_OPS_MGR_WINDOW 100
|
||||
|
||||
class RGWCoroutinesStack;
|
||||
@ -39,7 +41,7 @@ class RGWCompletionManager : public RefCountedObject {
|
||||
|
||||
SafeTimer timer;
|
||||
|
||||
atomic_t going_down;
|
||||
std::atomic<bool> going_down = { false };
|
||||
|
||||
map<void *, void *> waiters;
|
||||
|
||||
@ -506,9 +508,9 @@ public:
|
||||
|
||||
class RGWCoroutinesManager {
|
||||
CephContext *cct;
|
||||
atomic_t going_down;
|
||||
std::atomic<bool> going_down = { false };
|
||||
|
||||
atomic64_t run_context_count;
|
||||
std::atomic<int64_t> run_context_count = { 0 };
|
||||
map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
|
||||
|
||||
RWLock lock;
|
||||
@ -542,7 +544,8 @@ public:
|
||||
int run(list<RGWCoroutinesStack *>& ops);
|
||||
int run(RGWCoroutine *op);
|
||||
void stop() {
|
||||
if (going_down.inc() == 1) {
|
||||
bool expected = false;
|
||||
if (going_down.compare_exchange_strong(expected, true)) {
|
||||
completion_mgr->go_down();
|
||||
}
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ void RGWAsyncRadosProcessor::start() {
|
||||
}
|
||||
|
||||
void RGWAsyncRadosProcessor::stop() {
|
||||
going_down.set(1);
|
||||
going_down = true;
|
||||
m_tp.drain(&req_wq);
|
||||
m_tp.stop();
|
||||
for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
|
||||
@ -627,7 +627,7 @@ int RGWContinuousLeaseCR::operate()
|
||||
return set_cr_done();
|
||||
}
|
||||
reenter(this) {
|
||||
while (!going_down.read()) {
|
||||
while (!going_down) {
|
||||
yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));
|
||||
|
||||
caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include "common/WorkQueue.h"
|
||||
#include "common/Throttle.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
class RGWAsyncRadosRequest : public RefCountedObject {
|
||||
RGWCoroutine *caller;
|
||||
RGWAioCompletionNotifier *notifier;
|
||||
@ -57,7 +59,7 @@ public:
|
||||
|
||||
class RGWAsyncRadosProcessor {
|
||||
deque<RGWAsyncRadosRequest *> m_req_queue;
|
||||
atomic_t going_down;
|
||||
std::atomic<bool> going_down = { false };
|
||||
protected:
|
||||
RGWRados *store;
|
||||
ThreadPool m_tp;
|
||||
@ -91,7 +93,7 @@ public:
|
||||
void queue(RGWAsyncRadosRequest *req);
|
||||
|
||||
bool is_going_down() {
|
||||
return (going_down.read() != 0);
|
||||
return going_down;
|
||||
}
|
||||
};
|
||||
|
||||
@ -1014,7 +1016,7 @@ class RGWContinuousLeaseCR : public RGWCoroutine {
|
||||
int interval;
|
||||
|
||||
Mutex lock;
|
||||
atomic_t going_down;
|
||||
std::atomic<bool> going_down = { false };
|
||||
bool locked{false};
|
||||
|
||||
RGWCoroutine *caller;
|
||||
@ -1044,7 +1046,7 @@ public:
|
||||
}
|
||||
|
||||
void go_down() {
|
||||
going_down.set(1);
|
||||
going_down = true;
|
||||
wakeup();
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,8 @@
|
||||
#include "rgw_file.h"
|
||||
#include "rgw_lib_frontend.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
using namespace rgw;
|
||||
@ -37,7 +39,7 @@ namespace rgw {
|
||||
|
||||
const string RGWFileHandle::root_name = "/";
|
||||
|
||||
atomic<uint32_t> RGWLibFS::fs_inst_counter;
|
||||
std::atomic<uint32_t> RGWLibFS::fs_inst_counter;
|
||||
|
||||
uint32_t RGWLibFS::write_completion_interval_s = 10;
|
||||
|
||||
|
@ -745,7 +745,7 @@ namespace rgw {
|
||||
RGWUserInfo user;
|
||||
RGWAccessKey key; // XXXX acc_key
|
||||
|
||||
static atomic<uint32_t> fs_inst_counter;
|
||||
static std::atomic<uint32_t> fs_inst_counter;
|
||||
|
||||
static uint32_t write_completion_interval_s;
|
||||
std::string fsid;
|
||||
|
@ -253,7 +253,7 @@ int RGWGC::process()
|
||||
|
||||
bool RGWGC::going_down()
|
||||
{
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
void RGWGC::start_processor()
|
||||
@ -264,7 +264,7 @@ void RGWGC::start_processor()
|
||||
|
||||
void RGWGC::stop_processor()
|
||||
{
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
if (worker) {
|
||||
worker->stop();
|
||||
worker->join();
|
||||
|
@ -6,7 +6,6 @@
|
||||
|
||||
|
||||
#include "include/types.h"
|
||||
#include "include/atomic.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
#include "common/Mutex.h"
|
||||
#include "common/Cond.h"
|
||||
@ -15,12 +14,14 @@
|
||||
#include "rgw_rados.h"
|
||||
#include "cls/rgw/cls_rgw_types.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
class RGWGC {
|
||||
CephContext *cct;
|
||||
RGWRados *store;
|
||||
int max_objs;
|
||||
string *obj_names;
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
int tag_index(const string& tag);
|
||||
|
||||
|
@ -16,6 +16,8 @@
|
||||
|
||||
#include "rgw_coroutine.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
@ -24,7 +26,7 @@ struct rgw_http_req_data : public RefCountedObject {
|
||||
curl_slist *h;
|
||||
uint64_t id;
|
||||
int ret;
|
||||
atomic_t done;
|
||||
std::atomic<bool> done = { false };
|
||||
RGWHTTPClient *client;
|
||||
void *user_info;
|
||||
bool registered;
|
||||
@ -58,12 +60,12 @@ struct rgw_http_req_data : public RefCountedObject {
|
||||
|
||||
easy_handle = NULL;
|
||||
h = NULL;
|
||||
done.set(1);
|
||||
done = true;
|
||||
cond.Signal();
|
||||
}
|
||||
|
||||
bool is_done() {
|
||||
return done.read() != 0;
|
||||
return done;
|
||||
}
|
||||
|
||||
int get_retcode() {
|
||||
@ -900,14 +902,14 @@ int RGWHTTPManager::set_threaded()
|
||||
|
||||
void RGWHTTPManager::stop()
|
||||
{
|
||||
if (is_stopped.read()) {
|
||||
if (is_stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
is_stopped.set(1);
|
||||
is_stopped = true;
|
||||
|
||||
if (is_threaded) {
|
||||
going_down.set(1);
|
||||
going_down = true;
|
||||
signal_thread();
|
||||
reqs_thread->join();
|
||||
delete reqs_thread;
|
||||
@ -935,7 +937,7 @@ void *RGWHTTPManager::reqs_thread_entry()
|
||||
|
||||
ldout(cct, 20) << __func__ << ": start" << dendl;
|
||||
|
||||
while (!going_down.read()) {
|
||||
while (!going_down) {
|
||||
int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
|
||||
if (ret < 0) {
|
||||
dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
|
||||
|
@ -6,10 +6,11 @@
|
||||
|
||||
#include "common/RWLock.h"
|
||||
#include "common/Cond.h"
|
||||
#include "include/atomic.h"
|
||||
#include "rgw_common.h"
|
||||
#include "rgw_string.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
using param_pair_t = pair<string, string>;
|
||||
using param_vec_t = vector<param_pair_t>;
|
||||
|
||||
@ -33,7 +34,7 @@ class RGWHTTPClient
|
||||
string last_url;
|
||||
bool verify_ssl; // Do not validate self signed certificates, default to false
|
||||
|
||||
atomic_t stopped;
|
||||
std::atomic<unsigned> stopped { 0 };
|
||||
|
||||
protected:
|
||||
CephContext *cct;
|
||||
@ -219,8 +220,8 @@ class RGWHTTPManager {
|
||||
RGWCompletionManager *completion_mgr;
|
||||
void *multi_handle;
|
||||
bool is_threaded;
|
||||
atomic_t going_down;
|
||||
atomic_t is_stopped;
|
||||
std::atomic<unsigned> going_down { 0 };
|
||||
std::atomic<unsigned> is_stopped { 0 };
|
||||
|
||||
RWLock reqs_lock;
|
||||
map<uint64_t, rgw_http_req_data *> reqs;
|
||||
|
@ -613,7 +613,7 @@ int TokenCache::RevokeThread::check_revoked()
|
||||
|
||||
bool TokenCache::going_down() const
|
||||
{
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
void* TokenCache::RevokeThread::entry()
|
||||
|
@ -13,6 +13,8 @@
|
||||
#include "rgw_http_client.h"
|
||||
#include "common/Cond.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
int rgw_open_cms_envelope(CephContext *cct,
|
||||
const std::string& src,
|
||||
std::string& dst); /* out */
|
||||
@ -216,7 +218,7 @@ class TokenCache {
|
||||
list<string>::iterator lru_iter;
|
||||
};
|
||||
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
class RevokeThread : public Thread {
|
||||
friend class TokenCache;
|
||||
@ -271,7 +273,7 @@ class TokenCache {
|
||||
}
|
||||
|
||||
~TokenCache() {
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
|
||||
revocator.stop();
|
||||
revocator.join();
|
||||
|
@ -655,7 +655,7 @@ void RGWLC::start_processor()
|
||||
|
||||
void RGWLC::stop_processor()
|
||||
{
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
if (worker) {
|
||||
worker->stop();
|
||||
worker->join();
|
||||
@ -672,7 +672,7 @@ void RGWLC::LCWorker::stop()
|
||||
|
||||
bool RGWLC::going_down()
|
||||
{
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
bool RGWLC::LCWorker::should_work(utime_t& now)
|
||||
|
@ -9,7 +9,6 @@
|
||||
#include "common/debug.h"
|
||||
|
||||
#include "include/types.h"
|
||||
#include "include/atomic.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
#include "common/Mutex.h"
|
||||
#include "common/Cond.h"
|
||||
@ -19,6 +18,8 @@
|
||||
#include "rgw_multi.h"
|
||||
#include "cls/rgw/cls_rgw_types.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
using namespace std;
|
||||
#define HASH_PRIME 7877
|
||||
#define MAX_ID_LEN 255
|
||||
@ -227,7 +228,7 @@ class RGWLC {
|
||||
RGWRados *store;
|
||||
int max_objs;
|
||||
string *obj_names;
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
string cookie;
|
||||
|
||||
class LCWorker : public Thread {
|
||||
|
@ -13,6 +13,8 @@
|
||||
#include "rgw_loadgen.h"
|
||||
#include "rgw_client_io.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
extern void signal_shutdown();
|
||||
@ -37,7 +39,7 @@ void RGWLoadGenProcess::run()
|
||||
|
||||
vector<string> buckets(num_buckets);
|
||||
|
||||
atomic_t failed;
|
||||
std::atomic<long int> failed = { 0 };
|
||||
|
||||
for (i = 0; i < num_buckets; i++) {
|
||||
buckets[i] = "/loadgen";
|
||||
@ -51,7 +53,7 @@ void RGWLoadGenProcess::run()
|
||||
|
||||
string *objs = new string[num_objs];
|
||||
|
||||
if (failed.read()) {
|
||||
if (failed) {
|
||||
derr << "ERROR: bucket creation failed" << dendl;
|
||||
goto done;
|
||||
}
|
||||
@ -69,7 +71,7 @@ void RGWLoadGenProcess::run()
|
||||
|
||||
checkpoint();
|
||||
|
||||
if (failed.read()) {
|
||||
if (failed) {
|
||||
derr << "ERROR: bucket creation failed" << dendl;
|
||||
goto done;
|
||||
}
|
||||
@ -102,7 +104,7 @@ done:
|
||||
|
||||
void RGWLoadGenProcess::gen_request(const string& method,
|
||||
const string& resource,
|
||||
int content_length, atomic_t* fail_flag)
|
||||
int content_length, std::atomic<long int>* fail_flag)
|
||||
{
|
||||
RGWLoadGenRequest* req =
|
||||
new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
|
||||
@ -138,7 +140,7 @@ void RGWLoadGenProcess::handle_request(RGWRequest* r)
|
||||
dout(20) << "process_request() returned " << ret << dendl;
|
||||
|
||||
if (req->fail_flag) {
|
||||
req->fail_flag->inc();
|
||||
req->fail_flag++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,6 +60,7 @@
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
|
||||
#include "include/types.h"
|
||||
#include "common/BackTrace.h"
|
||||
@ -77,11 +78,11 @@ static sig_t sighandler_alrm;
|
||||
class RGWProcess;
|
||||
|
||||
static int signal_fd[2] = {0, 0};
|
||||
static atomic_t disable_signal_fd;
|
||||
static std::atomic<int64_t> disable_signal_fd = { 0 };
|
||||
|
||||
void signal_shutdown()
|
||||
{
|
||||
if (!disable_signal_fd.read()) {
|
||||
if (!disable_signal_fd) {
|
||||
int val = 0;
|
||||
int ret = write(signal_fd[0], (char *)&val, sizeof(val));
|
||||
if (ret < 0) {
|
||||
|
@ -230,7 +230,7 @@ bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
|
||||
|
||||
bool RGWObjectExpirer::going_down()
|
||||
{
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
void RGWObjectExpirer::start_processor()
|
||||
@ -241,7 +241,7 @@ void RGWObjectExpirer::start_processor()
|
||||
|
||||
void RGWObjectExpirer::stop_processor()
|
||||
{
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
if (worker) {
|
||||
worker->stop();
|
||||
worker->join();
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <atomic>
|
||||
|
||||
#include "auth/Crypto.h"
|
||||
|
||||
@ -37,6 +38,8 @@
|
||||
#include "rgw_usage.h"
|
||||
#include "rgw_replica_log.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
class RGWObjectExpirer {
|
||||
protected:
|
||||
RGWRados *store;
|
||||
@ -65,7 +68,7 @@ protected:
|
||||
};
|
||||
|
||||
OEWorker *worker;
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
public:
|
||||
explicit RGWObjectExpirer(RGWRados *_store)
|
||||
|
@ -17,6 +17,8 @@
|
||||
#include "common/WorkQueue.h"
|
||||
#include "common/Throttle.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#if !defined(dout_subsys)
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
#define def_dout_subsys
|
||||
@ -182,7 +184,7 @@ public:
|
||||
void checkpoint();
|
||||
void handle_request(RGWRequest* req) override;
|
||||
void gen_request(const string& method, const string& resource,
|
||||
int content_length, atomic_t* fail_flag);
|
||||
int content_length, std::atomic<int64_t>* fail_flag);
|
||||
|
||||
void set_access_key(RGWAccessKey& key) { access_key = key; }
|
||||
};
|
||||
|
@ -26,6 +26,8 @@
|
||||
#include "rgw_bucket.h"
|
||||
#include "rgw_user.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
@ -410,7 +412,7 @@ void UserAsyncRefreshHandler::handle_response(int r)
|
||||
}
|
||||
|
||||
class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
RWLock rwlock;
|
||||
map<rgw_bucket, rgw_user> modified_buckets;
|
||||
|
||||
@ -569,11 +571,11 @@ public:
|
||||
}
|
||||
|
||||
bool going_down() {
|
||||
return (down_flag.read() != 0);
|
||||
return down_flag;
|
||||
}
|
||||
|
||||
void stop() {
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
rwlock.get_write();
|
||||
stop_thread(&buckets_sync_thread);
|
||||
rwlock.unlock();
|
||||
|
@ -15,11 +15,11 @@
|
||||
#ifndef CEPH_RGW_QUOTA_H
|
||||
#define CEPH_RGW_QUOTA_H
|
||||
|
||||
|
||||
#include "include/utime.h"
|
||||
#include "include/atomic.h"
|
||||
#include "common/lru_map.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
static inline int64_t rgw_rounded_kb(int64_t bytes)
|
||||
{
|
||||
return (bytes + 1023) / 1024;
|
||||
|
@ -55,6 +55,7 @@ using namespace librados;
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include "auth/Crypto.h" // get_random_bytes()
|
||||
@ -71,6 +72,8 @@ using namespace librados;
|
||||
|
||||
#include "compressor/Compressor.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_rgw
|
||||
|
||||
@ -2958,7 +2961,7 @@ protected:
|
||||
CephContext *cct;
|
||||
RGWRados *store;
|
||||
|
||||
atomic_t down_flag;
|
||||
std::atomic<bool> down_flag = { false };
|
||||
|
||||
string thread_name;
|
||||
|
||||
@ -2974,7 +2977,8 @@ public:
|
||||
virtual int init() { return 0; }
|
||||
virtual int process() = 0;
|
||||
|
||||
bool going_down() { return down_flag.read() != 0; }
|
||||
bool going_down() { return down_flag; }
|
||||
|
||||
void start();
|
||||
void stop();
|
||||
};
|
||||
@ -2987,7 +2991,7 @@ void RGWRadosThread::start()
|
||||
|
||||
void RGWRadosThread::stop()
|
||||
{
|
||||
down_flag.set(1);
|
||||
down_flag = true;
|
||||
stop_process();
|
||||
if (worker) {
|
||||
worker->stop();
|
||||
@ -5213,14 +5217,14 @@ int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *
|
||||
if (delim_pos >= 0) {
|
||||
string s = cur_marker.name.substr(0, delim_pos);
|
||||
s.append(bigger_than_delim);
|
||||
cur_marker.set(s);
|
||||
cur_marker = s;
|
||||
}
|
||||
}
|
||||
|
||||
string skip_after_delim;
|
||||
while (truncated && count <= max) {
|
||||
if (skip_after_delim > cur_marker.name) {
|
||||
cur_marker.set(skip_after_delim);
|
||||
cur_marker = skip_after_delim;
|
||||
ldout(cct, 20) << "setting cur_marker=" << cur_marker.name << "[" << cur_marker.instance << "]" << dendl;
|
||||
}
|
||||
std::map<string, rgw_bucket_dir_entry> ent_map;
|
||||
@ -9794,8 +9798,8 @@ struct get_obj_data : public RefCountedObject {
|
||||
Mutex data_lock;
|
||||
list<get_obj_aio_data> aio_data;
|
||||
RGWGetDataCB *client_cb;
|
||||
atomic_t cancelled;
|
||||
atomic_t err_code;
|
||||
std::atomic<bool> cancelled = { false };
|
||||
std::atomic<int64_t> err_code = { 0 };
|
||||
Throttle throttle;
|
||||
list<bufferlist> read_list;
|
||||
|
||||
@ -9807,16 +9811,16 @@ struct get_obj_data : public RefCountedObject {
|
||||
throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
|
||||
~get_obj_data() override { }
|
||||
void set_cancelled(int r) {
|
||||
cancelled.set(1);
|
||||
err_code.set(r);
|
||||
cancelled = true;
|
||||
err_code = r;
|
||||
}
|
||||
|
||||
bool is_cancelled() {
|
||||
return cancelled.read() == 1;
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
int get_err_code() {
|
||||
return err_code.read();
|
||||
return err_code;
|
||||
}
|
||||
|
||||
int wait_next_io(bool *done) {
|
||||
@ -11660,7 +11664,7 @@ int RGWRados::pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<rgw_bucket_
|
||||
if (filter && !filter->filter(oid, oid))
|
||||
continue;
|
||||
|
||||
e.key.set(oid);
|
||||
e.key = oid;
|
||||
objs.push_back(e);
|
||||
}
|
||||
|
||||
|
@ -2199,7 +2199,7 @@ class RGWRados
|
||||
|
||||
void get_bucket_instance_ids(const RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
|
||||
|
||||
atomic64_t max_req_id;
|
||||
std::atomic<int64_t> max_req_id = { 0 };
|
||||
Mutex lock;
|
||||
Mutex watchers_lock;
|
||||
SafeTimer *timer;
|
||||
@ -2298,7 +2298,7 @@ protected:
|
||||
|
||||
RGWPeriod current_period;
|
||||
public:
|
||||
RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
|
||||
RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
|
||||
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
|
||||
run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
|
||||
data_notifier(NULL), meta_sync_processor_thread(NULL),
|
||||
@ -2320,7 +2320,7 @@ public:
|
||||
meta_mgr(NULL), data_log(NULL) {}
|
||||
|
||||
uint64_t get_new_req_id() {
|
||||
return max_req_id.inc();
|
||||
return ++max_req_id;
|
||||
}
|
||||
|
||||
librados::IoCtx* get_lc_pool_ctx() {
|
||||
|
@ -12,8 +12,11 @@
|
||||
#if defined(WITH_RADOSGW_FCGI_FRONTEND)
|
||||
#include "rgw_fcgi.h"
|
||||
#endif
|
||||
|
||||
#include "common/QueueRing.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
struct RGWRequest
|
||||
{
|
||||
uint64_t id;
|
||||
@ -56,10 +59,10 @@ struct RGWLoadGenRequest : public RGWRequest {
|
||||
string method;
|
||||
string resource;
|
||||
int content_length;
|
||||
atomic_t* fail_flag;
|
||||
std::atomic<int64_t>* fail_flag = nullptr;
|
||||
|
||||
RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
|
||||
atomic_t *ff)
|
||||
std::atomic<int64_t> *ff)
|
||||
: RGWRequest(req_id), method(_m), resource(_r), content_length(_cl),
|
||||
fail_flag(ff) {}
|
||||
};
|
||||
|
@ -26,7 +26,7 @@ int RGWRESTConn::get_url(string& endpoint)
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
int i = counter.inc();
|
||||
int i = ++counter;
|
||||
endpoint = endpoints[i % endpoints.size()];
|
||||
|
||||
return 0;
|
||||
@ -40,7 +40,7 @@ string RGWRESTConn::get_url()
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
int i = counter.inc();
|
||||
int i = ++counter;
|
||||
endpoint = endpoints[i % endpoints.size()];
|
||||
|
||||
return endpoint;
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "common/ceph_json.h"
|
||||
#include "common/RefCountedObj.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
class CephContext;
|
||||
class RGWRados;
|
||||
@ -55,7 +56,7 @@ class RGWRESTConn
|
||||
RGWAccessKey key;
|
||||
string self_zone_group;
|
||||
string remote_id;
|
||||
atomic_t counter;
|
||||
std::atomic<int64_t> counter = { 0 };
|
||||
|
||||
public:
|
||||
|
||||
|
@ -52,7 +52,7 @@ RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const string& source_zone, const
|
||||
::encode(info, bl);
|
||||
store->time_log_prepare_entry(entry, real_clock::now(), section, name, bl);
|
||||
|
||||
uint32_t shard_id = counter.inc() % num_shards;
|
||||
uint32_t shard_id = ++counter % num_shards;
|
||||
|
||||
|
||||
return new RGWRadosTimelogAddCR(store, oids[shard_id], entry);
|
||||
@ -290,7 +290,7 @@ int RGWRemoteMetaLog::init()
|
||||
|
||||
void RGWRemoteMetaLog::finish()
|
||||
{
|
||||
going_down.set(1);
|
||||
going_down = true;
|
||||
stop();
|
||||
}
|
||||
|
||||
@ -1992,7 +1992,7 @@ int RGWRemoteMetaLog::run_sync()
|
||||
// get shard count and oldest log period from master
|
||||
rgw_mdlog_info mdlog_info;
|
||||
for (;;) {
|
||||
if (going_down.read()) {
|
||||
if (going_down) {
|
||||
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
|
||||
return 0;
|
||||
}
|
||||
@ -2013,7 +2013,7 @@ int RGWRemoteMetaLog::run_sync()
|
||||
|
||||
rgw_meta_sync_status sync_status;
|
||||
do {
|
||||
if (going_down.read()) {
|
||||
if (going_down) {
|
||||
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
|
||||
return 0;
|
||||
}
|
||||
@ -2112,7 +2112,7 @@ int RGWRemoteMetaLog::run_sync()
|
||||
ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
|
||||
return -EIO;
|
||||
}
|
||||
} while (!going_down.read());
|
||||
} while (!going_down);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -8,6 +8,8 @@
|
||||
#include "include/stringify.h"
|
||||
#include "common/RWLock.h"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#define ERROR_LOGGER_SHARDS 32
|
||||
#define RGW_SYNC_ERROR_LOG_SHARD_PREFIX "sync.error-log"
|
||||
|
||||
@ -65,7 +67,7 @@ class RGWSyncErrorLogger {
|
||||
vector<string> oids;
|
||||
int num_shards;
|
||||
|
||||
atomic_t counter;
|
||||
std::atomic<int64_t> counter = { 0 };
|
||||
public:
|
||||
RGWSyncErrorLogger(RGWRados *_store, const string &oid_prefix, int _num_shards);
|
||||
RGWCoroutine *log_error_cr(const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message);
|
||||
@ -195,7 +197,7 @@ class RGWRemoteMetaLog : public RGWCoroutinesManager {
|
||||
void init_sync_env(RGWMetaSyncEnv *env);
|
||||
int store_sync_info(const rgw_meta_sync_info& sync_info);
|
||||
|
||||
atomic_t going_down;
|
||||
std::atomic<bool> going_down = { false };
|
||||
|
||||
public:
|
||||
RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
|
||||
|
Loading…
Reference in New Issue
Block a user