From aba9bb728d20c44f12a9aec685376257fd39e3ab Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Mon, 2 May 2022 22:23:23 -0400 Subject: [PATCH 1/5] rgw: allow Arrow Flight to be built and linked into ceph components Arrow Flight integration is triggered by defining WITH_RADOSGW_ARROW_FLIGHT=ON with the cmake invocation. For now this assumes that grpc-plugins is installed on the system and won't be built internally. Signed-off-by: J. Eric Ivancich --- CMakeLists.txt | 1 + cmake/modules/BuildArrow.cmake | 24 ++++++++++++++++++++++++ cmake/modules/Findc-ares.cmake | 19 +++++++++++++------ src/CMakeLists.txt | 4 ++-- 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 683c806cbcf..3aab8c8a453 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -439,6 +439,7 @@ option(WITH_RADOSGW_DBSTORE "DBStore backend for Rados Gateway" ON) option(WITH_RADOSGW_MOTR "CORTX-Motr backend for Rados Gateway" OFF) option(WITH_RADOSGW_DAOS "DAOS backend for RADOS Gateway" OFF) option(WITH_RADOSGW_SELECT_PARQUET "Support for s3 select on parquet objects" ON) +option(WITH_RADOSGW_ARROW_FLIGHT "Build arrow flight when not using system-provided arrow" OFF) option(WITH_SYSTEM_ARROW "Use system-provided arrow" OFF) option(WITH_SYSTEM_UTF8PROC "Use system-provided utf8proc" OFF) diff --git a/cmake/modules/BuildArrow.cmake b/cmake/modules/BuildArrow.cmake index 45ebb697446..691108a40c5 100644 --- a/cmake/modules/BuildArrow.cmake +++ b/cmake/modules/BuildArrow.cmake @@ -42,6 +42,15 @@ function(build_arrow) list(APPEND arrow_CMAKE_ARGS -DARROW_WITH_SNAPPY=ON) # required list(APPEND arrow_INTERFACE_LINK_LIBRARIES snappy::snappy) + if(WITH_RADOSGW_ARROW_FLIGHT) + message("building arrow flight; make sure grpc-plugins is installed on the system") + list(APPEND arrow_CMAKE_ARGS + -DARROW_FLIGHT=ON -DARROW_WITH_RE2=OFF) + find_package(gRPC REQUIRED) + find_package(Protobuf REQUIRED) + find_package(c-ares 1.13.0 QUIET REQUIRED) + endif(WITH_RADOSGW_ARROW_FLIGHT) + list(APPEND arrow_CMAKE_ARGS -DARROW_WITH_ZLIB=ON) # required list(APPEND arrow_INTERFACE_LINK_LIBRARIES ZLIB::ZLIB) @@ -102,6 +111,11 @@ function(build_arrow) set(arrow_BYPRODUCTS ${arrow_LIBRARY}) list(APPEND arrow_BYPRODUCTS ${parquet_LIBRARY}) + if(WITH_RADOSGW_ARROW_FLIGHT) + set(arrow_flight_LIBRARY "${arrow_LIBRARY_DIR}/libarrow_flight.a") + list(APPEND arrow_BYPRODUCTS ${arrow_flight_LIBRARY}) + endif(WITH_RADOSGW_ARROW_FLIGHT) + if(CMAKE_MAKE_PROGRAM MATCHES "make") # try to inherit command line arguments passed by parent "make" job set(make_cmd $(MAKE)) @@ -140,4 +154,14 @@ function(build_arrow) set_target_properties(Arrow::Parquet PROPERTIES IMPORTED_LINK_INTERFACE_LANGUAGES "CXX" IMPORTED_LOCATION "${parquet_LIBRARY}") + + if(WITH_RADOSGW_ARROW_FLIGHT) + add_library(Arrow::Flight STATIC IMPORTED) + add_dependencies(Arrow::Flight arrow_ext) + target_link_libraries(Arrow::Flight INTERFACE Arrow::Arrow gRPC::grpc++) + set_target_properties(Arrow::Flight PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${arrow_INCLUDE_DIR}" # flight is accessed via "arrow/flight" + IMPORTED_LINK_INTERFACE_LANGUAGES "CXX" + IMPORTED_LOCATION "${arrow_flight_LIBRARY}") + endif(WITH_RADOSGW_ARROW_FLIGHT) endfunction() diff --git a/cmake/modules/Findc-ares.cmake b/cmake/modules/Findc-ares.cmake index 56311c7c3eb..93554ed2814 100644 --- a/cmake/modules/Findc-ares.cmake +++ b/cmake/modules/Findc-ares.cmake @@ -21,12 +21,19 @@ find_package_handle_standard_args(c-ares c-ares_LIBRARY VERSION_VAR c-ares_VERSION) -if(c-ares_FOUND AND NOT (TARGET c-ares::cares)) - add_library(c-ares::cares UNKNOWN IMPORTED GLOBAL) - set_target_properties(c-ares::cares PROPERTIES - INTERFACE_INCLUDE_DIRECTORIES "${c-ares_INCLUDE_DIR}" - IMPORTED_LINK_INTERFACE_LANGUAGES "C" - IMPORTED_LOCATION "${c-ares_LIBRARY}") +if(c-ares_FOUND) + if(NOT TARGET c-ares::cares) + add_library(c-ares::cares UNKNOWN IMPORTED GLOBAL) + set_target_properties(c-ares::cares PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${c-ares_INCLUDE_DIR}" + IMPORTED_LINK_INTERFACE_LANGUAGES "C" + IMPORTED_LOCATION "${c-ares_LIBRARY}") + endif() + # to be compatible with old Seastar add_library(c-ares::c-ares ALIAS c-ares::cares) + + if(NOT TARGET c-ares::c-ares) + add_library(c-ares::c-ares ALIAS c-ares::cares) + endif() endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 42972391210..a8e427c3eeb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -907,8 +907,8 @@ if(WITH_RADOSGW) include(BuildArrow) build_arrow() - endif() - endif() + endif(WITH_SYSTEM_ARROW) + endif(WITH_RADOSGW_SELECT_PARQUET) add_subdirectory(libkmip) add_subdirectory(rgw) From 00bbd7b988011e3d47c6ed44e2f62d6f7af1e34b Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Mon, 2 May 2022 22:23:23 -0400 Subject: [PATCH 2/5] rgw: allow Arrow Flight to be built and linked into ceph components Arrow Flight integration is triggered by defining WITH_RADOSGW_ARROW_FLIGHT=ON with the cmake invocation. For now this assumes that grpc-plugins is installed on the system and won't be built internally. Signed-off-by: J. Eric Ivancich --- src/CMakeLists.txt | 4 ++-- src/rgw/CMakeLists.txt | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a8e427c3eeb..1901a74ddbc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -891,7 +891,7 @@ if(WITH_KVS) endif(WITH_KVS) if(WITH_RADOSGW) - if(WITH_RADOSGW_SELECT_PARQUET) + if(WITH_RADOSGW_SELECT_PARQUET OR WITH_RADOSGW_ARROW_FLIGHT) if(WITH_SYSTEM_ARROW) find_package(Arrow 4 REQUIRED QUIET) find_package(Parquet 4 REQUIRED QUIET) @@ -908,7 +908,7 @@ if(WITH_RADOSGW) include(BuildArrow) build_arrow() endif(WITH_SYSTEM_ARROW) - endif(WITH_RADOSGW_SELECT_PARQUET) + endif(WITH_RADOSGW_SELECT_PARQUET OR WITH_RADOSGW_ARROW_FLIGHT) add_subdirectory(libkmip) add_subdirectory(rgw) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 15c4d4c1d78..25404632bb6 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -9,6 +9,12 @@ if(WITH_RADOSGW_SELECT_PARQUET) message("-- arrow is installed, radosgw/s3select-op is able to process parquet objects") endif(WITH_RADOSGW_SELECT_PARQUET) +if(WITH_RADOSGW_ARROW_FLIGHT) + set(ARROW_FLIGHT_LIBRARIES Arrow::Arrow Arrow::Parquet Arrow::Flight utf8proc::utf8proc) # order is important + add_definitions(-D_ARROW_EXIST) + message("-- arrow flight is installed") +endif(WITH_RADOSGW_ARROW_FLIGHT) + function(gperf_generate input output) add_custom_command( OUTPUT ${output} From b4b8b57d2c7bdcd8a3530e60878e6deaf12a9581 Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Tue, 10 May 2022 20:07:38 -0400 Subject: [PATCH 3/5] rgw: allow vstart.sh to start an RGW with Arrow Flight Adds "--rgw_arrow_flight" as a command-line option to vstart.sh that will enable arrow_flight on the first RGW created, but not for subsequent RGWs (so as not to create port conflicts). It adds a frontend to ceph.conf and as an option to the command-line that starts the radosgw daemon. Signed-off-by: J. Eric Ivancich --- src/vstart.sh | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/vstart.sh b/src/vstart.sh index e18184ed1e8..6fcb3f919a9 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -224,6 +224,7 @@ options: -o config add extra config parameters to all sections --rgw_port specify ceph rgw http listen port --rgw_frontend specify the rgw frontend configuration + --rgw_arrow_flight start arrow flight frontend --rgw_compression specify the rgw compression plugin --seastore use seastore as crimson osd backend -b, --bluestore use bluestore as the osd objectstore backend (default) @@ -415,6 +416,9 @@ case $1 in rgw_frontend=$2 shift ;; + --rgw_arrow_flight) + rgw_flight_frontend="yes" + ;; --rgw_compression) rgw_compression=$2 shift @@ -626,13 +630,17 @@ do_rgw_conf() { # setup each rgw on a sequential port, starting at $CEPH_RGW_PORT. # individual rgw's ids will be their ports. current_port=$CEPH_RGW_PORT + # allow only first rgw to start arrow_flight server/port + local flight_conf=$rgw_flight_frontend for n in $(seq 1 $CEPH_NUM_RGW); do wconf << EOF [client.rgw.${current_port}] - rgw frontends = $rgw_frontend port=${current_port} + rgw frontends = $rgw_frontend port=${current_port}${flight_conf:+,arrow_flight} admin socket = ${CEPH_OUT_DIR}/radosgw.${current_port}.asok + debug rgw_flight = 20 EOF current_port=$((current_port + 1)) + unset flight_conf done } @@ -1651,6 +1659,11 @@ do_rgw() $CEPH_BIN/radosgw-admin zone placement modify -c $conf_fn --rgw-zone=default --placement-id=default-placement --compression=$rgw_compression > /dev/null fi fi + + if [ -n "$rgw_flight_frontend" ] ;then + debug echo "starting arrow_flight frontend on first rgw" + fi + # Start server if [ "$cephadm" -gt 0 ]; then ceph_adm orch apply rgw rgwTest @@ -1673,6 +1686,8 @@ do_rgw() [ $CEPH_RGW_PORT_NUM -lt 1024 ] && RGWSUDO=sudo current_port=$CEPH_RGW_PORT + # allow only first rgw to start arrow_flight server/port + local flight_conf=$rgw_flight_frontend for n in $(seq 1 $CEPH_NUM_RGW); do rgw_name="client.rgw.${current_port}" @@ -1690,12 +1705,13 @@ do_rgw() --rgw_luarocks_location=${CEPH_OUT_DIR}/luarocks \ ${RGWDEBUG} \ -n ${rgw_name} \ - "--rgw_frontends=${rgw_frontend} port=${current_port}${CEPH_RGW_HTTPS}" + "--rgw_frontends=${rgw_frontend} port=${current_port}${CEPH_RGW_HTTPS}${flight_conf:+,arrow_flight}" i=$(($i + 1)) [ $i -eq $CEPH_NUM_RGW ] && break current_port=$((current_port+1)) + unset flight_conf done } if [ "$CEPH_NUM_RGW" -gt 0 ]; then From 34f38970de7fdf8e1e4b24cfbfa595e15ec2b95f Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Tue, 10 May 2022 20:02:05 -0400 Subject: [PATCH 4/5] rgw: initial integration of Arrow Flight code into RGW Add base files rgw_arrow.h and rgw_arrow.cc. Additionally handle initialization and shutdown of a Flight Server in radosgw. Integration requires WITH_RADOSGW_ARROW_FLIGHT to be defined. Signed-off-by: J. Eric Ivancich --- src/common/subsys.h | 1 + src/rgw/CMakeLists.txt | 30 ++++++++ src/rgw/rgw_appmain.cc | 13 ++++ src/rgw/rgw_flight.cc | 131 +++++++++++++++++++++++++++++++++ src/rgw/rgw_flight.h | 101 +++++++++++++++++++++++++ src/rgw/rgw_flight_frontend.cc | 103 ++++++++++++++++++++++++++ src/rgw/rgw_flight_frontend.h | 69 +++++++++++++++++ src/rgw/rgw_op.cc | 18 +++++ 8 files changed, 466 insertions(+) create mode 100644 src/rgw/rgw_flight.cc create mode 100644 src/rgw/rgw_flight.h create mode 100644 src/rgw/rgw_flight_frontend.cc create mode 100644 src/rgw/rgw_flight_frontend.h diff --git a/src/common/subsys.h b/src/common/subsys.h index a6f4b8500d3..3e558b44092 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -63,6 +63,7 @@ SUBSYS(rgw_sync, 1, 5) SUBSYS(rgw_datacache, 1, 5) SUBSYS(rgw_access, 1, 5) SUBSYS(rgw_dbstore, 1, 5) +SUBSYS(rgw_flight, 1, 5) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) SUBSYS(throttle, 1, 1) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 25404632bb6..22c7ded4fb8 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -219,6 +219,12 @@ endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) endif() +if(WITH_RADOSGW_ARROW_FLIGHT) + # NOTE: eventually don't want this in common but just in radosgw daemon + # list(APPEND radosgw_srcs rgw_flight.cc rgw_flight_frontend.cc) + list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc) +endif(WITH_RADOSGW_ARROW_FLIGHT) + add_library(rgw_common STATIC ${librgw_common_srcs}) target_compile_definitions(rgw_common @@ -255,6 +261,7 @@ target_link_libraries(rgw_common ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${ARROW_LIBRARIES} + ${ARROW_FLIGHT_LIBRARIES} ${ALLOC_LIBS} PUBLIC ${LUA_LIBRARIES} @@ -383,6 +390,7 @@ target_link_libraries(rgw_a common_utf8 global ${CRYPTO_LIBS} ${ARROW_LIBRARIES} + ${ARROW_FLIGHT_LIBRARIES} OATH::OATH PUBLIC rgw_common @@ -409,6 +417,14 @@ set(radosgw_srcs add_executable(radosgw ${radosgw_srcs}) +if(WITH_RADOSGW_ARROW_FLIGHT) + # target_compile_definitions(radosgw PUBLIC WITH_ARROW_FLIGHT) + target_compile_definitions(rgw_common PUBLIC WITH_ARROW_FLIGHT) + target_include_directories(rgw_common + PUBLIC "${CMAKE_SOURCE_DIR}/src/arrow/cpp/src") + # target_include_directories(radosgw PUBLIC Arrow::Arrow) +endif(WITH_RADOSGW_ARROW_FLIGHT) + target_compile_definitions(radosgw PUBLIC "-DCLS_CLIENT_HIDE_IOCTX") target_include_directories(radosgw PUBLIC "${CMAKE_SOURCE_DIR}/src/dmclock/support/src" @@ -429,6 +445,13 @@ set(radosgw_admin_srcs rgw_admin.cc rgw_sync_checkpoint.cc rgw_orphan.cc) + +# this is unsatisfying and hopefully temporary; ARROW should not be +# part of radosgw_admin +if(WITH_RADOSGW_ARROW_FLIGHT) + list(APPEND radosgw_admin_srcs rgw_flight.cc) +endif(WITH_RADOSGW_ARROW_FLIGHT) + add_executable(radosgw-admin ${radosgw_admin_srcs}) target_link_libraries(radosgw-admin ${rgw_libs} librados cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client @@ -437,6 +460,13 @@ target_link_libraries(radosgw-admin ${rgw_libs} librados global ${LIB_RESOLV} OATH::OATH ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES}) + +# this is unsatisfying and hopefully temporary; ARROW should not be +# part of radosgw_admin +if(WITH_RADOSGW_ARROW_FLIGHT) + target_link_libraries(radosgw-admin ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES}) +endif(WITH_RADOSGW_ARROW_FLIGHT) + install(TARGETS radosgw-admin DESTINATION bin) set(radosgw_es_srcs diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 69672fa807d..77e29d55954 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -62,6 +62,9 @@ #ifdef WITH_RADOSGW_KAFKA_ENDPOINT #include "rgw_kafka.h" #endif +#ifdef WITH_ARROW_FLIGHT +#include "rgw_flight_frontend.h" +#endif #include "rgw_asio_frontend.h" #include "rgw_dmclock_scheduler_ctx.h" #include "rgw_lua.h" @@ -433,6 +436,16 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) rgwlib->set_fe(static_cast(fe)); } } + else if (framework == "arrow_flight") { +#ifdef WITH_ARROW_FLIGHT + int port; + config->get_val("port", 8077, &port); + fe = new rgw::flight::FlightFrontend(cct, config, store, port); +#else + derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl; + continue; +#endif + } service_map_meta["frontend_type#" + stringify(fe_count)] = framework; service_map_meta["frontend_config#" + stringify(fe_count)] = config->get_config(); diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc new file mode 100644 index 00000000000..be0a17c8f32 --- /dev/null +++ b/src/rgw/rgw_flight.cc @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "common/dout.h" +#include "rgw_op.h" + +#include "rgw_flight.h" +#include "rgw_flight_frontend.h" + + +#define dout_subsys ceph_subsys_rgw_flight + +#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " +#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " +#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " +#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " + +#define INFO INFO_F(dp) +#define STATUS STATUS_F(dp) +#define WARN WARN_F(dp) +#define ERROR ERROR_F(dp) + + +namespace rgw::flight { + + std::atomic next_flight_key = 0; + + FlightData::FlightData(const req_state* state) : + key(next_flight_key++), + expires(coarse_real_clock::now() + lifespan) + { +#if 0 + bucket = new rgw::sal::Bucket(*state->bucket); +#endif + } + + FlightStore::~FlightStore() { + // empty + } + + MemoryFlightStore::~MemoryFlightStore() { + // empty + } + + FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { + FlightKey key = flight.key; + + auto p = map.insert( {key, flight} ); + ceph_assertf(p.second, + "unable to add FlightData to MemoryFlightStore"); // temporary until error handling + + return key; + } + // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::expire_flights() { return 0; } + + FlightServer::FlightServer(boost::intrusive_ptr& _cct, + rgw::sal::Store* _store, + FlightStore* _flight_store) : + cct(_cct), + dp(cct.get(), dout_subsys, "rgw arrow_flight: "), + store(_store), + flight_store(_flight_store) + { + INFO << "FlightServer constructed" << dendl; + } + + FlightServer::~FlightServer() + { + INFO << "FlightServer destructed" << dendl; + } + + +class RGWFlightListing : public flt::FlightListing { +public: + + RGWFlightListing() { +#if 0 + const int64_t total_records = 2; + const int64_t total_bytes = 2048; + const std::vector endpoints; + const auto descriptor = flt::FlightDescriptor::Command("fake-cmd"); + arw::FieldVector fields; + const arw::Schema schema(fields, nullptr); + auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes); +#endif + } + + arw::Status Next(std::unique_ptr* info) { + *info = nullptr; + return arw::Status::OK(); + } +}; + + + arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) { + *listings = std::make_unique(); + return arw::Status::OK(); + } + + static FlightServer* fs; + + void set_flight_server(FlightServer* _server) { + fs = _server; + } + + FlightServer* get_flight_server() { + return fs; + } + + FlightStore* get_flight_store() { + return fs ? fs->get_flight_store() : nullptr; + } + + FlightKey propose_flight(const req_state* request) { + FlightKey key = get_flight_store()->add_flight(FlightData(request)); + return key; + } + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h new file mode 100644 index 00000000000..1f4d3721376 --- /dev/null +++ b/src/rgw/rgw_flight.h @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include + +#include "include/common_fwd.h" +#include "common/ceph_context.h" +#include "common/Thread.h" +#include "common/ceph_time.h" +#include "rgw_frontend.h" +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "rgw_flight_frontend.h" + +namespace arw = arrow; +namespace flt = arrow::flight; + +struct req_state; + +namespace rgw::flight { + + static const coarse_real_clock::duration lifespan = std::chrono::hours(1); + + struct FlightData { + FlightKey key; + coarse_real_clock::time_point expires; + + rgw::sal::Bucket* bucket; +#if 0 + rgw::sal::Object object; + rgw::sal::User user; +#endif + + + FlightData(const req_state* state); + }; + + // stores flights that have been created and helps expire them + class FlightStore { + public: + virtual ~FlightStore(); + virtual FlightKey add_flight(FlightData&& flight) = 0; + virtual int get_flight(const FlightKey& key) = 0; + virtual int remove_flight(const FlightKey& key) = 0; + virtual int expire_flights() = 0; + }; + + class MemoryFlightStore : public FlightStore { + std::map map; + + public: + + virtual ~MemoryFlightStore(); + FlightKey add_flight(FlightData&& flight) override; + int get_flight(const FlightKey& key) override; + int remove_flight(const FlightKey& key) override; + int expire_flights() override; + }; + + class FlightServer : public flt::FlightServerBase { + + using Data1 = std::vector>; + + boost::intrusive_ptr cct; + const DoutPrefix dp; + rgw::sal::Store* store; + FlightStore* flight_store; + + std::map data; + + public: + + static constexpr int default_port = 8077; + + FlightServer(boost::intrusive_ptr& _cct, + rgw::sal::Store* _store, + FlightStore* _flight_store); + ~FlightServer() override; + + FlightStore* get_flight_store() { + return flight_store; + } + + arw::Status ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) override; + }; // class FlightServer + + // GLOBAL + + void set_flight_server(FlightServer* _server); + FlightServer* get_flight_server(); + FlightStore* get_flight_store(); + FlightKey propose_flight(const req_state* request); + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc new file mode 100644 index 00000000000..1751e2515d3 --- /dev/null +++ b/src/rgw/rgw_flight_frontend.cc @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + + +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "rgw_flight_frontend.h" +#include "rgw_flight.h" + +#define dout_subsys ceph_subsys_rgw_flight + +namespace rgw::flight { + + FlightFrontend::FlightFrontend(boost::intrusive_ptr& _cct, + RGWFrontendConfig* _config, + rgw::sal::Store* _store, + int _port) : + cct(_cct), + dp(cct.get(), dout_subsys, "rgw arrow_flight: "), + config(_config), + port(_port) + { + FlightStore* flight_store = new MemoryFlightStore(); + flight_server = new FlightServer(_cct, _store, flight_store); + } + + FlightFrontend::~FlightFrontend() { + delete flight_server; + } + + int FlightFrontend::init() { + if (port <= 0) { + port = FlightServer::default_port; + } + const std::string url = + std::string("grpc+tcp://localhost:") + std::to_string(port); + flt::Location location; + arw::Status s = flt::Location::Parse(url, &location); + if (!s.ok()) { + return -EINVAL; + } + + flt::FlightServerOptions options(location); + options.verify_client = false; + s = flight_server->Init(options); + if (!s.ok()) { + return -EINVAL; + } + + dout(20) << "STATUS: " << __func__ << + ": FlightServer inited; will use port " << port << dendl; + return 0; + } + + int FlightFrontend::run() { + try { + flight_thread = make_named_thread(server_thread_name, + &FlightServer::Serve, + flight_server); + set_flight_server(flight_server); + + dout(20) << "INFO: " << __func__ << + ": FlightServer thread started, id=" << flight_thread.get_id() << + ", joinable=" << flight_thread.joinable() << dendl; + return 0; + } catch (std::system_error& e) { + derr << "ERROR: " << __func__ << + ": FlightServer thread failed to start" << dendl; + return -ENOSPC; + } + } + + void FlightFrontend::stop() { + set_flight_server(nullptr); + flight_server->Shutdown(); + flight_server->Wait(); + dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl; + } + + void FlightFrontend::join() { + flight_thread.join(); + dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl; + } + + void FlightFrontend::pause_for_new_config() { + // ignore since config changes won't alter flight_server + } + + void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store, + rgw_auth_registry_ptr_t auth_registry) { + // ignore since config changes won't alter flight_server + } + + int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { + // do work here + dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl; + + // chain upwards + return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + } + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.h b/src/rgw/rgw_flight_frontend.h new file mode 100644 index 00000000000..c74878a884b --- /dev/null +++ b/src/rgw/rgw_flight_frontend.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include "include/common_fwd.h" +#include "common/Thread.h" +#include "rgw_frontend.h" +#include "rgw_op.h" + + +namespace rgw::flight { + + using FlightKey = uint32_t; + + class FlightServer; + + class FlightFrontend : public RGWFrontend { + + static constexpr std::string_view server_thread_name = + "Arrow Flight Server thread"; + + boost::intrusive_ptr& cct; + const DoutPrefix dp; + FlightServer* flight_server; // pointer so header file doesn't need to pull in too much + std::thread flight_thread; + RGWFrontendConfig* config; + int port; + + public: + + // port <= 0 -> let server decide; typically 8077 + FlightFrontend(boost::intrusive_ptr& cct, + RGWFrontendConfig* config, + rgw::sal::Store* store, + int port = -1); + ~FlightFrontend() override; + int init() override; + int run() override; + void stop() override; + void join() override; + + void pause_for_new_config() override; + void unpause_with_new_config(rgw::sal::Store* store, + rgw_auth_registry_ptr_t auth_registry) override; + + }; // class FlightFrontend + + class FlightGetObj_Filter : public RGWGetObj_Filter { + + FlightKey key; + + public: + + FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) : + RGWGetObj_Filter(next), + key(_key) + { + // empty + } + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; +#if 0 + // this would allow the range to be modified if necessary; + int fixup_range(off_t& ofs, off_t& end) override; +#endif + }; + +} // namespace rgw::flight diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index c847bfe2957..9c238827445 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -67,6 +67,11 @@ #include "compressor/Compressor.h" +#ifdef WITH_ARROW_FLIGHT +#include "rgw_flight.h" +#include "rgw_flight_frontend.h" +#endif + #ifdef WITH_LTTNG #define TRACEPOINT_DEFINE #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE @@ -2150,6 +2155,9 @@ void RGWGetObj::execute(optional_yield y) RGWGetObj_CB cb(this); RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb; boost::optional decompress; +#ifdef WITH_ARROW_FLIGHT + boost::optional flight_filter; +#endif std::unique_ptr decrypt; std::unique_ptr run_lua; map::iterator attr_iter; @@ -2228,6 +2236,16 @@ void RGWGetObj::execute(optional_yield y) goto done_err; } +#ifdef WITH_ARROW_FLIGHT + if (ofs == 0) { + rgw::flight::FlightKey key = rgw::flight::propose_flight(s); + ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl; + + flight_filter.emplace(key, filter); + filter = &*flight_filter; + } +#endif + op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info); if (op_ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl; From eac2905d9f64e174570ee25129112ed7ed53993c Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Tue, 28 Jun 2022 15:35:18 -0400 Subject: [PATCH 5/5] rgw: implement initial flight server functionality Implements the ability for a flight to be created when the object is retrieved by an S3 get. Adds FlightServer abilities ListFlights, GetFlightInfo, GetSchema, and DoGet. Adds an interface for a store for flight information and adds an in-memory implemtation of it. This code is functionality is early-stage and lacks some planned efficiencies. Signed-off-by: J. Eric Ivancich --- src/rgw/rgw_appmain.cc | 2 +- src/rgw/rgw_flight.cc | 749 +++++++++++++++++++++++++++++---- src/rgw/rgw_flight.h | 218 +++++++--- src/rgw/rgw_flight_frontend.cc | 288 +++++++++---- src/rgw/rgw_flight_frontend.h | 87 ++-- src/rgw/rgw_op.cc | 6 +- src/rgw/rgw_process_env.h | 14 + 7 files changed, 1107 insertions(+), 257 deletions(-) diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 77e29d55954..361f622b992 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -440,7 +440,7 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) #ifdef WITH_ARROW_FLIGHT int port; config->get_val("port", 8077, &port); - fe = new rgw::flight::FlightFrontend(cct, config, store, port); + fe = new rgw::flight::FlightFrontend(env, config, port); #else derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl; continue; diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc index be0a17c8f32..2299b741285 100644 --- a/src/rgw/rgw_flight.cc +++ b/src/rgw/rgw_flight.cc @@ -2,12 +2,22 @@ // vim: ts=8 sw=2 smarttab ft=cpp #include +#include #include #include +#include #include "arrow/type.h" +#include "arrow/buffer.h" +#include "arrow/util/string_view.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" + #include "arrow/flight/server.h" +#include "parquet/arrow/reader.h" + #include "common/dout.h" #include "rgw_op.h" @@ -15,117 +25,692 @@ #include "rgw_flight_frontend.h" -#define dout_subsys ceph_subsys_rgw_flight - -#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " -#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " -#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " -#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " - -#define INFO INFO_F(dp) -#define STATUS STATUS_F(dp) -#define WARN WARN_F(dp) -#define ERROR ERROR_F(dp) - - namespace rgw::flight { - std::atomic next_flight_key = 0; +// Ticket and FlightKey - FlightData::FlightData(const req_state* state) : - key(next_flight_key++), - expires(coarse_real_clock::now() + lifespan) +std::atomic next_flight_key = null_flight_key; + +flt::Ticket FlightKeyToTicket(const FlightKey& key) { + flt::Ticket result; + result.ticket = std::to_string(key); + return result; +} + +arw::Result TicketToFlightKey(const flt::Ticket& t) { + try { + return (FlightKey) std::stoul(t.ticket); + } catch (std::invalid_argument const& ex) { + return arw::Status::Invalid( + "could not convert Ticket containing \"%s\" into a Flight Key", + t.ticket); + } catch (const std::out_of_range& ex) { + return arw::Status::Invalid( + "could not convert Ticket containing \"%s\" into a Flight Key due to range", + t.ticket); + } +} + +// FlightData + +FlightData::FlightData(const std::string& _uri, + const std::string& _tenant_name, + const std::string& _bucket_name, + const rgw_obj_key& _object_key, + uint64_t _num_records, + uint64_t _obj_size, + std::shared_ptr& _schema, + std::shared_ptr& _kv_metadata, + rgw_user _user_id) : + key(++next_flight_key), + /* expires(coarse_real_clock::now() + lifespan), */ + uri(_uri), + tenant_name(_tenant_name), + bucket_name(_bucket_name), + object_key(_object_key), + num_records(_num_records), + obj_size(_obj_size), + schema(_schema), + kv_metadata(_kv_metadata), + user_id(_user_id) +{ } + +/**** FlightStore ****/ + +FlightStore::FlightStore(const DoutPrefix& _dp) : + dp(_dp) +{ } + +FlightStore::~FlightStore() { } + +/**** MemoryFlightStore ****/ + +MemoryFlightStore::MemoryFlightStore(const DoutPrefix& _dp) : + FlightStore(_dp) +{ } + +MemoryFlightStore::~MemoryFlightStore() { } + +FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { + std::pair result; { -#if 0 - bucket = new rgw::sal::Bucket(*state->bucket); -#endif + const std::lock_guard lock(mtx); + result = map.insert( {flight.key, std::move(flight)} ); } + ceph_assertf(result.second, + "unable to add FlightData to MemoryFlightStore"); // temporary until error handling - FlightStore::~FlightStore() { - // empty + return result.first->second.key; +} + +arw::Result MemoryFlightStore::get_flight(const FlightKey& key) const { + const std::lock_guard lock(mtx); + auto i = map.find(key); + if (i == map.cend()) { + return arw::Status::KeyError("could not find Flight with Key %" PRIu32, + key); + } else { + return i->second; } +} - MemoryFlightStore::~MemoryFlightStore() { - // empty - } - - FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { - FlightKey key = flight.key; - - auto p = map.insert( {key, flight} ); - ceph_assertf(p.second, - "unable to add FlightData to MemoryFlightStore"); // temporary until error handling - - return key; - } - // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; } - int MemoryFlightStore::expire_flights() { return 0; } - - FlightServer::FlightServer(boost::intrusive_ptr& _cct, - rgw::sal::Store* _store, - FlightStore* _flight_store) : - cct(_cct), - dp(cct.get(), dout_subsys, "rgw arrow_flight: "), - store(_store), - flight_store(_flight_store) +// returns either the next FilghtData or, if at end, empty optional +std::optional MemoryFlightStore::after_key(const FlightKey& key) const { + std::optional result; { - INFO << "FlightServer constructed" << dendl; + const std::lock_guard lock(mtx); + auto i = map.upper_bound(key); + if (i != map.end()) { + result = i->second; + } } + return result; +} - FlightServer::~FlightServer() - { - INFO << "FlightServer destructed" << dendl; - } +int MemoryFlightStore::remove_flight(const FlightKey& key) { + return 0; +} + +int MemoryFlightStore::expire_flights() { + return 0; +} + +/**** FlightServer ****/ + +FlightServer::FlightServer(RGWProcessEnv& _env, + FlightStore* _flight_store, + const DoutPrefix& _dp) : + env(_env), + driver(env.driver), + dp(_dp), + flight_store(_flight_store) +{ } + +FlightServer::~FlightServer() +{ } -class RGWFlightListing : public flt::FlightListing { +arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) { + + // function local class to implement FlightListing interface + class RGWFlightListing : public flt::FlightListing { + + FlightStore* flight_store; + FlightKey previous_key; + + public: + + RGWFlightListing(FlightStore* flight_store) : + flight_store(flight_store), + previous_key(null_flight_key) + { } + + arw::Status Next(std::unique_ptr* info) { + std::optional fd = flight_store->after_key(previous_key); + if (fd) { + previous_key = fd->key; + auto descriptor = + flt::FlightDescriptor::Path( + { fd->tenant_name, fd->bucket_name, fd->object_key.name, fd->object_key.instance, fd->object_key.ns }); + flt::FlightEndpoint endpoint; + endpoint.ticket = FlightKeyToTicket(fd->key); + std::vector endpoints { endpoint }; + + ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj, + flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size)); + *info = std::make_unique(std::move(info_obj)); + return arw::Status::OK(); + } else { + *info = nullptr; + return arw::Status::OK(); + } + } + }; // class RGWFlightListing + + *listings = std::make_unique(flight_store); + return arw::Status::OK(); +} // FlightServer::ListFlights + + +arw::Status FlightServer::GetFlightInfo(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *info) { + return arw::Status::OK(); +} // FlightServer::GetFlightInfo + + +arw::Status FlightServer::GetSchema(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *schema) { + return arw::Status::OK(); +} // FlightServer::GetSchema + + // A Buffer that owns its memory and frees it when the Buffer is + // destructed +class OwnedBuffer : public arw::Buffer { + + uint8_t* buffer; + +protected: + + OwnedBuffer(uint8_t* _buffer, int64_t _size) : + Buffer(_buffer, _size), + buffer(_buffer) + { } + public: - RGWFlightListing() { -#if 0 - const int64_t total_records = 2; - const int64_t total_bytes = 2048; - const std::vector endpoints; - const auto descriptor = flt::FlightDescriptor::Command("fake-cmd"); - arw::FieldVector fields; - const arw::Schema schema(fields, nullptr); - auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes); + ~OwnedBuffer() override { + delete[] buffer; + } + + static arw::Result> make(int64_t size) { + uint8_t* buffer = new (std::nothrow) uint8_t[size]; + if (!buffer) { + return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); + } + + OwnedBuffer* ptr = new OwnedBuffer(buffer, size); + std::shared_ptr result; + result.reset(ptr); + return result; + } + + // if what's read in is less than capacity + void set_size(int64_t size) { + size_ = size; + } + + // pointer that can be used to write into buffer + uint8_t* writeable_data() { + return buffer; + } +}; // class OwnedBuffer + +#if 0 // remove classes used for testing and incrementally building + +// make local to DoGet eventually +class LocalInputStream : public arw::io::InputStream { + + std::iostream::pos_type position; + std::fstream file; + std::shared_ptr kv_metadata; + const DoutPrefix dp; + +public: + + LocalInputStream(std::shared_ptr _kv_metadata, + const DoutPrefix _dp) : + kv_metadata(_kv_metadata), + dp(_dp) + {} + + arw::Status Open() { + file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in); + if (!file.good()) { + return arw::Status::IOError("unable to open file"); + } + + INFO << "file opened successfully" << dendl; + position = file.tellg(); + return arw::Status::OK(); + } + + arw::Status Close() override { + file.close(); + INFO << "file closed" << dendl; + return arw::Status::OK(); + } + + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError( + "could not query file implementaiton with tellg"); + } else { + return int64_t(position); + } + } + + bool closed() const override { + return file.is_open(); + } + + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + if (file.read(reinterpret_cast(out), + reinterpret_cast(nbytes))) { + const std::streamsize bytes_read = file.gcount(); + INFO << "Point A: read bytes " << bytes_read << dendl; + position = file.tellg(); + return bytes_read; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %" PRId64, + int64_t(position)); + } + } + + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: " << ": asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); + + if (file.read(reinterpret_cast(buffer->writeable_data()), + reinterpret_cast(nbytes))) { + const auto bytes_read = file.gcount(); + INFO << "Point B: read bytes " << bytes_read << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else if (file.rdstate() & std::ifstream::failbit && + file.rdstate() & std::ifstream::eofbit) { + const auto bytes_read = file.gcount(); + INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %ld", position); + } + } + + arw::Result Peek(int64_t nbytes) override { + INFO << "called, not implemented" << dendl; + return arw::Status::NotImplemented("peek not currently allowed"); + } + + bool supports_zero_copy() const override { + return false; + } + + arw::Result> ReadMetadata() override { + INFO << "called" << dendl; + return kv_metadata; + } +}; // class LocalInputStream + +class LocalRandomAccessFile : public arw::io::RandomAccessFile { + + FlightData flight_data; + const DoutPrefix dp; + + std::iostream::pos_type position; + std::fstream file; + +public: + LocalRandomAccessFile(const FlightData& _flight_data, const DoutPrefix _dp) : + flight_data(_flight_data), + dp(_dp) + { } + + // implement InputStream + + arw::Status Open() { + file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in); + if (!file.good()) { + return arw::Status::IOError("unable to open file"); + } + + INFO << "file opened successfully" << dendl; + position = file.tellg(); + return arw::Status::OK(); + } + + arw::Status Close() override { + file.close(); + INFO << "file closed" << dendl; + return arw::Status::OK(); + } + + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError( + "could not query file implementaiton with tellg"); + } else { + return int64_t(position); + } + } + + bool closed() const override { + return file.is_open(); + } + + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + if (file.read(reinterpret_cast(out), + reinterpret_cast(nbytes))) { + const std::streamsize bytes_read = file.gcount(); + INFO << "Point A: read bytes " << bytes_read << dendl; + position = file.tellg(); + return bytes_read; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %" PRId64, + int64_t(position)); + } + } + + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); + + if (file.read(reinterpret_cast(buffer->writeable_data()), + reinterpret_cast(nbytes))) { + const auto bytes_read = file.gcount(); + INFO << "Point B: read bytes " << bytes_read << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else if (file.rdstate() & std::ifstream::failbit && + file.rdstate() & std::ifstream::eofbit) { + const auto bytes_read = file.gcount(); + INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl; + // buffer->set_size(bytes_read); + position = file.tellg(); + return buffer; + } else { + ERROR << "unable to read from file" << dendl; + return arw::Status::IOError("unable to read from offset %ld", position); + } + } + + bool supports_zero_copy() const override { + return false; + } + + // implement Seekable + + arw::Result GetSize() override { + return flight_data.obj_size; + } + + arw::Result Peek(int64_t nbytes) override { + std::iostream::pos_type here = file.tellg(); + if (here == -1) { + return arw::Status::IOError( + "unable to determine current position ahead of peek"); + } + + ARROW_ASSIGN_OR_RAISE(OwningStringView result, + OwningStringView::make(nbytes)); + + // read + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + Read(nbytes, (void*) result.writeable_data())); + (void) bytes_read; // silence unused variable warnings + + // return offset to original + ARROW_RETURN_NOT_OK(Seek(here)); + + return result; + } + + arw::Result> ReadMetadata() { + return flight_data.kv_metadata; + } + + arw::Future> ReadMetadataAsync( + const arw::io::IOContext& io_context) override { + return arw::Future>::MakeFinished(ReadMetadata()); + } + + // implement Seekable interface + + arw::Status Seek(int64_t position) { + file.seekg(position); + if (file.fail()) { + return arw::Status::IOError( + "error encountered during seek to %" PRId64, position); + } else { + return arw::Status::OK(); + } + } +}; // class LocalRandomAccessFile #endif - } - arw::Status Next(std::unique_ptr* info) { - *info = nullptr; - return arw::Status::OK(); - } -}; +class RandomAccessObject : public arw::io::RandomAccessFile { + FlightData flight_data; + const DoutPrefix dp; - arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, - const flt::Criteria* criteria, - std::unique_ptr* listings) { - *listings = std::make_unique(); + int64_t position; + bool is_closed; + std::unique_ptr op; + +public: + + RandomAccessObject(const FlightData& _flight_data, + std::unique_ptr& obj, + const DoutPrefix _dp) : + flight_data(_flight_data), + dp(_dp), + position(-1), + is_closed(false) + { + op = obj->get_read_op(); + } + + arw::Status Open() { + int ret = op->prepare(null_yield, &dp); + if (ret < 0) { + return arw::Status::IOError( + "unable to prepare object with error %d", ret); + } + INFO << "file opened successfully" << dendl; + position = 0; return arw::Status::OK(); } - static FlightServer* fs; + // implement InputStream - void set_flight_server(FlightServer* _server) { - fs = _server; + arw::Status Close() override { + position = -1; + is_closed = true; + (void) op.reset(); + INFO << "object closed" << dendl; + return arw::Status::OK(); } - FlightServer* get_flight_server() { - return fs; + arw::Result Tell() const override { + if (position < 0) { + return arw::Status::IOError("could not determine position"); + } else { + return position; + } } - FlightStore* get_flight_store() { - return fs ? fs->get_flight_store() : nullptr; + bool closed() const override { + return is_closed; } - FlightKey propose_flight(const req_state* request) { - FlightKey key = get_flight_store()->add_flight(FlightData(request)); - return key; + arw::Result Read(int64_t nbytes, void* out) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + if (position < 0) { + ERROR << "error, position indicated error" << dendl; + return arw::Status::IOError("object read op is in bad state"); + } + + // note: read function reads through end_position inclusive + int64_t end_position = position + nbytes - 1; + + bufferlist bl; + + const int64_t bytes_read = + op->read(position, end_position, bl, null_yield, &dp); + if (bytes_read < 0) { + const int64_t former_position = position; + position = -1; + ERROR << "read operation returned " << bytes_read << dendl; + return arw::Status::IOError( + "unable to read object at position %" PRId64 ", error code: %" PRId64, + former_position, + bytes_read); + } + + // TODO: see if there's a way to get rid of this copy, perhaps + // updating rgw::sal::read_op + bl.cbegin().copy(bytes_read, reinterpret_cast(out)); + + position += bytes_read; + + if (nbytes != bytes_read) { + INFO << "partial read: nbytes=" << nbytes << + ", bytes_read=" << bytes_read << dendl; + } + INFO << bytes_read << " bytes read" << dendl; + return bytes_read; } + arw::Result> Read(int64_t nbytes) override { + INFO << "entered: asking for " << nbytes << " bytes" << dendl; + + std::shared_ptr buffer; + ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes)); + + ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read, + Read(nbytes, buffer->writeable_data())); + buffer->set_size(bytes_read); + + return buffer; + } + + bool supports_zero_copy() const override { + return false; + } + + // implement Seekable + + arw::Result GetSize() override { + INFO << "entered: " << flight_data.obj_size << " returned" << dendl; + return flight_data.obj_size; + } + + arw::Result Peek(int64_t nbytes) override { + INFO << "entered: " << nbytes << " bytes" << dendl; + + int64_t saved_position = position; + + ARROW_ASSIGN_OR_RAISE(OwningStringView buffer, + OwningStringView::make(nbytes)); + + ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read, + Read(nbytes, (void*) buffer.writeable_data())); + + // restore position for a peek + position = saved_position; + + if (bytes_read < nbytes) { + // create new OwningStringView with moved buffer + return OwningStringView::shrink(std::move(buffer), bytes_read); + } else { + return buffer; + } + } + + arw::Result> ReadMetadata() { + return flight_data.kv_metadata; + } + + arw::Future> ReadMetadataAsync( + const arw::io::IOContext& io_context) override { + return arw::Future>::MakeFinished(ReadMetadata()); + } + + // implement Seekable interface + + arw::Status Seek(int64_t new_position) { + INFO << "entered: position: " << new_position << dendl; + if (position < 0) { + ERROR << "error, position indicated error" << dendl; + return arw::Status::IOError("object read op is in bad state"); + } else { + position = new_position; + return arw::Status::OK(); + } + } +}; // class RandomAccessObject + +arw::Status FlightServer::DoGet(const flt::ServerCallContext &context, + const flt::Ticket &request, + std::unique_ptr *stream) { + int ret; + + ARROW_ASSIGN_OR_RAISE(FlightKey key, TicketToFlightKey(request)); + ARROW_ASSIGN_OR_RAISE(FlightData fd, get_flight_store()->get_flight(key)); + + std::unique_ptr user = driver->get_user(fd.user_id); + if (user->empty()) { + INFO << "user is empty" << dendl; + } else { + // TODO: test what happens if user is not loaded + ret = user->load_user(&dp, null_yield); + if (ret < 0) { + ERROR << "load_user returned " << ret << dendl; + // TODO return something + } + INFO << "user is " << user->get_display_name() << dendl; + } + + std::unique_ptr bucket; + + ret = driver->get_bucket(&dp, &(*user), fd.tenant_name, fd.bucket_name, + &bucket, null_yield); + if (ret < 0) { + ERROR << "get_bucket returned " << ret << dendl; + // TODO return something + } + + std::unique_ptr object = bucket->get_object(fd.object_key); + + auto input = std::make_shared(fd, object, dp); + ARROW_RETURN_NOT_OK(input->Open()); + + std::unique_ptr reader; + ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, + arw::default_memory_pool(), + &reader)); + + std::shared_ptr table; + ARROW_RETURN_NOT_OK(reader->ReadTable(&table)); + + std::vector> batches; + arw::TableBatchReader batch_reader(*table); + ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches)); + + ARROW_ASSIGN_OR_RAISE(auto owning_reader, + arw::RecordBatchReader::Make( + std::move(batches), table->schema())); + *stream = std::unique_ptr( + new flt::RecordBatchStream(owning_reader)); + + return arw::Status::OK(); +} // flightServer::DoGet + } // namespace rgw::flight diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h index 1f4d3721376..8f6c4ade7b7 100644 --- a/src/rgw/rgw_flight.h +++ b/src/rgw/rgw_flight.h @@ -14,88 +14,200 @@ #include "rgw_frontend.h" #include "arrow/type.h" #include "arrow/flight/server.h" +#include "arrow/util/string_view.h" #include "rgw_flight_frontend.h" + +#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " +#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " +#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " +#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " + +#define INFO INFO_F(dp) +#define STATUS STATUS_F(dp) +#define WARN WARN_F(dp) +#define ERROR ERROR_F(dp) + + namespace arw = arrow; namespace flt = arrow::flight; + struct req_state; namespace rgw::flight { - static const coarse_real_clock::duration lifespan = std::chrono::hours(1); +static const coarse_real_clock::duration lifespan = std::chrono::hours(1); - struct FlightData { - FlightKey key; - coarse_real_clock::time_point expires; +struct FlightData { + FlightKey key; + // coarse_real_clock::time_point expires; + std::string uri; + std::string tenant_name; + std::string bucket_name; + rgw_obj_key object_key; + // NB: what about object's namespace and instance? + uint64_t num_records; + uint64_t obj_size; + std::shared_ptr schema; + std::shared_ptr kv_metadata; - rgw::sal::Bucket* bucket; -#if 0 - rgw::sal::Object object; - rgw::sal::User user; -#endif + rgw_user user_id; // TODO: this should be removed when we do + // proper flight authentication + FlightData(const std::string& _uri, + const std::string& _tenant_name, + const std::string& _bucket_name, + const rgw_obj_key& _object_key, + uint64_t _num_records, + uint64_t _obj_size, + std::shared_ptr& _schema, + std::shared_ptr& _kv_metadata, + rgw_user _user_id); +}; - FlightData(const req_state* state); - }; +// stores flights that have been created and helps expire them +class FlightStore { - // stores flights that have been created and helps expire them - class FlightStore { - public: - virtual ~FlightStore(); - virtual FlightKey add_flight(FlightData&& flight) = 0; - virtual int get_flight(const FlightKey& key) = 0; - virtual int remove_flight(const FlightKey& key) = 0; - virtual int expire_flights() = 0; - }; +protected: - class MemoryFlightStore : public FlightStore { - std::map map; + const DoutPrefix& dp; - public: +public: - virtual ~MemoryFlightStore(); - FlightKey add_flight(FlightData&& flight) override; - int get_flight(const FlightKey& key) override; - int remove_flight(const FlightKey& key) override; - int expire_flights() override; - }; + FlightStore(const DoutPrefix& dp); + virtual ~FlightStore(); + virtual FlightKey add_flight(FlightData&& flight) = 0; - class FlightServer : public flt::FlightServerBase { + // TODO consider returning const shared pointers to FlightData in + // the following two functions + virtual arw::Result get_flight(const FlightKey& key) const = 0; + virtual std::optional after_key(const FlightKey& key) const = 0; - using Data1 = std::vector>; + virtual int remove_flight(const FlightKey& key) = 0; + virtual int expire_flights() = 0; +}; - boost::intrusive_ptr cct; - const DoutPrefix dp; - rgw::sal::Store* store; - FlightStore* flight_store; +class MemoryFlightStore : public FlightStore { + std::map map; + mutable std::mutex mtx; // for map - std::map data; +public: - public: + MemoryFlightStore(const DoutPrefix& dp); + virtual ~MemoryFlightStore(); + FlightKey add_flight(FlightData&& flight) override; + arw::Result get_flight(const FlightKey& key) const override; + std::optional after_key(const FlightKey& key) const override; + int remove_flight(const FlightKey& key) override; + int expire_flights() override; +}; - static constexpr int default_port = 8077; +class FlightServer : public flt::FlightServerBase { - FlightServer(boost::intrusive_ptr& _cct, - rgw::sal::Store* _store, - FlightStore* _flight_store); - ~FlightServer() override; + using Data1 = std::vector>; - FlightStore* get_flight_store() { - return flight_store; + RGWProcessEnv& env; + rgw::sal::Driver* driver; + const DoutPrefix& dp; + FlightStore* flight_store; + + std::map data; + +public: + + static constexpr int default_port = 8077; + + FlightServer(RGWProcessEnv& env, + FlightStore* flight_store, + const DoutPrefix& dp); + ~FlightServer() override; + + FlightStore* get_flight_store() { + return flight_store; + } + + arw::Status ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) override; + + arw::Status GetFlightInfo(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *info) override; + + arw::Status GetSchema(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr *schema) override; + + arw::Status DoGet(const flt::ServerCallContext &context, + const flt::Ticket &request, + std::unique_ptr *stream) override; +}; // class FlightServer + +class OwningStringView : public arw::util::string_view { + + uint8_t* buffer; + int64_t capacity; + int64_t consumed; + + OwningStringView(uint8_t* _buffer, int64_t _size) : + arw::util::string_view((const char*) _buffer, _size), + buffer(_buffer), + capacity(_size), + consumed(_size) + { } + + OwningStringView(OwningStringView&& from, int64_t new_size) : + buffer(nullptr), + capacity(from.capacity), + consumed(new_size) + { + // should be impossible due to static function check + ceph_assertf(consumed <= capacity, "new size cannot exceed capacity"); + + std::swap(buffer, from.buffer); + from.capacity = 0; + from.consumed = 0; } - arw::Status ListFlights(const flt::ServerCallContext& context, - const flt::Criteria* criteria, - std::unique_ptr* listings) override; - }; // class FlightServer +public: - // GLOBAL + OwningStringView(OwningStringView&&) = default; + OwningStringView& operator=(OwningStringView&&) = default; - void set_flight_server(FlightServer* _server); - FlightServer* get_flight_server(); - FlightStore* get_flight_store(); - FlightKey propose_flight(const req_state* request); + uint8_t* writeable_data() { + return buffer; + } + + ~OwningStringView() { + if (buffer) { + delete[] buffer; + } + } + + static arw::Result make(int64_t size) { + uint8_t* buffer = new uint8_t[size]; + if (!buffer) { + return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); + } + return OwningStringView(buffer, size); + } + + static arw::Result shrink(OwningStringView&& from, + int64_t new_size) { + if (new_size > from.capacity) { + return arw::Status::Invalid("new size cannot exceed capacity"); + } else { + return OwningStringView(std::move(from), new_size); + } + } + +}; + +// GLOBAL + +flt::Ticket FlightKeyToTicket(const FlightKey& key); +arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key); } // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc index 1751e2515d3..3da0f5d15e7 100644 --- a/src/rgw/rgw_flight_frontend.cc +++ b/src/rgw/rgw_flight_frontend.cc @@ -2,102 +2,234 @@ // vim: ts=8 sw=2 smarttab ft=cpp +#include +#include +#include + #include "arrow/type.h" #include "arrow/flight/server.h" +#include "arrow/io/file.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" +#include "parquet/stream_reader.h" #include "rgw_flight_frontend.h" #include "rgw_flight.h" -#define dout_subsys ceph_subsys_rgw_flight + +// logging +constexpr unsigned dout_subsys = ceph_subsys_rgw_flight; +constexpr const char* dout_prefix_str = "rgw arrow_flight: "; + namespace rgw::flight { - FlightFrontend::FlightFrontend(boost::intrusive_ptr& _cct, - RGWFrontendConfig* _config, - rgw::sal::Store* _store, - int _port) : - cct(_cct), - dp(cct.get(), dout_subsys, "rgw arrow_flight: "), - config(_config), - port(_port) - { - FlightStore* flight_store = new MemoryFlightStore(); - flight_server = new FlightServer(_cct, _store, flight_store); +const FlightKey null_flight_key = 0; + +FlightFrontend::FlightFrontend(RGWProcessEnv& _env, + RGWFrontendConfig* _config, + int _port) : + env(_env), + config(_config), + port(_port), + dp(env.driver->ctx(), dout_subsys, dout_prefix_str) +{ + env.flight_store = new MemoryFlightStore(dp); + env.flight_server = new FlightServer(env, env.flight_store, dp); + INFO << "flight server started" << dendl; +} + +FlightFrontend::~FlightFrontend() { + delete env.flight_server; + delete env.flight_store; + INFO << "flight server shut down" << dendl; +} + +int FlightFrontend::init() { + if (port <= 0) { + port = FlightServer::default_port; + } + const std::string url = + std::string("grpc+tcp://localhost:") + std::to_string(port); + flt::Location location; + arw::Status s = flt::Location::Parse(url, &location); + if (!s.ok()) { + ERROR << "couldn't parse url=" << url << ", status=" << s << dendl; + return -EINVAL; } - FlightFrontend::~FlightFrontend() { - delete flight_server; + flt::FlightServerOptions options(location); + options.verify_client = false; + s = env.flight_server->Init(options); + if (!s.ok()) { + ERROR << "couldn't init flight server; status=" << s << dendl; + return -EINVAL; } - int FlightFrontend::init() { - if (port <= 0) { - port = FlightServer::default_port; - } - const std::string url = - std::string("grpc+tcp://localhost:") + std::to_string(port); - flt::Location location; - arw::Status s = flt::Location::Parse(url, &location); - if (!s.ok()) { - return -EINVAL; - } + INFO << "FlightServer inited; will use port " << port << dendl; + return 0; +} - flt::FlightServerOptions options(location); - options.verify_client = false; - s = flight_server->Init(options); - if (!s.ok()) { - return -EINVAL; - } +int FlightFrontend::run() { + try { + flight_thread = make_named_thread(server_thread_name, + &FlightServer::Serve, + env.flight_server); - dout(20) << "STATUS: " << __func__ << - ": FlightServer inited; will use port " << port << dendl; + INFO << "FlightServer thread started, id=" << + flight_thread.get_id() << + ", joinable=" << flight_thread.joinable() << dendl; return 0; + } catch (std::system_error& e) { + ERROR << "FlightServer thread failed to start" << dendl; + return -e.code().value(); + } +} + +void FlightFrontend::stop() { + env.flight_server->Shutdown(); + env.flight_server->Wait(); + INFO << "FlightServer shut down" << dendl; +} + +void FlightFrontend::join() { + flight_thread.join(); + INFO << "FlightServer thread joined" << dendl; +} + +void FlightFrontend::pause_for_new_config() { + // ignore since config changes won't alter flight_server +} + +void FlightFrontend::unpause_with_new_config() { + // ignore since config changes won't alter flight_server +} + +/* ************************************************************ */ + +FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request, + RGWGetObj_Filter* next) : + RGWGetObj_Filter(next), + penv(request->penv), + dp(request->cct->get(), dout_subsys, dout_prefix_str), + current_offset(0), + expected_size(request->obj_size), + uri(request->decoded_uri), + tenant_name(request->bucket->get_tenant()), + bucket_name(request->bucket->get_name()), + object_key(request->object->get_key()), + // note: what about object namespace and instance? + schema_status(arrow::StatusCode::Cancelled, + "schema determination incomplete"), + user_id(request->user->get_id()) +{ +#warning "TODO: fix use of tmpnam" + char name[L_tmpnam]; + const char* namep = std::tmpnam(name); + if (!namep) { + // + } + temp_file_name = namep; + + temp_file.open(temp_file_name); +} + +FlightGetObj_Filter::~FlightGetObj_Filter() { + if (temp_file.is_open()) { + temp_file.close(); + } + std::error_code error; + std::filesystem::remove(temp_file_name, error); + if (error) { + ERROR << "FlightGetObj_Filter got error when removing temp file; " + "error=" << error.value() << + ", temp_file_name=" << temp_file_name << dendl; + } else { + INFO << "parquet/arrow schema determination status: " << + schema_status << dendl; + } +} + +int FlightGetObj_Filter::handle_data(bufferlist& bl, + off_t bl_ofs, off_t bl_len) { + INFO << "flight handling data from offset " << + current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl; + + current_offset += bl_len; + + if (temp_file.is_open()) { + bl.write_stream(temp_file); + + if (current_offset >= expected_size) { + INFO << "data read is completed, current_offset=" << + current_offset << ", expected_size=" << expected_size << dendl; + temp_file.close(); + + std::shared_ptr kv_metadata; + std::shared_ptr aw_schema; + int64_t num_rows = 0; + + auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr file, + arrow::io::ReadableFile::Open(temp_file_name)); + const std::shared_ptr metadata = parquet::ReadMetaData(file); + + file->Close(); + + num_rows = metadata->num_rows(); + kv_metadata = metadata->key_value_metadata(); + const parquet::SchemaDescriptor* pq_schema = metadata->schema(); + ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema)); + + return arrow::Status::OK(); + }; + + schema_status = process_metadata(); + if (!schema_status.ok()) { + ERROR << "reading metadata to access schema, error=" << schema_status << dendl; + } else { + // INFO << "arrow_schema=" << *aw_schema << dendl; + FlightStore* store = penv.flight_store; + auto key = + store->add_flight(FlightData(uri, tenant_name, bucket_name, + object_key, num_rows, + expected_size, aw_schema, + kv_metadata, user_id)); + (void) key; // suppress unused variable warning + } + } // if last block + } // if file opened + + // chain to next filter in stream + int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + + return ret; +} + +#if 0 +void code_snippets() { + INFO << "num_columns:" << md->num_columns() << + " num_schema_elements:" << md->num_schema_elements() << + " num_rows:" << md->num_rows() << + " num_row_groups:" << md->num_row_groups() << dendl; + + + INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl; + for (int c = 0; c < schema1->num_columns(); ++c) { + const parquet::ColumnDescriptor* cd = schema1->Column(c); + // const parquet::ConvertedType::type t = cd->converted_type; + const std::shared_ptr lt = cd->logical_type(); + INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl; } - int FlightFrontend::run() { - try { - flight_thread = make_named_thread(server_thread_name, - &FlightServer::Serve, - flight_server); - set_flight_server(flight_server); - - dout(20) << "INFO: " << __func__ << - ": FlightServer thread started, id=" << flight_thread.get_id() << - ", joinable=" << flight_thread.joinable() << dendl; - return 0; - } catch (std::system_error& e) { - derr << "ERROR: " << __func__ << - ": FlightServer thread failed to start" << dendl; - return -ENOSPC; - } - } - - void FlightFrontend::stop() { - set_flight_server(nullptr); - flight_server->Shutdown(); - flight_server->Wait(); - dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl; - } - - void FlightFrontend::join() { - flight_thread.join(); - dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl; - } - - void FlightFrontend::pause_for_new_config() { - // ignore since config changes won't alter flight_server - } - - void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store, - rgw_auth_registry_ptr_t auth_registry) { - // ignore since config changes won't alter flight_server - } - - int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { - // do work here - dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl; - - // chain upwards - return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl; + for (int rg = 0; rg < md->num_row_groups(); ++rg) { + INFO << "Row Group " << rg << dendl; + auto rg_md = md->RowGroup(rg); + auto schema2 = rg_md->schema(); } +} +#endif } // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.h b/src/rgw/rgw_flight_frontend.h index c74878a884b..b820ca22b06 100644 --- a/src/rgw/rgw_flight_frontend.h +++ b/src/rgw/rgw_flight_frontend.h @@ -8,62 +8,71 @@ #include "rgw_frontend.h" #include "rgw_op.h" +#include "arrow/status.h" + namespace rgw::flight { - using FlightKey = uint32_t; +using FlightKey = uint32_t; +extern const FlightKey null_flight_key; - class FlightServer; +class FlightServer; - class FlightFrontend : public RGWFrontend { +class FlightFrontend : public RGWFrontend { - static constexpr std::string_view server_thread_name = - "Arrow Flight Server thread"; + static constexpr std::string_view server_thread_name = + "Arrow Flight Server thread"; - boost::intrusive_ptr& cct; - const DoutPrefix dp; - FlightServer* flight_server; // pointer so header file doesn't need to pull in too much - std::thread flight_thread; - RGWFrontendConfig* config; - int port; + RGWProcessEnv& env; + std::thread flight_thread; + RGWFrontendConfig* config; + int port; - public: + const DoutPrefix dp; - // port <= 0 -> let server decide; typically 8077 - FlightFrontend(boost::intrusive_ptr& cct, - RGWFrontendConfig* config, - rgw::sal::Store* store, - int port = -1); - ~FlightFrontend() override; - int init() override; - int run() override; - void stop() override; - void join() override; +public: - void pause_for_new_config() override; - void unpause_with_new_config(rgw::sal::Store* store, - rgw_auth_registry_ptr_t auth_registry) override; + // port <= 0 means let server decide; typically 8077 + FlightFrontend(RGWProcessEnv& env, + RGWFrontendConfig* config, + int port = -1); + ~FlightFrontend() override; + int init() override; + int run() override; + void stop() override; + void join() override; - }; // class FlightFrontend + void pause_for_new_config() override; + void unpause_with_new_config() override; +}; // class FlightFrontend - class FlightGetObj_Filter : public RGWGetObj_Filter { +class FlightGetObj_Filter : public RGWGetObj_Filter { - FlightKey key; + const RGWProcessEnv& penv; + const DoutPrefix dp; + FlightKey key; + uint64_t current_offset; + uint64_t expected_size; + std::string uri; + std::string tenant_name; + std::string bucket_name; + rgw_obj_key object_key; + std::string temp_file_name; + std::ofstream temp_file; + arrow::Status schema_status; + rgw_user user_id; // TODO: this should be removed when we do + // proper flight authentication - public: +public: - FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) : - RGWGetObj_Filter(next), - key(_key) - { - // empty - } + FlightGetObj_Filter(const req_state* request, RGWGetObj_Filter* next); + ~FlightGetObj_Filter(); - int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; #if 0 - // this would allow the range to be modified if necessary; - int fixup_range(off_t& ofs, off_t& end) override; + // this would allow the range to be modified if necessary; + int fixup_range(off_t& ofs, off_t& end) override; #endif - }; +}; } // namespace rgw::flight diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 9c238827445..e5589ae1508 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2238,10 +2238,8 @@ void RGWGetObj::execute(optional_yield y) #ifdef WITH_ARROW_FLIGHT if (ofs == 0) { - rgw::flight::FlightKey key = rgw::flight::propose_flight(s); - ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl; - - flight_filter.emplace(key, filter); + // insert a GetObj_Filter to monitor and create flight + flight_filter.emplace(s, filter); filter = &*flight_filter; } #endif diff --git a/src/rgw/rgw_process_env.h b/src/rgw/rgw_process_env.h index 7e62b6afcd4..3193ff15121 100644 --- a/src/rgw/rgw_process_env.h +++ b/src/rgw/rgw_process_env.h @@ -20,6 +20,13 @@ namespace rgw::sal { class LuaManager; } +#ifdef WITH_ARROW_FLIGHT +namespace rgw::flight { + class FlightServer; + class FlightStore; +} +#endif + struct RGWLuaProcessEnv { std::string luarocks_path; rgw::lua::Background* background = nullptr; @@ -33,4 +40,11 @@ struct RGWProcessEnv { OpsLogSink *olog = nullptr; std::unique_ptr auth_registry; ActiveRateLimiter* ratelimiting = nullptr; + +#ifdef WITH_ARROW_FLIGHT + // managed by rgw:flight::FlightFrontend in rgw_flight_frontend.cc + rgw::flight::FlightServer* flight_server; + rgw::flight::FlightStore* flight_store; +#endif }; +