Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ function(resolve_cpr_dependency)
if(DEFINED ENV{ICEBERG_CPR_URL})
set(CPR_URL "$ENV{ICEBERG_CPR_URL}")
else()
set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.12.0.tar.gz")
set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.14.1.tar.gz")
endif()

fetchcontent_declare(cpr
Expand Down
97 changes: 21 additions & 76 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response,

} // namespace

void HttpClient::PrepareSession(
const std::string& path, HttpMethod method,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers) {
session_->SetUrl(cpr::Url{path});
session_->SetParameters(GetParameters(params));
session_->RemoveContent();
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
// manually enforce HTTP GET to clear it.
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
switch (method) {
case HttpMethod::kGet:
session_->PrepareGet();
break;
case HttpMethod::kPost:
session_->PreparePost();
break;
case HttpMethod::kPut:
session_->PreparePut();
break;
case HttpMethod::kDelete:
session_->PrepareDelete();
break;
case HttpMethod::kHead:
session_->PrepareHead();
break;
}
auto final_headers = MergeHeaders(default_headers_, headers);
session_->SetHeader(final_headers);
}

HttpClient::HttpClient(std::unordered_map<std::string, std::string> default_headers)
: default_headers_{std::move(default_headers)},
session_{std::make_unique<cpr::Session>()} {
connection_pool_{std::make_unique<cpr::ConnectionPool>()} {
// Set default Content-Type for all requests (including GET/HEAD/DELETE).
// Many systems require that content type is set regardless and will fail,
// even on an empty bodied request.
Expand All @@ -182,12 +150,9 @@ Result<HttpResponse> HttpClient::Get(
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, HttpMethod::kGet, params, headers);
response = session_->Get();
}
auto final_headers = MergeHeaders(default_headers_, headers);
cpr::Response response =
cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, *connection_pool_);

ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
Expand All @@ -199,13 +164,9 @@ Result<HttpResponse> HttpClient::Post(
const std::string& path, const std::string& body,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
session_->SetBody(cpr::Body{body});
response = session_->Post();
}
auto final_headers = MergeHeaders(default_headers_, headers);
cpr::Response response =
cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, *connection_pool_);

ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
Expand All @@ -218,25 +179,16 @@ Result<HttpResponse> HttpClient::PostForm(
const std::unordered_map<std::string, std::string>& form_data,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;

{
std::lock_guard guard(session_mutex_);

// Override default Content-Type (application/json) with form-urlencoded
auto form_headers = headers;
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;

PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
std::vector<cpr::Pair> pair_list;
pair_list.reserve(form_data.size());
for (const auto& [key, val] : form_data) {
pair_list.emplace_back(key, val);
}
session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));

response = session_->Post();
auto final_headers = MergeHeaders(default_headers_, headers);
final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded);
std::vector<cpr::Pair> pair_list;
pair_list.reserve(form_data.size());
for (const auto& [key, val] : form_data) {
pair_list.emplace_back(key, val);
}
cpr::Response response =
cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), pair_list.end()),
final_headers, *connection_pool_);

ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
Expand All @@ -247,12 +199,8 @@ Result<HttpResponse> HttpClient::PostForm(
Result<HttpResponse> HttpClient::Head(
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
response = session_->Head();
}
auto final_headers = MergeHeaders(default_headers_, headers);
cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, *connection_pool_);

ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
Expand All @@ -264,12 +212,9 @@ Result<HttpResponse> HttpClient::Delete(
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, HttpMethod::kDelete, params, headers);
response = session_->Delete();
}
auto final_headers = MergeHeaders(default_headers_, headers);
cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params),
final_headers, *connection_pool_);

ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
Expand Down
14 changes: 2 additions & 12 deletions src/iceberg/catalog/rest/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"
Expand All @@ -34,7 +32,7 @@
/// \brief Http client for Iceberg REST API.

namespace cpr {
class Session;
class ConnectionPool;
} // namespace cpr

namespace iceberg::rest {
Expand Down Expand Up @@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient {
const ErrorHandler& error_handler);

private:
void PrepareSession(const std::string& path, HttpMethod method,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers);

std::unordered_map<std::string, std::string> default_headers_;

// TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent
// calls
std::unique_ptr<cpr::Session> session_;
mutable std::mutex session_mutex_;
std::unique_ptr<cpr::ConnectionPool> connection_pool_;
};

} // namespace iceberg::rest
21 changes: 11 additions & 10 deletions subprojects/cpr.wrap
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
# under the License.

[wrap-file]
directory = cpr-1.12.0
source_url = https://github.com/libcpr/cpr/archive/1.12.0.tar.gz
source_filename = cpr-1.12.0.tar.gz
source_hash = f64b501de66e163d6a278fbb6a95f395ee873b7a66c905dd785eae107266a709
patch_filename = cpr_1.12.0-1_patch.zip
patch_url = https://wrapdb.mesonbuild.com/v2/cpr_1.12.0-1/get_patch
patch_hash = 16404431dd8b2dbb49afc78a07b3bbe3c84c9f83ce1f45c3510934fadab99e72
source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/cpr_1.12.0-1/cpr-1.12.0.tar.gz
wrapdb_version = 1.12.0-1
directory = cpr-1.14.1
source_url = https://github.com/libcpr/cpr/archive/1.14.1.tar.gz
source_filename = cpr-1.14.1.tar.gz
source_hash = 213ccc7c98683d2ca6304d9760005effa12ec51d664bababf114566cb2b1e23c
source_fallback_url = https://wrapdb.mesonbuild.com/v2/cpr_1.14.1-1/get_source/cpr-1.14.1.tar.gz
patch_filename = cpr_1.14.1-1_patch.zip
patch_url = https://wrapdb.mesonbuild.com/v2/cpr_1.14.1-1/get_patch
patch_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/cpr_1.14.1-1/cpr_1.14.1-1_patch.zip
patch_hash = e5930186aa8cfb9383a468a80d177b3a4c4fcc5f38deb6fca13d96263ce36459
wrapdb_version = 1.14.1-1

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the bump of version is due to the fact that 1.12 did not support connection pooling? I would probably add a little it of a comment on that in the summary just to help people get more context.

I assume integration tests are going to help catch any potential regression not directly related to this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, it would be better to split this into two separate PRs, one for bumping the cpr version, the other for connection pool refactoring.

[provide]
cpr = cpr_dep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this?

dependency_names = cpr
Loading