diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index fa53cf215f..8ef6ae0eb4 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -48,17 +48,106 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/FindDuckDB.cmake) set(CMAKE_PREFIX_PATH ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api ${CMAKE_PREFIX_PATH}) find_package(DeepLakeAPI REQUIRED PATHS ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api/lib/cmake/deeplake_api NO_DEFAULT_PATH) +# AWS SDK - required by deeplake_api (symbols not bundled in the prebuilt library) +find_package(AWSSDK COMPONENTS core s3 identity-management) +if(AWSSDK_FOUND) + message(STATUS "Found AWS SDK: ${AWSSDK_LIBRARIES}") + list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${AWSSDK_LIBRARIES}) +else() + message(STATUS "AWS SDK not found via find_package, trying manual discovery...") + # Try to find AWS SDK libraries in common vcpkg locations + set(_AWS_SEARCH_PATHS + "$ENV{VCPKG_ROOT}/installed/arm64-linux/lib" + "$ENV{VCPKG_ROOT}/packages/aws-sdk-cpp_arm64-linux/lib" + ) + # Also check for AWS libs in the build's vcpkg_installed + file(GLOB _VCPKG_INSTALLED_DIRS "${CMAKE_BINARY_DIR}/vcpkg_installed/*/lib") + list(APPEND _AWS_SEARCH_PATHS ${_VCPKG_INSTALLED_DIRS}) + + set(_AWS_LIBS + aws-cpp-sdk-s3 + aws-cpp-sdk-core + aws-cpp-sdk-identity-management + aws-cpp-sdk-cognito-identity + aws-cpp-sdk-sts + aws-crt-cpp + aws-c-s3 + aws-c-auth + aws-c-http + aws-c-mqtt + aws-c-event-stream + aws-c-io + aws-c-cal + aws-c-compression + aws-c-sdkutils + aws-c-common + aws-checksums + s2n + ) + set(_FOUND_AWS_LIBS) + foreach(_lib ${_AWS_LIBS}) + find_library(_LIB_${_lib} NAMES ${_lib} PATHS ${_AWS_SEARCH_PATHS} NO_DEFAULT_PATH) + if(_LIB_${_lib}) + list(APPEND _FOUND_AWS_LIBS ${_LIB_${_lib}}) + message(STATUS " Found: ${_LIB_${_lib}}") + endif() + endforeach() + if(_FOUND_AWS_LIBS) + list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${_FOUND_AWS_LIBS}) + message(STATUS "Linked ${CMAKE_LIST_LENGTH(_FOUND_AWS_LIBS)} AWS SDK libraries manually") + else() + message(WARNING "AWS SDK libraries not found. pg_deeplake may fail to load at runtime.") + endif() +endif() + # } include_directories(${DEFAULT_PARENT_DIR}/.ext/duckdb/src/include) set(POSTGRES_DIR "${DEFAULT_PARENT_DIR}/../postgres") +# Build fingerprint: git hash + dirty state for hot-reload verification. +# Always recomputed at configure time so the .so reflects the current source. +# -c safe.directory=* is needed because the Docker container runs as root +# but the bind-mounted /deeplake is owned by the host user. +execute_process( + COMMAND git -c safe.directory=* rev-parse --short=12 HEAD + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + OUTPUT_VARIABLE _GIT_HASH + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + RESULT_VARIABLE _GIT_RESULT) +if(_GIT_RESULT EQUAL 0) + execute_process( + COMMAND git -c safe.directory=* diff --quiet + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + RESULT_VARIABLE _GIT_DIRTY) + if(_GIT_DIRTY) + set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}-dirty") + else() + set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}") + endif() +else() + set(PG_DEEPLAKE_BUILD_HASH "unknown") +endif() +message(STATUS "PG_DEEPLAKE_BUILD_HASH: ${PG_DEEPLAKE_BUILD_HASH}") + foreach(PG_VERSION ${PG_VERSIONS}) set(PG_LIB "pg_deeplake_${PG_VERSION}") message(STATUS "Creating library ${PG_LIB} with sources: ${PG_SOURCES}") ADD_LIBRARY(${PG_LIB} SHARED ${PG_SOURCES}) + # Embed build fingerprint for hot-reload verification. + # Isolated in a generated .cpp so hash changes only recompile one file + # instead of every source file in the target (configure_file only + # rewrites when content actually changes, so same-hash = zero work). + configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/deeplake_pg/build_info.cpp.in" + "${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp" + @ONLY) + target_sources(${PG_LIB} PRIVATE + "${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp") + set(PG_TARGET_NAME "configure_postgres_REL_${PG_VERSION}_0") if(TARGET ${PG_TARGET_NAME}) @@ -151,8 +240,14 @@ foreach(PG_VERSION ${PG_VERSIONS}) DESTINATION ${PG_SHAREDIR}/extension ) + # Symlink instead of copy: always points at the build output, zero I/O, + # and survives incremental builds where the target is up-to-date (the + # existing symlink still resolves to the current .so). add_custom_command(TARGET ${PG_LIB} POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy ${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} "${POSTGRES_DIR}/" - COMMENT "Copied ${CMAKE_BINARY_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} to ${POSTGRES_DIR}/" + COMMAND ${CMAKE_COMMAND} -E rm -f "${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}" + COMMAND ${CMAKE_COMMAND} -E create_symlink + "$" + "${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}" + COMMENT "Symlinked ${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} -> $" ) endforeach() diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 00ef436ad4..3fa323c23f 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -28,7 +28,7 @@ "inherits": "template-unix", "cacheVariables": { "AL_PG": "ON", - "AL_ASSERTIONS": "ON" + "AL_ASSERTIONS": "OFF" } }, { @@ -36,7 +36,7 @@ "inherits": "template-windows", "cacheVariables": { "AL_PG": "ON", - "AL_ASSERTIONS": "ON" + "AL_ASSERTIONS": "OFF" } }, { diff --git a/cpp/bifrost/async_prefetcher.hpp b/cpp/bifrost/async_prefetcher.hpp index 6d0244cd38..8e30c6999f 100644 --- a/cpp/bifrost/async_prefetcher.hpp +++ b/cpp/bifrost/async_prefetcher.hpp @@ -38,6 +38,16 @@ class async_prefetcher void start(); void stop() noexcept; + /** + * @brief Wait for the first batch to be ready (for cold run optimization). + * @param timeout_ms Maximum time to wait in milliseconds. + * + * This method is used for eager prefetching during cold runs. + * It fetches and caches the first batch so that subsequent next_batch() + * calls return immediately without blocking. + */ + void wait_for_first_batch(int64_t timeout_ms = 30000); + bool is_started() const noexcept; heimdall::dataset_view_ptr dataset() const noexcept; diff --git a/cpp/bifrost/column_streamer.hpp b/cpp/bifrost/column_streamer.hpp index dc4f7c7186..69ca7aed8c 100644 --- a/cpp/bifrost/column_streamer.hpp +++ b/cpp/bifrost/column_streamer.hpp @@ -35,6 +35,24 @@ class column_streamer return b.columns()[0].array(); } + async::promise next_batch_async() + { + return prefetcher_.next_batch_async(); + } + + /** + * @brief Pre-fetch and cache the first batch for cold run optimization. + * @param timeout_ms Maximum time to wait in milliseconds. + * + * This method waits for the first batch to be downloaded and cached + * internally. Subsequent calls to next_batch() will return immediately + * for the first batch. + */ + void ensure_first_batch_ready(int64_t timeout_ms = 30000) + { + prefetcher_.wait_for_first_batch(timeout_ms); + } + bool empty() const noexcept { return prefetcher_.size() == 0; diff --git a/cpp/cmake/modules/FindPostgres.cmake b/cpp/cmake/modules/FindPostgres.cmake index 62531f6250..815da0b209 100644 --- a/cpp/cmake/modules/FindPostgres.cmake +++ b/cpp/cmake/modules/FindPostgres.cmake @@ -1,21 +1,28 @@ include(FetchContent) include(ExternalProject) -# Define PostgreSQL versions -set(postgres_versions - "REL_16_0" - "REL_17_0" - "REL_18_0" -) - -# Define corresponding SHA256 checksums for each version -set(postgres_SHA256_CHECKSUMS - "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf" - "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34" - "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc" -) - -# Loop through each PostgreSQL version +# Map BUILD_PG_* options to version tags and SHA256 checksums. +# Only download/build PostgreSQL versions that are actually enabled, +# avoiding unnecessary network downloads and compilation. +set(postgres_versions) +set(postgres_SHA256_CHECKSUMS) + +if(BUILD_PG_16) + list(APPEND postgres_versions "REL_16_0") + list(APPEND postgres_SHA256_CHECKSUMS "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf") +endif() + +if(BUILD_PG_17) + list(APPEND postgres_versions "REL_17_0") + list(APPEND postgres_SHA256_CHECKSUMS "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34") +endif() + +if(BUILD_PG_18) + list(APPEND postgres_versions "REL_18_0") + list(APPEND postgres_SHA256_CHECKSUMS "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc") +endif() + +# Loop through each enabled PostgreSQL version foreach(postgres_version IN LISTS postgres_versions) # Find the index of the current version list(FIND postgres_versions ${postgres_version} postgres_index) diff --git a/cpp/deeplake_pg/build_info.cpp.in b/cpp/deeplake_pg/build_info.cpp.in new file mode 100644 index 0000000000..ecf48e52d4 --- /dev/null +++ b/cpp/deeplake_pg/build_info.cpp.in @@ -0,0 +1,8 @@ +// Auto-generated at CMake configure time — do not edit manually. +// Isolated so that build-hash changes only recompile this single file +// instead of all pg_deeplake source files (18+). +// configure_file() only rewrites when the content actually changes, +// so an unchanged hash means zero recompilation. + +__attribute__((visibility("default"), used)) +const char pg_deeplake_build_hash[] = "PG_DEEPLAKE_BUILD:@PG_DEEPLAKE_BUILD_HASH@"; diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 5c89ecc4a2..3f968ea3e2 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -85,8 +85,26 @@ void analyze_plan(PlannedStmt* plan) } } } + + // Warm first batches for all streamers in parallel for cold run optimization. + // This blocks until all first batches are downloaded but overlaps I/O across columns. + if (pg::eager_batch_prefetch) { + try { + table_data->get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during analyze_plan: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during analyze_plan with unknown exception"); + } + } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); + + // Pre-initialize DuckDB connection early so it's ready when query execution starts. + // This reduces cold run latency by front-loading DuckDB init. + if (pg::eager_batch_prefetch && pg::query_info::current().is_deeplake_table_referenced()) { + pg::ensure_duckdb_initialized(); + } } } // namespace pg diff --git a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp index 19f00b02ed..a91066777b 100644 --- a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp +++ b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp @@ -955,6 +955,22 @@ class deeplake_scan_function_helper } ASSERT(output_.ColumnCount() == global_state_.column_ids.size()); + + // Pre-trigger parallel batch initialization for all streaming columns. + // Without this, each column's batch download would block sequentially, + // serializing I/O waits. This overlaps all column batch downloads. + if (!has_index_search() && pg::eager_batch_prefetch) { + std::vector streaming_cols; + for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) { + const auto col_idx = global_state_.column_ids[i]; + if (bind_data_.table_data.is_column_requested(col_idx) && + bind_data_.table_data.column_has_streamer(col_idx)) { + streaming_cols.push_back(col_idx); + } + } + bind_data_.table_data.get_streamers().prefetch_batches_for_row(streaming_cols, current_row); + } + icm::vector> column_promises; // Fill output vectors column by column using table_data streamers for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) { @@ -988,10 +1004,47 @@ void deeplake_scan_function(duckdb::ClientContext& context, duckdb::TableFunctio deeplake_scan_function_helper helper(context, data, output); try { helper.scan(); + } catch (const duckdb::OutOfMemoryException& e) { + // Provide helpful error message with configuration hints for OOM + elog(ERROR, + "DuckDB out of memory during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); } catch (const duckdb::Exception& e) { - elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what()); + // Check if the error message indicates memory issues + std::string msg = e.what(); + std::string msg_lower; + msg_lower.reserve(msg.size()); + for (char c : msg) { + msg_lower.push_back(static_cast(std::tolower(static_cast(c)))); + } + if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) { + elog(ERROR, + "DuckDB memory error during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); + } else { + elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what()); + } } catch (const std::exception& e) { - elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + // Check if the error message indicates memory issues + std::string msg = e.what(); + std::string msg_lower; + msg_lower.reserve(msg.size()); + for (char c : msg) { + msg_lower.push_back(static_cast(std::tolower(static_cast(c)))); + } + if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) { + elog(ERROR, + "Memory error during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); + } else { + elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + } } catch (...) { elog(ERROR, "Unknown exception during Deeplake scan"); } diff --git a/cpp/deeplake_pg/duckdb_executor.cpp b/cpp/deeplake_pg/duckdb_executor.cpp index 0b58d60eb0..cb3359ca8b 100644 --- a/cpp/deeplake_pg/duckdb_executor.cpp +++ b/cpp/deeplake_pg/duckdb_executor.cpp @@ -17,6 +17,8 @@ #include "table_storage.hpp" #include "utils.hpp" +#include + #include #include #include @@ -74,6 +76,62 @@ std::unique_ptr create_connections() // Register the deeplake_scan table function for zero-copy access pg::register_deeplake_scan_function(*(conns->con_cpp)); + // Configure DuckDB memory management for large operations (e.g., JOINs at SF100+) + // This prevents segfaults during memory-intensive operations by enabling disk spilling + // + // Memory configuration: + // - If duckdb_memory_limit_mb > 0, use the explicit setting (in MB) + // - Otherwise, auto-detect using 80% of system memory with 256MB minimum floor + // - For containerized environments with cgroup limits, auto-detection may use host + // memory instead of container limits; set pg_deeplake.duckdb_memory_limit_mb explicitly + // + // All memory values use MB units consistently throughout this codebase + uint64_t mem_limit_mb = 0; + if (pg::duckdb_memory_limit_mb > 0) { + mem_limit_mb = static_cast(pg::duckdb_memory_limit_mb); + } else { + // Auto-detect: use 80% of system memory + uint64_t total_bytes = base::system_report::total_memory(); + mem_limit_mb = static_cast(total_bytes * 0.8 / (1024ULL * 1024ULL)); + if (mem_limit_mb < 256) { + mem_limit_mb = 256; // Minimum floor of 256MB + } + } + + // Apply memory limit to DuckDB + auto mem_result = conns->con_cpp->Query(fmt::format("SET memory_limit='{}MB'", mem_limit_mb)); + if (!mem_result || mem_result->HasError()) { + elog(WARNING, "Failed to set DuckDB memory_limit: %s", + mem_result ? mem_result->GetError().c_str() : "null result"); + } + + // Configure temp directory for disk spilling (if specified) + if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) { + // Sanitize the directory path to prevent SQL injection + std::string safe_dir(pg::duckdb_temp_directory); + // Escape single quotes by doubling them (standard SQL escaping) + for (size_t pos = 0; (pos = safe_dir.find('\'', pos)) != std::string::npos; pos += 2) { + safe_dir.insert(pos, 1, '\''); + } + auto temp_result = conns->con_cpp->Query( + fmt::format("SET temp_directory='{}'", safe_dir)); + if (!temp_result || temp_result->HasError()) { + elog(WARNING, "Failed to set DuckDB temp_directory: %s", + temp_result ? temp_result->GetError().c_str() : "null result"); + } + } + + // Log DuckDB settings at INFO level for operational visibility + auto verify_mem = conns->con_cpp->Query("SELECT current_setting('memory_limit')"); + if (verify_mem && !verify_mem->HasError() && verify_mem->RowCount() > 0) { + elog(INFO, "DuckDB memory_limit configured: %s", verify_mem->GetValue(0, 0).ToString().c_str()); + } + + auto verify_temp = conns->con_cpp->Query("SELECT current_setting('temp_directory')"); + if (verify_temp && !verify_temp->HasError() && verify_temp->RowCount() > 0) { + elog(INFO, "DuckDB temp_directory configured: %s", verify_temp->GetValue(0, 0).ToString().c_str()); + } + // For now, we'll use C++ API for queries since table functions require it // The C API connection will be used later when we can restructure to avoid table functions // or when DuckDB provides a way to register table functions via C API @@ -374,7 +432,7 @@ void* duckdb_result_holder::get_chunk_ptr(size_t chunk_idx) const // Execute SQL query and return DuckDB results using C API // Note: Currently we still use C++ API for execution due to table function requirements // This function converts C++ results to C API format for processing -duckdb_result_holder execute_sql_query_direct(const std::string& query_string) +static std::unique_ptr& get_duckdb_conns() { static std::unique_ptr conns; if (conns == nullptr || !pg::table_storage::instance().is_up_to_date()) { @@ -386,6 +444,25 @@ duckdb_result_holder execute_sql_query_direct(const std::string& query_string) register_views(conns.get()); pg::table_storage::instance().set_up_to_date(true); } + return conns; +} + +} // unnamed namespace + +namespace pg { + +void ensure_duckdb_initialized() +{ + get_duckdb_conns(); +} + +} // namespace pg + +namespace pg { + +duckdb_result_holder execute_sql_query_direct(const std::string& query_string) +{ + auto& conns = get_duckdb_conns(); std::string duckdb_query = pg_to_duckdb_translator::translate(query_string); diff --git a/cpp/deeplake_pg/duckdb_executor.hpp b/cpp/deeplake_pg/duckdb_executor.hpp index b4ff441d23..8a90767dd2 100644 --- a/cpp/deeplake_pg/duckdb_executor.hpp +++ b/cpp/deeplake_pg/duckdb_executor.hpp @@ -54,6 +54,10 @@ struct duckdb_result_holder void* get_chunk_ptr(size_t chunk_idx) const; }; +// Pre-initialize DuckDB connection (for cold run optimization). +// Can be called early (e.g., during analyze_plan) to overlap DuckDB init with batch prefetching. +void ensure_duckdb_initialized(); + // Execute SQL query using DuckDB and return results directly without conversion duckdb_result_holder execute_sql_query_direct(const std::string& query_string); diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index 5444a07499..1f65118b79 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -8,6 +8,11 @@ extern "C" { #include +// Build fingerprint for hot-reload verification. +// Defined in the generated build_info_.cpp (see build_info.cpp.in). +// Isolated there so that hash changes only recompile one file. +// Extract with: strings pg_deeplake_18.so | grep PG_DEEPLAKE_BUILD: + #include #include #include @@ -64,6 +69,11 @@ bool use_shared_mem_for_refresh = false; bool enable_dataset_logging = false; // Enable dataset operation logging for debugging bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options bool stateless_enabled = false; // Enable stateless catalog sync across instances +bool eager_batch_prefetch = true; // Enable eager prefetch of first batch for cold run optimization + +// DuckDB memory management - controls memory_limit and temp_directory for large operations +int32_t duckdb_memory_limit_mb = 0; // 0 = auto-detect (80% of system memory) +char* duckdb_temp_directory = nullptr; // nullptr/empty = DuckDB's default temp location } // namespace pg @@ -245,6 +255,19 @@ void initialize_guc_parameters() nullptr, nullptr); + DefineCustomBoolVariable("pg_deeplake.eager_batch_prefetch", + "Enable eager prefetch of first batch for cold run optimization.", + "When enabled, the first batch of data for all columns is prefetched in parallel " + "when a scan begins. This significantly improves cold run performance by overlapping " + "the initial data fetches for multiple columns.", + &pg::eager_batch_prefetch, + true, + PGC_USERSET, + 0, + nullptr, + nullptr, + nullptr); + DefineCustomBoolVariable("pg_deeplake.enable_dataset_logging", "Enable operation logging for deeplake datasets.", "When enabled, all dataset operations (append_row, update_row, delete_row, etc.) " @@ -277,6 +300,42 @@ void initialize_guc_parameters() ); + // DuckDB memory management GUC parameters + // These control DuckDB's internal memory budget for large operations like JOINs + DefineCustomIntVariable( + "pg_deeplake.duckdb_memory_limit_mb", + "Memory limit for DuckDB's internal operations in MB.", + "Controls DuckDB's memory budget for large operations like JOINs and aggregations. " + "Set to 0 (default) for auto-detection using 80% of system memory. " + "When the limit is exceeded, DuckDB spills to disk using temp_directory. " + "For containerized environments with cgroup limits, set this explicitly as " + "auto-detection may use host memory instead of container limits.", + &pg::duckdb_memory_limit_mb, // linked C variable + 0, // default value (0 = auto-detect) + 0, // min value (0 = unlimited/auto) + INT_MAX, // max value + PGC_USERSET, // context - can be set by any user + GUC_UNIT_MB, // flags - treat as MB + nullptr, // check_hook + nullptr, // assign_hook + nullptr // show_hook + ); + + DefineCustomStringVariable( + "pg_deeplake.duckdb_temp_directory", + "Temporary directory for DuckDB disk spilling during large operations.", + "Specifies where DuckDB writes temporary files when memory_limit is exceeded. " + "Empty string (default) uses DuckDB's default temp location. " + "DuckDB will validate the path at runtime and fail gracefully if invalid.", + &pg::duckdb_temp_directory, // linked C variable + "", // default value (empty = DuckDB default) + PGC_USERSET, // context - can be set by any user + 0, // flags + nullptr, // check_hook + nullptr, // assign_hook + nullptr // show_hook + ); + // Initialize PostgreSQL memory tracking pg::memory_tracker::initialize_guc_parameters(); diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index c9d071e392..12746a9944 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -294,8 +294,20 @@ bool deeplake_scan_analyze_next_tuple( return false; } - if (!scan_data->scan_state.get_next_tuple(slot)) { - return false; // no more tuples + try { + if (!scan_data->scan_state.get_next_tuple(slot)) { + return false; // no more tuples + } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake ANALYZE scan failed: %s", e.what()))); + return false; + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake ANALYZE scan failed: unknown exception"))); + return false; } /* @@ -373,6 +385,18 @@ double deeplake_index_build_range_scan(Relation heap_rel, td.create_streamer(attnum, -1); } } + + // Warm all streamers in parallel for cold run optimization + if (pg::eager_batch_prefetch) { + try { + td.get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during index_build_range_scan: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during index_build_range_scan with unknown exception"); + } + } + std::vector values(nkeys, 0); std::vector nulls(nkeys, 0); pg::table_scan tscan(table_id, false, false); @@ -728,6 +752,17 @@ TableScanDesc deeplake_table_am_routine::scan_begin(Relation relation, } } + // Warm all streamers in parallel for cold run optimization + if (pg::eager_batch_prefetch) { + try { + td.get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during scan_begin: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during scan_begin with unknown exception"); + } + } + if (nkeys > 0) { extended_scan->scan_state.nkeys = nkeys; // copy ScanKeyData because Postgres only gave us a pointer @@ -786,7 +821,18 @@ bool deeplake_table_am_routine::scan_getnextslot(TableScanDesc scan, ScanDirecti ++scan_data->progress_bar; } - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake scan failed: unknown exception"))); + } + return false; } void deeplake_table_am_routine::scan_set_tidrange(TableScanDesc scan, ItemPointer mintid, ItemPointer maxtid) @@ -833,7 +879,18 @@ bool deeplake_table_am_routine::scan_getnextslot_tidrange(TableScanDesc scan, // Switch to the dedicated memory context for this scan pg::utils::memory_context_switcher context_switcher(scan_data->memory_context); - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake TID range scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake TID range scan failed: unknown exception"))); + } + return false; } #if PG_VERSION_NUM >= PG_VERSION_NUM_18 @@ -886,7 +943,18 @@ bool deeplake_table_am_routine::scan_bitmap_next_tuple( // Get next row number int64_t row_num = scan_data->bitmap_row_numbers[scan_data->current_offset++]; scan_data->scan_state.set_current_position(row_num); - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake bitmap scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake bitmap scan failed: unknown exception"))); + } + return false; } #endif @@ -934,13 +1002,23 @@ bool deeplake_table_am_routine::scan_sample_next_tuple(TableScanDesc scan, sample_fraction = sampler->fraction; } - while (scan_data->scan_state.get_next_tuple(slot)) { - // random fraction, should come from scanstate->tsm_state - const double random_value = (double)random() / RAND_MAX; + try { + while (scan_data->scan_state.get_next_tuple(slot)) { + // random fraction, should come from scanstate->tsm_state + const double random_value = (double)random() / RAND_MAX; - if (random_value <= sample_fraction) { - return true; + if (random_value <= sample_fraction) { + return true; + } } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake sample scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake sample scan failed: unknown exception"))); } return false; @@ -986,10 +1064,22 @@ bool deeplake_table_am_routine::index_fetch_tuple(struct IndexFetchTableData* sc pg::utils::memory_context_switcher context_switcher(idx_scan->memory_context); idx_scan->scan_state.set_current_position(utils::tid_to_row_number(tid)); - if (!idx_scan->scan_state.get_next_tuple(slot)) { - if (all_dead != nullptr) { - *all_dead = true; + try { + if (!idx_scan->scan_state.get_next_tuple(slot)) { + if (all_dead != nullptr) { + *all_dead = true; + } + return false; } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake index fetch failed: %s", e.what()))); + return false; + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake index fetch failed: unknown exception"))); return false; } diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index be7b419625..ce51f3d5f8 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -130,13 +131,39 @@ struct table_data std::vector column_to_batches; std::vector> streamers; + std::vector> first_batch_cache_; + bool warmed_ = false; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); + first_batch_cache_.clear(); + warmed_ = false; } + /** + * @brief Pre-warm all streamers by triggering parallel first batch downloads. + * + * This method initiates the download of the first batch for all active + * streamers in parallel, then waits for all downloads to complete. + * This significantly improves cold run performance by overlapping the + * initial data fetches. + */ + inline void warm_all_streamers(); + + + /** + * @brief Pre-initialize batches for all given columns at the specified row in parallel. + * + * For a cold run, batch initialization blocks on I/O. Without this method, + * the scan processes columns sequentially, serializing I/O waits. + * This method triggers batch downloads for all columns that need it concurrently, + * then waits for all to complete, so the subsequent sequential column processing + * finds all batches already initialized. + */ + inline void prefetch_batches_for_row(const std::vector& column_indices, int64_t row_number); + inline nd::array get_sample(int32_t column_number, int64_t row_number); template diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 9f90517fe1..a0a37669e9 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -7,8 +7,12 @@ #include "table_version.hpp" #include "utils.hpp" +#include #include +#include +#include + // Inline implementation functions for table_data // This file should be included at the end of table_data.hpp @@ -112,6 +116,20 @@ inline void table_data::open_dataset(bool create) ASSERT(dataset_ != nullptr); num_total_rows_ = dataset_->num_rows(); + // Validate row count against TID conversion limits + // With TUPLES_PER_BLOCK=256 and BlockNumber=uint32_t, max is ~1.1 trillion rows + // which is safe for foreseeable scale factors, but provide early warning + constexpr int64_t MAX_SUPPORTED_ROWS = + static_cast(UINT32_MAX) * static_cast(pg::DEEPLAKE_TUPLES_PER_BLOCK); + if (num_total_rows_ > MAX_SUPPORTED_ROWS) { + elog(WARNING, + "Table '%s' has %ld rows, exceeding max supported %ld for TID conversion. " + "Consider partitioning or sharding.", + table_name_.c_str(), + num_total_rows_, + MAX_SUPPORTED_ROWS); + } + // Enable logging if GUC parameter is set if (pg::enable_dataset_logging && dataset_ && !dataset_->is_logging_enabled()) { dataset_->start_logging(); @@ -562,6 +580,59 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) streamers_.column_to_batches[idx].batches.resize(batch_count); } +inline void table_data::streamer_info::warm_all_streamers() +{ + if (warmed_) { + return; + } + warmed_ = true; + + first_batch_cache_.resize(streamers.size()); + + icm::vector indices; + icm::vector> promises; + + for (size_t i = 0; i < streamers.size(); ++i) { + if (streamers[i]) { + promises.push_back(streamers[i]->next_batch_async()); + indices.push_back(i); + } + } + + if (promises.empty()) { + return; + } + + try { + auto results = async::combine(std::move(promises)).get_future().get(); + + for (size_t j = 0; j < indices.size(); ++j) { + auto& batch = results[j]; + first_batch_cache_[indices[j]] = batch.columns()[0].array(); + } + } catch (const std::exception& e) { + elog(WARNING, "warm_all_streamers failed: %s", e.what()); + first_batch_cache_.clear(); + } catch (...) { + elog(WARNING, "warm_all_streamers failed with unknown exception"); + first_batch_cache_.clear(); + } +} + +inline void table_data::streamer_info::prefetch_batches_for_row( + const std::vector& /*column_indices*/, int64_t /*row_number*/) +{ + // The async_prefetcher already downloads batches ahead of time in the + // background. Each column_streamer's prefetcher queues up to max_task_count_ + // batch requests on construction. By the time the scan reaches a batch + // boundary, the download is typically already complete or nearly ready. + // + // Previously this method spawned OS threads per column per batch boundary + // to overlap I/O, but thread creation (pthread_create/clone3) was the + // dominant overhead, as shown by profiling. The async prefetcher provides + // the same I/O overlap without thread creation cost. +} + inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) { const int64_t batch_index = row_number >> batch_size_log2_; @@ -569,6 +640,18 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -595,11 +678,25 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); + first_batch_cache_[column_number].reset(); + batch.data_ = batch.owner_.data().data(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - col_data.batches[i].owner_ = utils::eval_with_nones(streamers[column_number]->next_batch()); + nd::array raw_batch = streamers[column_number]->next_batch(); + col_data.batches[i].owner_ = utils::eval_with_nones(std::move(raw_batch)); col_data.batches[i].data_ = col_data.batches[i].owner_.data().data(); col_data.batches[i].initialized_.store(true, std::memory_order_release); } @@ -617,6 +714,19 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.holder_ = impl::string_stream_array_holder(batch.owner_); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { diff --git a/cpp/deeplake_pg/table_scan.hpp b/cpp/deeplake_pg/table_scan.hpp index afe38f1a42..028b58eb60 100644 --- a/cpp/deeplake_pg/table_scan.hpp +++ b/cpp/deeplake_pg/table_scan.hpp @@ -32,8 +32,8 @@ class table_scan inline table_scan& operator=(table_scan&&) = delete; inline ~table_scan() = default; - inline std::pair get_datum(int32_t column_number, int64_t row_number) const noexcept; - inline void convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const noexcept; + inline std::pair get_datum(int32_t column_number, int64_t row_number) const; + inline void convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const; inline bool get_next_tuple(TupleTableSlot* slot); diff --git a/cpp/deeplake_pg/table_scan_impl.hpp b/cpp/deeplake_pg/table_scan_impl.hpp index caca926b25..088302d3e6 100644 --- a/cpp/deeplake_pg/table_scan_impl.hpp +++ b/cpp/deeplake_pg/table_scan_impl.hpp @@ -57,7 +57,7 @@ inline table_scan::table_scan(Oid table_id, bool is_parallel, bool streamer_only } } -inline std::pair table_scan::get_datum(int32_t column_number, int64_t row_number) const noexcept +inline std::pair table_scan::get_datum(int32_t column_number, int64_t row_number) const { const auto base_typeid = table_data_.get_base_atttypid(column_number); const auto column_typmod = table_data_.get_atttypmod(column_number); @@ -155,7 +155,7 @@ inline std::pair table_scan::get_datum(int32_t column_number, int64 return {(Datum)0, true}; } -inline void table_scan::convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const noexcept +inline void table_scan::convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const { for (auto col : null_columns_) { nulls[col] = true; diff --git a/cpp/deeplake_pg/utils.hpp b/cpp/deeplake_pg/utils.hpp index 3cde8d0a2d..84e91d386e 100644 --- a/cpp/deeplake_pg/utils.hpp +++ b/cpp/deeplake_pg/utils.hpp @@ -57,6 +57,12 @@ extern bool use_shared_mem_for_refresh; extern bool enable_dataset_logging; extern bool allow_custom_paths; extern bool stateless_enabled; +extern bool eager_batch_prefetch; + +// DuckDB memory management GUC variables - defined in extension_init.cpp +// These control DuckDB's internal memory limit and temp directory for disk spilling +extern int32_t duckdb_memory_limit_mb; +extern char* duckdb_temp_directory; namespace utils { diff --git a/postgres/scripts/run_pg_server.sh b/postgres/scripts/run_pg_server.sh index b6fc812218..5f0304e6cf 100755 --- a/postgres/scripts/run_pg_server.sh +++ b/postgres/scripts/run_pg_server.sh @@ -78,12 +78,31 @@ if [ -d "$POSTGRES_DATA" ]; then fi install_extension -# Initialize database -"$POSTGRES_INSTALL/bin/initdb" -D "$POSTGRES_DATA" -U "$USER" +# Initialize database with 'postgres' superuser (matches Docker POSTGRES_USER=postgres) +"$POSTGRES_INSTALL/bin/initdb" -D "$POSTGRES_DATA" -U postgres echo "shared_preload_libraries = 'pg_deeplake'" >> "$POSTGRES_DATA/postgresql.conf" echo "max_connections = 300" >> "$POSTGRES_DATA/postgresql.conf" echo "shared_buffers = 128MB" >> "$POSTGRES_DATA/postgresql.conf" #echo "log_min_messages = debug1" >> "$POSTGRES_DATA/postgresql.conf" -# Start PostgreSQL -"$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" start +# Configure pg_hba.conf: use scram-sha-256 for TCP connections (matches Docker behavior) +# Keep trust for local socket connections (for admin convenience) +if [[ "$(uname)" == "Darwin" ]]; then + sed -i '' 's/^\(host.*all.*all.*127\.0\.0\.1\/32\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" + sed -i '' 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" +else + sed -i 's/^\(host.*all.*all.*127\.0\.0\.1\/32\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" + sed -i 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" +fi + +# Start PostgreSQL temporarily to set password +"$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" -t 120 start + +# Set postgres password (matches Docker POSTGRES_PASSWORD=password) +"$POSTGRES_INSTALL/bin/psql" -U postgres -c "ALTER USER postgres PASSWORD 'password';" + +# Stop PostgreSQL and restart in foreground +stop_postgres + +echo "Starting PostgreSQL in foreground..." +exec "$POSTGRES_INSTALL/bin/postgres" -D "$POSTGRES_DATA" diff --git a/postgres/tests/sql/tpch/create_schema.sql b/postgres/tests/sql/tpch/create_schema.sql index e0a408ec78..40e090cdfc 100644 --- a/postgres/tests/sql/tpch/create_schema.sql +++ b/postgres/tests/sql/tpch/create_schema.sql @@ -1,3 +1,9 @@ +-- TPC-H Schema for pg_deeplake (v2: BIGINT keys for SF1000+ support, 2026-02) +-- +-- MIGRATION: Existing SF1000 data with INT overflow issues is already corrupted. +-- Re-ingestion with this schema is required to recover from overflow errors. +-- SF10 and SF100 data will continue to work since o_orderkey maxes at ~600M for SF100. + DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS lineitem; DROP TABLE IF EXISTS nation; @@ -18,8 +24,9 @@ CREATE TABLE customer ( c_comment VARCHAR(117) NOT NULL ) USING deeplake; +-- Note: l_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE lineitem ( - l_orderkey int NOT NULL, + l_orderkey bigint NOT NULL, l_partkey int NOT NULL, l_suppkey int not null, l_linenumber int not null, @@ -44,8 +51,9 @@ CREATE TABLE nation ( n_comment varchar(152) NULL ) USING deeplake; +-- Note: o_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE orders ( - o_orderkey int NOT NULL, + o_orderkey bigint NOT NULL, o_custkey int NOT NULL, o_orderstatus VARCHAR(1) NOT NULL, o_totalprice decimal(15, 2) NOT NULL, diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index 01e2e62198..673f31e02e 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -1,9 +1,21 @@ -import distutils.sysconfig as sysconfig +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "requests>=2.28", +# ] +# /// + import json import os import sys import platform -import requests + +try: + import requests +except ImportError: + os.system("pip install requests --user --break-system-packages") + import requests """ Usage: python3 scripts/build_pg_ext.py debug #Debug build @@ -14,8 +26,12 @@ Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16,17,18 #Build for PostgreSQL 16, 17, and 18 Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16 #Build for PostgreSQL 16 only Usage: python3 scripts/build_pg_ext.py prod --pg-versions all #Build for all supported PostgreSQL versions +Usage: python3 scripts/build_pg_ext.py dev --local-api /path/to/package #Use local deeplake API package instead of downloading """ + + + def get_pinned_version(): """ Read the pinned deeplake API version from DEEPLAKE_API_VERSION file. @@ -77,6 +93,7 @@ def download_api_lib(api_root_dir, overwrite=True): print(f"Downloading prebuilt api libraries from {asset_url} ...") + response = requests.get(asset_url) if response.status_code != 200: raise Exception(f"Failed to download api libraries from {asset_url}. Status code: {response.status_code}") @@ -114,7 +131,38 @@ def download_api_lib(api_root_dir, overwrite=True): print(f"Successfully installed deeplake API library version {version}") -def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_versions: list = None): + +def install_local_api_lib(api_root_dir, local_path): + """ + Install the deeplake API library from a local package directory (e.g. from an indra build). + This copies include/, lib/, and cmake/ from the local package into .ext/deeplake_api/. + """ + if not os.path.isdir(local_path): + raise Exception(f"Local API path does not exist: {local_path}") + + # Verify the local package has the expected structure + local_lib = os.path.join(local_path, "lib") + local_include = os.path.join(local_path, "include") + if not os.path.isdir(local_lib) or not os.path.isdir(local_include): + raise Exception(f"Local API path missing lib/ or include/ directories: {local_path}") + + os.makedirs(api_root_dir, exist_ok=True) + + print(f"Installing local deeplake API from {local_path} ...") + + # Remove existing and copy from local + err = os.system(f"rm -rf {api_root_dir}/*") + if err: + raise Exception(f"Failed to clean {api_root_dir}. Command exited with code {err}.") + + err = os.system(f"cp -r {local_path}/* {api_root_dir}/") + if err: + raise Exception(f"Failed to copy local API library. Command exited with code {err}.") + + print(f"Successfully installed local deeplake API from {local_path}") + + +def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_versions: list = None, local_api_path: str = None): modes = ["debug", "dev", "prod"] if mode not in modes: raise Exception(f"Invalid mode - '{mode}'. Possible values - {', '.join(modes)}") @@ -128,9 +176,12 @@ def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_version try: if not incremental: - # Download DeepLake API library before CMake configuration + # Install DeepLake API library before CMake configuration # (CMake's find_package needs it to exist during configuration) - download_api_lib(".ext/deeplake_api") + if local_api_path: + install_local_api_lib(".ext/deeplake_api", local_api_path) + else: + download_api_lib(".ext/deeplake_api") cmake_cmd = (f"cmake " f"{preset} ") @@ -221,6 +272,7 @@ def write_mode(mode: str): mode = sys.argv[1] deeplake_link_type = None pg_versions = None + local_api_path = None # Parse optional flags i = 2 @@ -232,6 +284,11 @@ def write_mode(mode: str): elif arg == "--deeplake-static": deeplake_link_type = "static" i += 1 + elif arg == "--local-api": + if i + 1 >= len(sys.argv): + raise Exception("--local-api requires a path to the local deeplake API package directory") + local_api_path = os.path.abspath(sys.argv[i + 1]) + i += 2 elif arg == "--pg-versions": if i + 1 >= len(sys.argv): raise Exception("--pg-versions requires a value (e.g., '16,17,18' or 'all')") @@ -250,6 +307,6 @@ def write_mode(mode: str): raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '16,17,18') or 'all'") i += 2 else: - raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, or --pg-versions") + raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, --local-api, or --pg-versions") - run(mode=mode, incremental=False, deeplake_link_type=deeplake_link_type, pg_versions=pg_versions) + run(mode=mode, incremental=False, deeplake_link_type=deeplake_link_type, pg_versions=pg_versions, local_api_path=local_api_path)