mirror of https://github.com/ceph/ceph
Merge pull request #59465 from galsalomon66/limit_mem_usage_on_parquet_flow
rgw/s3select: limit memory usage on Parquet flow
This commit is contained in:
commit
20007ea4a7
|
@ -59,6 +59,14 @@ options:
|
|||
services:
|
||||
- rgw
|
||||
with_legacy: true
|
||||
- name: rgw_parquet_buffer_size
|
||||
type: size
|
||||
level: advanced
|
||||
desc: the Maximum parquet buffer size, a limit to memory consumption for parquet reading operations.
|
||||
default: 16_M
|
||||
services:
|
||||
- rgw
|
||||
with_legacy: true
|
||||
- name: rgw_rados_tracing
|
||||
type: bool
|
||||
level: advanced
|
||||
|
|
|
@ -344,7 +344,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
|
|||
|
||||
int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
|
||||
{
|
||||
if(m_s3select_query.empty() == false) {
|
||||
if (m_s3select_query.empty() == false) {
|
||||
return 0;
|
||||
}
|
||||
#ifndef _ARROW_EXIST
|
||||
|
@ -416,14 +416,14 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
|
|||
if (output_escape_char.size()) {
|
||||
csv.output_escape_char = *output_escape_char.c_str();
|
||||
}
|
||||
if(output_quote_fields.compare("ALWAYS") == 0) {
|
||||
if (output_quote_fields.compare("ALWAYS") == 0) {
|
||||
csv.quote_fields_always = true;
|
||||
} else if(output_quote_fields.compare("ASNEEDED") == 0) {
|
||||
} else if (output_quote_fields.compare("ASNEEDED") == 0) {
|
||||
csv.quote_fields_asneeded = true;
|
||||
}
|
||||
if(m_header_info.compare("IGNORE")==0) {
|
||||
if (m_header_info.compare("IGNORE")==0) {
|
||||
csv.ignore_header_info=true;
|
||||
} else if(m_header_info.compare("USE")==0) {
|
||||
} else if (m_header_info.compare("USE")==0) {
|
||||
csv.use_header_info=true;
|
||||
}
|
||||
|
||||
|
@ -478,6 +478,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
|
|||
if (!m_s3_parquet_object.is_set()) {
|
||||
//parsing the SQL statement.
|
||||
s3select_syntax.parse_query(m_sql_query.c_str());
|
||||
parquet_object::csv_definitions parquet;
|
||||
|
||||
m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
|
||||
fp_s3select_result_format,
|
||||
|
@ -485,8 +486,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
|
|||
fp_debug_mesg);
|
||||
|
||||
try {
|
||||
//setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader
|
||||
parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size);
|
||||
//at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
|
||||
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
|
||||
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet);
|
||||
} catch(base_s3select_exception& e) {
|
||||
ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
|
||||
fp_result_header_format(m_aws_response_handler.get_sql_result());
|
||||
|
@ -524,6 +527,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
|
|||
fp_s3select_result_format,
|
||||
fp_result_header_format,
|
||||
fp_debug_mesg);
|
||||
json_object::csv_definitions json;
|
||||
|
||||
m_aws_response_handler.init_response();
|
||||
|
||||
|
@ -547,8 +551,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
|
|||
}
|
||||
|
||||
//initializing json processor
|
||||
json_object::csv_definitions output_definition;
|
||||
m_s3_json_object.set_json_query(&s3select_syntax,output_definition);
|
||||
m_s3_json_object.set_json_query(&s3select_syntax, json);
|
||||
|
||||
if (input == nullptr) {
|
||||
input = "";
|
||||
|
@ -706,6 +709,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff
|
|||
RGWGetObj::parse_range();
|
||||
requested_buffer.clear();
|
||||
m_request_range = len;
|
||||
m_aws_response_handler.update_processed_size(len);
|
||||
ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
|
||||
RGWGetObj::execute(y);
|
||||
if (buff) {
|
||||
|
@ -730,7 +734,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
|
|||
m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
|
||||
}
|
||||
|
||||
if(s->cct->_conf->rgw_disable_s3select == true)
|
||||
if (s->cct->_conf->rgw_disable_s3select == true)
|
||||
{
|
||||
std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter";
|
||||
ldpp_dout(this, 10) << error_msg << dendl;
|
||||
|
@ -749,12 +753,24 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
|
|||
return;
|
||||
}
|
||||
s3select_syntax.parse_query(m_sql_query.c_str());
|
||||
//the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request,
|
||||
//upon query-engine finish the processing, the control is back to execute()
|
||||
//the parquet-reader indicates the end of the parquet object.
|
||||
status = run_s3select_on_parquet(m_sql_query.c_str());
|
||||
if (status) {
|
||||
ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
|
||||
op_ret = -ERR_INVALID_REQUEST;
|
||||
} else {
|
||||
ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
|
||||
//status per amount of processed data
|
||||
m_aws_response_handler.update_total_bytes_returned(m_s3_parquet_object.get_return_result_size());
|
||||
m_aws_response_handler.init_stats_response();
|
||||
m_aws_response_handler.send_stats_response();
|
||||
m_aws_response_handler.init_end_response();
|
||||
ldpp_dout(this, 10) << "s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() "
|
||||
<< m_aws_response_handler.get_processed_size()
|
||||
<< "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
|
||||
|
||||
ldout(s->cct, 10) << "S3select: complete parquet query with success " << dendl;
|
||||
}
|
||||
} else {
|
||||
//CSV or JSON processing
|
||||
|
@ -762,7 +778,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
|
|||
|
||||
m_requested_range = (m_end_scan_sz - m_start_scan_sz);
|
||||
|
||||
if(m_is_trino_request){
|
||||
if (m_is_trino_request){
|
||||
// fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
|
||||
// thus the additional length will be processed, and no broken row for Trino.
|
||||
// assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
|
||||
|
@ -778,7 +794,8 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
|
|||
}
|
||||
|
||||
int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
|
||||
{
|
||||
{//purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader.
|
||||
//range_request() is called by arrow::ReadAt, and upon request completion the control is back to RGWSelectObj_ObjStore_S3::execute()
|
||||
fp_chunked_transfer_encoding();
|
||||
size_t append_in_callback = 0;
|
||||
int part_no = 1;
|
||||
|
@ -809,7 +826,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
|
|||
//the purpose is to return "perfect" results, with no broken or missing lines.
|
||||
|
||||
off_t new_offset = 0;
|
||||
if(m_scan_range_ind){//only upon range-scan
|
||||
if (m_scan_range_ind){//only upon range-scan
|
||||
int64_t sc=0;
|
||||
int64_t start =0;
|
||||
const char* row_delimiter = m_row_delimiter.c_str();
|
||||
|
@ -817,10 +834,10 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
|
|||
ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl;
|
||||
|
||||
//chop the head of the first chunk and only upon the slice does not include the head of the object.
|
||||
if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
|
||||
if (m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
|
||||
char* p = const_cast<char*>(it_cp+ofs);
|
||||
while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++;
|
||||
if(!strncmp(row_delimiter,p,1)){
|
||||
if (!strncmp(row_delimiter,p,1)){
|
||||
new_offset += (p - (it_cp+ofs))+1;
|
||||
}
|
||||
}
|
||||
|
@ -831,14 +848,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
|
|||
|
||||
//chop the end of the last chunk for this request
|
||||
//if it's the last chunk, search for first row-delimiter for the following different use-cases
|
||||
if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
|
||||
if ((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
|
||||
//had pass the requested range, start to search for first delimiter
|
||||
if(m_aws_response_handler.get_processed_size()>m_requested_range){
|
||||
if (m_aws_response_handler.get_processed_size()>m_requested_range){
|
||||
//the previous chunk contain the complete request(all data) and an extra bytes.
|
||||
//thus, search for the first row-delimiter
|
||||
//[:previous (RR) ... ][:current (RD) ]
|
||||
start = 0;
|
||||
} else if(m_aws_response_handler.get_processed_size()){
|
||||
} else if (m_aws_response_handler.get_processed_size()){
|
||||
//the *current* chunk contain the complete request in the middle of the chunk.
|
||||
//thus, search for the first row-delimiter after the complete request position
|
||||
//[:current (RR) .... (RD) ]
|
||||
|
@ -852,7 +869,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
|
|||
for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
|
||||
{
|
||||
char* p = const_cast<char*>(it_cp) + ofs + sc;
|
||||
if(!strncmp(row_delimiter,p,1)){
|
||||
if (!strncmp(row_delimiter,p,1)){
|
||||
ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl;
|
||||
len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
|
||||
//the end of row exist in current chunk.
|
||||
|
@ -872,7 +889,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
|
|||
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
|
||||
{
|
||||
int status = 0;
|
||||
if(m_skip_next_chunk == true){
|
||||
if (m_skip_next_chunk == true){
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -894,13 +911,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
|
|||
}
|
||||
|
||||
|
||||
if(ofs > it.length()){
|
||||
if (ofs > it.length()){
|
||||
//safety check
|
||||
ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
|
||||
ofs = 0;
|
||||
}
|
||||
|
||||
if(m_is_trino_request){
|
||||
if (m_is_trino_request){
|
||||
//TODO replace len with it.length() ? ; test Trino flow with compressed objects.
|
||||
//is it possible to send get-by-ranges? in parallel?
|
||||
shape_chunk_per_trino_requests(&(it)[0], ofs, len);
|
||||
|
@ -964,7 +981,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
|
|||
continue;
|
||||
}
|
||||
|
||||
if((ofs + len) > it.length()){
|
||||
if ((ofs + len) > it.length()){
|
||||
ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
|
||||
ofs = 0;
|
||||
len = it.length();
|
||||
|
@ -1025,7 +1042,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
|
|||
if (len == 0 && s->obj_size != 0) {
|
||||
return 0;
|
||||
}
|
||||
if (m_parquet_type) {
|
||||
if (m_parquet_type) {//bufferlist sendback upon range-request
|
||||
return parquet_processing(bl,ofs,len);
|
||||
}
|
||||
if (m_json_type) {
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit f333ec82e6e8a3f7eb9ba1041d1442b2c7cd0f05
|
||||
Subproject commit 0a0f6d439441f5b121ed1052dac54542e4f1d89b
|
Loading…
Reference in New Issue