Merge PR #39191 into master

* refs/pull/39191/head:
	pybind/mgr/snap_schedule: use ceph VFS
	pybind/mgr/snap_schedule: idempotentize table creation
	mgr: add ceph sqlite VFS
	doc: add libcephsqlite
	ceph.spec,debian: package libcephsqlite
	test/libcephsqlite,qa: add tests for libcephsqlite
	libcephsqlite: rework architecture and backend
	SimpleRADOSStriper: wait for finished aios after write
	SimpleRADOSStriper: add new minimal async striper
	mon: define simple-rados-client-with-blocklist profile
	librados: define must renew lock flag
	common: add timeval conversion for durations
	Revert "libradosstriper: add function to read into char*"
	test_libcephsqlite: test random inserts
	cephsqlite: fix compiler errors
	cmake: improve build inst for cephsqlite
	libcephsqlite: sqlite interface to RADOS
	libradosstriper: add function to read into char*

Reviewed-by: Neha Ojha <nojha@redhat.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Patrick Donnelly 2021-03-22 10:06:08 -07:00
commit 6ad5901422
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
33 changed files with 3759 additions and 38 deletions

View File

@ -281,6 +281,11 @@ endif(WITH_QATZIP)
# needs mds and? XXX
option(WITH_LIBCEPHFS "libcephfs client library" ON)
option(WITH_LIBCEPHSQLITE "libcephsqlite client library" ON)
if(WITH_LIBCEPHSQLITE)
find_package(SQLite3 REQUIRED)
endif()
# key-value store
option(WITH_KVS "Key value store is here" ON)

View File

@ -1,3 +1,14 @@
>=17.0.0
* A new library is available, libcephsqlite. It provides a SQLite Virtual File
System (VFS) on top of RADOS. The database and journals are striped over
RADOS across multiple objects for virtually unlimited scaling and throughput
only limited by the SQLite client. Applications using SQLite may change to
the Ceph VFS with minimal changes, usually just by specifying the alternate
VFS. We expect the library to be most impactful and useful for applications
that were storing state in RADOS omap, especially without striping which
limits scalability.
>=16.0.0
--------
* mgr-pg_autoscaler: Autoscaler will now start out by scaling each

View File

@ -230,6 +230,7 @@ BuildRequires: procps
BuildRequires: python%{python3_pkgversion}
BuildRequires: python%{python3_pkgversion}-devel
BuildRequires: snappy-devel
BuildRequires: sqlite-devel
BuildRequires: sudo
BuildRequires: pkgconfig(udev)
BuildRequires: util-linux
@ -554,6 +555,7 @@ Group: System/Filesystems
%endif
Requires: ceph-base = %{_epoch_prefix}%{version}-%{release}
Requires: ceph-mgr-modules-core = %{_epoch_prefix}%{version}-%{release}
Requires: libcephsqlite = %{_epoch_prefix}%{version}-%{release}
%if 0%{?weak_deps}
Recommends: ceph-mgr-dashboard = %{_epoch_prefix}%{version}-%{release}
Recommends: ceph-mgr-diskprediction-local = %{_epoch_prefix}%{version}-%{release}
@ -922,6 +924,33 @@ Obsoletes: python-rados < %{_epoch_prefix}%{version}-%{release}
This package contains Python 3 libraries for interacting with Ceph RADOS
object store.
%package -n libcephsqlite
Summary: SQLite3 VFS for Ceph
%if 0%{?suse_version}
Group: System/Libraries
%endif
Requires: librados2 = %{_epoch_prefix}%{version}-%{release}
Requires: sqlite-libs
%description -n libcephsqlite
A SQLite3 VFS for storing and manipulating databases stored on Ceph's RADOS
distributed object store.
%package -n libcephsqlite-devel
Summary: SQLite3 VFS for Ceph headers
%if 0%{?suse_version}
Group: Development/Libraries/C and C++
%endif
Requires: sqlite-devel
Requires: libcephsqlite = %{_epoch_prefix}%{version}-%{release}
Requires: librados-devel = %{_epoch_prefix}%{version}-%{release}
Requires: libradospp-devel = %{_epoch_prefix}%{version}-%{release}
Obsoletes: ceph-devel < %{_epoch_prefix}%{version}-%{release}
Provides: libcephsqlite-devel = %{_epoch_prefix}%{version}-%{release}
Obsoletes: libcephsqlite-devel < %{_epoch_prefix}%{version}-%{release}
%description -n libcephsqlite-devel
A SQLite3 VFS for storing and manipulating databases stored on Ceph's RADOS
distributed object store.
%if 0%{with libradosstriper}
%package -n libradosstriper1
Summary: RADOS striping interface
@ -2165,6 +2194,16 @@ fi
%{python3_sitearch}/rados.cpython*.so
%{python3_sitearch}/rados-*.egg-info
%files -n libcephsqlite
%{_libdir}/libcephsqlite.so
%post -n libcephsqlite -p /sbin/ldconfig
%postun -n libcephsqlite -p /sbin/ldconfig
%files -n libcephsqlite-devel
%{_includedir}/libcephsqlite.h
%if 0%{with libradosstriper}
%files -n libradosstriper1
%{_libdir}/libradosstriper.so.*

View File

@ -0,0 +1,12 @@
find_path(SQLite3_INCLUDE_DIR NAMES sqlite3.h)
find_library(SQLite3_LIBRARY NAMES sqlite3 sqlite)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(SQLite3 DEFAULT_MSG SQLite3_LIBRARY SQLite3_INCLUDE_DIR)
if(NOT TARGET SQLite3::SQLite3)
add_library(SQLite3::SQLite3 UNKNOWN IMPORTED)
set_target_properties(SQLite3::SQLite3 PROPERTIES
IMPORTED_LOCATION "${SQLite3_LIBRARY}"
INTERFACE_INCLUDE_DIRECTORIES "${SQLite3_INCLUDE_DIR}")
endif()

39
debian/control vendored
View File

@ -60,6 +60,7 @@ Build-Depends: automake,
# Crimson libprotobuf-dev,
# Crimson libsctp-dev,
libsnappy-dev,
libsqlite3-dev,
libssl-dev,
libtool,
libudev-dev,
@ -226,6 +227,7 @@ Package: ceph-mgr
Architecture: linux-any
Depends: ceph-base (= ${binary:Version}),
ceph-mgr-modules-core (= ${binary:Version}),
libsqlite3-mod-ceph,
python3-bcrypt,
python3-cherrypy3,
python3-distutils,
@ -736,6 +738,43 @@ Description: RADOS distributed object store client C++ library (development file
This package contains development files needed for building C++ applications that
link against librados.
Package: libsqlite3-mod-ceph
Architecture: any
Section: libs
Depends: ${misc:Depends},
${shlibs:Depends},
Description: SQLite3 VFS for Ceph
A SQLite3 VFS for storing and manipulating databases stored on Ceph's RADOS
distributed object store.
.
This packages contains the loadable extension module for SQLite3.
Package: libsqlite3-mod-ceph-dbg
Architecture: any
Section: debug
Priority: extra
Depends: libsqlite3-mod-ceph (= ${binary:Version}),
libsqlite3-0-dbgsym
${misc:Depends},
Description: debugging symbols for libsqlite3-mod-ceph
A SQLite3 VFS for storing and manipulating databases stored on Ceph's RADOS
distributed object store.
.
This package contains debugging symbols for libsqlite3-mod-ceph.
Package: libsqlite3-mod-ceph-dev
Architecture: any
Section: libdevel
Depends: libsqlite3-mod-ceph (= ${binary:Version}),
libsqlite3-dev,
${misc:Depends},
Description: SQLite3 VFS for Ceph (development files)
A SQLite3 VFS for storing and manipulating databases stored on Ceph's RADOS
distributed object store.
.
This package contains development files needed for building applications that
link against libsqlite3-mod-ceph.
Package: libradosstriper1
Architecture: linux-any
Section: libs

View File

@ -0,0 +1 @@
usr/include/libcephsqlite.h

1
debian/libsqlite3-mod-ceph.install vendored Normal file
View File

@ -0,0 +1 @@
usr/lib/libcephsqlite.so

1
debian/rules vendored
View File

@ -107,6 +107,7 @@ override_dh_strip:
dh_strip -prbd-nbd --dbg-package=rbd-nbd-dbg
dh_strip -pceph-common --dbg-package=ceph-common-dbg
dh_strip -plibrados2 --dbg-package=librados2-dbg
dh_strip -plibsqlite3-mod-ceph --dbg-package=libsqlite3-mod-ceph-dbg
dh_strip -plibradosstriper1 --dbg-package=libradosstriper1-dbg
dh_strip -plibrbd1 --dbg-package=librbd1-dbg
dh_strip -plibcephfs2 --dbg-package=libcephfs2-dbg

View File

@ -21,4 +21,5 @@ Ceph, your own interface to Ceph, etc.).
librados (C) <librados>
librados (C++) <libradospp>
librados (Python) <python>
libcephsqlite (SQLite) <libcephsqlite>
object class <objclass-sdk>

View File

@ -0,0 +1,438 @@
.. _libcephsqlite:
================
Ceph SQLite VFS
================
This `SQLite VFS`_ may be used for storing and accessing a `SQLite`_ database
backed by RADOS. This allows you to fully decentralize your database using
Ceph's object store for improved availability, accessibility, and use of
storage.
Note what this is not: a distributed SQL engine. SQLite on RADOS can be thought
of like RBD as compared to CephFS: RBD puts a disk image on RADOS for the
purposes of exclusive access by a machine and generally does not allow parallel
access by other machines; on the other hand, CephFS allows fully distributed
access to a file system from many client mounts. SQLite on RADOS is meant to be
accessed by a single SQLite client database connection at a given time. The
database may be manipulated safely by multiple clients only in a serial fashion
controlled by RADOS locks managed by the Ceph SQLite VFS.
Usage
^^^^^
Normal unmodified applications (including the sqlite command-line toolset
binary) may load the *ceph* VFS using the `SQLite Extension Loading API`_.
.. code:: sql
.LOAD libcephsqlite.so
or during the invocation of ``sqlite3``
.. code:: sh
sqlite3 -cmd '.load libcephsqlite.so'
A database file is formatted as a SQLite URI::
file:///<"*"poolid|poolname>:[namespace]/<dbname>?vfs=ceph
The RADOS ``namespace`` is optional. Note the triple ``///`` in the path. The URI
authority must be empty or localhost in SQLite. Only the path part of the URI
is parsed. For this reason, the URI will not parse properly if you only use two
``//``.
A complete example of (optionally) creating a database and opening:
.. code:: sh
sqlite3 -cmd '.load libcephsqlite.so' -cmd '.open file:///foo:bar/baz.db?vfs=ceph'
Note you cannot specify the database file as the normal positional argument to
``sqlite3``. This is because the ``.load libcephsqlite.so`` command is applied
after opening the database, but opening the database depends on the extension
being loaded first.
An example passing the pool integer id and no RADOS namespace:
.. code:: sh
sqlite3 -cmd '.load libcephsqlite.so' -cmd '.open file:///*2:/baz.db?vfs=ceph'
Like other Ceph tools, the *ceph* VFS looks at some environment variables that
help with configuring which Ceph cluster to communicate with and which
credential to use. Here would be a typical configuration:
.. code:: sh
export CEPH_CONF=/path/to/ceph.conf
export CEPH_KEYRING=/path/to/ceph.keyring
export CEPH_ARGS='--id myclientid'
./runmyapp
# or
sqlite3 -cmd '.load libcephsqlite.so' -cmd '.open file:///foo:bar/baz.db?vfs=ceph'
The default operation would look at the standard Ceph configuration file path
using the ``client.admin`` user.
User
^^^^
The *ceph* VFS requires a user credential with read access to the monitors, the
ability to blocklist dead clients of the database, and access to the OSDs
hosting the database. This can be done with authorizations as simply as:
.. code:: sh
ceph auth get-or-create client.X mon 'allow r, allow command "osd blocklist" with blocklistop=add' osd 'allow rwx'
.. note:: The terminology change from ``blacklist`` to ``blocklist``; older clusters may require using the old terms.
You may also simplify using the ``simple-rados-client-with-blocklist`` profile:
.. code:: sh
ceph auth get-or-create client.X mon 'profile simple-rados-client-with-blocklist' osd 'allow rwx'
To learn why blocklisting is necessary, see :ref:`libcephsqlite-corrupt`.
Page Size
^^^^^^^^^
SQLite allows configuring the page size prior to creating a new database. It is
advisable to increase this config to 65536 (64K) when using RADOS backed
databases to reduce the number of OSD reads/writes and thereby improve
throughput and latency.
.. code:: sql
PRAGMA page_size = 65536
You may also try other values according to your application needs but note that
64K is the max imposed by SQLite.
Cache
^^^^^
The ceph VFS does not do any caching of reads or buffering of writes. Instead,
and more appropriately, the SQLite page cache is used. You may find it is too small
for most workloads and should therefore increase it significantly:
.. code:: sql
PRAGMA cache_size = 4096
Which will cache 4096 pages or 256MB (with 64K ``page_cache``).
Journal Persistence
^^^^^^^^^^^^^^^^^^^
By default, SQLite deletes the journal for every transaction. This can be
expensive as the *ceph* VFS must delete every object backing the journal for each
transaction. For this reason, it is much faster and simpler to ask SQLite to
**persist** the journal. In this mode, SQLite will invalidate the journal via a
write to its header. This is done as:
.. code:: sql
PRAGMA journal_mode = PERSIST
The cost of this may be increased unused space according to the high-water size
of the rollback journal (based on transaction type and size).
Exclusive Lock Mode
^^^^^^^^^^^^^^^^^^^
SQLite operates in a ``NORMAL`` locking mode where each transaction requires
locking the backing database file. This can add unnecessary overhead to
transactions when you know there's only ever one user of the database at a
given time. You can have SQLite lock the database once for the duration of the
connection using:
.. code:: sql
PRAGMA locking_mode = EXCLUSIVE
This can more than **halve** the time taken to perform a transaction. Keep in
mind this prevents other clients from accessing the database.
In this locking mode, each write transaction to the database requires 3
synchronization events: once to write to the journal, another to write to the
database file, and a final write to invalidate the journal header (in
``PERSIST`` journaling mode).
WAL Journal
^^^^^^^^^^^
The `WAL Journal Mode`_ is only available when SQLite is operating in exclusive
lock mode. This is because it requires shared memory communication with other
readers and writers when in the ``NORMAL`` locking mode.
As with local disk databases, WAL mode may significantly reduce small
transaction latency. Testing has shown it can provide more than 50% speedup
over persisted rollback journals in exclusive locking mode. You can expect
around 150-250 transactions per second depending on size.
Performance Notes
^^^^^^^^^^^^^^^^^
The filing backend for the database on RADOS is asynchronous as much as
possible. Still, performance can be anywhere from 3x-10x slower than a local
database on SSD. Latency can be a major factor. It is advisable to be familiar
with SQL transactions and other strategies for efficient database updates.
Depending on the performance of the underlying pool, you can expect small
transactions to take up to 30 milliseconds to complete. If you use the
``EXCLUSIVE`` locking mode, it can be reduced further to 15 milliseconds per
transaction. A WAL journal in ``EXCLUSIVE`` locking mode can further reduce
this as low as ~2-5 milliseconds (or the time to complete a RADOS write; you
won't get better than that!).
There is no limit to the size of a SQLite database on RADOS imposed by the Ceph
VFS. There are standard `SQLite Limits`_ to be aware of, notably the maximum
database size of 281 TB. Large databases may or may not be performant on Ceph.
Experimentation for your own use-case is advised.
Be aware that read-heavy queries could take significant amounts of time as
reads are necessarily synchronous (due to the VFS API). No readahead is yet
performed by the VFS.
Recommended Use-Cases
^^^^^^^^^^^^^^^^^^^^^
The original purpose of this module was to support saving relational or large
data in RADOS which needs to span multiple objects. Many current applications
with trivial state try to use RADOS omap storage on a single object but this
cannot scale without striping data across multiple objects. Unfortunately, it
is non-trivial to design a store spanning multiple objects which is consistent
and also simple to use. SQLite can be used to bridge that gap.
Parallel Access
^^^^^^^^^^^^^^^
The VFS does not yet support concurrent readers. All database access is protected
by a single exclusive lock.
Export or Extract Database out of RADOS
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The database is striped on RADOS and can be extracted using the RADOS cli toolset.
.. code:: sh
rados --pool=foo --striper get bar.db local-bar.db
rados --pool=foo --striper get bar.db-journal local-bar.db-journal
sqlite3 local-bar.db ...
Keep in mind the rollback journal is also striped and will need to be extracted
as well if the database was in the middle of a transaction. If you're using
WAL, that journal will need to be extracted as well.
Keep in mind that extracting the database using the striper uses the same RADOS
locks as those used by the *ceph* VFS. However, the journal file locks are not
used by the *ceph* VFS (SQLite only locks the main database file) so there is a
potential race with other SQLite clients when extracting both files. That could
result in fetching a corrupt journal.
Instead of manually extracting the files, it would be more advisable to use the
`SQLite Backup`_ mechanism instead.
Temporary Tables
^^^^^^^^^^^^^^^^
Temporary tables backed by the ceph VFS are not supported. The main reason for
this is that the VFS lacks context about where it should put the database, i.e.
which RADOS pool. The persistent database associated with the temporary
database is not communicated via the SQLite VFS API.
Instead, it's suggested to attach a secondary local or `In-Memory Database`_
and put the temporary tables there. Alternatively, you may set a connection
pragma:
.. code:: sql
PRAGMA temp_store=memory
.. _libcephsqlite-breaking-locks:
Breaking Locks
^^^^^^^^^^^^^^
Access to the database file is protected by an exclusive lock on the first
object stripe of the database. If the application fails without unlocking the
database (e.g. a segmentation fault), the lock is not automatically unlocked,
even if the client connection is blocklisted afterward. Eventually, the lock
will timeout subject to the configurations::
cephsqlite_lock_renewal_timeout = 30000
The timeout is in milliseconds. Once the timeout is reached, the OSD will
expire the lock and allow clients to relock. When this occurs, the database
will be recovered by SQLite and the in-progress transaction rolled back. The
new client recovering the database will also blocklist the old client to
prevent potential database corruption from rogue writes.
The holder of the exclusive lock on the database will periodically renew the
lock so it does not lose the lock. This is necessary for large transactions or
database connections operating in ``EXCLUSIVE`` locking mode. The lock renewal
interval is adjustable via::
cephsqlite_lock_renewal_interval = 2000
This configuration is also in units of milliseconds.
It is possible to break the lock early if you know the client is gone for good
(e.g. blocklisted). This allows restoring database access to clients
immediately. For example:
.. code:: sh
$ rados --pool=foo --namespace bar lock info baz.db.0000000000000000 striper.lock
{"name":"striper.lock","type":"exclusive","tag":"","lockers":[{"name":"client.4463","cookie":"555c7208-db39-48e8-a4d7-3ba92433a41a","description":"SimpleRADOSStriper","expiration":"0.000000","addr":"127.0.0.1:0/1831418345"}]}
$ rados --pool=foo --namespace bar lock break baz.db.0000000000000000 striper.lock client.4463 --lock-cookie 555c7208-db39-48e8-a4d7-3ba92433a41a
.. _libcephsqlite-corrupt:
How to Corrupt Your Database
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There is the usual reading on `How to Corrupt Your SQLite Database`_ that you
should review before using this tool. To add to that, the most likely way you
may corrupt your database is by a rogue process transiently losing network
connectivity and then resuming its work. The exclusive RADOS lock it held will
be lost but it cannot know that immediately. Any work it might do after
regaining network connectivity could corrupt the database.
The *ceph* VFS library defaults do not allow for this scenario to occur. The Ceph
VFS will blocklist the last owner of the exclusive lock on the database if it
detects incomplete cleanup.
By blocklisting the old client, it's no longer possible for the old client to
resume its work on the database when it returns (subject to blocklist
expiration, 3600 seconds by default). To turn off blocklisting the prior client, change::
cephsqlite_blocklist_dead_locker = false
Do NOT do this unless you know database corruption cannot result due to other
guarantees. If this config is true (the default), the *ceph* VFS will cowardly
fail if it cannot blocklist the prior instance (due to lack of authorization,
for example).
One example where out-of-band mechanisms exist to blocklist the last dead
holder of the exclusive lock on the database is in the ``ceph-mgr``. The
monitors are made aware of the RADOS connection used for the *ceph* VFS and will
blocklist the instance during ``ceph-mgr`` failover. This prevents a zombie
``ceph-mgr`` from continuing work and potentially corrupting the database. For
this reason, it is not necessary for the *ceph* VFS to do the blocklist command
in the new instance of the ``ceph-mgr`` (but it still does so, harmlessly).
To blocklist the *ceph* VFS manually, you may see the instance address of the
*ceph* VFS using the ``ceph_status`` SQL function:
.. code:: sql
SELECT ceph_status();
.. code::
{"id":788461300,"addr":"172.21.10.4:0/1472139388"}
You may easily manipulate that information using the `JSON1 extension`_:
.. code:: sql
SELECT json_extract(ceph_status(), '$.addr');
.. code::
172.21.10.4:0/3563721180
This is the address you would pass to the ceph blocklist command:
.. code:: sh
ceph osd blocklist add 172.21.10.4:0/3082314560
Performance Statistics
^^^^^^^^^^^^^^^^^^^^^^
The *ceph* VFS provides a SQLite function, ``ceph_perf``, for querying the
performance statistics of the VFS. The data is from "performance counters" as
in other Ceph services normally queried via an admin socket.
.. code:: sql
SELECT ceph_perf();
.. code::
{"libcephsqlite_vfs":{"op_open":{"avgcount":2,"sum":0.150001291,"avgtime":0.075000645},"op_delete":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"op_access":{"avgcount":1,"sum":0.003000026,"avgtime":0.003000026},"op_fullpathname":{"avgcount":1,"sum":0.064000551,"avgtime":0.064000551},"op_currenttime":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"opf_close":{"avgcount":1,"sum":0.000000000,"avgtime":0.000000000},"opf_read":{"avgcount":3,"sum":0.036000310,"avgtime":0.012000103},"opf_write":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"opf_truncate":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"opf_sync":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"opf_filesize":{"avgcount":2,"sum":0.000000000,"avgtime":0.000000000},"opf_lock":{"avgcount":1,"sum":0.158001360,"avgtime":0.158001360},"opf_unlock":{"avgcount":1,"sum":0.101000871,"avgtime":0.101000871},"opf_checkreservedlock":{"avgcount":1,"sum":0.002000017,"avgtime":0.002000017},"opf_filecontrol":{"avgcount":4,"sum":0.000000000,"avgtime":0.000000000},"opf_sectorsize":{"avgcount":0,"sum":0.000000000,"avgtime":0.000000000},"opf_devicecharacteristics":{"avgcount":4,"sum":0.000000000,"avgtime":0.000000000}},"libcephsqlite_striper":{"update_metadata":0,"update_allocated":0,"update_size":0,"update_version":0,"shrink":0,"shrink_bytes":0,"lock":1,"unlock":1}}
You may easily manipulate that information using the `JSON1 extension`_:
.. code:: sql
SELECT json_extract(ceph_perf(), '$.libcephsqlite_vfs.opf_sync.avgcount');
.. code::
776
That tells you the number of times SQLite has called the xSync method of the
`SQLite IO Methods`_ of the VFS (for **all** open database connections in the
process). You could analyze the performance stats before and after a number of
queries to see the number of file system syncs required (this would just be
proportional to the number of transactions). Alternatively, you may be more
interested in the average latency to complete a write:
.. code:: sql
SELECT json_extract(ceph_perf(), '$.libcephsqlite_vfs.opf_write');
.. code::
{"avgcount":7873,"sum":0.675005797,"avgtime":0.000085736}
Which would tell you there have been 7873 writes with an average
time-to-complete of 85 microseconds. That clearly shows the calls are executed
asynchronously. Returning to sync:
.. code:: sql
SELECT json_extract(ceph_perf(), '$.libcephsqlite_vfs.opf_sync');
.. code::
{"avgcount":776,"sum":4.802041199,"avgtime":0.006188197}
6 milliseconds were spent on average executing a sync call. This gathers all of
the asynchronous writes as well as an asynchronous update to the size of the
striped file.
.. _SQLite: https://sqlite.org/index.html
.. _SQLite VFS: https://www.sqlite.org/vfs.html
.. _SQLite Backup: https://www.sqlite.org/backup.html
.. _SQLite Limits: https://www.sqlite.org/limits.html
.. _SQLite Extension Loading API: https://sqlite.org/c3ref/load_extension.html
.. _In-Memory Database: https://www.sqlite.org/inmemorydb.html
.. _WAL Journal Mode: https://sqlite.org/wal.html
.. _How to Corrupt Your SQLite Database: https://www.sqlite.org/howtocorrupt.html
.. _JSON1 Extension: https://www.sqlite.org/json1.html
.. _SQLite IO Methods: https://www.sqlite.org/c3ref/io_methods.html

View File

@ -295,6 +295,13 @@ The following entries describe valid capability profiles:
:Description: Gives a user read-only permissions for monitor, OSD, and PG data.
Intended for use by direct librados client applications.
``profile simple-rados-client-with-blocklist`` (Monitor only)
:Description: Gives a user read-only permissions for monitor, OSD, and PG data.
Intended for use by direct librados client applications. Also
includes permission to add blocklist entries to build HA
applications.
``profile fs-client`` (Monitor only)
:Description: Gives a user read-only permissions for monitor, OSD, PG, and MDS

View File

@ -10,4 +10,9 @@ overrides:
mon osdmap full prune txsize: 2
tasks:
- install:
extra_system_packages:
rpm:
- sqlite-devel
deb:
- sqlite3
- ceph:

View File

@ -0,0 +1,24 @@
overrides:
ceph:
conf:
client:
debug ms: 1
debug client: 20
debug cephsqlite: 20
log-ignorelist:
- POOL_APP_NOT_ENABLED
- do not have an application enabled
tasks:
- exec:
client.0:
- ceph osd pool create cephsqlite
- ceph auth get-or-create client.libcephsqlite mon 'profile simple-rados-client-with-blocklist' osd 'allow rwx pool=cephsqlite' >> /etc/ceph/ceph.keyring
- exec:
client.0:
- ceph_test_libcephsqlite --id libcephsqlite --no-log-to-stderr
- workunit:
clients:
client.0:
- rados/test_libcephsqlite.sh cephsqlite
env:
CEPH_ARGS: --id libcephsqlite --no-log-to-stderr

View File

@ -0,0 +1,136 @@
#!/bin/bash -ex
# The main point of these tests beyond ceph_test_libcephsqlite is to:
#
# - Ensure you can load the Ceph VFS via the dynamic load extension mechanism
# in SQLite.
# - Check the behavior of a dead application, that it does not hold locks
# indefinitely.
pool="$1"
ns="$(basename $0)"
function sqlite {
background="$1"
if [ "$background" = b ]; then
shift
fi
a=$(cat)
printf "%s" "$a" >&2
# We're doing job control gymnastics here to make sure that sqlite3 is the
# main process (i.e. the process group leader) in the background, not a bash
# function or job pipeline.
sqlite3 -cmd '.output /dev/null' -cmd '.load libcephsqlite.so' -cmd 'pragma journal_mode = PERSIST' -cmd ".open file:///$pool:$ns/baz.db?vfs=ceph" -cmd '.output stdout' <<<"$a" &
if [ "$background" != b ]; then
wait
fi
}
function striper {
rados --pool=$pool --namespace="$ns" --striper "$@"
}
function repeat {
n=$1
shift
for ((i = 0; i < "$n"; ++i)); do
echo "$*"
done
}
striper rm baz.db || true
time sqlite <<EOF
create table if not exists foo (a INT);
insert into foo (a) values (RANDOM());
drop table foo;
EOF
striper stat baz.db
striper rm baz.db
time sqlite <<EOF
CREATE TABLE IF NOT EXISTS rand(text BLOB NOT NULL);
$(repeat 10 'INSERT INTO rand (text) VALUES (RANDOMBLOB(4096));')
SELECT LENGTH(text) FROM rand;
DROP TABLE rand;
EOF
time sqlite <<EOF
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS rand(text BLOB NOT NULL);
$(repeat 100 'INSERT INTO rand (text) VALUES (RANDOMBLOB(4096));')
COMMIT;
SELECT LENGTH(text) FROM rand;
DROP TABLE rand;
EOF
# Connection death drops the lock:
striper rm baz.db
date
sqlite b <<EOF
CREATE TABLE foo (a BLOB);
INSERT INTO foo VALUES ("start");
WITH RECURSIVE c(x) AS
(
VALUES(1)
UNION ALL
SELECT x+1
FROM c
)
INSERT INTO foo (a)
SELECT RANDOMBLOB(1<<20)
FROM c
LIMIT (1<<20);
EOF
# Let it chew on that INSERT for a while so it writes data, it will not finish as it's trying to write 2^40 bytes...
sleep 10
echo done
jobs -l
kill -KILL -- $(jobs -p)
date
wait
date
n=$(sqlite <<<"SELECT COUNT(*) FROM foo;")
[ "$n" -eq 1 ]
# Connection "hang" loses the lock and cannot reacquire it:
striper rm baz.db
date
sqlite b <<EOF
CREATE TABLE foo (a BLOB);
INSERT INTO foo VALUES ("start");
WITH RECURSIVE c(x) AS
(
VALUES(1)
UNION ALL
SELECT x+1
FROM c
)
INSERT INTO foo (a)
SELECT RANDOMBLOB(1<<20)
FROM c
LIMIT (1<<20);
EOF
# Same thing, let it chew on the INSERT for a while...
sleep 20
jobs -l
kill -STOP -- $(jobs -p)
# cephsqlite_lock_renewal_timeout is 30s
sleep 45
date
kill -CONT -- $(jobs -p)
sleep 10
date
# it should exit with an error as it lost the lock
wait
date
n=$(sqlite <<<"SELECT COUNT(*) FROM foo;")
[ "$n" -eq 1 ]

View File

@ -807,6 +807,13 @@ if(WITH_LIBCEPHFS)
endif()
endif(WITH_LIBCEPHFS)
if(WITH_LIBCEPHSQLITE)
set(cephsqlite_srcs libcephsqlite.cc SimpleRADOSStriper.cc)
add_library(cephsqlite ${CEPH_SHARED} ${cephsqlite_srcs})
target_link_libraries(cephsqlite PRIVATE cls_lock_client librados ceph-common SQLite3::SQLite3 ${EXTRALIBS})
install(TARGETS cephsqlite DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif(WITH_LIBCEPHSQLITE)
if(WITH_FUSE)
set(ceph_fuse_srcs
ceph_fuse.cc

772
src/SimpleRADOSStriper.cc Normal file
View File

@ -0,0 +1,772 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2021 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License version 2.1, as published by
* the Free Software Foundation. See file COPYING.
*
*/
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iomanip>
#include <iostream>
#include <regex>
#include <sstream>
#include <string_view>
#include <limits.h>
#include <string.h>
#include "include/ceph_assert.h"
#include "include/rados/librados.hpp"
#include "cls/lock/cls_lock_client.h"
#include "common/ceph_argparse.h"
#include "common/ceph_mutex.h"
#include "common/common_init.h"
#include "common/config.h"
#include "common/debug.h"
#include "common/errno.h"
#include "common/version.h"
#include "SimpleRADOSStriper.h"
using ceph::bufferlist;
#define dout_subsys ceph_subsys_client
#undef dout_prefix
#define dout_prefix *_dout << "client." << ioctx.get_instance_id() << ": SimpleRADOSStriper: " << __func__ << ": " << oid << ": "
#define d(lvl) ldout((CephContext*)ioctx.cct(), (lvl))
enum {
P_FIRST = 0xe0000,
P_UPDATE_METADATA,
P_UPDATE_ALLOCATED,
P_UPDATE_SIZE,
P_UPDATE_VERSION,
P_SHRINK,
P_SHRINK_BYTES,
P_LOCK,
P_UNLOCK,
P_LAST,
};
int SimpleRADOSStriper::config_logger(CephContext* cct, std::string_view name, std::shared_ptr<PerfCounters>* l)
{
PerfCountersBuilder plb(cct, name.data(), P_FIRST, P_LAST);
plb.add_u64_counter(P_UPDATE_METADATA, "update_metadata", "Number of metadata updates");
plb.add_u64_counter(P_UPDATE_ALLOCATED, "update_allocated", "Number of allocated updates");
plb.add_u64_counter(P_UPDATE_SIZE, "update_size", "Number of size updates");
plb.add_u64_counter(P_UPDATE_VERSION, "update_version", "Number of version updates");
plb.add_u64_counter(P_SHRINK, "shrink", "Number of allocation shrinks");
plb.add_u64_counter(P_SHRINK_BYTES, "shrink_bytes", "Bytes shrunk");
plb.add_u64_counter(P_LOCK, "lock", "Number of locks");
plb.add_u64_counter(P_UNLOCK, "unlock", "Number of unlocks");
l->reset(plb.create_perf_counters());
return 0;
}
SimpleRADOSStriper::~SimpleRADOSStriper()
{
if (lock_keeper.joinable()) {
shutdown = true;
lock_keeper_cvar.notify_all();
lock_keeper.join();
}
if (ioctx.is_valid()) {
d(5) << dendl;
if (is_locked()) {
unlock();
}
}
}
SimpleRADOSStriper::extent SimpleRADOSStriper::get_next_extent(uint64_t off, size_t len) const
{
extent e;
{
uint64_t stripe = (off>>object_size);
CachedStackStringStream css;
*css << oid;
*css << ".";
*css << std::setw(16) << std::setfill('0') << std::hex << stripe;
e.soid = css->str();
}
e.off = off & ((1<<object_size)-1);
e.len = std::min<size_t>(len, (1<<object_size)-e.off);
return e;
}
int SimpleRADOSStriper::remove()
{
d(5) << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
if (int rc = wait_for_aios(true); rc < 0) {
aios_failure = 0;
return rc;
}
if (int rc = set_metadata(0, true); rc < 0) {
return rc;
}
auto ext = get_first_extent();
if (int rc = ioctx.remove(ext.soid); rc < 0) {
d(5) << " remove failed: " << cpp_strerror(rc) << dendl;
return rc;
}
locked = false;
return 0;
}
int SimpleRADOSStriper::truncate(uint64_t size)
{
d(5) << size << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
/* TODO: (not currently used by SQLite) handle growth + sparse */
if (int rc = set_metadata(size, true); rc < 0) {
return rc;
}
return 0;
}
int SimpleRADOSStriper::wait_for_aios(bool block)
{
while (!aios.empty()) {
auto& aiocp = aios.front();
int rc;
if (block) {
rc = aiocp->wait_for_complete();
} else {
if (aiocp->is_complete()) {
rc = aiocp->get_return_value();
} else {
return 0;
}
}
if (rc) {
d(5) << " aio failed: " << cpp_strerror(rc) << dendl;
if (aios_failure == 0) {
aios_failure = rc;
}
}
aios.pop();
}
return aios_failure;
}
int SimpleRADOSStriper::flush()
{
d(5) << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
if (size_dirty) {
if (int rc = set_metadata(size, true); rc < 0) {
return rc;
}
}
if (int rc = wait_for_aios(true); rc < 0) {
aios_failure = 0;
return rc;
}
return 0;
}
int SimpleRADOSStriper::stat(uint64_t* s)
{
d(5) << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
*s = size;
return 0;
}
int SimpleRADOSStriper::create()
{
d(5) << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
auto ext = get_first_extent();
auto op = librados::ObjectWriteOperation();
/* exclusive create ensures we do none of these setxattrs happen if it fails */
op.create(1);
op.setxattr(XATTR_VERSION, uint2bl(0));
op.setxattr(XATTR_EXCL, bufferlist());
op.setxattr(XATTR_SIZE, uint2bl(0));
op.setxattr(XATTR_ALLOCATED, uint2bl(0));
op.setxattr(XATTR_LAYOUT_STRIPE_UNIT, uint2bl(1));
op.setxattr(XATTR_LAYOUT_STRIPE_COUNT, uint2bl(1));
op.setxattr(XATTR_LAYOUT_OBJECT_SIZE, uint2bl(1<<object_size));
if (int rc = ioctx.operate(ext.soid, &op); rc < 0) {
return rc; /* including EEXIST */
}
return 0;
}
int SimpleRADOSStriper::open()
{
d(5) << oid << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
auto ext = get_first_extent();
auto op = librados::ObjectReadOperation();
bufferlist bl_excl, bl_size, bl_alloc, bl_version, pbl;
int prval_excl, prval_size, prval_alloc, prval_version;
op.getxattr(XATTR_EXCL, &bl_excl, &prval_excl);
op.getxattr(XATTR_SIZE, &bl_size, &prval_size);
op.getxattr(XATTR_ALLOCATED, &bl_alloc, &prval_alloc);
op.getxattr(XATTR_VERSION, &bl_version, &prval_version);
if (int rc = ioctx.operate(ext.soid, &op, &pbl); rc < 0) {
d(5) << " getxattr failed: " << cpp_strerror(rc) << dendl;
return rc;
}
exclusive_holder = bl_excl.to_str();
{
auto sstr = bl_size.to_str();
std::string err;
size = strict_strtoll(sstr.c_str(), 10, &err);
ceph_assert(err.empty());
}
{
auto sstr = bl_alloc.to_str();
std::string err;
allocated = strict_strtoll(sstr.c_str(), 10, &err);
ceph_assert(err.empty());
}
{
auto sstr = bl_version.to_str();
std::string err;
version = strict_strtoll(sstr.c_str(), 10, &err);
ceph_assert(err.empty());
}
d(15) << " size: " << size << " allocated: " << allocated << " version: " << version << dendl;
return 0;
}
int SimpleRADOSStriper::shrink_alloc(uint64_t a)
{
d(5) << dendl;
std::vector<aiocompletionptr> removes;
ceph_assert(a <= allocated);
uint64_t prune = std::max<uint64_t>(a, (1u << object_size)); /* never delete first extent here */
uint64_t len = allocated - prune;
const uint64_t bytes_removed = len;
uint64_t offset = prune;
while (len > 0) {
auto ext = get_next_extent(offset, len);
auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion());
if (int rc = ioctx.aio_remove(ext.soid, aiocp.get()); rc < 0) {
d(5) << " aio_remove failed: " << cpp_strerror(rc) << dendl;
return rc;
}
removes.emplace_back(std::move(aiocp));
len -= ext.len;
offset += ext.len;
}
for (auto& aiocp : removes) {
if (int rc = aiocp->wait_for_complete(); rc < 0 && rc != -ENOENT) {
d(5) << " aio_remove failed: " << cpp_strerror(rc) << dendl;
return rc;
}
}
auto ext = get_first_extent();
auto op = librados::ObjectWriteOperation();
auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion());
op.setxattr(XATTR_ALLOCATED, uint2bl(a));
d(15) << " updating allocated to " << a << dendl;
op.setxattr(XATTR_VERSION, uint2bl(version+1));
d(15) << " updating version to " << (version+1) << dendl;
if (int rc = ioctx.aio_operate(ext.soid, aiocp.get(), &op); rc < 0) {
d(5) << " update failed: " << cpp_strerror(rc) << dendl;
return rc;
}
/* we need to wait so we don't have dangling extents */
d(10) << " waiting for allocated update" << dendl;
if (int rc = aiocp->wait_for_complete(); rc < 0) {
d(1) << " update failure: " << cpp_strerror(rc) << dendl;
return rc;
}
if (logger) {
logger->inc(P_UPDATE_METADATA);
logger->inc(P_UPDATE_ALLOCATED);
logger->inc(P_UPDATE_VERSION);
logger->inc(P_SHRINK);
logger->inc(P_SHRINK_BYTES, bytes_removed);
}
version += 1;
allocated = a;
return 0;
}
int SimpleRADOSStriper::maybe_shrink_alloc()
{
d(15) << dendl;
if (size == 0) {
if (allocated > 0) {
d(10) << "allocation shrink to 0" << dendl;
return shrink_alloc(0);
} else {
return 0;
}
}
uint64_t mask = (1<<object_size)-1;
uint64_t new_allocated = min_growth + ((size + mask) & ~mask); /* round up base 2 */
if (allocated > new_allocated && ((allocated-new_allocated) > min_growth)) {
d(10) << "allocation shrink to " << new_allocated << dendl;
return shrink_alloc(new_allocated);
}
return 0;
}
bufferlist SimpleRADOSStriper::str2bl(std::string_view sv)
{
bufferlist bl;
bl.append(sv);
return bl;
}
bufferlist SimpleRADOSStriper::uint2bl(uint64_t v)
{
CachedStackStringStream css;
*css << std::dec << std::setw(16) << std::setfill('0') << v;
bufferlist bl;
bl.append(css->strv());
return bl;
}
int SimpleRADOSStriper::set_metadata(uint64_t new_size, bool update_size)
{
d(10) << " new_size: " << new_size
<< " update_size: " << update_size
<< " allocated: " << allocated
<< " size: " << size
<< " version: " << version
<< dendl;
bool do_op = false;
auto new_allocated = allocated;
auto ext = get_first_extent();
auto op = librados::ObjectWriteOperation();
if (new_size > allocated) {
uint64_t mask = (1<<object_size)-1;
new_allocated = min_growth + ((size + mask) & ~mask); /* round up base 2 */
op.setxattr(XATTR_ALLOCATED, uint2bl(new_allocated));
do_op = true;
if (logger) logger->inc(P_UPDATE_ALLOCATED);
d(15) << " updating allocated to " << new_allocated << dendl;
}
if (update_size) {
op.setxattr(XATTR_SIZE, uint2bl(new_size));
do_op = true;
if (logger) logger->inc(P_UPDATE_SIZE);
d(15) << " updating size to " << new_size << dendl;
}
if (do_op) {
if (logger) logger->inc(P_UPDATE_METADATA);
if (logger) logger->inc(P_UPDATE_VERSION);
op.setxattr(XATTR_VERSION, uint2bl(version+1));
d(15) << " updating version to " << (version+1) << dendl;
auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion());
if (int rc = ioctx.aio_operate(ext.soid, aiocp.get(), &op); rc < 0) {
d(1) << " update failure: " << cpp_strerror(rc) << dendl;
return rc;
}
version += 1;
if (allocated != new_allocated) {
/* we need to wait so we don't have dangling extents */
d(10) << "waiting for allocated update" << dendl;
if (int rc = aiocp->wait_for_complete(); rc < 0) {
d(1) << " update failure: " << cpp_strerror(rc) << dendl;
return rc;
}
aiocp.reset();
allocated = new_allocated;
}
if (aiocp) {
aios.emplace(std::move(aiocp));
}
if (update_size) {
size = new_size;
size_dirty = false;
return maybe_shrink_alloc();
}
}
return 0;
}
ssize_t SimpleRADOSStriper::write(const void* data, size_t len, uint64_t off)
{
d(5) << off << "~" << len << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
if (allocated < (len+off)) {
if (int rc = set_metadata(len+off, false); rc < 0) {
return rc;
}
}
size_t w = 0;
while ((len-w) > 0) {
auto ext = get_next_extent(off+w, len-w);
auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion());
bufferlist bl;
bl.append((const char*)data+w, ext.len);
if (int rc = ioctx.aio_write(ext.soid, aiocp.get(), bl, ext.len, ext.off); rc < 0) {
break;
}
aios.emplace(std::move(aiocp));
w += ext.len;
}
wait_for_aios(false); // clean up finished completions
if (size < (len+off)) {
size = len+off;
size_dirty = true;
d(10) << " dirty size: " << size << dendl;
}
return (ssize_t)w;
}
ssize_t SimpleRADOSStriper::read(void* data, size_t len, uint64_t off)
{
d(5) << off << "~" << len << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
size_t r = 0;
std::vector<std::pair<bufferlist, aiocompletionptr>> reads;
while ((len-r) > 0) {
auto ext = get_next_extent(off+r, len-r);
auto& [bl, aiocp] = reads.emplace_back();
aiocp = aiocompletionptr(librados::Rados::aio_create_completion());
if (int rc = ioctx.aio_read(ext.soid, aiocp.get(), &bl, ext.len, ext.off); rc < 0) {
d(1) << " read failure: " << cpp_strerror(rc) << dendl;
return rc;
}
r += ext.len;
}
r = 0;
for (auto& [bl, aiocp] : reads) {
if (int rc = aiocp->wait_for_complete(); rc < 0) {
d(1) << " read failure: " << cpp_strerror(rc) << dendl;
return rc;
}
bl.begin().copy(bl.length(), ((char*)data)+r);
r += bl.length();
}
ceph_assert(r <= len);
return r;
}
int SimpleRADOSStriper::print_lockers(std::ostream& out)
{
int exclusive;
std::string tag;
std::list<librados::locker_t> lockers;
auto ext = get_first_extent();
if (int rc = ioctx.list_lockers(ext.soid, biglock, &exclusive, &tag, &lockers); rc < 0) {
d(1) << " list_lockers failure: " << cpp_strerror(rc) << dendl;
return rc;
}
if (lockers.empty()) {
out << " lockers none";
} else {
out << " lockers exclusive=" << exclusive << " tag=" << tag << " lockers=[";
bool first = true;
for (const auto& l : lockers) {
if (!first) out << ",";
out << l.client << ":" << l.cookie << ":" << l.address;
}
out << "]";
}
return 0;
}
/* Do lock renewal in a separate thread: while it's unlikely sqlite chews on
* something for multiple seconds without calling into the VFS (where we could
* initiate a lock renewal), it's not impossible with complex queries. Also, we
* want to allow "PRAGMA locking_mode = exclusive" where the application may
* not use the sqlite3 database connection for an indeterminate amount of time.
*/
void SimpleRADOSStriper::lock_keeper_main(void)
{
d(20) << dendl;
const auto ext = get_first_extent();
while (!shutdown) {
d(20) << "tick" << dendl;
std::unique_lock lock(lock_keeper_mutex);
auto now = clock::now();
auto since = now-last_renewal;
if (since >= lock_keeper_interval && locked) {
d(10) << "renewing lock" << dendl;
auto tv = ceph::to_timeval(lock_keeper_timeout);
int rc = ioctx.lock_exclusive(ext.soid, biglock, cookie.to_string(), lockdesc, &tv, LIBRADOS_LOCK_FLAG_MUST_RENEW);
if (rc) {
/* If lock renewal fails, we cannot continue the application. Return
* -EBLOCKLISTED for all calls into the striper for this instance, even
* if we're not actually blocklisted.
*/
d(-1) << "lock renewal failed: " << cpp_strerror(rc) << dendl;
blocklisted = true;
break;
}
last_renewal = clock::now();
}
lock_keeper_cvar.wait_for(lock, lock_keeper_interval);
}
}
int SimpleRADOSStriper::recover_lock()
{
d(5) << "attempting to recover lock" << dendl;
std::string addrs;
const auto ext = get_first_extent();
{
auto tv = ceph::to_timeval(lock_keeper_timeout);
if (int rc = ioctx.lock_exclusive(ext.soid, biglock, cookie.to_string(), lockdesc, &tv, 0); rc < 0) {
return rc;
}
locked = true;
last_renewal = clock::now();
}
d(5) << "acquired lock, fetching last owner" << dendl;
{
bufferlist bl_excl;
if (int rc = ioctx.getxattr(ext.soid, XATTR_EXCL, bl_excl); rc < 0) {
if (rc == -ENOENT) {
/* someone removed it? ok... */
goto setowner;
} else {
d(-1) << "could not recover exclusive locker" << dendl;
locked = false; /* it will drop eventually */
return -EIO;
}
}
addrs = bl_excl.to_str();
}
if (addrs.empty()) {
d(5) << "someone else cleaned up" << dendl;
goto setowner;
} else {
d(5) << "exclusive lock holder was " << addrs << dendl;
}
if (blocklist_the_dead) {
entity_addrvec_t addrv;
addrv.parse(addrs.c_str());
auto R = librados::Rados(ioctx);
auto b = "blocklist"sv;
retry:
for (auto& a : addrv.v) {
CachedStackStringStream css;
*css << "{\"prefix\":\"osd " << b << "\", \"" << b << "op\":\"add\",";
*css << "\"addr\":\"";
*css << a;
*css << "\"}";
std::vector<std::string> cmd = {css->str()};
d(5) << "sending blocklist command: " << cmd << dendl;
std::string out;
if (int rc = R.mon_command(css->str(), bufferlist(), nullptr, &out); rc < 0) {
if (rc == -EINVAL && b == "blocklist"sv) {
b = "blacklist"sv;
goto retry;
}
d(-1) << "Cannot proceed with recovery because I have failed to blocklist the old client: " << cpp_strerror(rc) << ", out = " << out << dendl;
locked = false; /* it will drop eventually */
return -EIO;
}
}
/* Ensure our osd_op requests have the latest epoch. */
R.wait_for_latest_osdmap();
}
setowner:
d(5) << "setting new owner to myself, " << myaddrs << dendl;
{
auto myaddrbl = str2bl(myaddrs);
if (int rc = ioctx.setxattr(ext.soid, XATTR_EXCL, myaddrbl); rc < 0) {
d(-1) << "could not set lock owner" << dendl;
locked = false; /* it will drop eventually */
return -EIO;
}
}
return 0;
}
int SimpleRADOSStriper::lock(uint64_t timeoutms)
{
/* XXX: timeoutms is unused */
d(5) << "timeout=" << timeoutms << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
std::scoped_lock lock(lock_keeper_mutex);
ceph_assert(!is_locked());
/* We're going to be very lazy here in implementation: only exclusive locks
* are allowed. That even ensures a single reader.
*/
uint64_t slept = 0;
auto ext = get_first_extent();
while (true) {
/* The general fast path in one compound operation: obtain the lock,
* confirm the past locker cleaned up after themselves (set XATTR_EXCL to
* ""), then finally set XATTR_EXCL to our address vector as the new
* exclusive locker.
*/
auto op = librados::ObjectWriteOperation();
auto tv = ceph::to_timeval(lock_keeper_timeout);
utime_t duration;
duration.set_from_timeval(&tv);
rados::cls::lock::lock(&op, biglock, ClsLockType::EXCLUSIVE, cookie.to_string(), "", lockdesc, duration, 0);
op.cmpxattr(XATTR_EXCL, LIBRADOS_CMPXATTR_OP_EQ, bufferlist());
op.setxattr(XATTR_EXCL, str2bl(myaddrs));
int rc = ioctx.operate(ext.soid, &op);
if (rc == 0) {
locked = true;
last_renewal = clock::now();
break;
} else if (rc == -EBUSY) {
if ((slept % 500000) == 0) {
d(-1) << "waiting for locks: ";
print_lockers(*_dout);
*_dout << dendl;
}
usleep(5000);
slept += 5000;
continue;
} else if (rc == -ECANCELED) {
/* CMPXATTR failed, a locker didn't cleanup. Try to recover! */
if (rc = recover_lock(); rc < 0) {
if (rc == -EBUSY) {
continue; /* try again */
}
return rc;
}
break;
} else {
d(-1) << " lock failed: " << cpp_strerror(rc) << dendl;
return rc;
}
}
if (!lock_keeper.joinable()) {
lock_keeper = std::thread(&SimpleRADOSStriper::lock_keeper_main, this);
}
if (int rc = open(); rc < 0) {
d(5) << " open failed: " << cpp_strerror(rc) << dendl;
return rc;
}
d(5) << " = 0" << dendl;
if (logger) {
logger->inc(P_LOCK);
}
return 0;
}
int SimpleRADOSStriper::unlock()
{
d(5) << dendl;
if (blocklisted.load()) {
return -EBLOCKLISTED;
}
std::scoped_lock lock(lock_keeper_mutex);
ceph_assert(is_locked());
/* wait for flush of metadata */
if (int rc = flush(); rc < 0) {
return rc;
}
const auto ext = get_first_extent();
auto op = librados::ObjectWriteOperation();
op.cmpxattr(XATTR_EXCL, LIBRADOS_CMPXATTR_OP_EQ, str2bl(myaddrs));
op.setxattr(XATTR_EXCL, bufferlist());
rados::cls::lock::unlock(&op, biglock, cookie.to_string());
if (int rc = ioctx.operate(ext.soid, &op); rc < 0) {
d(-1) << " unlock failed: " << cpp_strerror(rc) << dendl;
return rc;
}
locked = false;
d(5) << " = 0" << dendl;
if (logger) {
logger->inc(P_UNLOCK);
}
return 0;
}

139
src/SimpleRADOSStriper.h Normal file
View File

@ -0,0 +1,139 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2021 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License version 2.1, as published by
* the Free Software Foundation. See file COPYING.
*
*/
#ifndef _SIMPLERADOSSTRIPER_H
#define _SIMPLERADOSSTRIPER_H
#include <queue>
#include <string_view>
#include <thread>
#include "include/buffer.h"
#include "include/rados/librados.hpp"
#include "include/uuid.h"
#include "include/types.h"
#include "common/ceph_time.h"
#include "common/perf_counters.h"
class SimpleRADOSStriper
{
public:
using aiocompletionptr = std::unique_ptr<librados::AioCompletion>;
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;
static inline const uint64_t object_size = 22; /* power of 2 */
static inline const uint64_t min_growth = (1<<27); /* 128 MB */
static int config_logger(CephContext* cct, std::string_view name, std::shared_ptr<PerfCounters>* l);
SimpleRADOSStriper() = default;
SimpleRADOSStriper(librados::IoCtx _ioctx, std::string _oid)
: ioctx(std::move(_ioctx))
, oid(std::move(_oid))
{
cookie.generate_random();
auto r = librados::Rados(ioctx);
myaddrs = r.get_addrs();
}
SimpleRADOSStriper(const SimpleRADOSStriper&) = delete;
SimpleRADOSStriper& operator=(const SimpleRADOSStriper&) = delete;
SimpleRADOSStriper& operator=(SimpleRADOSStriper&&) = delete;
SimpleRADOSStriper(SimpleRADOSStriper&&) = delete;
~SimpleRADOSStriper();
int create();
int open();
int remove();
int stat(uint64_t* size);
ssize_t write(const void* data, size_t len, uint64_t off);
ssize_t read(void* data, size_t len, uint64_t off);
int truncate(size_t size);
int flush();
int lock(uint64_t timeoutms);
int unlock();
int is_locked() const {
return locked;
}
int print_lockers(std::ostream& out);
void set_logger(std::shared_ptr<PerfCounters> l) {
logger = std::move(l);
}
void set_lock_interval(std::chrono::milliseconds t) {
lock_keeper_interval = t;
}
void set_lock_timeout(std::chrono::milliseconds t) {
lock_keeper_timeout = t;
}
void set_blocklist_the_dead(bool b) {
blocklist_the_dead = b;
}
protected:
struct extent {
std::string soid;
size_t len;
size_t off;
};
ceph::bufferlist str2bl(std::string_view sv);
ceph::bufferlist uint2bl(uint64_t v);
int set_metadata(uint64_t new_size, bool update_size);
int shrink_alloc(uint64_t a);
int maybe_shrink_alloc();
int wait_for_aios(bool block);
int recover_lock();
extent get_next_extent(uint64_t off, size_t len) const;
extent get_first_extent() const {
return get_next_extent(0, 0);
}
private:
static inline const char XATTR_EXCL[] = "striper.excl";
static inline const char XATTR_SIZE[] = "striper.size";
static inline const char XATTR_ALLOCATED[] = "striper.allocated";
static inline const char XATTR_VERSION[] = "striper.version";
static inline const char XATTR_LAYOUT_STRIPE_UNIT[] = "striper.layout.stripe_unit";
static inline const char XATTR_LAYOUT_STRIPE_COUNT[] = "striper.layout.stripe_count";
static inline const char XATTR_LAYOUT_OBJECT_SIZE[] = "striper.layout.object_size";
static inline const std::string biglock = "striper.lock";
static inline const std::string lockdesc = "SimpleRADOSStriper";
void lock_keeper_main();
librados::IoCtx ioctx;
std::shared_ptr<PerfCounters> logger;
std::string oid;
std::thread lock_keeper;
std::condition_variable lock_keeper_cvar;
std::mutex lock_keeper_mutex;
time last_renewal = time::min();
std::chrono::milliseconds lock_keeper_interval = 2000ms;
std::chrono::milliseconds lock_keeper_timeout = 30000ms;
std::atomic<bool> blocklisted = false;
bool shutdown = false;
version_t version = 0;
std::string exclusive_holder;
uint64_t size = 0;
uint64_t allocated = 0;
uuid_d cookie{};
bool locked = false;
bool size_dirty = false;
bool blocklist_the_dead = true;
std::queue<aiocompletionptr> aios;
int aios_failure = 0;
std::string myaddrs;
};
#endif /* _SIMPLERADOSSTRIPER_H */

View File

@ -64,6 +64,16 @@ typedef int64_t signed_rep;
// differences between now and a time point in the past.
typedef std::chrono::duration<signed_rep, std::nano> signedspan;
template<typename Duration>
struct timeval to_timeval(Duration d) {
struct timeval tv;
auto sec = std::chrono::duration_cast<std::chrono::seconds>(d);
tv.tv_sec = sec.count();
auto usec = std::chrono::duration_cast<std::chrono::microseconds>(d-sec);
tv.tv_usec = usec.count();
return tv;
}
// We define our own clocks so we can have our choice of all time
// sources supported by the operating system. With the standard
// library the resolution and cost are unspecified. (For example,

View File

@ -5594,6 +5594,28 @@ std::vector<Option> get_global_options() {
.set_description("Size of thread pool for ASIO completions")
.add_tag("osd"),
Option("cephsqlite_lock_renewal_interval", Option::TYPE_MILLISECS, Option::LEVEL_ADVANCED)
.add_see_also("cephsqlite_lock_renewal_timeout")
.add_tag("client")
.set_default(2000)
.set_description("number of milliseconds before lock is renewed")
.set_min(100)
,
Option("cephsqlite_lock_renewal_timeout", Option::TYPE_MILLISECS, Option::LEVEL_ADVANCED)
.add_see_also("cephsqlite_lock_renewal_interval")
.add_tag("client")
.set_default(30000)
.set_description("number of milliseconds before transaction lock times out")
.set_long_description("The amount of time before a running libcephsqlite VFS connection has to renew a lock on the database before the lock is automatically lost. If the lock is lost, the VFS will abort the process to prevent database corruption.")
.set_min(100),
Option("cephsqlite_blocklist_dead_locker", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.add_tag("client")
.set_default(true)
.set_description("blocklist the last dead owner of the database lock")
.set_long_description("Require that the Ceph SQLite VFS blocklist the last dead owner of the database when cleanup was incomplete. DO NOT CHANGE THIS UNLESS YOU UNDERSTAND THE RAMIFICATIONS. CORRUPTION MAY RESULT."),
// ----------------------------
// Crimson specific options

View File

@ -81,3 +81,4 @@ SUBSYS(eventtrace, 1, 5)
SUBSYS(prioritycache, 1, 5)
SUBSYS(test, 0, 5)
SUBSYS(cephfs_mirror, 0, 5)
SUBSYS(cephsqlite, 0, 5)

View File

@ -1,3 +1,7 @@
install(FILES
libcephsqlite.h
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})
install(FILES
rados/librados.h
rados/rados_types.h

View File

@ -363,6 +363,9 @@
/* Define if RBD QCOW migration format is enabled */
#cmakedefine WITH_RBD_MIGRATION_FORMAT_QCOW_V1
/* Define if libcephsqlite is enabled */
#cmakedefine WITH_LIBCEPHSQLITE
/* Define if RWL is enabled */
#cmakedefine WITH_RBD_RWL

View File

@ -0,0 +1,67 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2021 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License version 2.1, as published by
* the Free Software Foundation. See file COPYING.
*
*/
#ifndef LIBCEPHSQLITE_H
#define LIBCEPHSQLITE_H
/* This loadable extension does not generally require using this header. It is
* here to allow controlling which version of the library is linked in. See
* also sqlite3_cephsqlite_init below. Additionally, you may specify which
* CephContext to use rather than the library instantiating its own and using
* whatever the default credential is.
*/
#include <sqlite3.h>
#ifdef _WIN32
# define LIBCEPHSQLITE_API __declspec(dllexport)
#else
# define LIBCEPHSQLITE_API extern "C"
#endif
/* This is the SQLite entry point when loaded as a dynamic library. You also
* need to ensure SQLite calls this method when using libcephsqlite as a static
* library or a dynamic library linked at compile time. For the latter case,
* you can do this by:
*
* sqlite3_auto_extension((void (*)())sqlite3_cephsqlite_init);
* sqlite3* db = nullptr;
* int rc = sqlite3_open_v2(":memory:", &db, SQLITE_OPEN_READWRITE, nullptr);
* if (rc == SQLITE_DONE) {
* sqlite3_close(db);
* } else {
* // failure
* }
*
* The throwaway database created (name == "") is a memory database opened so
* that SQLite runs the libcephsqlite initialization routine to register the
* VFS. AFter that's done, the VFS is available for a future database open with
* the VFS set to "ceph":
*
* sqlite3_open_v2("foo:bar/baz.db", &db, SQLITE_OPEN_READWRITE, "ceph");
*
* You MUST do this before calling any other libcephsqlite routine so that
* sqlite3 can pass its API routines to the libcephsqlite extension.
*/
LIBCEPHSQLITE_API int sqlite3_cephsqlite_init(sqlite3* db, char** err, const sqlite3_api_routines* api);
/* If you prefer to have libcephsqlite use a CephContext managed by your
* application, use this routine to set that. libcephsqlite can only have one
* context globally.
*/
LIBCEPHSQLITE_API int cephsqlite_setcct(class CephContext* cct, char** ident);
#endif

View File

@ -55,7 +55,9 @@ extern "C" {
/* RADOS lock flags
* They are also defined in cls_lock_types.h. Keep them in sync!
*/
#define LIBRADOS_LOCK_FLAG_RENEW 0x1
#define LIBRADOS_LOCK_FLAG_RENEW (1u<<0)
#define LIBRADOS_LOCK_FLAG_MAY_RENEW LIBRADOS_LOCK_FLAG_RENEW
#define LIBRADOS_LOCK_FLAG_MUST_RENEW (1u<<1)
/*
* Constants for rados_write_op_create().

822
src/libcephsqlite.cc Normal file
View File

@ -0,0 +1,822 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2021 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License version 2.1, as published by
* the Free Software Foundation. See file COPYING.
*
*/
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <fmt/format.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iomanip>
#include <iostream>
#include <regex>
#include <sstream>
#include <string_view>
#include <limits.h>
#include <string.h>
#include <sqlite3ext.h>
SQLITE_EXTENSION_INIT1
#include "include/ceph_assert.h"
#include "include/rados/librados.hpp"
#include "common/Clock.h"
#include "common/Formatter.h"
#include "common/ceph_argparse.h"
#include "common/ceph_mutex.h"
#include "common/common_init.h"
#include "common/config.h"
#include "common/debug.h"
#include "common/errno.h"
#include "common/perf_counters.h"
#include "common/version.h"
#include "include/libcephsqlite.h"
#include "SimpleRADOSStriper.h"
#define dout_subsys ceph_subsys_cephsqlite
#undef dout_prefix
#define dout_prefix *_dout << "cephsqlite: " << __func__ << ": "
#define d(vfs,lvl) ldout(getcct(vfs), (lvl)) << "(client." << getdata(vfs).cluster.get_instance_id() << ") "
#define dv(lvl) d(vfs,(lvl))
#define df(lvl) d(f->vfs,(lvl)) << f->loc << " "
enum {
P_FIRST = 0xf0000,
P_OP_OPEN,
P_OP_DELETE,
P_OP_ACCESS,
P_OP_FULLPATHNAME,
P_OP_CURRENTTIME,
P_OPF_CLOSE,
P_OPF_READ,
P_OPF_WRITE,
P_OPF_TRUNCATE,
P_OPF_SYNC,
P_OPF_FILESIZE,
P_OPF_LOCK,
P_OPF_UNLOCK,
P_OPF_CHECKRESERVEDLOCK,
P_OPF_FILECONTROL,
P_OPF_SECTORSIZE,
P_OPF_DEVICECHARACTERISTICS,
P_LAST,
};
struct cephsqlite_appdata {
~cephsqlite_appdata() {
if (logger) {
cct->get_perfcounters_collection()->remove(logger.get());
}
if (striper_logger) {
cct->get_perfcounters_collection()->remove(striper_logger.get());
}
}
int setup_perf() {
ceph_assert(cct);
PerfCountersBuilder plb(cct.get(), "libcephsqlite_vfs", P_FIRST, P_LAST);
plb.add_time_avg(P_OP_OPEN, "op_open", "Time average of Open operations");
plb.add_time_avg(P_OP_DELETE, "op_delete", "Time average of Delete operations");
plb.add_time_avg(P_OP_ACCESS, "op_access", "Time average of Access operations");
plb.add_time_avg(P_OP_FULLPATHNAME, "op_fullpathname", "Time average of FullPathname operations");
plb.add_time_avg(P_OP_CURRENTTIME, "op_currenttime", "Time average of Currenttime operations");
plb.add_time_avg(P_OPF_CLOSE, "opf_close", "Time average of Close file operations");
plb.add_time_avg(P_OPF_READ, "opf_read", "Time average of Read file operations");
plb.add_time_avg(P_OPF_WRITE, "opf_write", "Time average of Write file operations");
plb.add_time_avg(P_OPF_TRUNCATE, "opf_truncate", "Time average of Truncate file operations");
plb.add_time_avg(P_OPF_SYNC, "opf_sync", "Time average of Sync file operations");
plb.add_time_avg(P_OPF_FILESIZE, "opf_filesize", "Time average of FileSize file operations");
plb.add_time_avg(P_OPF_LOCK, "opf_lock", "Time average of Lock file operations");
plb.add_time_avg(P_OPF_UNLOCK, "opf_unlock", "Time average of Unlock file operations");
plb.add_time_avg(P_OPF_CHECKRESERVEDLOCK, "opf_checkreservedlock", "Time average of CheckReservedLock file operations");
plb.add_time_avg(P_OPF_FILECONTROL, "opf_filecontrol", "Time average of FileControl file operations");
plb.add_time_avg(P_OPF_SECTORSIZE, "opf_sectorsize", "Time average of SectorSize file operations");
plb.add_time_avg(P_OPF_DEVICECHARACTERISTICS, "opf_devicecharacteristics", "Time average of DeviceCharacteristics file operations");
logger.reset(plb.create_perf_counters());
if (int rc = SimpleRADOSStriper::config_logger(cct.get(), "libcephsqlite_striper", &striper_logger); rc < 0) {
return rc;
}
cct->get_perfcounters_collection()->add(logger.get());
cct->get_perfcounters_collection()->add(striper_logger.get());
return 0;
}
int init_cluster() {
ceph_assert(cct);
ldout(cct, 5) << "initializing RADOS handle as " << cct->_conf->name << dendl;
if (int rc = cluster.init_with_context(cct.get()); rc < 0) {
lderr(cct) << "cannot initialize RADOS: " << cpp_strerror(rc) << dendl;
return rc;
}
if (int rc = cluster.connect(); rc < 0) {
lderr(cct) << "cannot connect: " << cpp_strerror(rc) << dendl;
return rc;
}
auto s = cluster.get_addrs();
ldout(cct, 5) << "completed connection to RADOS with address " << s << dendl;
return 0;
}
boost::intrusive_ptr<CephContext> cct;
std::unique_ptr<PerfCounters> logger;
std::shared_ptr<PerfCounters> striper_logger;
librados::Rados cluster;
struct sqlite3_vfs vfs{};
};
struct cephsqlite_fileloc {
std::string pool;
std::string radosns;
std::string name;
};
struct cephsqlite_fileio {
librados::IoCtx ioctx;
std::unique_ptr<SimpleRADOSStriper> rs;
};
std::ostream& operator<<(std::ostream &out, const cephsqlite_fileloc& fileloc) {
return out
<< "["
<< fileloc.pool
<< ":"
<< fileloc.radosns
<< "/"
<< fileloc.name
<< "]"
;
}
struct cephsqlite_file {
sqlite3_file base;
struct sqlite3_vfs* vfs = nullptr;
int flags = 0;
// There are 5 lock states: https://sqlite.org/c3ref/c_lock_exclusive.html
int lock = 0;
struct cephsqlite_fileloc loc{};
struct cephsqlite_fileio io{};
};
#define getdata(vfs) (*((cephsqlite_appdata*)((vfs)->pAppData)))
static CephContext* getcct(sqlite3_vfs* vfs)
{
auto&& appd = getdata(vfs);
auto& cct = appd.cct;
if (cct) {
return cct.get();
}
/* bootstrap cct */
std::vector<const char*> env_args;
env_to_vec(env_args, "CEPH_ARGS");
std::string cluster, conf_file_list; // unused
CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
cct = boost::intrusive_ptr<CephContext>(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
cct->_conf.parse_env(cct->get_module_type()); // environment variables override
cct->_conf.apply_changes(nullptr);
common_init_finish(cct.get());
if (int rc = appd.setup_perf(); rc < 0) {
ceph_abort("cannot setup perf counters");
}
if (int rc = appd.init_cluster(); rc < 0) {
ceph_abort("cannot setup RADOS cluster handle");
}
return cct.get();
}
static int Lock(sqlite3_file *file, int ilock)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << std::hex << ilock << dendl;
auto& lock = f->lock;
ceph_assert(!f->io.rs->is_locked() || lock > SQLITE_LOCK_NONE);
ceph_assert(lock <= ilock);
if (!f->io.rs->is_locked() && ilock > SQLITE_LOCK_NONE) {
if (int rc = f->io.rs->lock(0); rc < 0) {
df(5) << "failed: " << rc << dendl;
return SQLITE_IOERR;
}
}
lock = ilock;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_LOCK, end-start);
return SQLITE_OK;
}
static int Unlock(sqlite3_file *file, int ilock)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << std::hex << ilock << dendl;
auto& lock = f->lock;
ceph_assert(lock == SQLITE_LOCK_NONE || (lock > SQLITE_LOCK_NONE && f->io.rs->is_locked()));
ceph_assert(lock >= ilock);
if (ilock <= SQLITE_LOCK_NONE && SQLITE_LOCK_NONE < lock) {
if (int rc = f->io.rs->unlock(); rc < 0) {
df(5) << "failed: " << rc << dendl;
return SQLITE_IOERR;
}
}
lock = ilock;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_UNLOCK, end-start);
return SQLITE_OK;
}
static int CheckReservedLock(sqlite3_file *file, int *result)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << dendl;
auto& lock = f->lock;
if (lock > SQLITE_LOCK_SHARED) {
*result = 1;
}
df(10);
f->io.rs->print_lockers(*_dout);
*_dout << dendl;
*result = 0;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_CHECKRESERVEDLOCK, end-start);
return SQLITE_OK;
}
static int Close(sqlite3_file *file)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << dendl;
f->~cephsqlite_file();
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_CLOSE, end-start);
return SQLITE_OK;
}
static int Read(sqlite3_file *file, void *buf, int len, sqlite_int64 off)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << buf << " " << off << "~" << len << dendl;
if (int rc = f->io.rs->read(buf, len, off); rc < 0) {
df(5) << "read failed: " << cpp_strerror(rc) << dendl;
return SQLITE_IOERR_READ;
} else {
df(5) << "= " << rc << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_READ, end-start);
if (rc < len) {
memset(buf, 0, len-rc);
return SQLITE_IOERR_SHORT_READ;
} else {
return SQLITE_OK;
}
}
}
static int Write(sqlite3_file *file, const void *buf, int len, sqlite_int64 off)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << off << "~" << len << dendl;
if (int rc = f->io.rs->write(buf, len, off); rc < 0) {
df(5) << "write failed: " << cpp_strerror(rc) << dendl;
return SQLITE_IOERR_WRITE;
} else {
df(5) << "= " << rc << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_WRITE, end-start);
return SQLITE_OK;
}
}
static int Truncate(sqlite3_file *file, sqlite_int64 size)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << size << dendl;
if (int rc = f->io.rs->truncate(size); rc < 0) {
df(5) << "truncate failed: " << cpp_strerror(rc) << dendl;
return SQLITE_IOERR;
}
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_TRUNCATE, end-start);
return SQLITE_OK;
}
static int Sync(sqlite3_file *file, int flags)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << flags << dendl;
if (int rc = f->io.rs->flush(); rc < 0) {
df(5) << "failed: " << cpp_strerror(rc) << dendl;
return SQLITE_IOERR;
}
df(5) << " = 0" << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_SYNC, end-start);
return SQLITE_OK;
}
static int FileSize(sqlite3_file *file, sqlite_int64 *osize)
{
auto f = (cephsqlite_file*)file;
auto start = ceph::coarse_mono_clock::now();
df(5) << dendl;
uint64_t size = 0;
if (int rc = f->io.rs->stat(&size); rc < 0) {
df(5) << "stat failed: " << cpp_strerror(rc) << dendl;
return SQLITE_NOTFOUND;
}
*osize = (sqlite_int64)size;
df(5) << "= " << size << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_FILESIZE, end-start);
return SQLITE_OK;
}
static bool parsepath(std::string_view path, struct cephsqlite_fileloc* fileloc)
{
static const std::regex re1{"^/*(\\*[[:digit:]]+):([[:alnum:]-_.]*)/([[:alnum:]-._]+)$"};
static const std::regex re2{"^/*([[:alnum:]-_.]+):([[:alnum:]-_.]*)/([[:alnum:]-._]+)$"};
std::cmatch cm;
if (!std::regex_match(path.data(), cm, re1)) {
if (!std::regex_match(path.data(), cm, re2)) {
return false;
}
}
fileloc->pool = cm[1];
fileloc->radosns = cm[2];
fileloc->name = cm[3];
return true;
}
static int makestriper(sqlite3_vfs* vfs, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
{
auto&& appd = getdata(vfs);
auto& cct = appd.cct;
auto& cluster = appd.cluster;
bool gotmap = false;
dv(10) << loc << dendl;
enoent_retry:
if (loc.pool[0] == '*') {
std::string err;
int64_t id = strict_strtoll(loc.pool.c_str()+1, 10, &err);
ceph_assert(err.empty());
if (int rc = cluster.ioctx_create2(id, io->ioctx); rc < 0) {
if (rc == -ENOENT && !gotmap) {
cluster.wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
return rc;
}
} else {
if (int rc = cluster.ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
if (rc == -ENOENT && !gotmap) {
cluster.wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
return rc;
}
}
if (!loc.radosns.empty())
io->ioctx.set_namespace(loc.radosns);
io->rs = std::make_unique<SimpleRADOSStriper>(io->ioctx, loc.name);
io->rs->set_logger(appd.striper_logger);
io->rs->set_lock_timeout(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_timeout"));
io->rs->set_lock_interval(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_interval"));
io->rs->set_blocklist_the_dead(cct->_conf.get_val<bool>("cephsqlite_blocklist_dead_locker"));
return 0;
}
static int SectorSize(sqlite3_file* sf)
{
static const int size = 65536;
auto start = ceph::coarse_mono_clock::now();
auto f = (cephsqlite_file*)sf;
df(5) << " = " << size << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_SECTORSIZE, end-start);
return size;
}
static int FileControl(sqlite3_file* sf, int op, void *arg)
{
auto f = (cephsqlite_file*)sf;
auto start = ceph::coarse_mono_clock::now();
df(5) << op << ", " << arg << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_FILECONTROL, end-start);
return SQLITE_NOTFOUND;
}
static int DeviceCharacteristics(sqlite3_file* sf)
{
auto f = (cephsqlite_file*)sf;
auto start = ceph::coarse_mono_clock::now();
df(5) << dendl;
static const int c = 0
|SQLITE_IOCAP_ATOMIC
|SQLITE_IOCAP_POWERSAFE_OVERWRITE
|SQLITE_IOCAP_UNDELETABLE_WHEN_OPEN
|SQLITE_IOCAP_SAFE_APPEND
;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_DEVICECHARACTERISTICS, end-start);
return c;
}
static int Open(sqlite3_vfs *vfs, const char *name, sqlite3_file *file,
int flags, int *oflags)
{
static const sqlite3_io_methods io = {
1, /* iVersion */
Close, /* xClose */
Read, /* xRead */
Write, /* xWrite */
Truncate, /* xTruncate */
Sync, /* xSync */
FileSize, /* xFileSize */
Lock, /* xLock */
Unlock, /* xUnlock */
CheckReservedLock, /* xCheckReservedLock */
FileControl, /* xFileControl */
SectorSize, /* xSectorSize */
DeviceCharacteristics /* xDeviceCharacteristics */
};
auto start = ceph::coarse_mono_clock::now();
bool gotmap = false;
auto& cluster = getdata(vfs).cluster;
/* we are not going to create temporary files */
if (name == NULL) {
dv(-1) << " cannot open temporary database" << dendl;
return SQLITE_CANTOPEN;
}
auto path = std::string_view(name);
if (path == ":memory:"sv) {
dv(-1) << " cannot open temporary database" << dendl;
return SQLITE_IOERR;
}
dv(5) << path << " flags=" << std::hex << flags << dendl;
auto f = new (file)cephsqlite_file();
f->vfs = vfs;
if (!parsepath(path, &f->loc)) {
ceph_assert(0); /* xFullPathname validates! */
}
f->flags = flags;
enoent_retry:
if (int rc = makestriper(vfs, f->loc, &f->io); rc < 0) {
f->~cephsqlite_file();
dv(5) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
if (flags & SQLITE_OPEN_CREATE) {
dv(10) << "OPEN_CREATE" << dendl;
if (int rc = f->io.rs->create(); rc < 0 && rc != -EEXIST) {
if (rc == -ENOENT && !gotmap) {
/* we may have an out of date OSDMap which cancels the op in the
* Objecter. Try to get a new one and retry. This is mostly noticable
* in testing when pools are getting created/deleted left and right.
*/
dv(5) << "retrying create after getting latest OSDMap" << dendl;
cluster.wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
dv(5) << "file cannot be created: " << cpp_strerror(rc) << dendl;
return SQLITE_IOERR;
}
}
if (int rc = f->io.rs->open(); rc < 0) {
if (rc == -ENOENT && !gotmap) {
/* See comment above for create case. */
dv(5) << "retrying open after getting latest OSDMap" << dendl;
cluster.wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
dv(10) << "cannot open striper: " << cpp_strerror(rc) << dendl;
return rc;
}
if (oflags) {
*oflags = flags;
}
f->base.pMethods = &io;
auto end = ceph::coarse_mono_clock::now();
getdata(vfs).logger->tinc(P_OP_OPEN, end-start);
return SQLITE_OK;
}
/*
** Delete the file identified by argument path. If the dsync parameter
** is non-zero, then ensure the file-system modification to delete the
** file has been synced to disk before returning.
*/
static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
{
auto start = ceph::coarse_mono_clock::now();
dv(5) << "'" << path << "', " << dsync << dendl;
cephsqlite_fileloc fileloc;
if (!parsepath(path, &fileloc)) {
dv(5) << "path does not parse!" << dendl;
return SQLITE_NOTFOUND;
}
cephsqlite_fileio io;
if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
dv(5) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
if (int rc = io.rs->lock(0); rc < 0) {
return SQLITE_IOERR;
}
if (int rc = io.rs->remove(); rc < 0) {
dv(5) << "= " << rc << dendl;
return SQLITE_IOERR_DELETE;
}
/* No need to unlock */
dv(5) << "= 0" << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(vfs).logger->tinc(P_OP_DELETE, end-start);
return SQLITE_OK;
}
/*
** Query the file-system to see if the named file exists, is readable or
** is both readable and writable.
*/
static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
{
auto start = ceph::coarse_mono_clock::now();
dv(5) << path << " " << std::hex << flags << dendl;
cephsqlite_fileloc fileloc;
if (!parsepath(path, &fileloc)) {
dv(5) << "path does not parse!" << dendl;
return SQLITE_NOTFOUND;
}
cephsqlite_fileio io;
if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
dv(5) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
if (int rc = io.rs->open(); rc < 0) {
if (rc == -ENOENT) {
*result = 0;
return SQLITE_OK;
} else {
dv(10) << "cannot open striper: " << cpp_strerror(rc) << dendl;
*result = 0;
return SQLITE_IOERR;
}
}
uint64_t size = 0;
if (int rc = io.rs->stat(&size); rc < 0) {
dv(5) << "= " << rc << " (" << cpp_strerror(rc) << ")" << dendl;
*result = 0;
} else {
dv(5) << "= 0" << dendl;
*result = 1;
}
auto end = ceph::coarse_mono_clock::now();
getdata(vfs).logger->tinc(P_OP_ACCESS, end-start);
return SQLITE_OK;
}
/* This method is only called once for each database. It provides a chance to
* reformat the path into a canonical format.
*/
static int FullPathname(sqlite3_vfs* vfs, const char* ipath, int opathlen, char* opath)
{
auto start = ceph::coarse_mono_clock::now();
auto path = std::string_view(ipath);
dv(5) << "1: " << path << dendl;
cephsqlite_fileloc fileloc;
if (!parsepath(path, &fileloc)) {
dv(5) << "path does not parse!" << dendl;
return SQLITE_NOTFOUND;
}
dv(5) << " parsed " << fileloc << dendl;
auto p = fmt::format("{}:{}/{}", fileloc.pool, fileloc.radosns, fileloc.name);
if (p.size() >= (size_t)opathlen) {
dv(5) << "path too long!" << dendl;
return SQLITE_CANTOPEN;
}
strcpy(opath, p.c_str());
dv(5) << " output " << p << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(vfs).logger->tinc(P_OP_FULLPATHNAME, end-start);
return SQLITE_OK;
}
static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
{
auto start = ceph::coarse_mono_clock::now();
dv(5) << time << dendl;
auto t = ceph_clock_now();
*time = t.to_msec() + 2440587.5;
auto end = ceph::coarse_mono_clock::now();
getdata(vfs).logger->tinc(P_OP_CURRENTTIME, end-start);
return SQLITE_OK;
}
LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
{
ldout(cct, 1) << "cct: " << cct << dendl;
if (sqlite3_api == nullptr) {
lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
return -EINVAL;
}
auto vfs = sqlite3_vfs_find("ceph");
if (!vfs) {
lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
return -EINVAL;
}
auto& appd = getdata(vfs);
appd.cct = cct;
if (int rc = appd.setup_perf(); rc < 0) {
appd.cct = nullptr;
return rc;
}
if (int rc = appd.init_cluster(); rc < 0) {
appd.cct = nullptr;
return rc;
}
auto s = appd.cluster.get_addrs();
if (ident) {
*ident = strdup(s.c_str());
}
ldout(cct, 1) << "complete" << dendl;
return 0;
}
static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
{
auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
dv(10) << dendl;
auto&& appd = getdata(vfs);
JSONFormatter f(false);
f.open_object_section("ceph_perf");
appd.logger->dump_formatted(&f, false);
appd.striper_logger->dump_formatted(&f, false);
f.close_section();
{
CachedStackStringStream css;
f.flush(*css);
auto sv = css->strv();
dv(20) << " = " << sv << dendl;
sqlite3_result_text(ctx, sv.data(), sv.size(), SQLITE_TRANSIENT);
}
}
static void f_status(sqlite3_context* ctx, int argc, sqlite3_value** argv)
{
auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
dv(10) << dendl;
auto&& appd = getdata(vfs);
JSONFormatter f(false);
f.open_object_section("ceph_status");
f.dump_int("id", appd.cluster.get_instance_id());
f.dump_string("addr", appd.cluster.get_addrs());
f.close_section();
{
CachedStackStringStream css;
f.flush(*css);
auto sv = css->strv();
dv(20) << " = " << sv << dendl;
sqlite3_result_text(ctx, sv.data(), sv.size(), SQLITE_TRANSIENT);
}
}
static int autoreg(sqlite3* db, char** err, const struct sqlite3_api_routines* thunk)
{
auto vfs = sqlite3_vfs_find("ceph");
if (!vfs) {
ceph_abort("ceph vfs not found");
}
if (int rc = sqlite3_create_function(db, "ceph_perf", 0, SQLITE_UTF8, vfs, f_perf, nullptr, nullptr); rc) {
return rc;
}
if (int rc = sqlite3_create_function(db, "ceph_status", 0, SQLITE_UTF8, vfs, f_status, nullptr, nullptr); rc) {
return rc;
}
return SQLITE_OK;
}
LIBCEPHSQLITE_API int sqlite3_cephsqlite_init(sqlite3* db, char** err, const sqlite3_api_routines* api)
{
SQLITE_EXTENSION_INIT2(api);
auto vfs = sqlite3_vfs_find("ceph");
if (!vfs) {
auto appd = new cephsqlite_appdata;
vfs = &appd->vfs;
vfs->iVersion = 2;
vfs->szOsFile = sizeof(struct cephsqlite_file);
vfs->mxPathname = 4096;
vfs->zName = "ceph";
vfs->pAppData = appd;
vfs->xOpen = Open;
vfs->xDelete = Delete;
vfs->xAccess = Access;
vfs->xFullPathname = FullPathname;
vfs->xCurrentTimeInt64 = CurrentTime;
appd->cct = nullptr;
sqlite3_vfs_register(vfs, 0);
}
if (int rc = sqlite3_auto_extension((void(*)(void))autoreg); rc) {
return rc;
}
if (int rc = autoreg(db, err, api); rc) {
return rc;
}
return SQLITE_OK_LOAD_PERMANENTLY;
}

View File

@ -33,6 +33,9 @@ if(WITH_MGR)
mgr_commands.cc
$<TARGET_OBJECTS:mgr_cap_obj>)
add_executable(ceph-mgr ${mgr_srcs})
if(WITH_LIBCEPHSQLITE)
target_link_libraries(ceph-mgr cephsqlite SQLite3::SQLite3)
endif()
target_link_libraries(ceph-mgr
osdc client heap_profiler
global-static ceph-common

View File

@ -13,6 +13,8 @@
#include <Python.h>
#include <sqlite3.h>
#include "osdc/Objecter.h"
#include "client/Client.h"
#include "common/errno.h"
@ -21,6 +23,10 @@
#include "global/global_context.h"
#include "global/signal_handler.h"
#ifdef WITH_LIBCEPHSQLITE
# include "include/libcephsqlite.h"
#endif
#include "mgr/MgrContext.h"
#include "DaemonServer.h"
@ -353,6 +359,32 @@ void Mgr::init()
"Dump mgr status");
ceph_assert(r == 0);
#ifdef WITH_LIBCEPHSQLITE
dout(4) << "Using sqlite3 version: " << sqlite3_libversion() << dendl;
/* See libcephsqlite.h for rationale of this code. */
sqlite3_auto_extension((void (*)())sqlite3_cephsqlite_init);
{
sqlite3* db = nullptr;
if (int rc = sqlite3_open_v2(":memory:", &db, SQLITE_OPEN_READWRITE, nullptr); rc == SQLITE_OK) {
sqlite3_close(db);
} else {
derr << "could not open sqlite3: " << rc << dendl;
ceph_abort();
}
}
{
char *ident = nullptr;
if (int rc = cephsqlite_setcct(g_ceph_context, &ident); rc < 0) {
derr << "could not set libcephsqlite cct: " << rc << dendl;
ceph_abort();
}
entity_addrvec_t addrv;
addrv.parse(ident);
ident = (char*)realloc(ident, 0);
py_module_registry->register_client("libcephsqlite", addrv);
}
#endif
dout(4) << "Complete." << dendl;
initializing = false;
initialized = true;

View File

@ -290,6 +290,17 @@ void MonCapGrant::expand_profile(const EntityName& name) const
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R));
profile_grants.push_back(MonCapGrant("pg", MON_CAP_R));
}
if (profile == "simple-rados-client-with-blocklist") {
profile_grants.push_back(MonCapGrant("mon", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R));
profile_grants.push_back(MonCapGrant("pg", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd blocklist"));
profile_grants.back().command_args["blocklistop"] = StringConstraint(
StringConstraint::MATCH_TYPE_EQUAL, "add");
profile_grants.back().command_args["addr"] = StringConstraint(
StringConstraint::MATCH_TYPE_REGEX, "^[^/]+/[0-9]+$");
}
if (boost::starts_with(profile, "rbd")) {
profile_grants.push_back(MonCapGrant("mon", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R));

View File

@ -157,14 +157,14 @@ class Schedule(object):
return json.dumps({'path': self.path, 'schedule': self.schedule,
'retention': dump_retention(self.retention)})
CREATE_TABLES = '''CREATE TABLE schedules(
CREATE_TABLES = '''CREATE TABLE IF NOT EXISTS schedules(
id INTEGER PRIMARY KEY ASC,
path TEXT NOT NULL UNIQUE,
subvol TEXT,
retention TEXT DEFAULT '{}',
rel_path TEXT NOT NULL
);
CREATE TABLE schedules_meta(
CREATE TABLE IF NOT EXISTS schedules_meta(
id INTEGER PRIMARY KEY ASC,
schedule_id INT,
start TEXT NOT NULL,

View File

@ -124,42 +124,30 @@ class SnapSchedClient(CephfsClient):
def get_schedule_db(self, fs: str) -> sqlite3.Connection:
if fs not in self.sqlite_connections:
self.sqlite_connections[fs] = sqlite3.connect(
':memory:',
check_same_thread=False)
with self.sqlite_connections[fs] as con:
con.row_factory = sqlite3.Row
con.execute("PRAGMA FOREIGN_KEYS = 1")
pool = self.get_metadata_pool(fs)
assert pool, f'fs "{fs}" not found'
with open_ioctx(self, pool) as ioctx:
try:
size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
db = ioctx.read(SNAP_DB_OBJECT_NAME,
size).decode('utf-8')
con.executescript(db)
except rados.ObjectNotFound:
log.debug((f'No schedule DB found in {fs}, '
'creating one.'))
con.executescript(Schedule.CREATE_TABLES)
poolid = self.get_metadata_pool(fs)
assert poolid, f'fs "{fs}" not found'
uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph";
log.debug(f"using uri {uri}")
db = sqlite3.connect(uri, check_same_thread=False, uri=True)
db.execute('PRAGMA FOREIGN_KEYS = 1')
db.execute('PRAGMA JOURNAL_MODE = PERSIST')
db.execute('PRAGMA PAGE_SIZE = 65536')
db.execute('PRAGMA CACHE_SIZE = 256')
db.row_factory = sqlite3.Row
# check for legacy dump store
with open_ioctx(self, poolid) as ioctx:
try:
size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
db.executescript(dump)
ioctx.remove(SNAP_DB_OBJECT_NAME)
except rados.ObjectNotFound:
log.debug(f'No legacy schedule DB found in {fs}')
db.executescript(Schedule.CREATE_TABLES)
self.sqlite_connections[fs] = db
return db
return self.sqlite_connections[fs]
def store_schedule_db(self, fs: str) -> None:
# only store db is it exists, otherwise nothing to do
metadata_pool = self.get_metadata_pool(fs)
if not metadata_pool:
raise CephfsConnectionException(
-errno.ENOENT, "Filesystem {} does not exist".format(fs))
if fs in self.sqlite_connections:
db_content = []
db = self.sqlite_connections[fs]
with db:
for row in db.iterdump():
db_content.append(row)
with open_ioctx(self, metadata_pool) as ioctx:
ioctx.write_full(SNAP_DB_OBJECT_NAME,
'\n'.join(db_content).encode('utf-8'))
def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
if self.allow_minute_snaps:
@ -293,7 +281,6 @@ class SnapSchedClient(CephfsClient):
log.debug(f'attempting to add schedule {sched}')
db = self.get_schedule_db(fs)
sched.store_schedule(db)
self.store_schedule_db(sched.fs)
@updates_schedule_db
def rm_snap_schedule(self,

View File

@ -61,6 +61,7 @@ if(NOT WIN32)
add_subdirectory(filestore)
add_subdirectory(fs)
add_subdirectory(libcephfs)
add_subdirectory(libcephsqlite)
add_subdirectory(client)
add_subdirectory(mon)
add_subdirectory(mgr)

View File

@ -0,0 +1,15 @@
if(WITH_LIBCEPHSQLITE)
add_executable(ceph_test_libcephsqlite
main.cc
)
target_link_libraries(ceph_test_libcephsqlite
cephsqlite
librados
ceph-common
SQLite3::SQLite3
${UNITTEST_LIBS}
${EXTRALIBS}
${CMAKE_DL_LIBS}
)
install(TARGETS ceph_test_libcephsqlite DESTINATION ${CMAKE_INSTALL_BINDIR})
endif(WITH_LIBCEPHSQLITE)

File diff suppressed because it is too large Load Diff