mirror of
https://github.com/ceph/ceph
synced 2025-03-22 02:08:13 +00:00
Merge pull request #48729 from ivancich/wip-flight-select-add-flight
rgw: initial commit adding Arrow Flight functionality Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
commit
933a42f9af
@ -439,6 +439,7 @@ option(WITH_RADOSGW_DBSTORE "DBStore backend for Rados Gateway" ON)
|
||||
option(WITH_RADOSGW_MOTR "CORTX-Motr backend for Rados Gateway" OFF)
|
||||
option(WITH_RADOSGW_DAOS "DAOS backend for RADOS Gateway" OFF)
|
||||
option(WITH_RADOSGW_SELECT_PARQUET "Support for s3 select on parquet objects" ON)
|
||||
option(WITH_RADOSGW_ARROW_FLIGHT "Build arrow flight when not using system-provided arrow" OFF)
|
||||
|
||||
option(WITH_SYSTEM_ARROW "Use system-provided arrow" OFF)
|
||||
option(WITH_SYSTEM_UTF8PROC "Use system-provided utf8proc" OFF)
|
||||
|
@ -42,6 +42,15 @@ function(build_arrow)
|
||||
list(APPEND arrow_CMAKE_ARGS -DARROW_WITH_SNAPPY=ON) # required
|
||||
list(APPEND arrow_INTERFACE_LINK_LIBRARIES snappy::snappy)
|
||||
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
message("building arrow flight; make sure grpc-plugins is installed on the system")
|
||||
list(APPEND arrow_CMAKE_ARGS
|
||||
-DARROW_FLIGHT=ON -DARROW_WITH_RE2=OFF)
|
||||
find_package(gRPC REQUIRED)
|
||||
find_package(Protobuf REQUIRED)
|
||||
find_package(c-ares 1.13.0 QUIET REQUIRED)
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
list(APPEND arrow_CMAKE_ARGS -DARROW_WITH_ZLIB=ON) # required
|
||||
list(APPEND arrow_INTERFACE_LINK_LIBRARIES ZLIB::ZLIB)
|
||||
|
||||
@ -102,6 +111,11 @@ function(build_arrow)
|
||||
set(arrow_BYPRODUCTS ${arrow_LIBRARY})
|
||||
list(APPEND arrow_BYPRODUCTS ${parquet_LIBRARY})
|
||||
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
set(arrow_flight_LIBRARY "${arrow_LIBRARY_DIR}/libarrow_flight.a")
|
||||
list(APPEND arrow_BYPRODUCTS ${arrow_flight_LIBRARY})
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
if(CMAKE_MAKE_PROGRAM MATCHES "make")
|
||||
# try to inherit command line arguments passed by parent "make" job
|
||||
set(make_cmd $(MAKE))
|
||||
@ -140,4 +154,14 @@ function(build_arrow)
|
||||
set_target_properties(Arrow::Parquet PROPERTIES
|
||||
IMPORTED_LINK_INTERFACE_LANGUAGES "CXX"
|
||||
IMPORTED_LOCATION "${parquet_LIBRARY}")
|
||||
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
add_library(Arrow::Flight STATIC IMPORTED)
|
||||
add_dependencies(Arrow::Flight arrow_ext)
|
||||
target_link_libraries(Arrow::Flight INTERFACE Arrow::Arrow gRPC::grpc++)
|
||||
set_target_properties(Arrow::Flight PROPERTIES
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${arrow_INCLUDE_DIR}" # flight is accessed via "arrow/flight"
|
||||
IMPORTED_LINK_INTERFACE_LANGUAGES "CXX"
|
||||
IMPORTED_LOCATION "${arrow_flight_LIBRARY}")
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
endfunction()
|
||||
|
@ -21,12 +21,19 @@ find_package_handle_standard_args(c-ares
|
||||
c-ares_LIBRARY
|
||||
VERSION_VAR c-ares_VERSION)
|
||||
|
||||
if(c-ares_FOUND AND NOT (TARGET c-ares::cares))
|
||||
add_library(c-ares::cares UNKNOWN IMPORTED GLOBAL)
|
||||
set_target_properties(c-ares::cares PROPERTIES
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${c-ares_INCLUDE_DIR}"
|
||||
IMPORTED_LINK_INTERFACE_LANGUAGES "C"
|
||||
IMPORTED_LOCATION "${c-ares_LIBRARY}")
|
||||
if(c-ares_FOUND)
|
||||
if(NOT TARGET c-ares::cares)
|
||||
add_library(c-ares::cares UNKNOWN IMPORTED GLOBAL)
|
||||
set_target_properties(c-ares::cares PROPERTIES
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${c-ares_INCLUDE_DIR}"
|
||||
IMPORTED_LINK_INTERFACE_LANGUAGES "C"
|
||||
IMPORTED_LOCATION "${c-ares_LIBRARY}")
|
||||
endif()
|
||||
|
||||
# to be compatible with old Seastar
|
||||
add_library(c-ares::c-ares ALIAS c-ares::cares)
|
||||
|
||||
if(NOT TARGET c-ares::c-ares)
|
||||
add_library(c-ares::c-ares ALIAS c-ares::cares)
|
||||
endif()
|
||||
endif()
|
||||
|
@ -893,7 +893,7 @@ if(WITH_KVS)
|
||||
endif(WITH_KVS)
|
||||
|
||||
if(WITH_RADOSGW)
|
||||
if(WITH_RADOSGW_SELECT_PARQUET)
|
||||
if(WITH_RADOSGW_SELECT_PARQUET OR WITH_RADOSGW_ARROW_FLIGHT)
|
||||
if(WITH_SYSTEM_ARROW)
|
||||
find_package(Arrow 4 REQUIRED QUIET)
|
||||
find_package(Parquet 4 REQUIRED QUIET)
|
||||
@ -909,8 +909,8 @@ if(WITH_RADOSGW)
|
||||
|
||||
include(BuildArrow)
|
||||
build_arrow()
|
||||
endif()
|
||||
endif()
|
||||
endif(WITH_SYSTEM_ARROW)
|
||||
endif(WITH_RADOSGW_SELECT_PARQUET OR WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
add_subdirectory(libkmip)
|
||||
add_subdirectory(rgw)
|
||||
|
@ -63,6 +63,7 @@ SUBSYS(rgw_sync, 1, 5)
|
||||
SUBSYS(rgw_datacache, 1, 5)
|
||||
SUBSYS(rgw_access, 1, 5)
|
||||
SUBSYS(rgw_dbstore, 1, 5)
|
||||
SUBSYS(rgw_flight, 1, 5)
|
||||
SUBSYS(javaclient, 1, 5)
|
||||
SUBSYS(asok, 1, 5)
|
||||
SUBSYS(throttle, 1, 1)
|
||||
|
@ -9,6 +9,12 @@ if(WITH_RADOSGW_SELECT_PARQUET)
|
||||
message("-- arrow is installed, radosgw/s3select-op is able to process parquet objects")
|
||||
endif(WITH_RADOSGW_SELECT_PARQUET)
|
||||
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
set(ARROW_FLIGHT_LIBRARIES Arrow::Arrow Arrow::Parquet Arrow::Flight utf8proc::utf8proc) # order is important
|
||||
add_definitions(-D_ARROW_EXIST)
|
||||
message("-- arrow flight is installed")
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
function(gperf_generate input output)
|
||||
add_custom_command(
|
||||
OUTPUT ${output}
|
||||
@ -213,6 +219,12 @@ endif()
|
||||
if(WITH_JAEGER)
|
||||
list(APPEND librgw_common_srcs rgw_tracer.cc)
|
||||
endif()
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
# NOTE: eventually don't want this in common but just in radosgw daemon
|
||||
# list(APPEND radosgw_srcs rgw_flight.cc rgw_flight_frontend.cc)
|
||||
list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc)
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
|
||||
add_library(rgw_common STATIC ${librgw_common_srcs})
|
||||
target_compile_definitions(rgw_common
|
||||
@ -249,6 +261,7 @@ target_link_libraries(rgw_common
|
||||
${CURL_LIBRARIES}
|
||||
${EXPAT_LIBRARIES}
|
||||
${ARROW_LIBRARIES}
|
||||
${ARROW_FLIGHT_LIBRARIES}
|
||||
${ALLOC_LIBS}
|
||||
PUBLIC
|
||||
${LUA_LIBRARIES}
|
||||
@ -377,6 +390,7 @@ target_link_libraries(rgw_a
|
||||
common_utf8 global
|
||||
${CRYPTO_LIBS}
|
||||
${ARROW_LIBRARIES}
|
||||
${ARROW_FLIGHT_LIBRARIES}
|
||||
OATH::OATH
|
||||
PUBLIC
|
||||
rgw_common
|
||||
@ -403,6 +417,14 @@ set(radosgw_srcs
|
||||
|
||||
add_executable(radosgw ${radosgw_srcs})
|
||||
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
# target_compile_definitions(radosgw PUBLIC WITH_ARROW_FLIGHT)
|
||||
target_compile_definitions(rgw_common PUBLIC WITH_ARROW_FLIGHT)
|
||||
target_include_directories(rgw_common
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/src/arrow/cpp/src")
|
||||
# target_include_directories(radosgw PUBLIC Arrow::Arrow)
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
target_compile_definitions(radosgw PUBLIC "-DCLS_CLIENT_HIDE_IOCTX")
|
||||
target_include_directories(radosgw
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/src/dmclock/support/src"
|
||||
@ -423,6 +445,13 @@ set(radosgw_admin_srcs
|
||||
rgw_admin.cc
|
||||
rgw_sync_checkpoint.cc
|
||||
rgw_orphan.cc)
|
||||
|
||||
# this is unsatisfying and hopefully temporary; ARROW should not be
|
||||
# part of radosgw_admin
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
list(APPEND radosgw_admin_srcs rgw_flight.cc)
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
add_executable(radosgw-admin ${radosgw_admin_srcs})
|
||||
target_link_libraries(radosgw-admin ${rgw_libs} librados
|
||||
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
|
||||
@ -431,6 +460,13 @@ target_link_libraries(radosgw-admin ${rgw_libs} librados
|
||||
global ${LIB_RESOLV}
|
||||
OATH::OATH
|
||||
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
|
||||
|
||||
# this is unsatisfying and hopefully temporary; ARROW should not be
|
||||
# part of radosgw_admin
|
||||
if(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
target_link_libraries(radosgw-admin ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES})
|
||||
endif(WITH_RADOSGW_ARROW_FLIGHT)
|
||||
|
||||
install(TARGETS radosgw-admin DESTINATION bin)
|
||||
|
||||
set(radosgw_es_srcs
|
||||
|
@ -62,6 +62,9 @@
|
||||
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
|
||||
#include "rgw_kafka.h"
|
||||
#endif
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
#include "rgw_flight_frontend.h"
|
||||
#endif
|
||||
#include "rgw_asio_frontend.h"
|
||||
#include "rgw_dmclock_scheduler_ctx.h"
|
||||
#include "rgw_lua.h"
|
||||
@ -433,6 +436,16 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib)
|
||||
rgwlib->set_fe(static_cast<RGWLibFrontend*>(fe));
|
||||
}
|
||||
}
|
||||
else if (framework == "arrow_flight") {
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
int port;
|
||||
config->get_val("port", 8077, &port);
|
||||
fe = new rgw::flight::FlightFrontend(env, config, port);
|
||||
#else
|
||||
derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl;
|
||||
continue;
|
||||
#endif
|
||||
}
|
||||
|
||||
service_map_meta["frontend_type#" + stringify(fe_count)] = framework;
|
||||
service_map_meta["frontend_config#" + stringify(fe_count)] = config->get_config();
|
||||
|
716
src/rgw/rgw_flight.cc
Normal file
716
src/rgw/rgw_flight.cc
Normal file
@ -0,0 +1,716 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <mutex>
|
||||
#include <map>
|
||||
#include <algorithm>
|
||||
|
||||
#include "arrow/type.h"
|
||||
#include "arrow/buffer.h"
|
||||
#include "arrow/util/string_view.h"
|
||||
#include "arrow/io/interfaces.h"
|
||||
#include "arrow/ipc/reader.h"
|
||||
#include "arrow/table.h"
|
||||
|
||||
#include "arrow/flight/server.h"
|
||||
|
||||
#include "parquet/arrow/reader.h"
|
||||
|
||||
#include "common/dout.h"
|
||||
#include "rgw_op.h"
|
||||
|
||||
#include "rgw_flight.h"
|
||||
#include "rgw_flight_frontend.h"
|
||||
|
||||
|
||||
namespace rgw::flight {
|
||||
|
||||
// Ticket and FlightKey
|
||||
|
||||
std::atomic<FlightKey> next_flight_key = null_flight_key;
|
||||
|
||||
flt::Ticket FlightKeyToTicket(const FlightKey& key) {
|
||||
flt::Ticket result;
|
||||
result.ticket = std::to_string(key);
|
||||
return result;
|
||||
}
|
||||
|
||||
arw::Result<FlightKey> TicketToFlightKey(const flt::Ticket& t) {
|
||||
try {
|
||||
return (FlightKey) std::stoul(t.ticket);
|
||||
} catch (std::invalid_argument const& ex) {
|
||||
return arw::Status::Invalid(
|
||||
"could not convert Ticket containing \"%s\" into a Flight Key",
|
||||
t.ticket);
|
||||
} catch (const std::out_of_range& ex) {
|
||||
return arw::Status::Invalid(
|
||||
"could not convert Ticket containing \"%s\" into a Flight Key due to range",
|
||||
t.ticket);
|
||||
}
|
||||
}
|
||||
|
||||
// FlightData
|
||||
|
||||
FlightData::FlightData(const std::string& _uri,
|
||||
const std::string& _tenant_name,
|
||||
const std::string& _bucket_name,
|
||||
const rgw_obj_key& _object_key,
|
||||
uint64_t _num_records,
|
||||
uint64_t _obj_size,
|
||||
std::shared_ptr<arw::Schema>& _schema,
|
||||
std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
|
||||
rgw_user _user_id) :
|
||||
key(++next_flight_key),
|
||||
/* expires(coarse_real_clock::now() + lifespan), */
|
||||
uri(_uri),
|
||||
tenant_name(_tenant_name),
|
||||
bucket_name(_bucket_name),
|
||||
object_key(_object_key),
|
||||
num_records(_num_records),
|
||||
obj_size(_obj_size),
|
||||
schema(_schema),
|
||||
kv_metadata(_kv_metadata),
|
||||
user_id(_user_id)
|
||||
{ }
|
||||
|
||||
/**** FlightStore ****/
|
||||
|
||||
FlightStore::FlightStore(const DoutPrefix& _dp) :
|
||||
dp(_dp)
|
||||
{ }
|
||||
|
||||
FlightStore::~FlightStore() { }
|
||||
|
||||
/**** MemoryFlightStore ****/
|
||||
|
||||
MemoryFlightStore::MemoryFlightStore(const DoutPrefix& _dp) :
|
||||
FlightStore(_dp)
|
||||
{ }
|
||||
|
||||
MemoryFlightStore::~MemoryFlightStore() { }
|
||||
|
||||
FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
|
||||
std::pair<decltype(map)::iterator,bool> result;
|
||||
{
|
||||
const std::lock_guard lock(mtx);
|
||||
result = map.insert( {flight.key, std::move(flight)} );
|
||||
}
|
||||
ceph_assertf(result.second,
|
||||
"unable to add FlightData to MemoryFlightStore"); // temporary until error handling
|
||||
|
||||
return result.first->second.key;
|
||||
}
|
||||
|
||||
arw::Result<FlightData> MemoryFlightStore::get_flight(const FlightKey& key) const {
|
||||
const std::lock_guard lock(mtx);
|
||||
auto i = map.find(key);
|
||||
if (i == map.cend()) {
|
||||
return arw::Status::KeyError("could not find Flight with Key %" PRIu32,
|
||||
key);
|
||||
} else {
|
||||
return i->second;
|
||||
}
|
||||
}
|
||||
|
||||
// returns either the next FilghtData or, if at end, empty optional
|
||||
std::optional<FlightData> MemoryFlightStore::after_key(const FlightKey& key) const {
|
||||
std::optional<FlightData> result;
|
||||
{
|
||||
const std::lock_guard lock(mtx);
|
||||
auto i = map.upper_bound(key);
|
||||
if (i != map.end()) {
|
||||
result = i->second;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int MemoryFlightStore::remove_flight(const FlightKey& key) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemoryFlightStore::expire_flights() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**** FlightServer ****/
|
||||
|
||||
FlightServer::FlightServer(RGWProcessEnv& _env,
|
||||
FlightStore* _flight_store,
|
||||
const DoutPrefix& _dp) :
|
||||
env(_env),
|
||||
driver(env.driver),
|
||||
dp(_dp),
|
||||
flight_store(_flight_store)
|
||||
{ }
|
||||
|
||||
FlightServer::~FlightServer()
|
||||
{ }
|
||||
|
||||
|
||||
arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
|
||||
const flt::Criteria* criteria,
|
||||
std::unique_ptr<flt::FlightListing>* listings) {
|
||||
|
||||
// function local class to implement FlightListing interface
|
||||
class RGWFlightListing : public flt::FlightListing {
|
||||
|
||||
FlightStore* flight_store;
|
||||
FlightKey previous_key;
|
||||
|
||||
public:
|
||||
|
||||
RGWFlightListing(FlightStore* flight_store) :
|
||||
flight_store(flight_store),
|
||||
previous_key(null_flight_key)
|
||||
{ }
|
||||
|
||||
arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
|
||||
std::optional<FlightData> fd = flight_store->after_key(previous_key);
|
||||
if (fd) {
|
||||
previous_key = fd->key;
|
||||
auto descriptor =
|
||||
flt::FlightDescriptor::Path(
|
||||
{ fd->tenant_name, fd->bucket_name, fd->object_key.name, fd->object_key.instance, fd->object_key.ns });
|
||||
flt::FlightEndpoint endpoint;
|
||||
endpoint.ticket = FlightKeyToTicket(fd->key);
|
||||
std::vector<flt::FlightEndpoint> endpoints { endpoint };
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
|
||||
flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
|
||||
*info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
|
||||
return arw::Status::OK();
|
||||
} else {
|
||||
*info = nullptr;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
}
|
||||
}; // class RGWFlightListing
|
||||
|
||||
*listings = std::make_unique<RGWFlightListing>(flight_store);
|
||||
return arw::Status::OK();
|
||||
} // FlightServer::ListFlights
|
||||
|
||||
|
||||
arw::Status FlightServer::GetFlightInfo(const flt::ServerCallContext &context,
|
||||
const flt::FlightDescriptor &request,
|
||||
std::unique_ptr<flt::FlightInfo> *info) {
|
||||
return arw::Status::OK();
|
||||
} // FlightServer::GetFlightInfo
|
||||
|
||||
|
||||
arw::Status FlightServer::GetSchema(const flt::ServerCallContext &context,
|
||||
const flt::FlightDescriptor &request,
|
||||
std::unique_ptr<flt::SchemaResult> *schema) {
|
||||
return arw::Status::OK();
|
||||
} // FlightServer::GetSchema
|
||||
|
||||
// A Buffer that owns its memory and frees it when the Buffer is
|
||||
// destructed
|
||||
class OwnedBuffer : public arw::Buffer {
|
||||
|
||||
uint8_t* buffer;
|
||||
|
||||
protected:
|
||||
|
||||
OwnedBuffer(uint8_t* _buffer, int64_t _size) :
|
||||
Buffer(_buffer, _size),
|
||||
buffer(_buffer)
|
||||
{ }
|
||||
|
||||
public:
|
||||
|
||||
~OwnedBuffer() override {
|
||||
delete[] buffer;
|
||||
}
|
||||
|
||||
static arw::Result<std::shared_ptr<OwnedBuffer>> make(int64_t size) {
|
||||
uint8_t* buffer = new (std::nothrow) uint8_t[size];
|
||||
if (!buffer) {
|
||||
return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
|
||||
}
|
||||
|
||||
OwnedBuffer* ptr = new OwnedBuffer(buffer, size);
|
||||
std::shared_ptr<OwnedBuffer> result;
|
||||
result.reset(ptr);
|
||||
return result;
|
||||
}
|
||||
|
||||
// if what's read in is less than capacity
|
||||
void set_size(int64_t size) {
|
||||
size_ = size;
|
||||
}
|
||||
|
||||
// pointer that can be used to write into buffer
|
||||
uint8_t* writeable_data() {
|
||||
return buffer;
|
||||
}
|
||||
}; // class OwnedBuffer
|
||||
|
||||
#if 0 // remove classes used for testing and incrementally building
|
||||
|
||||
// make local to DoGet eventually
|
||||
class LocalInputStream : public arw::io::InputStream {
|
||||
|
||||
std::iostream::pos_type position;
|
||||
std::fstream file;
|
||||
std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
|
||||
const DoutPrefix dp;
|
||||
|
||||
public:
|
||||
|
||||
LocalInputStream(std::shared_ptr<const arw::KeyValueMetadata> _kv_metadata,
|
||||
const DoutPrefix _dp) :
|
||||
kv_metadata(_kv_metadata),
|
||||
dp(_dp)
|
||||
{}
|
||||
|
||||
arw::Status Open() {
|
||||
file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
|
||||
if (!file.good()) {
|
||||
return arw::Status::IOError("unable to open file");
|
||||
}
|
||||
|
||||
INFO << "file opened successfully" << dendl;
|
||||
position = file.tellg();
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
arw::Status Close() override {
|
||||
file.close();
|
||||
INFO << "file closed" << dendl;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Tell() const override {
|
||||
if (position < 0) {
|
||||
return arw::Status::IOError(
|
||||
"could not query file implementaiton with tellg");
|
||||
} else {
|
||||
return int64_t(position);
|
||||
}
|
||||
}
|
||||
|
||||
bool closed() const override {
|
||||
return file.is_open();
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
|
||||
INFO << "entered: asking for " << nbytes << " bytes" << dendl;
|
||||
if (file.read(reinterpret_cast<char*>(out),
|
||||
reinterpret_cast<std::streamsize>(nbytes))) {
|
||||
const std::streamsize bytes_read = file.gcount();
|
||||
INFO << "Point A: read bytes " << bytes_read << dendl;
|
||||
position = file.tellg();
|
||||
return bytes_read;
|
||||
} else {
|
||||
ERROR << "unable to read from file" << dendl;
|
||||
return arw::Status::IOError("unable to read from offset %" PRId64,
|
||||
int64_t(position));
|
||||
}
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
|
||||
INFO << "entered: " << ": asking for " << nbytes << " bytes" << dendl;
|
||||
|
||||
std::shared_ptr<OwnedBuffer> buffer;
|
||||
ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
|
||||
|
||||
if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
|
||||
reinterpret_cast<std::streamsize>(nbytes))) {
|
||||
const auto bytes_read = file.gcount();
|
||||
INFO << "Point B: read bytes " << bytes_read << dendl;
|
||||
// buffer->set_size(bytes_read);
|
||||
position = file.tellg();
|
||||
return buffer;
|
||||
} else if (file.rdstate() & std::ifstream::failbit &&
|
||||
file.rdstate() & std::ifstream::eofbit) {
|
||||
const auto bytes_read = file.gcount();
|
||||
INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
|
||||
// buffer->set_size(bytes_read);
|
||||
position = file.tellg();
|
||||
return buffer;
|
||||
} else {
|
||||
ERROR << "unable to read from file" << dendl;
|
||||
return arw::Status::IOError("unable to read from offset %ld", position);
|
||||
}
|
||||
}
|
||||
|
||||
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
|
||||
INFO << "called, not implemented" << dendl;
|
||||
return arw::Status::NotImplemented("peek not currently allowed");
|
||||
}
|
||||
|
||||
bool supports_zero_copy() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() override {
|
||||
INFO << "called" << dendl;
|
||||
return kv_metadata;
|
||||
}
|
||||
}; // class LocalInputStream
|
||||
|
||||
class LocalRandomAccessFile : public arw::io::RandomAccessFile {
|
||||
|
||||
FlightData flight_data;
|
||||
const DoutPrefix dp;
|
||||
|
||||
std::iostream::pos_type position;
|
||||
std::fstream file;
|
||||
|
||||
public:
|
||||
LocalRandomAccessFile(const FlightData& _flight_data, const DoutPrefix _dp) :
|
||||
flight_data(_flight_data),
|
||||
dp(_dp)
|
||||
{ }
|
||||
|
||||
// implement InputStream
|
||||
|
||||
arw::Status Open() {
|
||||
file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
|
||||
if (!file.good()) {
|
||||
return arw::Status::IOError("unable to open file");
|
||||
}
|
||||
|
||||
INFO << "file opened successfully" << dendl;
|
||||
position = file.tellg();
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
arw::Status Close() override {
|
||||
file.close();
|
||||
INFO << "file closed" << dendl;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Tell() const override {
|
||||
if (position < 0) {
|
||||
return arw::Status::IOError(
|
||||
"could not query file implementaiton with tellg");
|
||||
} else {
|
||||
return int64_t(position);
|
||||
}
|
||||
}
|
||||
|
||||
bool closed() const override {
|
||||
return file.is_open();
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
|
||||
INFO << "entered: asking for " << nbytes << " bytes" << dendl;
|
||||
if (file.read(reinterpret_cast<char*>(out),
|
||||
reinterpret_cast<std::streamsize>(nbytes))) {
|
||||
const std::streamsize bytes_read = file.gcount();
|
||||
INFO << "Point A: read bytes " << bytes_read << dendl;
|
||||
position = file.tellg();
|
||||
return bytes_read;
|
||||
} else {
|
||||
ERROR << "unable to read from file" << dendl;
|
||||
return arw::Status::IOError("unable to read from offset %" PRId64,
|
||||
int64_t(position));
|
||||
}
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
|
||||
INFO << "entered: asking for " << nbytes << " bytes" << dendl;
|
||||
|
||||
std::shared_ptr<OwnedBuffer> buffer;
|
||||
ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
|
||||
|
||||
if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
|
||||
reinterpret_cast<std::streamsize>(nbytes))) {
|
||||
const auto bytes_read = file.gcount();
|
||||
INFO << "Point B: read bytes " << bytes_read << dendl;
|
||||
// buffer->set_size(bytes_read);
|
||||
position = file.tellg();
|
||||
return buffer;
|
||||
} else if (file.rdstate() & std::ifstream::failbit &&
|
||||
file.rdstate() & std::ifstream::eofbit) {
|
||||
const auto bytes_read = file.gcount();
|
||||
INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
|
||||
// buffer->set_size(bytes_read);
|
||||
position = file.tellg();
|
||||
return buffer;
|
||||
} else {
|
||||
ERROR << "unable to read from file" << dendl;
|
||||
return arw::Status::IOError("unable to read from offset %ld", position);
|
||||
}
|
||||
}
|
||||
|
||||
bool supports_zero_copy() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
// implement Seekable
|
||||
|
||||
arw::Result<int64_t> GetSize() override {
|
||||
return flight_data.obj_size;
|
||||
}
|
||||
|
||||
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
|
||||
std::iostream::pos_type here = file.tellg();
|
||||
if (here == -1) {
|
||||
return arw::Status::IOError(
|
||||
"unable to determine current position ahead of peek");
|
||||
}
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(OwningStringView result,
|
||||
OwningStringView::make(nbytes));
|
||||
|
||||
// read
|
||||
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
|
||||
Read(nbytes, (void*) result.writeable_data()));
|
||||
(void) bytes_read; // silence unused variable warnings
|
||||
|
||||
// return offset to original
|
||||
ARROW_RETURN_NOT_OK(Seek(here));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
|
||||
return flight_data.kv_metadata;
|
||||
}
|
||||
|
||||
arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
|
||||
const arw::io::IOContext& io_context) override {
|
||||
return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
|
||||
}
|
||||
|
||||
// implement Seekable interface
|
||||
|
||||
arw::Status Seek(int64_t position) {
|
||||
file.seekg(position);
|
||||
if (file.fail()) {
|
||||
return arw::Status::IOError(
|
||||
"error encountered during seek to %" PRId64, position);
|
||||
} else {
|
||||
return arw::Status::OK();
|
||||
}
|
||||
}
|
||||
}; // class LocalRandomAccessFile
|
||||
#endif
|
||||
|
||||
class RandomAccessObject : public arw::io::RandomAccessFile {
|
||||
|
||||
FlightData flight_data;
|
||||
const DoutPrefix dp;
|
||||
|
||||
int64_t position;
|
||||
bool is_closed;
|
||||
std::unique_ptr<rgw::sal::Object::ReadOp> op;
|
||||
|
||||
public:
|
||||
|
||||
RandomAccessObject(const FlightData& _flight_data,
|
||||
std::unique_ptr<rgw::sal::Object>& obj,
|
||||
const DoutPrefix _dp) :
|
||||
flight_data(_flight_data),
|
||||
dp(_dp),
|
||||
position(-1),
|
||||
is_closed(false)
|
||||
{
|
||||
op = obj->get_read_op();
|
||||
}
|
||||
|
||||
arw::Status Open() {
|
||||
int ret = op->prepare(null_yield, &dp);
|
||||
if (ret < 0) {
|
||||
return arw::Status::IOError(
|
||||
"unable to prepare object with error %d", ret);
|
||||
}
|
||||
INFO << "file opened successfully" << dendl;
|
||||
position = 0;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
// implement InputStream
|
||||
|
||||
arw::Status Close() override {
|
||||
position = -1;
|
||||
is_closed = true;
|
||||
(void) op.reset();
|
||||
INFO << "object closed" << dendl;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Tell() const override {
|
||||
if (position < 0) {
|
||||
return arw::Status::IOError("could not determine position");
|
||||
} else {
|
||||
return position;
|
||||
}
|
||||
}
|
||||
|
||||
bool closed() const override {
|
||||
return is_closed;
|
||||
}
|
||||
|
||||
arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
|
||||
INFO << "entered: asking for " << nbytes << " bytes" << dendl;
|
||||
|
||||
if (position < 0) {
|
||||
ERROR << "error, position indicated error" << dendl;
|
||||
return arw::Status::IOError("object read op is in bad state");
|
||||
}
|
||||
|
||||
// note: read function reads through end_position inclusive
|
||||
int64_t end_position = position + nbytes - 1;
|
||||
|
||||
bufferlist bl;
|
||||
|
||||
const int64_t bytes_read =
|
||||
op->read(position, end_position, bl, null_yield, &dp);
|
||||
if (bytes_read < 0) {
|
||||
const int64_t former_position = position;
|
||||
position = -1;
|
||||
ERROR << "read operation returned " << bytes_read << dendl;
|
||||
return arw::Status::IOError(
|
||||
"unable to read object at position %" PRId64 ", error code: %" PRId64,
|
||||
former_position,
|
||||
bytes_read);
|
||||
}
|
||||
|
||||
// TODO: see if there's a way to get rid of this copy, perhaps
|
||||
// updating rgw::sal::read_op
|
||||
bl.cbegin().copy(bytes_read, reinterpret_cast<char*>(out));
|
||||
|
||||
position += bytes_read;
|
||||
|
||||
if (nbytes != bytes_read) {
|
||||
INFO << "partial read: nbytes=" << nbytes <<
|
||||
", bytes_read=" << bytes_read << dendl;
|
||||
}
|
||||
INFO << bytes_read << " bytes read" << dendl;
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
|
||||
INFO << "entered: asking for " << nbytes << " bytes" << dendl;
|
||||
|
||||
std::shared_ptr<OwnedBuffer> buffer;
|
||||
ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
|
||||
Read(nbytes, buffer->writeable_data()));
|
||||
buffer->set_size(bytes_read);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
bool supports_zero_copy() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
// implement Seekable
|
||||
|
||||
arw::Result<int64_t> GetSize() override {
|
||||
INFO << "entered: " << flight_data.obj_size << " returned" << dendl;
|
||||
return flight_data.obj_size;
|
||||
}
|
||||
|
||||
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
|
||||
INFO << "entered: " << nbytes << " bytes" << dendl;
|
||||
|
||||
int64_t saved_position = position;
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(OwningStringView buffer,
|
||||
OwningStringView::make(nbytes));
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
|
||||
Read(nbytes, (void*) buffer.writeable_data()));
|
||||
|
||||
// restore position for a peek
|
||||
position = saved_position;
|
||||
|
||||
if (bytes_read < nbytes) {
|
||||
// create new OwningStringView with moved buffer
|
||||
return OwningStringView::shrink(std::move(buffer), bytes_read);
|
||||
} else {
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
|
||||
return flight_data.kv_metadata;
|
||||
}
|
||||
|
||||
arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
|
||||
const arw::io::IOContext& io_context) override {
|
||||
return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
|
||||
}
|
||||
|
||||
// implement Seekable interface
|
||||
|
||||
arw::Status Seek(int64_t new_position) {
|
||||
INFO << "entered: position: " << new_position << dendl;
|
||||
if (position < 0) {
|
||||
ERROR << "error, position indicated error" << dendl;
|
||||
return arw::Status::IOError("object read op is in bad state");
|
||||
} else {
|
||||
position = new_position;
|
||||
return arw::Status::OK();
|
||||
}
|
||||
}
|
||||
}; // class RandomAccessObject
|
||||
|
||||
arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,
|
||||
const flt::Ticket &request,
|
||||
std::unique_ptr<flt::FlightDataStream> *stream) {
|
||||
int ret;
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(FlightKey key, TicketToFlightKey(request));
|
||||
ARROW_ASSIGN_OR_RAISE(FlightData fd, get_flight_store()->get_flight(key));
|
||||
|
||||
std::unique_ptr<rgw::sal::User> user = driver->get_user(fd.user_id);
|
||||
if (user->empty()) {
|
||||
INFO << "user is empty" << dendl;
|
||||
} else {
|
||||
// TODO: test what happens if user is not loaded
|
||||
ret = user->load_user(&dp, null_yield);
|
||||
if (ret < 0) {
|
||||
ERROR << "load_user returned " << ret << dendl;
|
||||
// TODO return something
|
||||
}
|
||||
INFO << "user is " << user->get_display_name() << dendl;
|
||||
}
|
||||
|
||||
std::unique_ptr<rgw::sal::Bucket> bucket;
|
||||
|
||||
ret = driver->get_bucket(&dp, &(*user), fd.tenant_name, fd.bucket_name,
|
||||
&bucket, null_yield);
|
||||
if (ret < 0) {
|
||||
ERROR << "get_bucket returned " << ret << dendl;
|
||||
// TODO return something
|
||||
}
|
||||
|
||||
std::unique_ptr<rgw::sal::Object> object = bucket->get_object(fd.object_key);
|
||||
|
||||
auto input = std::make_shared<RandomAccessObject>(fd, object, dp);
|
||||
ARROW_RETURN_NOT_OK(input->Open());
|
||||
|
||||
std::unique_ptr<parquet::arrow::FileReader> reader;
|
||||
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input,
|
||||
arw::default_memory_pool(),
|
||||
&reader));
|
||||
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
|
||||
|
||||
std::vector<std::shared_ptr<arw::RecordBatch>> batches;
|
||||
arw::TableBatchReader batch_reader(*table);
|
||||
ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
|
||||
|
||||
ARROW_ASSIGN_OR_RAISE(auto owning_reader,
|
||||
arw::RecordBatchReader::Make(
|
||||
std::move(batches), table->schema()));
|
||||
*stream = std::unique_ptr<flt::FlightDataStream>(
|
||||
new flt::RecordBatchStream(owning_reader));
|
||||
|
||||
return arw::Status::OK();
|
||||
} // flightServer::DoGet
|
||||
|
||||
} // namespace rgw::flight
|
213
src/rgw/rgw_flight.h
Normal file
213
src/rgw/rgw_flight.h
Normal file
@ -0,0 +1,213 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
|
||||
#include "include/common_fwd.h"
|
||||
#include "common/ceph_context.h"
|
||||
#include "common/Thread.h"
|
||||
#include "common/ceph_time.h"
|
||||
#include "rgw_frontend.h"
|
||||
#include "arrow/type.h"
|
||||
#include "arrow/flight/server.h"
|
||||
#include "arrow/util/string_view.h"
|
||||
|
||||
#include "rgw_flight_frontend.h"
|
||||
|
||||
|
||||
#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
|
||||
#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
|
||||
#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": "
|
||||
#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": "
|
||||
|
||||
#define INFO INFO_F(dp)
|
||||
#define STATUS STATUS_F(dp)
|
||||
#define WARN WARN_F(dp)
|
||||
#define ERROR ERROR_F(dp)
|
||||
|
||||
|
||||
namespace arw = arrow;
|
||||
namespace flt = arrow::flight;
|
||||
|
||||
|
||||
struct req_state;
|
||||
|
||||
namespace rgw::flight {
|
||||
|
||||
static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
|
||||
|
||||
struct FlightData {
|
||||
FlightKey key;
|
||||
// coarse_real_clock::time_point expires;
|
||||
std::string uri;
|
||||
std::string tenant_name;
|
||||
std::string bucket_name;
|
||||
rgw_obj_key object_key;
|
||||
// NB: what about object's namespace and instance?
|
||||
uint64_t num_records;
|
||||
uint64_t obj_size;
|
||||
std::shared_ptr<arw::Schema> schema;
|
||||
std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
|
||||
|
||||
rgw_user user_id; // TODO: this should be removed when we do
|
||||
// proper flight authentication
|
||||
|
||||
FlightData(const std::string& _uri,
|
||||
const std::string& _tenant_name,
|
||||
const std::string& _bucket_name,
|
||||
const rgw_obj_key& _object_key,
|
||||
uint64_t _num_records,
|
||||
uint64_t _obj_size,
|
||||
std::shared_ptr<arw::Schema>& _schema,
|
||||
std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
|
||||
rgw_user _user_id);
|
||||
};
|
||||
|
||||
// stores flights that have been created and helps expire them
|
||||
class FlightStore {
|
||||
|
||||
protected:
|
||||
|
||||
const DoutPrefix& dp;
|
||||
|
||||
public:
|
||||
|
||||
FlightStore(const DoutPrefix& dp);
|
||||
virtual ~FlightStore();
|
||||
virtual FlightKey add_flight(FlightData&& flight) = 0;
|
||||
|
||||
// TODO consider returning const shared pointers to FlightData in
|
||||
// the following two functions
|
||||
virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0;
|
||||
virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0;
|
||||
|
||||
virtual int remove_flight(const FlightKey& key) = 0;
|
||||
virtual int expire_flights() = 0;
|
||||
};
|
||||
|
||||
class MemoryFlightStore : public FlightStore {
|
||||
std::map<FlightKey, FlightData> map;
|
||||
mutable std::mutex mtx; // for map
|
||||
|
||||
public:
|
||||
|
||||
MemoryFlightStore(const DoutPrefix& dp);
|
||||
virtual ~MemoryFlightStore();
|
||||
FlightKey add_flight(FlightData&& flight) override;
|
||||
arw::Result<FlightData> get_flight(const FlightKey& key) const override;
|
||||
std::optional<FlightData> after_key(const FlightKey& key) const override;
|
||||
int remove_flight(const FlightKey& key) override;
|
||||
int expire_flights() override;
|
||||
};
|
||||
|
||||
class FlightServer : public flt::FlightServerBase {
|
||||
|
||||
using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
|
||||
|
||||
RGWProcessEnv& env;
|
||||
rgw::sal::Driver* driver;
|
||||
const DoutPrefix& dp;
|
||||
FlightStore* flight_store;
|
||||
|
||||
std::map<std::string, Data1> data;
|
||||
|
||||
public:
|
||||
|
||||
static constexpr int default_port = 8077;
|
||||
|
||||
FlightServer(RGWProcessEnv& env,
|
||||
FlightStore* flight_store,
|
||||
const DoutPrefix& dp);
|
||||
~FlightServer() override;
|
||||
|
||||
FlightStore* get_flight_store() {
|
||||
return flight_store;
|
||||
}
|
||||
|
||||
arw::Status ListFlights(const flt::ServerCallContext& context,
|
||||
const flt::Criteria* criteria,
|
||||
std::unique_ptr<flt::FlightListing>* listings) override;
|
||||
|
||||
arw::Status GetFlightInfo(const flt::ServerCallContext &context,
|
||||
const flt::FlightDescriptor &request,
|
||||
std::unique_ptr<flt::FlightInfo> *info) override;
|
||||
|
||||
arw::Status GetSchema(const flt::ServerCallContext &context,
|
||||
const flt::FlightDescriptor &request,
|
||||
std::unique_ptr<flt::SchemaResult> *schema) override;
|
||||
|
||||
arw::Status DoGet(const flt::ServerCallContext &context,
|
||||
const flt::Ticket &request,
|
||||
std::unique_ptr<flt::FlightDataStream> *stream) override;
|
||||
}; // class FlightServer
|
||||
|
||||
class OwningStringView : public arw::util::string_view {
|
||||
|
||||
uint8_t* buffer;
|
||||
int64_t capacity;
|
||||
int64_t consumed;
|
||||
|
||||
OwningStringView(uint8_t* _buffer, int64_t _size) :
|
||||
arw::util::string_view((const char*) _buffer, _size),
|
||||
buffer(_buffer),
|
||||
capacity(_size),
|
||||
consumed(_size)
|
||||
{ }
|
||||
|
||||
OwningStringView(OwningStringView&& from, int64_t new_size) :
|
||||
buffer(nullptr),
|
||||
capacity(from.capacity),
|
||||
consumed(new_size)
|
||||
{
|
||||
// should be impossible due to static function check
|
||||
ceph_assertf(consumed <= capacity, "new size cannot exceed capacity");
|
||||
|
||||
std::swap(buffer, from.buffer);
|
||||
from.capacity = 0;
|
||||
from.consumed = 0;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
OwningStringView(OwningStringView&&) = default;
|
||||
OwningStringView& operator=(OwningStringView&&) = default;
|
||||
|
||||
uint8_t* writeable_data() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
~OwningStringView() {
|
||||
if (buffer) {
|
||||
delete[] buffer;
|
||||
}
|
||||
}
|
||||
|
||||
static arw::Result<OwningStringView> make(int64_t size) {
|
||||
uint8_t* buffer = new uint8_t[size];
|
||||
if (!buffer) {
|
||||
return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
|
||||
}
|
||||
return OwningStringView(buffer, size);
|
||||
}
|
||||
|
||||
static arw::Result<OwningStringView> shrink(OwningStringView&& from,
|
||||
int64_t new_size) {
|
||||
if (new_size > from.capacity) {
|
||||
return arw::Status::Invalid("new size cannot exceed capacity");
|
||||
} else {
|
||||
return OwningStringView(std::move(from), new_size);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// GLOBAL
|
||||
|
||||
flt::Ticket FlightKeyToTicket(const FlightKey& key);
|
||||
arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key);
|
||||
|
||||
} // namespace rgw::flight
|
235
src/rgw/rgw_flight_frontend.cc
Normal file
235
src/rgw/rgw_flight_frontend.cc
Normal file
@ -0,0 +1,235 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
|
||||
#include <cstdio>
|
||||
#include <filesystem>
|
||||
#include <sstream>
|
||||
|
||||
#include "arrow/type.h"
|
||||
#include "arrow/flight/server.h"
|
||||
#include "arrow/io/file.h"
|
||||
|
||||
#include "parquet/arrow/reader.h"
|
||||
#include "parquet/arrow/schema.h"
|
||||
#include "parquet/stream_reader.h"
|
||||
|
||||
#include "rgw_flight_frontend.h"
|
||||
#include "rgw_flight.h"
|
||||
|
||||
|
||||
// logging
|
||||
constexpr unsigned dout_subsys = ceph_subsys_rgw_flight;
|
||||
constexpr const char* dout_prefix_str = "rgw arrow_flight: ";
|
||||
|
||||
|
||||
namespace rgw::flight {
|
||||
|
||||
const FlightKey null_flight_key = 0;
|
||||
|
||||
FlightFrontend::FlightFrontend(RGWProcessEnv& _env,
|
||||
RGWFrontendConfig* _config,
|
||||
int _port) :
|
||||
env(_env),
|
||||
config(_config),
|
||||
port(_port),
|
||||
dp(env.driver->ctx(), dout_subsys, dout_prefix_str)
|
||||
{
|
||||
env.flight_store = new MemoryFlightStore(dp);
|
||||
env.flight_server = new FlightServer(env, env.flight_store, dp);
|
||||
INFO << "flight server started" << dendl;
|
||||
}
|
||||
|
||||
FlightFrontend::~FlightFrontend() {
|
||||
delete env.flight_server;
|
||||
delete env.flight_store;
|
||||
INFO << "flight server shut down" << dendl;
|
||||
}
|
||||
|
||||
int FlightFrontend::init() {
|
||||
if (port <= 0) {
|
||||
port = FlightServer::default_port;
|
||||
}
|
||||
const std::string url =
|
||||
std::string("grpc+tcp://localhost:") + std::to_string(port);
|
||||
flt::Location location;
|
||||
arw::Status s = flt::Location::Parse(url, &location);
|
||||
if (!s.ok()) {
|
||||
ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
flt::FlightServerOptions options(location);
|
||||
options.verify_client = false;
|
||||
s = env.flight_server->Init(options);
|
||||
if (!s.ok()) {
|
||||
ERROR << "couldn't init flight server; status=" << s << dendl;
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
INFO << "FlightServer inited; will use port " << port << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int FlightFrontend::run() {
|
||||
try {
|
||||
flight_thread = make_named_thread(server_thread_name,
|
||||
&FlightServer::Serve,
|
||||
env.flight_server);
|
||||
|
||||
INFO << "FlightServer thread started, id=" <<
|
||||
flight_thread.get_id() <<
|
||||
", joinable=" << flight_thread.joinable() << dendl;
|
||||
return 0;
|
||||
} catch (std::system_error& e) {
|
||||
ERROR << "FlightServer thread failed to start" << dendl;
|
||||
return -e.code().value();
|
||||
}
|
||||
}
|
||||
|
||||
void FlightFrontend::stop() {
|
||||
env.flight_server->Shutdown();
|
||||
env.flight_server->Wait();
|
||||
INFO << "FlightServer shut down" << dendl;
|
||||
}
|
||||
|
||||
void FlightFrontend::join() {
|
||||
flight_thread.join();
|
||||
INFO << "FlightServer thread joined" << dendl;
|
||||
}
|
||||
|
||||
void FlightFrontend::pause_for_new_config() {
|
||||
// ignore since config changes won't alter flight_server
|
||||
}
|
||||
|
||||
void FlightFrontend::unpause_with_new_config() {
|
||||
// ignore since config changes won't alter flight_server
|
||||
}
|
||||
|
||||
/* ************************************************************ */
|
||||
|
||||
FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request,
|
||||
RGWGetObj_Filter* next) :
|
||||
RGWGetObj_Filter(next),
|
||||
penv(request->penv),
|
||||
dp(request->cct->get(), dout_subsys, dout_prefix_str),
|
||||
current_offset(0),
|
||||
expected_size(request->obj_size),
|
||||
uri(request->decoded_uri),
|
||||
tenant_name(request->bucket->get_tenant()),
|
||||
bucket_name(request->bucket->get_name()),
|
||||
object_key(request->object->get_key()),
|
||||
// note: what about object namespace and instance?
|
||||
schema_status(arrow::StatusCode::Cancelled,
|
||||
"schema determination incomplete"),
|
||||
user_id(request->user->get_id())
|
||||
{
|
||||
#warning "TODO: fix use of tmpnam"
|
||||
char name[L_tmpnam];
|
||||
const char* namep = std::tmpnam(name);
|
||||
if (!namep) {
|
||||
//
|
||||
}
|
||||
temp_file_name = namep;
|
||||
|
||||
temp_file.open(temp_file_name);
|
||||
}
|
||||
|
||||
FlightGetObj_Filter::~FlightGetObj_Filter() {
|
||||
if (temp_file.is_open()) {
|
||||
temp_file.close();
|
||||
}
|
||||
std::error_code error;
|
||||
std::filesystem::remove(temp_file_name, error);
|
||||
if (error) {
|
||||
ERROR << "FlightGetObj_Filter got error when removing temp file; "
|
||||
"error=" << error.value() <<
|
||||
", temp_file_name=" << temp_file_name << dendl;
|
||||
} else {
|
||||
INFO << "parquet/arrow schema determination status: " <<
|
||||
schema_status << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
int FlightGetObj_Filter::handle_data(bufferlist& bl,
|
||||
off_t bl_ofs, off_t bl_len) {
|
||||
INFO << "flight handling data from offset " <<
|
||||
current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl;
|
||||
|
||||
current_offset += bl_len;
|
||||
|
||||
if (temp_file.is_open()) {
|
||||
bl.write_stream(temp_file);
|
||||
|
||||
if (current_offset >= expected_size) {
|
||||
INFO << "data read is completed, current_offset=" <<
|
||||
current_offset << ", expected_size=" << expected_size << dendl;
|
||||
temp_file.close();
|
||||
|
||||
std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
|
||||
std::shared_ptr<arw::Schema> aw_schema;
|
||||
int64_t num_rows = 0;
|
||||
|
||||
auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status {
|
||||
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file,
|
||||
arrow::io::ReadableFile::Open(temp_file_name));
|
||||
const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
|
||||
|
||||
file->Close();
|
||||
|
||||
num_rows = metadata->num_rows();
|
||||
kv_metadata = metadata->key_value_metadata();
|
||||
const parquet::SchemaDescriptor* pq_schema = metadata->schema();
|
||||
ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema));
|
||||
|
||||
return arrow::Status::OK();
|
||||
};
|
||||
|
||||
schema_status = process_metadata();
|
||||
if (!schema_status.ok()) {
|
||||
ERROR << "reading metadata to access schema, error=" << schema_status << dendl;
|
||||
} else {
|
||||
// INFO << "arrow_schema=" << *aw_schema << dendl;
|
||||
FlightStore* store = penv.flight_store;
|
||||
auto key =
|
||||
store->add_flight(FlightData(uri, tenant_name, bucket_name,
|
||||
object_key, num_rows,
|
||||
expected_size, aw_schema,
|
||||
kv_metadata, user_id));
|
||||
(void) key; // suppress unused variable warning
|
||||
}
|
||||
} // if last block
|
||||
} // if file opened
|
||||
|
||||
// chain to next filter in stream
|
||||
int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#if 0
|
||||
void code_snippets() {
|
||||
INFO << "num_columns:" << md->num_columns() <<
|
||||
" num_schema_elements:" << md->num_schema_elements() <<
|
||||
" num_rows:" << md->num_rows() <<
|
||||
" num_row_groups:" << md->num_row_groups() << dendl;
|
||||
|
||||
|
||||
INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl;
|
||||
for (int c = 0; c < schema1->num_columns(); ++c) {
|
||||
const parquet::ColumnDescriptor* cd = schema1->Column(c);
|
||||
// const parquet::ConvertedType::type t = cd->converted_type;
|
||||
const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type();
|
||||
INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl;
|
||||
}
|
||||
|
||||
INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl;
|
||||
for (int rg = 0; rg < md->num_row_groups(); ++rg) {
|
||||
INFO << "Row Group " << rg << dendl;
|
||||
auto rg_md = md->RowGroup(rg);
|
||||
auto schema2 = rg_md->schema();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace rgw::flight
|
78
src/rgw/rgw_flight_frontend.h
Normal file
78
src/rgw/rgw_flight_frontend.h
Normal file
@ -0,0 +1,78 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab ft=cpp
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "include/common_fwd.h"
|
||||
#include "common/Thread.h"
|
||||
#include "rgw_frontend.h"
|
||||
#include "rgw_op.h"
|
||||
|
||||
#include "arrow/status.h"
|
||||
|
||||
|
||||
namespace rgw::flight {
|
||||
|
||||
using FlightKey = uint32_t;
|
||||
extern const FlightKey null_flight_key;
|
||||
|
||||
class FlightServer;
|
||||
|
||||
class FlightFrontend : public RGWFrontend {
|
||||
|
||||
static constexpr std::string_view server_thread_name =
|
||||
"Arrow Flight Server thread";
|
||||
|
||||
RGWProcessEnv& env;
|
||||
std::thread flight_thread;
|
||||
RGWFrontendConfig* config;
|
||||
int port;
|
||||
|
||||
const DoutPrefix dp;
|
||||
|
||||
public:
|
||||
|
||||
// port <= 0 means let server decide; typically 8077
|
||||
FlightFrontend(RGWProcessEnv& env,
|
||||
RGWFrontendConfig* config,
|
||||
int port = -1);
|
||||
~FlightFrontend() override;
|
||||
int init() override;
|
||||
int run() override;
|
||||
void stop() override;
|
||||
void join() override;
|
||||
|
||||
void pause_for_new_config() override;
|
||||
void unpause_with_new_config() override;
|
||||
}; // class FlightFrontend
|
||||
|
||||
class FlightGetObj_Filter : public RGWGetObj_Filter {
|
||||
|
||||
const RGWProcessEnv& penv;
|
||||
const DoutPrefix dp;
|
||||
FlightKey key;
|
||||
uint64_t current_offset;
|
||||
uint64_t expected_size;
|
||||
std::string uri;
|
||||
std::string tenant_name;
|
||||
std::string bucket_name;
|
||||
rgw_obj_key object_key;
|
||||
std::string temp_file_name;
|
||||
std::ofstream temp_file;
|
||||
arrow::Status schema_status;
|
||||
rgw_user user_id; // TODO: this should be removed when we do
|
||||
// proper flight authentication
|
||||
|
||||
public:
|
||||
|
||||
FlightGetObj_Filter(const req_state* request, RGWGetObj_Filter* next);
|
||||
~FlightGetObj_Filter();
|
||||
|
||||
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
|
||||
#if 0
|
||||
// this would allow the range to be modified if necessary;
|
||||
int fixup_range(off_t& ofs, off_t& end) override;
|
||||
#endif
|
||||
};
|
||||
|
||||
} // namespace rgw::flight
|
@ -67,6 +67,11 @@
|
||||
|
||||
#include "compressor/Compressor.h"
|
||||
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
#include "rgw_flight.h"
|
||||
#include "rgw_flight_frontend.h"
|
||||
#endif
|
||||
|
||||
#ifdef WITH_LTTNG
|
||||
#define TRACEPOINT_DEFINE
|
||||
#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
|
||||
@ -2150,6 +2155,9 @@ void RGWGetObj::execute(optional_yield y)
|
||||
RGWGetObj_CB cb(this);
|
||||
RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
|
||||
boost::optional<RGWGetObj_Decompress> decompress;
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
boost::optional<rgw::flight::FlightGetObj_Filter> flight_filter;
|
||||
#endif
|
||||
std::unique_ptr<RGWGetObj_Filter> decrypt;
|
||||
std::unique_ptr<RGWGetObj_Filter> run_lua;
|
||||
map<string, bufferlist>::iterator attr_iter;
|
||||
@ -2228,6 +2236,14 @@ void RGWGetObj::execute(optional_yield y)
|
||||
goto done_err;
|
||||
}
|
||||
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
if (ofs == 0) {
|
||||
// insert a GetObj_Filter to monitor and create flight
|
||||
flight_filter.emplace(s, filter);
|
||||
filter = &*flight_filter;
|
||||
}
|
||||
#endif
|
||||
|
||||
op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
|
||||
if (op_ret < 0) {
|
||||
ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
|
||||
|
@ -20,6 +20,13 @@ namespace rgw::sal {
|
||||
class LuaManager;
|
||||
}
|
||||
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
namespace rgw::flight {
|
||||
class FlightServer;
|
||||
class FlightStore;
|
||||
}
|
||||
#endif
|
||||
|
||||
struct RGWLuaProcessEnv {
|
||||
std::string luarocks_path;
|
||||
rgw::lua::Background* background = nullptr;
|
||||
@ -33,4 +40,11 @@ struct RGWProcessEnv {
|
||||
OpsLogSink *olog = nullptr;
|
||||
std::unique_ptr<rgw::auth::StrategyRegistry> auth_registry;
|
||||
ActiveRateLimiter* ratelimiting = nullptr;
|
||||
|
||||
#ifdef WITH_ARROW_FLIGHT
|
||||
// managed by rgw:flight::FlightFrontend in rgw_flight_frontend.cc
|
||||
rgw::flight::FlightServer* flight_server;
|
||||
rgw::flight::FlightStore* flight_store;
|
||||
#endif
|
||||
};
|
||||
|
||||
|
@ -224,6 +224,7 @@ options:
|
||||
-o config add extra config parameters to all sections
|
||||
--rgw_port specify ceph rgw http listen port
|
||||
--rgw_frontend specify the rgw frontend configuration
|
||||
--rgw_arrow_flight start arrow flight frontend
|
||||
--rgw_compression specify the rgw compression plugin
|
||||
--seastore use seastore as crimson osd backend
|
||||
-b, --bluestore use bluestore as the osd objectstore backend (default)
|
||||
@ -415,6 +416,9 @@ case $1 in
|
||||
rgw_frontend=$2
|
||||
shift
|
||||
;;
|
||||
--rgw_arrow_flight)
|
||||
rgw_flight_frontend="yes"
|
||||
;;
|
||||
--rgw_compression)
|
||||
rgw_compression=$2
|
||||
shift
|
||||
@ -626,13 +630,17 @@ do_rgw_conf() {
|
||||
# setup each rgw on a sequential port, starting at $CEPH_RGW_PORT.
|
||||
# individual rgw's ids will be their ports.
|
||||
current_port=$CEPH_RGW_PORT
|
||||
# allow only first rgw to start arrow_flight server/port
|
||||
local flight_conf=$rgw_flight_frontend
|
||||
for n in $(seq 1 $CEPH_NUM_RGW); do
|
||||
wconf << EOF
|
||||
[client.rgw.${current_port}]
|
||||
rgw frontends = $rgw_frontend port=${current_port}
|
||||
rgw frontends = $rgw_frontend port=${current_port}${flight_conf:+,arrow_flight}
|
||||
admin socket = ${CEPH_OUT_DIR}/radosgw.${current_port}.asok
|
||||
debug rgw_flight = 20
|
||||
EOF
|
||||
current_port=$((current_port + 1))
|
||||
unset flight_conf
|
||||
done
|
||||
|
||||
}
|
||||
@ -1655,6 +1663,11 @@ do_rgw()
|
||||
$CEPH_BIN/radosgw-admin zone placement modify -c $conf_fn --rgw-zone=default --placement-id=default-placement --compression=$rgw_compression > /dev/null
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -n "$rgw_flight_frontend" ] ;then
|
||||
debug echo "starting arrow_flight frontend on first rgw"
|
||||
fi
|
||||
|
||||
# Start server
|
||||
if [ "$cephadm" -gt 0 ]; then
|
||||
ceph_adm orch apply rgw rgwTest
|
||||
@ -1677,6 +1690,8 @@ do_rgw()
|
||||
[ $CEPH_RGW_PORT_NUM -lt 1024 ] && RGWSUDO=sudo
|
||||
|
||||
current_port=$CEPH_RGW_PORT
|
||||
# allow only first rgw to start arrow_flight server/port
|
||||
local flight_conf=$rgw_flight_frontend
|
||||
for n in $(seq 1 $CEPH_NUM_RGW); do
|
||||
rgw_name="client.rgw.${current_port}"
|
||||
|
||||
@ -1694,12 +1709,13 @@ do_rgw()
|
||||
--rgw_luarocks_location=${CEPH_OUT_DIR}/luarocks \
|
||||
${RGWDEBUG} \
|
||||
-n ${rgw_name} \
|
||||
"--rgw_frontends=${rgw_frontend} port=${current_port}${CEPH_RGW_HTTPS}"
|
||||
"--rgw_frontends=${rgw_frontend} port=${current_port}${CEPH_RGW_HTTPS}${flight_conf:+,arrow_flight}"
|
||||
|
||||
i=$(($i + 1))
|
||||
[ $i -eq $CEPH_NUM_RGW ] && break
|
||||
|
||||
current_port=$((current_port+1))
|
||||
unset flight_conf
|
||||
done
|
||||
}
|
||||
if [ "$CEPH_NUM_RGW" -gt 0 ]; then
|
||||
|
Loading…
Reference in New Issue
Block a user