diff --git a/cmake/modules/BuildArrow.cmake b/cmake/modules/BuildArrow.cmake index 8a16b9b8899..0ee1d85b49f 100644 --- a/cmake/modules/BuildArrow.cmake +++ b/cmake/modules/BuildArrow.cmake @@ -69,6 +69,10 @@ function(build_arrow) list(APPEND arrow_DEPENDS Boost) endif() + # since Arrow 15.0.0 needs xsimd>=8.1.0 and since Ubuntu Jammy + # Jellyfish only provides 7.6.0, we'll have arrow build it as source + list(APPEND arrow_CMAKE_ARGS -Dxsimd_SOURCE=BUNDLED) + # cmake doesn't properly handle arguments containing ";", such as # CMAKE_PREFIX_PATH, for which reason we'll have to use some other separator. string(REPLACE ";" "!" CMAKE_PREFIX_PATH_ALT_SEP "${CMAKE_PREFIX_PATH}") diff --git a/src/arrow b/src/arrow index 347a88ff9d2..a61f4af724c 160000 --- a/src/arrow +++ b/src/arrow @@ -1 +1 @@ -Subproject commit 347a88ff9d20e2a4061eec0b455b8ea1aa8335dc +Subproject commit a61f4af724cd06c3a9b4abd20491345997e532c0 diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc index 955edcced89..4aaaa462689 100644 --- a/src/rgw/rgw_flight.cc +++ b/src/rgw/rgw_flight.cc @@ -17,7 +17,6 @@ #include "arrow/type.h" #include "arrow/buffer.h" -#include "arrow/util/string_view.h" #include "arrow/io/interfaces.h" #include "arrow/ipc/reader.h" #include "arrow/table.h" @@ -175,7 +174,7 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, previous_key(null_flight_key) { } - arw::Status Next(std::unique_ptr* info) { + arrow::Result> Next() override { std::optional fd = flight_store->after_key(previous_key); if (fd) { previous_key = fd->key; @@ -188,11 +187,9 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj, flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size)); - *info = std::make_unique(std::move(info_obj)); - return arw::Status::OK(); + return std::make_unique(std::move(info_obj)); } else { - *info = nullptr; - return arw::Status::OK(); + return nullptr; } } }; // class RGWFlightListing @@ -346,7 +343,7 @@ public: } } - arw::Result Peek(int64_t nbytes) override { + arw::Result Peek(int64_t nbytes) override { INFO << "called, not implemented" << dendl; return arw::Status::NotImplemented("peek not currently allowed"); } @@ -458,7 +455,7 @@ public: return flight_data.obj_size; } - arw::Result Peek(int64_t nbytes) override { + arw::Result Peek(int64_t nbytes) override { std::iostream::pos_type here = file.tellg(); if (here == -1) { return arw::Status::IOError( @@ -620,7 +617,7 @@ public: return flight_data.obj_size; } - arw::Result Peek(int64_t nbytes) override { + arw::Result Peek(int64_t nbytes) override { INFO << "entered: " << nbytes << " bytes" << dendl; int64_t saved_position = position; @@ -716,7 +713,14 @@ arw::Status FlightServer::DoGet(const flt::ServerCallContext &context, std::vector> batches; arw::TableBatchReader batch_reader(*table); - ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches)); + while (true) { + std::shared_ptr p; + auto s = batch_reader.ReadNext(&p); + if (!s.ok()) { + break; + } + batches.push_back(p); + } ARROW_ASSIGN_OR_RAISE(auto owning_reader, arw::RecordBatchReader::Make( diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h index bb0a987d0a1..d2f65d9a5b5 100644 --- a/src/rgw/rgw_flight.h +++ b/src/rgw/rgw_flight.h @@ -22,7 +22,6 @@ #include "rgw_frontend.h" #include "arrow/type.h" #include "arrow/flight/server.h" -#include "arrow/util/string_view.h" #include "rgw_flight_frontend.h" @@ -122,6 +121,7 @@ class FlightServer : public flt::FlightServerBase { FlightStore* flight_store; std::map data; + arw::Status serve_return_value; public: @@ -132,6 +132,12 @@ public: const DoutPrefix& dp); ~FlightServer() override; + // provides a version of Serve that has no return value, to avoid + // warnings when launching in a thread + void ServeAlt() { + serve_return_value = Serve(); + } + FlightStore* get_flight_store() { return flight_store; } @@ -153,14 +159,14 @@ public: std::unique_ptr *stream) override; }; // class FlightServer -class OwningStringView : public arw::util::string_view { +class OwningStringView : public std::string_view { uint8_t* buffer; int64_t capacity; int64_t consumed; OwningStringView(uint8_t* _buffer, int64_t _size) : - arw::util::string_view((const char*) _buffer, _size), + std::string_view((const char*) _buffer, _size), buffer(_buffer), capacity(_size), consumed(_size) diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc index c29703fe513..a673dbe3afb 100644 --- a/src/rgw/rgw_flight_frontend.cc +++ b/src/rgw/rgw_flight_frontend.cc @@ -63,16 +63,16 @@ int FlightFrontend::init() { } const std::string url = std::string("grpc+tcp://localhost:") + std::to_string(port); - flt::Location location; - arw::Status s = flt::Location::Parse(url, &location); - if (!s.ok()) { - ERROR << "couldn't parse url=" << url << ", status=" << s << dendl; + auto r = flt::Location::Parse(url); + if (!r.ok()) { + ERROR << "could not parse server uri: " << url << dendl; return -EINVAL; } + flt::Location location = *r; flt::FlightServerOptions options(location); options.verify_client = false; - s = env.flight_server->Init(options); + auto s = env.flight_server->Init(options); if (!s.ok()) { ERROR << "couldn't init flight server; status=" << s << dendl; return -EINVAL; @@ -85,7 +85,7 @@ int FlightFrontend::init() { int FlightFrontend::run() { try { flight_thread = make_named_thread(server_thread_name, - &FlightServer::Serve, + &FlightServer::ServeAlt, env.flight_server); INFO << "FlightServer thread started, id=" << @@ -99,8 +99,19 @@ int FlightFrontend::run() { } void FlightFrontend::stop() { - env.flight_server->Shutdown(); - env.flight_server->Wait(); + arw::Status s; + s = env.flight_server->Shutdown(); + if (!s.ok()) { + ERROR << "call to Shutdown failed; status=" << s << dendl; + return; + } + + s = env.flight_server->Wait(); + if (!s.ok()) { + ERROR << "call to Wait failed; status=" << s << dendl; + return; + } + INFO << "FlightServer shut down" << dendl; } @@ -186,7 +197,7 @@ int FlightGetObj_Filter::handle_data(bufferlist& bl, arrow::io::ReadableFile::Open(temp_file_name)); const std::shared_ptr metadata = parquet::ReadMetaData(file); - file->Close(); + ARROW_RETURN_NOT_OK(file->Close()); num_rows = metadata->num_rows(); kv_metadata = metadata->key_value_metadata(); diff --git a/src/s3select b/src/s3select index eb40d36e109..f333ec82e6e 160000 --- a/src/s3select +++ b/src/s3select @@ -1 +1 @@ -Subproject commit eb40d36e1090a8e02b610f67b7edb902d59f2bbe +Subproject commit f333ec82e6e8a3f7eb9ba1041d1442b2c7cd0f05