diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5eb26e06b..c60f42b1d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -41,12 +41,24 @@ jobs: name: AMD64 Ubuntu 24.04 runs-on: ubuntu-24.04 timeout-minutes: 30 + strategy: + fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@v6 - name: Install dependencies shell: bash run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: bash env: @@ -63,9 +75,21 @@ jobs: name: AArch64 macOS 26 runs-on: macos-26 timeout-minutes: 30 + strategy: + fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@v6 + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: bash run: ci/scripts/build_iceberg.sh $(pwd) @@ -76,6 +100,15 @@ jobs: name: AMD64 Windows 2025 runs-on: windows-2025 timeout-minutes: 60 + strategy: + fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@v6 @@ -85,6 +118,9 @@ jobs: vcpkg install zlib:x64-windows nlohmann-json:x64-windows nanoarrow:x64-windows roaring:x64-windows cpr:x64-windows - name: Setup sccache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: cmd env: diff --git a/CMakeLists.txt b/CMakeLists.txt index e7281fb11..c0f2eb73f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ option(ICEBERG_BUILD_TESTS "Build tests" ON) option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON) option(ICEBERG_BUILD_REST "Build rest catalog client" ON) option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF) +option(ICEBERG_S3 "Build with S3 support" OFF) option(ICEBERG_ENABLE_ASAN "Enable Address Sanitizer" OFF) option(ICEBERG_ENABLE_UBSAN "Enable Undefined Behavior Sanitizer" OFF) @@ -68,6 +69,12 @@ if(ICEBERG_BUILD_REST_INTEGRATION_TESTS AND WIN32) message(WARNING "Cannot build rest integration test on Windows, turning it off.") endif() +# ICEBERG_S3 requires ICEBERG_BUILD_BUNDLE +if(NOT ICEBERG_BUILD_BUNDLE AND ICEBERG_S3) + set(ICEBERG_S3 OFF) + message(STATUS "ICEBERG_S3 is disabled because ICEBERG_BUILD_BUNDLE is OFF") +endif() + include(CMakeParseArguments) include(IcebergBuildUtils) include(IcebergSanitizer) diff --git a/ci/scripts/build_iceberg.sh b/ci/scripts/build_iceberg.sh index 102c8ec11..cf332c21c 100755 --- a/ci/scripts/build_iceberg.sh +++ b/ci/scripts/build_iceberg.sh @@ -36,6 +36,7 @@ CMAKE_ARGS=( "-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX:-${ICEBERG_HOME}}" "-DICEBERG_BUILD_STATIC=ON" "-DICEBERG_BUILD_SHARED=ON" + "-DICEBERG_S3=ON" "-DICEBERG_BUILD_REST_INTEGRATION_TESTS=${build_rest_integration_test}" ) diff --git a/ci/scripts/start_minio.sh b/ci/scripts/start_minio.sh new file mode 100644 index 000000000..026be670c --- /dev/null +++ b/ci/scripts/start_minio.sh @@ -0,0 +1,153 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -eux + +MINIO_ROOT_USER="${MINIO_ROOT_USER:-minio}" +MINIO_ROOT_PASSWORD="${MINIO_ROOT_PASSWORD:-minio123}" +MINIO_IMAGE="${MINIO_IMAGE:-minio/minio:latest}" +MINIO_CONTAINER_NAME="${MINIO_CONTAINER_NAME:-iceberg-minio}" +MINIO_PORT="${MINIO_PORT:-9000}" +MINIO_CONSOLE_PORT="${MINIO_CONSOLE_PORT:-9001}" +MINIO_BUCKET="${MINIO_BUCKET:-iceberg-test}" +MINIO_ENDPOINT="${MINIO_ENDPOINT:-http://127.0.0.1:${MINIO_PORT}}" + +wait_for_minio() { + for i in {1..30}; do + if curl -fsS "${MINIO_ENDPOINT}/minio/health/ready" >/dev/null; then + return 0 + fi + sleep 1 + done + echo "MinIO did not become ready after 30 seconds." >&2 + echo "Endpoint: ${MINIO_ENDPOINT}" >&2 + if command -v docker >/dev/null 2>&1; then + docker logs "${MINIO_CONTAINER_NAME}" 2>&1 || true + fi + return 1 +} + +start_minio_docker() { + if ! command -v docker >/dev/null 2>&1; then + return 1 + fi + + if docker ps -a --format '{{.Names}}' | grep -q "^${MINIO_CONTAINER_NAME}\$"; then + docker rm -f "${MINIO_CONTAINER_NAME}" + fi + + docker run -d --name "${MINIO_CONTAINER_NAME}" \ + -p "${MINIO_PORT}:9000" -p "${MINIO_CONSOLE_PORT}:9001" \ + -e "MINIO_ROOT_USER=${MINIO_ROOT_USER}" \ + -e "MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}" \ + "${MINIO_IMAGE}" \ + server /data --console-address ":${MINIO_CONSOLE_PORT}" + + wait_for_minio +} + +start_minio_macos() { + if ! command -v brew >/dev/null 2>&1; then + echo "brew is required to start MinIO on macOS without Docker" >&2 + return 1 + fi + + brew install minio + MINIO_ROOT_USER="${MINIO_ROOT_USER}" MINIO_ROOT_PASSWORD="${MINIO_ROOT_PASSWORD}" \ + minio server /tmp/minio --console-address ":${MINIO_CONSOLE_PORT}" & + wait_for_minio +} + +download_mc() { + local uname_out + uname_out="$(uname -s)" + + local mc_dir + mc_dir="${RUNNER_TEMP:-/tmp}" + mkdir -p "${mc_dir}" + + case "${uname_out}" in + Linux*) + MC_BIN="${mc_dir}/mc" + curl -sSL "https://dl.min.io/client/mc/release/linux-amd64/mc" -o "${MC_BIN}" + chmod +x "${MC_BIN}" + ;; + Darwin*) + MC_BIN="${mc_dir}/mc" + local arch + arch="$(uname -m)" + if [ "${arch}" = "arm64" ]; then + curl -sSL "https://dl.min.io/client/mc/release/darwin-arm64/mc" -o "${MC_BIN}" + else + curl -sSL "https://dl.min.io/client/mc/release/darwin-amd64/mc" -o "${MC_BIN}" + fi + chmod +x "${MC_BIN}" + ;; + MINGW*|MSYS*|CYGWIN*) + MC_BIN="${mc_dir}/mc.exe" + curl -sSL "https://dl.min.io/client/mc/release/windows-amd64/mc.exe" -o "${MC_BIN}" + ;; + *) + echo "Unsupported OS for mc: ${uname_out}" >&2 + return 1 + ;; + esac +} + +create_bucket() { + download_mc + for i in {1..30}; do + if "${MC_BIN}" alias set local "${MINIO_ENDPOINT}" "${MINIO_ROOT_USER}" "${MINIO_ROOT_PASSWORD}"; then + break + fi + sleep 1 + done + "${MC_BIN}" mb --ignore-existing "local/${MINIO_BUCKET}" +} + +start_minio_windows() { + local minio_dir="${RUNNER_TEMP:-/tmp}" + local minio_bin="${minio_dir}/minio.exe" + curl -sSL "https://dl.min.io/server/minio/release/windows-amd64/minio.exe" -o "${minio_bin}" + MINIO_ROOT_USER="${MINIO_ROOT_USER}" MINIO_ROOT_PASSWORD="${MINIO_ROOT_PASSWORD}" \ + "${minio_bin}" server "${minio_dir}/minio-data" --console-address ":${MINIO_CONSOLE_PORT}" & + wait_for_minio +} + +case "$(uname -s)" in + Darwin*) + if ! start_minio_docker; then + start_minio_macos + fi + ;; + MINGW*|MSYS*|CYGWIN*) + if ! start_minio_docker; then + start_minio_windows + fi + ;; + Linux*) + start_minio_docker + ;; + *) + echo "Unsupported OS: $(uname -s)" >&2 + exit 1 + ;; +esac + +create_bucket diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index 8b32eb749..d4f837d67 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -102,6 +102,7 @@ function(resolve_arrow_dependency) # Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) set(ARROW_IPC ON) set(ARROW_FILESYSTEM ON) + set(ARROW_S3 ${ICEBERG_S3}) set(ARROW_JSON ON) set(ARROW_PARQUET ON) set(ARROW_SIMD_LEVEL "NONE") @@ -164,6 +165,13 @@ function(resolve_arrow_dependency) install(FILES ${arrow_bundled_dependencies_location} DESTINATION ${ICEBERG_INSTALL_LIBDIR}) endif() + + # Arrow's exported static target interface may reference system libraries + # (e.g. OpenSSL, CURL, ZLIB) that consumers need to find. + list(APPEND ICEBERG_SYSTEM_DEPENDENCIES ZLIB) + if(ARROW_S3) + list(APPEND ICEBERG_SYSTEM_DEPENDENCIES OpenSSL CURL) + endif() else() set(ARROW_VENDORED FALSE) find_package(Arrow CONFIG REQUIRED) diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 4cd5e92c5..1c6a13cb9 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -43,6 +43,7 @@ set(ICEBERG_SOURCES expression/rewrite_not.cc expression/strict_metrics_evaluator.cc expression/term.cc + file_io_registry.cc file_reader.cc file_writer.cc inheritable_metadata.cc @@ -181,6 +182,8 @@ add_subdirectory(util) if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES arrow/arrow_fs_file_io.cc + arrow/s3/arrow_s3_file_io.cc + arrow/file_io_register.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc @@ -247,6 +250,18 @@ if(ICEBERG_BUILD_BUNDLE) OUTPUTS ICEBERG_BUNDLE_LIBRARIES) + foreach(target iceberg_bundle_static iceberg_bundle_shared) + if(TARGET ${target}) + if(ICEBERG_S3) + target_compile_definitions(${target} + PUBLIC "$") + else() + target_compile_definitions(${target} + PUBLIC "$") + endif() + endif() + endforeach() + add_subdirectory(arrow) add_subdirectory(avro) add_subdirectory(parquet) diff --git a/src/iceberg/arrow/CMakeLists.txt b/src/iceberg/arrow/CMakeLists.txt index 3416d5e95..71c161f02 100644 --- a/src/iceberg/arrow/CMakeLists.txt +++ b/src/iceberg/arrow/CMakeLists.txt @@ -16,3 +16,5 @@ # under the License. iceberg_install_all_headers(iceberg/arrow) + +add_subdirectory(s3) diff --git a/src/iceberg/arrow/arrow_file_io.h b/src/iceberg/arrow/arrow_file_io.h index 12a9b2303..e33826dc6 100644 --- a/src/iceberg/arrow/arrow_file_io.h +++ b/src/iceberg/arrow/arrow_file_io.h @@ -20,9 +20,12 @@ #pragma once #include +#include +#include #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" +#include "iceberg/result.h" namespace iceberg::arrow { @@ -30,4 +33,21 @@ ICEBERG_BUNDLE_EXPORT std::unique_ptr MakeMockFileIO(); ICEBERG_BUNDLE_EXPORT std::unique_ptr MakeLocalFileIO(); +/// \brief Create an S3 FileIO backed by Arrow's S3FileSystem. +/// +/// This function initializes the S3 subsystem if not already initialized (thread-safe). +/// The S3 initialization is cached once per process. +/// +/// \param properties Configuration properties for S3 access. See S3Properties +/// for available keys (credentials, region, endpoint, timeouts, etc.). +/// \return A FileIO instance for S3 operations, or an error if S3 is not supported. +ICEBERG_BUNDLE_EXPORT Result> MakeS3FileIO( + const std::unordered_map& properties = {}); + +/// \brief Finalize (clean up) the Arrow S3 subsystem. +/// +/// Must be called before process exit if S3 was initialized, otherwise Arrow's +/// static destructors may cause a non-zero exit. +ICEBERG_BUNDLE_EXPORT Status FinalizeS3(); + } // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc b/src/iceberg/arrow/arrow_fs_file_io.cc index be62b79af..769fcfb13 100644 --- a/src/iceberg/arrow/arrow_fs_file_io.cc +++ b/src/iceberg/arrow/arrow_fs_file_io.cc @@ -25,13 +25,23 @@ #include "iceberg/arrow/arrow_file_io.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/util/macros.h" namespace iceberg::arrow { +Result ArrowFileSystemFileIO::ResolvePath(const std::string& file_location) { + if (file_location.find("://") != std::string::npos) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto path, arrow_fs_->PathFromUri(file_location)); + return path; + } + return file_location; +} + /// \brief Read the content of the file at the given location. Result ArrowFileSystemFileIO::ReadFile(const std::string& file_location, std::optional length) { - ::arrow::fs::FileInfo file_info(file_location); + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ::arrow::fs::FileInfo file_info(path); if (length.has_value()) { file_info.set_size(length.value()); } @@ -47,6 +57,10 @@ Result ArrowFileSystemFileIO::ReadFile(const std::string& file_loca ICEBERG_ARROW_ASSIGN_OR_RETURN( auto read_bytes, file->Read(read_length, reinterpret_cast(&content[offset]))); + if (read_bytes == 0) { + return IOError("Unexpected EOF reading {}: got {} of {} bytes", file_location, + offset, file_size); + } remain -= read_bytes; offset += read_bytes; } @@ -57,7 +71,8 @@ Result ArrowFileSystemFileIO::ReadFile(const std::string& file_loca /// \brief Write the given content to the file at the given location. Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location, std::string_view content) { - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(file_location)); + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path)); ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size())); ICEBERG_ARROW_RETURN_NOT_OK(file->Flush()); ICEBERG_ARROW_RETURN_NOT_OK(file->Close()); @@ -66,7 +81,8 @@ Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location, /// \brief Delete a file at the given location. Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) { - ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(file_location)); + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFile(path)); return {}; } diff --git a/src/iceberg/arrow/arrow_fs_file_io_internal.h b/src/iceberg/arrow/arrow_fs_file_io_internal.h index f151c7a5b..92a991501 100644 --- a/src/iceberg/arrow/arrow_fs_file_io_internal.h +++ b/src/iceberg/arrow/arrow_fs_file_io_internal.h @@ -56,6 +56,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; } private: + /// \brief Resolve a file location to a filesystem path. + Result ResolvePath(const std::string& file_location); + std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_; }; diff --git a/src/iceberg/arrow/file_io_register.cc b/src/iceberg/arrow/file_io_register.cc new file mode 100644 index 000000000..1140e49b8 --- /dev/null +++ b/src/iceberg/arrow/file_io_register.cc @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "iceberg/arrow/file_io_register.h" + +#include +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/file_io_registry.h" + +namespace iceberg::arrow { + +namespace { + +void RegisterLocalFileIO() { + FileIORegistry::Register( + std::string(FileIORegistry::kArrowLocalFileIO), + [](const std::unordered_map& /*properties*/) + -> Result> { return MakeLocalFileIO(); }); +} + +void RegisterS3FileIO() { +#if ICEBERG_S3_ENABLED + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [](const std::unordered_map& properties) + -> Result> { return MakeS3FileIO(properties); }); +#endif +} + +} // namespace + +void EnsureArrowFileIOsRegistered() { + static std::once_flag flag; + std::call_once(flag, []() { + RegisterLocalFileIO(); + RegisterS3FileIO(); + }); +} + +[[maybe_unused]] const bool kArrowFileIOsRegistered = []() { + EnsureArrowFileIOsRegistered(); + return true; +}(); + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/file_io_register.h b/src/iceberg/arrow/file_io_register.h new file mode 100644 index 000000000..1b4622bd7 --- /dev/null +++ b/src/iceberg/arrow/file_io_register.h @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/arrow/file_io_register.h +/// \brief Provide functions to register Arrow FileIO implementations. + +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::arrow { + +/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry. +/// +/// This operation is idempotent and safe to call multiple times. +ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered(); + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/s3/CMakeLists.txt b/src/iceberg/arrow/s3/CMakeLists.txt new file mode 100644 index 000000000..27eda89ed --- /dev/null +++ b/src/iceberg/arrow/s3/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/arrow/s3) diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc b/src/iceberg/arrow/s3/arrow_s3_file_io.cc new file mode 100644 index 000000000..808415d0a --- /dev/null +++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#if ICEBERG_S3_ENABLED +# include +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3/s3_properties.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" + +namespace iceberg::arrow { + +namespace { + +#if ICEBERG_S3_ENABLED +const std::string* FindProperty( + const std::unordered_map& properties, + std::string_view key) { + auto it = properties.find(std::string(key)); + return it == properties.end() ? nullptr : &it->second; +} + +Result> ParseOptionalBool( + const std::unordered_map& properties, + std::string_view key) { + const auto* value = FindProperty(properties, key); + if (value == nullptr) { + return std::nullopt; + } + if (*value == "true") { + return true; + } + if (*value == "false") { + return false; + } + return InvalidArgument(R"("{}" must be "true" or "false")", key); +} + +Status EnsureS3Initialized() { + static const ::arrow::Status init_status = []() { + auto options = ::arrow::fs::S3GlobalOptions::Defaults(); + return ::arrow::fs::InitializeS3(options); + }(); + if (!init_status.ok()) { + return std::unexpected(Error{.kind = ::iceberg::arrow::ToErrorKind(init_status), + .message = init_status.ToString()}); + } + return {}; +} + +/// \brief Configure S3Options from a properties map. +/// +/// \param properties The configuration properties map. +/// \return Configured S3Options. +Result<::arrow::fs::S3Options> ConfigureS3Options( + const std::unordered_map& properties) { + auto options = ::arrow::fs::S3Options::Defaults(); + + // Configure credentials + const auto* access_key = FindProperty(properties, S3Properties::kAccessKeyId); + const auto* secret_key = FindProperty(properties, S3Properties::kSecretAccessKey); + const auto* session_token = FindProperty(properties, S3Properties::kSessionToken); + + if ((access_key == nullptr) != (secret_key == nullptr)) { + return InvalidArgument( + "S3 client access key ID and secret access key must be set at the same time"); + } + if (access_key != nullptr) { + if (session_token != nullptr) { + options.ConfigureAccessKey(*access_key, *secret_key, *session_token); + } else { + options.ConfigureAccessKey(*access_key, *secret_key); + } + } + + // Configure region + if (const auto* region = FindProperty(properties, S3Properties::kRegion); + region != nullptr) { + options.region = *region; + } + + // Configure endpoint (for MinIO, LocalStack, etc.) + if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint); + endpoint != nullptr) { + options.endpoint_override = *endpoint; + } else { + // Fall back to AWS standard environment variables for endpoint override + const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); + if (s3_endpoint_env != nullptr) { + options.endpoint_override = s3_endpoint_env; + } else { + const char* endpoint_env = std::getenv("AWS_ENDPOINT_URL"); + if (endpoint_env != nullptr) { + options.endpoint_override = endpoint_env; + } + } + } + + ICEBERG_ASSIGN_OR_RAISE(const auto path_style_access, + ParseOptionalBool(properties, S3Properties::kPathStyleAccess)); + if (path_style_access.has_value()) { + options.force_virtual_addressing = !*path_style_access; + } + + // Configure SSL + ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled, + ParseOptionalBool(properties, S3Properties::kSslEnabled)); + if (ssl_enabled.has_value() && !*ssl_enabled) { + options.scheme = "http"; + } + + // Configure timeouts + auto connect_timeout_it = properties.find(std::string(S3Properties::kConnectTimeoutMs)); + if (connect_timeout_it != properties.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto timeout_ms, + StringUtils::ParseNumber(connect_timeout_it->second)); + options.connect_timeout = timeout_ms / 1000.0; + } + + auto socket_timeout_it = properties.find(std::string(S3Properties::kSocketTimeoutMs)); + if (socket_timeout_it != properties.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto timeout_ms, + StringUtils::ParseNumber(socket_timeout_it->second)); + options.request_timeout = timeout_ms / 1000.0; + } + + return options; +} +#endif + +} // namespace + +Result> MakeS3FileIO( + const std::unordered_map& properties) { +#if ICEBERG_S3_ENABLED + ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized()); + + // Configure S3 options from properties (uses default credentials if empty) + ICEBERG_ASSIGN_OR_RAISE(auto options, ConfigureS3Options(properties)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options)); + + return std::make_unique(std::move(fs)); +#else + return NotSupported("Arrow S3 support is not enabled"); +#endif +} + +Status FinalizeS3() { +#if ICEBERG_S3_ENABLED + auto status = ::arrow::fs::FinalizeS3(); + ICEBERG_ARROW_RETURN_NOT_OK(status); + return {}; +#else + return NotSupported("Arrow S3 support is not enabled"); +#endif +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/s3/s3_properties.h b/src/iceberg/arrow/s3/s3_properties.h new file mode 100644 index 000000000..53657743d --- /dev/null +++ b/src/iceberg/arrow/s3/s3_properties.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +namespace iceberg::arrow { + +/// \brief S3 configuration property keys for ArrowS3FileIO. +/// +/// These constants define the property keys used to configure S3 access +/// via the Arrow filesystem integration, following the Iceberg spec for +/// S3 configuration properties. +struct S3Properties { + /// S3 URI scheme + static constexpr std::string_view kS3Schema = "s3"; + /// AWS access key ID + static constexpr std::string_view kAccessKeyId = "s3.access-key-id"; + /// AWS secret access key + static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key"; + /// AWS session token (for temporary credentials) + static constexpr std::string_view kSessionToken = "s3.session-token"; + /// AWS region + static constexpr std::string_view kRegion = "s3.region"; + /// Custom endpoint override (for MinIO, LocalStack, etc.) + static constexpr std::string_view kEndpoint = "s3.endpoint"; + /// Whether to use path-style access (needed for MinIO) + static constexpr std::string_view kPathStyleAccess = "s3.path-style-access"; + /// Whether SSL is enabled + static constexpr std::string_view kSslEnabled = "s3.ssl.enabled"; + /// Connection timeout in milliseconds + static constexpr std::string_view kConnectTimeoutMs = "s3.connect-timeout-ms"; + /// Socket timeout in milliseconds + static constexpr std::string_view kSocketTimeoutMs = "s3.socket-timeout-ms"; +}; + +} // namespace iceberg::arrow diff --git a/src/iceberg/catalog/rest/CMakeLists.txt b/src/iceberg/catalog/rest/CMakeLists.txt index e91b12962..b862bc869 100644 --- a/src/iceberg/catalog/rest/CMakeLists.txt +++ b/src/iceberg/catalog/rest/CMakeLists.txt @@ -30,6 +30,7 @@ set(ICEBERG_REST_SOURCES json_serde.cc resource_paths.cc rest_catalog.cc + rest_file_io.cc rest_util.cc types.cc) diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h index 41d7da796..0515926c7 100644 --- a/src/iceberg/catalog/rest/catalog_properties.h +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -47,6 +47,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties inline static Entry kName{"name", ""}; /// \brief The warehouse path. inline static Entry kWarehouse{"warehouse", ""}; + /// \brief The FileIO implementation name. + inline static Entry kIOImpl{"io-impl", ""}; /// \brief The optional prefix for REST API paths. inline static Entry kPrefix{"prefix", ""}; /// \brief The encoded separator used to join namespace levels in REST paths. diff --git a/src/iceberg/catalog/rest/meson.build b/src/iceberg/catalog/rest/meson.build index ef2500456..a1f8ce973 100644 --- a/src/iceberg/catalog/rest/meson.build +++ b/src/iceberg/catalog/rest/meson.build @@ -28,6 +28,7 @@ iceberg_rest_sources = files( 'json_serde.cc', 'resource_paths.cc', 'rest_catalog.cc', + 'rest_file_io.cc', 'rest_util.cc', 'types.cc', ) @@ -71,6 +72,7 @@ install_headers( 'iceberg_rest_export.h', 'resource_paths.h', 'rest_catalog.h', + 'rest_file_io.h', 'rest_util.h', 'type_fwd.h', 'types.h', diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index ebb03bf84..a0267adcb 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -34,6 +34,7 @@ #include "iceberg/catalog/rest/http_client.h" #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/resource_paths.h" +#include "iceberg/catalog/rest/rest_file_io.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" #include "iceberg/json_serde_internal.h" @@ -121,11 +122,8 @@ Result CaptureNoSuchNamespace(const auto& status) { RestCatalog::~RestCatalog() = default; Result> RestCatalog::Make( - const RestCatalogProperties& config, std::shared_ptr file_io) { + const RestCatalogProperties& config) { ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri()); - if (!file_io) { - return InvalidArgument("FileIO is required to create RestCatalog"); - } std::string catalog_name = config.Get(RestCatalogProperties::kName); ICEBERG_ASSIGN_OR_RAISE(auto auth_manager, @@ -172,6 +170,9 @@ Result> RestCatalog::Make( ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); + // Create FileIO with the final configuration + ICEBERG_ASSIGN_OR_RAISE(auto file_io, MakeCatalogFileIO(final_config)); + return std::shared_ptr( new RestCatalog(std::move(final_config), std::move(file_io), std::move(client), std::move(paths), std::move(endpoints), std::move(auth_manager), @@ -473,6 +474,7 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + /// FIXME: support per-table FileIO creation return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, shared_from_this()); diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 38230a5e2..4fd4db5b8 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -46,13 +46,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, RestCatalog(RestCatalog&&) = delete; RestCatalog& operator=(RestCatalog&&) = delete; - /// \brief Create a RestCatalog instance - /// - /// \param config the configuration for the RestCatalog - /// \param file_io the FileIO instance to use for table operations - /// \return a shared_ptr to RestCatalog instance - static Result> Make(const RestCatalogProperties& config, - std::shared_ptr file_io); + /// \brief Create a RestCatalog instance. + static Result> Make(const RestCatalogProperties& config); std::string_view name() const override; diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc new file mode 100644 index 000000000..f08a03353 --- /dev/null +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_file_io.h" + +#include + +#include "iceberg/file_io_registry.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest { + +namespace { + +bool IsBuiltinImpl(std::string_view io_impl) { + return io_impl == FileIORegistry::kArrowLocalFileIO || + io_impl == FileIORegistry::kArrowS3FileIO; +} + +} // namespace + +Result DetectBuiltinFileIO(std::string_view location) { + const auto pos = location.find("://"); + if (pos == std::string_view::npos) { + return BuiltinFileIOKind::kArrowLocal; + } + + const auto scheme = location.substr(0, pos); + if (scheme == "file") { + return BuiltinFileIOKind::kArrowLocal; + } + if (scheme == "s3") { + return BuiltinFileIOKind::kArrowS3; + } + + return NotSupported("URI scheme '{}' is not supported for automatic FileIO resolution", + scheme); +} + +std::string_view BuiltinFileIOName(BuiltinFileIOKind kind) { + switch (kind) { + case BuiltinFileIOKind::kArrowLocal: + return FileIORegistry::kArrowLocalFileIO; + case BuiltinFileIOKind::kArrowS3: + return FileIORegistry::kArrowS3FileIO; + } + std::unreachable(); +} + +Result> MakeCatalogFileIO(const RestCatalogProperties& config) { + std::string io_impl = config.Get(RestCatalogProperties::kIOImpl); + std::string warehouse = config.Get(RestCatalogProperties::kWarehouse); + + if (io_impl.empty()) { + if (warehouse.empty()) { + return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)", + RestCatalogProperties::kIOImpl.key(), + RestCatalogProperties::kWarehouse.key()); + } + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse)); + io_impl = std::string(BuiltinFileIOName(detected_kind)); + } + + if (!warehouse.empty() && IsBuiltinImpl(io_impl)) { + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse)); + const auto detected_name = BuiltinFileIOName(detected_kind); + if (io_impl != detected_name) { + return InvalidArgument( + R"("io-impl" value '{}' is incompatible with warehouse '{}')", io_impl, + warehouse); + } + } + + // TODO(gangwu): Support Java-style customized FileIO creation flows instead of + // resolving a single catalog-scoped FileIO instance only from properties. + return FileIORegistry::Load(io_impl, config.configs()); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h new file mode 100644 index 000000000..68482521a --- /dev/null +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/file_io.h" +#include "iceberg/file_io_registry.h" +#include "iceberg/result.h" + +namespace iceberg::rest { + +enum class BuiltinFileIOKind : uint8_t { + kArrowLocal, + kArrowS3, +}; + +ICEBERG_REST_EXPORT Result DetectBuiltinFileIO( + std::string_view location); + +ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind); + +ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( + const RestCatalogProperties& config); + +} // namespace iceberg::rest diff --git a/src/iceberg/file_io_registry.cc b/src/iceberg/file_io_registry.cc new file mode 100644 index 000000000..ffba8677a --- /dev/null +++ b/src/iceberg/file_io_registry.cc @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "iceberg/file_io_registry.h" + +#include +#include + +namespace iceberg { + +namespace { + +struct RegistryState { + std::mutex mutex; + std::unordered_map registry; +}; + +RegistryState& State() { + static RegistryState state; + return state; +} + +} // namespace + +void FileIORegistry::Register(const std::string& name, Factory factory) { + auto& state = State(); + std::lock_guard lock(state.mutex); + state.registry[name] = std::move(factory); +} + +Result> FileIORegistry::Load( + const std::string& name, + const std::unordered_map& properties) { + Factory factory; + { + auto& state = State(); + std::lock_guard lock(state.mutex); + auto it = state.registry.find(name); + if (it == state.registry.end()) { + return NotFound("FileIO implementation not found: {}", name); + } + factory = it->second; + } + return factory(properties); +} + +} // namespace iceberg diff --git a/src/iceberg/file_io_registry.h b/src/iceberg/file_io_registry.h new file mode 100644 index 000000000..a0be1b563 --- /dev/null +++ b/src/iceberg/file_io_registry.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Registry for FileIO implementations. +/// +/// Provides a mechanism to register and load FileIO implementations by name. +/// This allows the REST catalog (and others) to resolve FileIO implementations +/// at runtime based on configuration properties like "io-impl". +class ICEBERG_EXPORT FileIORegistry { + public: + static constexpr std::string_view kArrowLocalFileIO = "arrow-fs-local"; + static constexpr std::string_view kArrowS3FileIO = "arrow-fs-s3"; + + /// Factory function type for creating FileIO instances. + using Factory = std::function>( + const std::unordered_map& properties)>; + + /// \brief Register a FileIO factory under the given name. + /// + /// \param name The implementation name (e.g., "local", "s3") + /// \param factory The factory function that creates the FileIO instance. + static void Register(const std::string& name, Factory factory); + + /// \brief Load a FileIO implementation by name. + /// + /// \param name The implementation name to look up. + /// \param properties Configuration properties to pass to the factory. + /// \return A unique_ptr to the FileIO instance, or an error if not found. + static Result> Load( + const std::string& name, + const std::unordered_map& properties); +}; + +} // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 99ac01f6d..1e6953c80 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -65,6 +65,7 @@ iceberg_sources = files( 'expression/rewrite_not.cc', 'expression/strict_metrics_evaluator.cc', 'expression/term.cc', + 'file_io_registry.cc', 'file_reader.cc', 'file_writer.cc', 'inheritable_metadata.cc', @@ -148,8 +149,8 @@ iceberg_sources = files( # CRoaring does not export symbols, so on Windows it must # be used as a static lib croaring_needs_static = ( - get_option('default_library') == 'static' or - host_machine.system() == 'windows' + get_option('default_library') == 'static' + or host_machine.system() == 'windows' ) croaring_dep = dependency('croaring', static: croaring_needs_static) nanoarrow_dep = dependency('nanoarrow') @@ -193,6 +194,7 @@ install_headers( 'exception.h', 'file_format.h', 'file_io.h', + 'file_io_registry.h', 'file_reader.h', 'file_writer.h', 'iceberg_export.h', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2dc90da64..58edcf7e0 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -145,6 +145,10 @@ if(ICEBERG_BUILD_BUNDLE) metadata_io_test.cc struct_like_test.cc) + if(ICEBERG_S3) + add_iceberg_test(file_io_test USE_BUNDLE SOURCES arrow_s3_file_io_test.cc) + endif() + add_iceberg_test(catalog_test USE_BUNDLE SOURCES in_memory_catalog_test.cc) add_iceberg_test(eval_expr_test @@ -227,6 +231,7 @@ if(ICEBERG_BUILD_REST) SOURCES auth_manager_test.cc endpoint_test.cc + rest_file_io_test.cc rest_json_serde_test.cc rest_util_test.cc) diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc new file mode 100644 index 000000000..d890ad10e --- /dev/null +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/s3/s3_properties.h" +#include "iceberg/test/matchers.h" + +namespace { + +std::optional GetEnvIfSet(const char* key) { + const char* value = std::getenv(key); + if (value == nullptr || std::string_view(value).empty()) { + return std::nullopt; + } + return std::string(value); +} + +std::string MakeObjectUri(std::string_view base_uri, std::string_view object_name) { + std::string object_uri(base_uri); + if (!object_uri.ends_with('/')) { + object_uri += '/'; + } + object_uri += object_name; + return object_uri; +} + +std::unordered_map PropertiesFromEnv() { + std::unordered_map properties; + + if (const auto access_key = GetEnvIfSet("AWS_ACCESS_KEY_ID")) { + properties[std::string(iceberg::arrow::S3Properties::kAccessKeyId)] = *access_key; + } + if (const auto secret_key = GetEnvIfSet("AWS_SECRET_ACCESS_KEY")) { + properties[std::string(iceberg::arrow::S3Properties::kSecretAccessKey)] = *secret_key; + } + if (const auto endpoint = GetEnvIfSet("ICEBERG_TEST_S3_ENDPOINT")) { + properties[std::string(iceberg::arrow::S3Properties::kEndpoint)] = *endpoint; + } + if (const auto region = GetEnvIfSet("AWS_REGION")) { + properties[std::string(iceberg::arrow::S3Properties::kRegion)] = *region; + } + + return properties; +} + +} // namespace + +namespace iceberg::arrow { + +namespace { + +class ArrowS3FileIOTest : public ::testing::Test { + protected: + static void TearDownTestSuite() { + auto status = FinalizeS3(); + if (!status.has_value()) { + std::cerr << "Warning: FinalizeS3 failed: " << status.error().message << std::endl; + } + } + + void SetUp() override { base_uri_ = GetEnvIfSet("ICEBERG_TEST_S3_URI"); } + + std::string ObjectUri(std::string_view object_name) const { + return MakeObjectUri(*base_uri_, object_name); + } + + void RequireIntegrationEnv() const { + if (!base_uri_.has_value()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + } + + private: + std::optional base_uri_; +}; + +} // namespace + +TEST_F(ArrowS3FileIOTest, CreateWithDefaultProperties) { + auto result = MakeS3FileIO({}); + ASSERT_THAT(result, IsOk()); + EXPECT_NE(result.value(), nullptr); +} + +TEST_F(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) { + auto result = MakeS3FileIO(); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(ArrowS3FileIOTest, RejectsIncompleteStaticCredentials) { + auto result = + MakeS3FileIO({{std::string(S3Properties::kAccessKeyId), "access-key-only"}}); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage( + "S3 client access key ID and secret access key must be set")); +} + +TEST_F(ArrowS3FileIOTest, RejectsInvalidBooleanProperties) { + auto result = + MakeS3FileIO({{std::string(S3Properties::kPathStyleAccess), "not-a-bool"}}); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(ArrowS3FileIOTest, ReadWriteFile) { + RequireIntegrationEnv(); + auto io_res = MakeS3FileIO(); + ASSERT_THAT(io_res, IsOk()); + auto io = std::move(io_res).value(); + + auto object_uri = ObjectUri("iceberg_s3_io_test.txt"); + auto write_res = io->WriteFile(object_uri, "hello s3"); + ASSERT_THAT(write_res, IsOk()); + + auto read_res = io->ReadFile(object_uri, std::nullopt); + ASSERT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3"))); + + auto del_res = io->DeleteFile(object_uri); + EXPECT_THAT(del_res, IsOk()); +} + +TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithProperties) { + RequireIntegrationEnv(); + auto io_res = MakeS3FileIO(PropertiesFromEnv()); + ASSERT_THAT(io_res, IsOk()); + auto io = std::move(io_res).value(); + + auto object_uri = ObjectUri("iceberg_s3_io_props_test.txt"); + auto write_res = io->WriteFile(object_uri, "hello s3 with properties"); + ASSERT_THAT(write_res, IsOk()); + + auto read_res = io->ReadFile(object_uri, std::nullopt); + ASSERT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3 with properties"))); + + auto del_res = io->DeleteFile(object_uri); + EXPECT_THAT(del_res, IsOk()); +} + +TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithSslDisabled) { + RequireIntegrationEnv(); + std::unordered_map properties; + properties[std::string(S3Properties::kSslEnabled)] = "false"; + + auto io_res = MakeS3FileIO(properties); + ASSERT_THAT(io_res, IsOk()); +} + +TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) { + RequireIntegrationEnv(); + std::unordered_map properties; + properties[std::string(S3Properties::kConnectTimeoutMs)] = "5000"; + properties[std::string(S3Properties::kSocketTimeoutMs)] = "10000"; + + auto io_res = MakeS3FileIO(properties); + ASSERT_THAT(io_res, IsOk()); +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 9a8da9dd5..9e6b16809 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -110,6 +110,7 @@ if get_option('rest').enabled() 'sources': files( 'auth_manager_test.cc', 'endpoint_test.cc', + 'rest_file_io_test.cc', 'rest_json_serde_test.cc', 'rest_util_test.cc', ), diff --git a/src/iceberg/test/rest_catalog_integration_test.cc b/src/iceberg/test/rest_catalog_integration_test.cc index b364ffd36..3de7e722a 100644 --- a/src/iceberg/test/rest_catalog_integration_test.cc +++ b/src/iceberg/test/rest_catalog_integration_test.cc @@ -39,6 +39,7 @@ #include "iceberg/catalog/rest/http_client.h" #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/rest_catalog.h" +#include "iceberg/file_io_registry.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -65,6 +66,7 @@ constexpr std::string_view kDockerProjectName = "iceberg-rest-catalog-service"; constexpr std::string_view kCatalogName = "test_catalog"; constexpr std::string_view kWarehouseName = "default"; constexpr std::string_view kLocalhostUri = "http://localhost"; +constexpr std::string_view kStdFileIOImpl = "test.StdFileIO"; /// \brief Check if a localhost port is ready to accept connections. bool CheckServiceReady(uint16_t port) { @@ -96,6 +98,12 @@ std::string CatalogUri() { return std::format("{}:{}", kLocalhostUri, kRestCatal class RestCatalogIntegrationTest : public ::testing::Test { protected: static void SetUpTestSuite() { + FileIORegistry::Register( + std::string(kStdFileIOImpl), + [](const std::unordered_map& /*properties*/) + -> Result> { + return std::make_unique(); + }); docker_compose_ = std::make_unique( std::string{kDockerProjectName}, GetResourcePath("iceberg-rest-fixture")); docker_compose_->Up(); @@ -126,10 +134,12 @@ class RestCatalogIntegrationTest : public ::testing::Test { config.Set(RestCatalogProperties::kUri, CatalogUri()) .Set(RestCatalogProperties::kName, std::string(kCatalogName)) .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName)); + config.mutable_configs()[std::string(RestCatalogProperties::kIOImpl.key())] = + std::string(kStdFileIOImpl); for (const auto& [k, v] : extra) { config.mutable_configs()[k] = v; } - return RestCatalog::Make(config, std::make_shared()); + return RestCatalog::Make(config); } /// Create a catalog configured with a specific snapshot loading mode. diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc new file mode 100644 index 000000000..e9131da3f --- /dev/null +++ b/src/iceberg/test/rest_file_io_test.cc @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "iceberg/catalog/rest/rest_file_io.h" + +#include +#include + +#include "iceberg/file_io_registry.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::rest { + +namespace { + +class MockFileIO : public FileIO { + public: + Result ReadFile(const std::string& /*file_location*/, + std::optional /*length*/) override { + return std::string("mock"); + } + + Status WriteFile(const std::string& /*file_location*/, + std::string_view /*content*/) override { + return {}; + } + + Status DeleteFile(const std::string& /*file_location*/) override { return {}; } +}; + +} // namespace + +TEST(RestFileIOTest, DetectBuiltinKindFromScheme) { + EXPECT_THAT(DetectBuiltinFileIO("s3://bucket/path"), + HasValue(::testing::Eq(BuiltinFileIOKind::kArrowS3))); + EXPECT_THAT(DetectBuiltinFileIO("/tmp/warehouse"), + HasValue(::testing::Eq(BuiltinFileIOKind::kArrowLocal))); + EXPECT_THAT(DetectBuiltinFileIO("file:///tmp/warehouse"), + HasValue(::testing::Eq(BuiltinFileIOKind::kArrowLocal))); +} + +TEST(RestFileIOTest, DetectBuiltinKindRejectsUnsupportedScheme) { + auto result = DetectBuiltinFileIO("gs://bucket/warehouse"); + EXPECT_THAT(result, IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(result, HasErrorMessage("not supported for automatic FileIO resolution")); +} + +TEST(RestFileIOTest, MakeCatalogFileIOMissingImplAndWarehouse) { + auto result = MakeCatalogFileIO(RestCatalogProperties::default_properties()); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST(RestFileIOTest, MakeCatalogFileIORejectsIncompatibleWarehouse) { + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto config = RestCatalogProperties::FromMap( + {{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}, + {"warehouse", "/tmp/warehouse"}}); + auto result = MakeCatalogFileIO(config); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("incompatible")); +} + +TEST(RestFileIOTest, MakeCatalogFileIOAutoDetectsFromWarehouse) { + FileIORegistry::Register( + std::string(FileIORegistry::kArrowLocalFileIO), + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto config = RestCatalogProperties::FromMap({{"warehouse", "/tmp/warehouse"}}); + auto result = MakeCatalogFileIO(config); + ASSERT_THAT(result, IsOk()); +} + +TEST(RestFileIOTest, MakeCatalogFileIORejectsUnsupportedWarehouseScheme) { + auto config = RestCatalogProperties::FromMap({{"warehouse", "gs://bucket/warehouse"}}); + auto result = MakeCatalogFileIO(config); + EXPECT_THAT(result, IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(result, HasErrorMessage("not supported for automatic FileIO resolution")); +} + +TEST(RestFileIOTest, MakeCatalogFileIOAllowsCompatibleWarehouse) { + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto config = RestCatalogProperties::FromMap( + {{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}, + {"warehouse", "s3://my-bucket/warehouse"}}); + auto result = MakeCatalogFileIO(config); + ASSERT_THAT(result, IsOk()); +} + +TEST(RestFileIOTest, MakeCatalogFileIOPassesThroughCustomImpl) { + const std::string custom_impl = "com.mycompany.CustomFileIO"; + FileIORegistry::Register( + custom_impl, + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto config = RestCatalogProperties::FromMap( + {{"io-impl", custom_impl}, {"warehouse", "/tmp/warehouse"}}); + auto result = MakeCatalogFileIO(config); + ASSERT_THAT(result, IsOk()); +} + +TEST(RestFileIOTest, MakeCatalogFileIOUnregisteredCustomImplReturnsNotFound) { + auto config = RestCatalogProperties::FromMap( + {{"io-impl", "com.nonexistent.FileIO"}, {"warehouse", "/tmp/warehouse"}}); + auto result = MakeCatalogFileIO(config); + EXPECT_THAT(result, IsError(ErrorKind::kNotFound)); +} + +TEST(RestFileIOTest, MakeCatalogFileIOSkipsCheckWhenWarehouseAbsent) { + FileIORegistry::Register( + std::string(FileIORegistry::kArrowLocalFileIO), + [](const std::unordered_map& /*properties*/) + -> Result> { return std::make_unique(); }); + + auto config = RestCatalogProperties::FromMap( + {{"io-impl", std::string(FileIORegistry::kArrowLocalFileIO)}}); + auto result = MakeCatalogFileIO(config); + ASSERT_THAT(result, IsOk()); +} + +} // namespace iceberg::rest