Merge pull request #56585 from ivancich/wip-upgrade-arrow+flight-15.0.2

rgw: upgrade Apache Arrow submodule to 15.0.0

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
J. Eric Ivancich 2024-05-10 12:22:43 -04:00 committed by GitHub
commit 1dd82d996a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 49 additions and 24 deletions

View File

@ -69,6 +69,10 @@ function(build_arrow)
list(APPEND arrow_DEPENDS Boost)
endif()
# since Arrow 15.0.0 needs xsimd>=8.1.0 and since Ubuntu Jammy
# Jellyfish only provides 7.6.0, we'll have arrow build it as source
list(APPEND arrow_CMAKE_ARGS -Dxsimd_SOURCE=BUNDLED)
# cmake doesn't properly handle arguments containing ";", such as
# CMAKE_PREFIX_PATH, for which reason we'll have to use some other separator.
string(REPLACE ";" "!" CMAKE_PREFIX_PATH_ALT_SEP "${CMAKE_PREFIX_PATH}")

@ -1 +1 @@
Subproject commit 347a88ff9d20e2a4061eec0b455b8ea1aa8335dc
Subproject commit a61f4af724cd06c3a9b4abd20491345997e532c0

View File

@ -17,7 +17,6 @@
#include "arrow/type.h"
#include "arrow/buffer.h"
#include "arrow/util/string_view.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/table.h"
@ -175,7 +174,7 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
previous_key(null_flight_key)
{ }
arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
arrow::Result<std::unique_ptr<flt::FlightInfo>> Next() override {
std::optional<FlightData> fd = flight_store->after_key(previous_key);
if (fd) {
previous_key = fd->key;
@ -188,11 +187,9 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
*info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
return arw::Status::OK();
return std::make_unique<flt::FlightInfo>(std::move(info_obj));
} else {
*info = nullptr;
return arw::Status::OK();
return nullptr;
}
}
}; // class RGWFlightListing
@ -346,7 +343,7 @@ public:
}
}
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "called, not implemented" << dendl;
return arw::Status::NotImplemented("peek not currently allowed");
}
@ -458,7 +455,7 @@ public:
return flight_data.obj_size;
}
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
std::iostream::pos_type here = file.tellg();
if (here == -1) {
return arw::Status::IOError(
@ -620,7 +617,7 @@ public:
return flight_data.obj_size;
}
arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "entered: " << nbytes << " bytes" << dendl;
int64_t saved_position = position;
@ -716,7 +713,14 @@ arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,
std::vector<std::shared_ptr<arw::RecordBatch>> batches;
arw::TableBatchReader batch_reader(*table);
ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
while (true) {
std::shared_ptr<arw::RecordBatch> p;
auto s = batch_reader.ReadNext(&p);
if (!s.ok()) {
break;
}
batches.push_back(p);
}
ARROW_ASSIGN_OR_RAISE(auto owning_reader,
arw::RecordBatchReader::Make(

View File

@ -22,7 +22,6 @@
#include "rgw_frontend.h"
#include "arrow/type.h"
#include "arrow/flight/server.h"
#include "arrow/util/string_view.h"
#include "rgw_flight_frontend.h"
@ -122,6 +121,7 @@ class FlightServer : public flt::FlightServerBase {
FlightStore* flight_store;
std::map<std::string, Data1> data;
arw::Status serve_return_value;
public:
@ -132,6 +132,12 @@ public:
const DoutPrefix& dp);
~FlightServer() override;
// provides a version of Serve that has no return value, to avoid
// warnings when launching in a thread
void ServeAlt() {
serve_return_value = Serve();
}
FlightStore* get_flight_store() {
return flight_store;
}
@ -153,14 +159,14 @@ public:
std::unique_ptr<flt::FlightDataStream> *stream) override;
}; // class FlightServer
class OwningStringView : public arw::util::string_view {
class OwningStringView : public std::string_view {
uint8_t* buffer;
int64_t capacity;
int64_t consumed;
OwningStringView(uint8_t* _buffer, int64_t _size) :
arw::util::string_view((const char*) _buffer, _size),
std::string_view((const char*) _buffer, _size),
buffer(_buffer),
capacity(_size),
consumed(_size)

View File

@ -63,16 +63,16 @@ int FlightFrontend::init() {
}
const std::string url =
std::string("grpc+tcp://localhost:") + std::to_string(port);
flt::Location location;
arw::Status s = flt::Location::Parse(url, &location);
if (!s.ok()) {
ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
auto r = flt::Location::Parse(url);
if (!r.ok()) {
ERROR << "could not parse server uri: " << url << dendl;
return -EINVAL;
}
flt::Location location = *r;
flt::FlightServerOptions options(location);
options.verify_client = false;
s = env.flight_server->Init(options);
auto s = env.flight_server->Init(options);
if (!s.ok()) {
ERROR << "couldn't init flight server; status=" << s << dendl;
return -EINVAL;
@ -85,7 +85,7 @@ int FlightFrontend::init() {
int FlightFrontend::run() {
try {
flight_thread = make_named_thread(server_thread_name,
&FlightServer::Serve,
&FlightServer::ServeAlt,
env.flight_server);
INFO << "FlightServer thread started, id=" <<
@ -99,8 +99,19 @@ int FlightFrontend::run() {
}
void FlightFrontend::stop() {
env.flight_server->Shutdown();
env.flight_server->Wait();
arw::Status s;
s = env.flight_server->Shutdown();
if (!s.ok()) {
ERROR << "call to Shutdown failed; status=" << s << dendl;
return;
}
s = env.flight_server->Wait();
if (!s.ok()) {
ERROR << "call to Wait failed; status=" << s << dendl;
return;
}
INFO << "FlightServer shut down" << dendl;
}
@ -186,7 +197,7 @@ int FlightGetObj_Filter::handle_data(bufferlist& bl,
arrow::io::ReadableFile::Open(temp_file_name));
const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
file->Close();
ARROW_RETURN_NOT_OK(file->Close());
num_rows = metadata->num_rows();
kv_metadata = metadata->key_value_metadata();

@ -1 +1 @@
Subproject commit eb40d36e1090a8e02b610f67b7edb902d59f2bbe
Subproject commit f333ec82e6e8a3f7eb9ba1041d1442b2c7cd0f05