diff --git a/be/src/service/http/action/http_stream.cpp b/be/src/service/http/action/http_stream.cpp index 9e98199ac46504..e519a62779f405 100644 --- a/be/src/service/http/action/http_stream.cpp +++ b/be/src/service/http/action/http_stream.cpp @@ -17,6 +17,7 @@ #include "service/http/action/http_stream.h" +#include #include #include #include @@ -55,6 +56,7 @@ #include "storage/storage_engine.h" #include "util/byte_buffer.h" #include "util/client_cache.h" +#include "util/load_util.h" #include "util/string_util.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -63,6 +65,26 @@ namespace doris { using namespace ErrorCode; +namespace { + +bool is_compressed_file_scan(const TPipelineFragmentParams& params) { + if (!params.__isset.file_scan_params) { + return false; + } + return std::ranges::any_of(params.file_scan_params, [](const auto& file_scan_param) { + const auto& file_scan_params = file_scan_param.second; + TFileCompressType::type compress_type = file_scan_params.__isset.compress_type + ? file_scan_params.compress_type + : TFileCompressType::UNKNOWN; + TFileFormatType::type format_type = file_scan_params.__isset.format_type + ? file_scan_params.format_type + : TFileFormatType::FORMAT_UNKNOWN; + return LoadUtil::is_compressed_load(compress_type, format_type); + }); +} + +} // namespace + DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS); @@ -387,13 +409,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); } - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + if (is_compressed_file_scan(ctx->put_result.pipeline_params)) { content_length *= 3; } } diff --git a/be/src/service/http/action/stream_load.cpp b/be/src/service/http/action/stream_load.cpp index 7e0f4ba0977121..7cfcccaa547170 100644 --- a/be/src/service/http/action/stream_load.cpp +++ b/be/src/service/http/action/stream_load.cpp @@ -812,13 +812,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); } - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + if (LoadUtil::is_compressed_load(ctx->compress_type, ctx->format)) { content_length *= 3; } } diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index ab6e3e887f8a19..0a7bf155d13778 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -37,6 +37,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *compress_type = TFileCompressType::LZO; } else if (iequal(compress_type_str, "BZ2")) { *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "ZSTD")) { + *compress_type = TFileCompressType::ZSTD; } else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) { *compress_type = TFileCompressType::LZ4FRAME; } else if (iequal(compress_type_str, "LZ4_BLOCK")) { @@ -62,6 +64,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_CSV_LZO; } else if (iequal(compress_type_str, "BZ2")) { *format_type = TFileFormatType::FORMAT_CSV_BZ2; + } else if (iequal(compress_type_str, "ZSTD")) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; } else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) { *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; } else if (iequal(compress_type_str, "LZ4_BLOCK")) { @@ -108,4 +112,25 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { } return false; } + +bool LoadUtil::is_compressed_load(TFileCompressType::type compress_type, + TFileFormatType::type format_type) { + if (compress_type != TFileCompressType::UNKNOWN && compress_type != TFileCompressType::PLAIN) { + return true; + } + + switch (format_type) { + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: + return true; + default: + return false; + } +} } // namespace doris diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h index 60bd79ab1bea9b..e84222c8fa2d92 100644 --- a/be/src/util/load_util.h +++ b/be/src/util/load_util.h @@ -29,5 +29,8 @@ class LoadUtil { TFileCompressType::type* compress_type); static bool is_format_support_streaming(TFileFormatType::type format); + + static bool is_compressed_load(TFileCompressType::type compress_type, + TFileFormatType::type format_type); }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp index 724e9f3cfc9093..c8c4cc201de83d 100644 --- a/be/test/util/load_util_test.cpp +++ b/be/test/util/load_util_test.cpp @@ -106,6 +106,22 @@ TEST_F(LoadUtilTest, ParseTest) { EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type); EXPECT_EQ(TFileCompressType::DEFLATE, compress_type); } + { + // CSV, ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } + { + // "", ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } { // JSON, "" TFileFormatType::type format_type; @@ -178,6 +194,14 @@ TEST_F(LoadUtilTest, ParseTest) { EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type); EXPECT_EQ(TFileCompressType::DEFLATE, compress_type); } + { + // JSON, ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("JSON", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } { // JSON, unkonw TFileFormatType::type format_type; @@ -204,4 +228,21 @@ TEST_F(LoadUtilTest, ParseTest) { } } +TEST_F(LoadUtilTest, IsCompressedLoadTest) { + EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::PLAIN, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::ZSTD, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_TRUE( + LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_CSV_GZ)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN, + TFileFormatType::FORMAT_CSV_DEFLATE)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::SNAPPYBLOCK, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_FALSE( + LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_JSON)); +} + } // namespace doris diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.zst b/regression-test/data/load_p0/http_stream/test_compress.csv.zst new file mode 100644 index 00000000000000..34c3da9c54ef04 Binary files /dev/null and b/regression-test/data/load_p0/http_stream/test_compress.csv.zst differ diff --git a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out index 99954d0853c950..d0f44b2055f87a 100644 --- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out @@ -3,12 +3,16 @@ 1 a 10 1 a 10 1 a 10 +1 a 10 +2 b 20 2 b 20 2 b 20 2 b 20 3 c 30 3 c 30 3 c 30 +3 c 30 +4 d \N 4 d \N 4 d \N 4 d \N diff --git a/regression-test/data/load_p0/stream_load/basic_data.csv.zst b/regression-test/data/load_p0/stream_load/basic_data.csv.zst new file mode 100644 index 00000000000000..b4a541578bd7bc Binary files /dev/null and b/regression-test/data/load_p0/stream_load/basic_data.csv.zst differ diff --git a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst new file mode 100644 index 00000000000000..bed19ec3346f66 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst differ diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.zst b/regression-test/data/load_p0/stream_load/test_compress.csv.zst new file mode 100644 index 00000000000000..34c3da9c54ef04 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/test_compress.csv.zst differ diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out b/regression-test/data/load_p0/stream_load/test_compress_type.out index 56d195c569edca..92a8f1b82a6c0f 100644 --- a/regression-test/data/load_p0/stream_load/test_compress_type.out +++ b/regression-test/data/load_p0/stream_load/test_compress_type.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -160 +200 diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out index 3e5d73452d08b3..fac63335a7e58d 100644 --- a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out +++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out @@ -3,12 +3,16 @@ 1 a 10 1 a 10 1 a 10 +1 a 10 +2 b 20 2 b 20 2 b 20 2 b 20 3 c 30 3 c 30 3 c 30 +3 c 30 +4 d \N 4 d \N 4 d \N 4 d \N diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index b160b478c8e06c..ad07ee55d74971 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -94,9 +94,11 @@ suite("test_group_commit_http_stream") { """ // stream load with compress file - String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame"} //, "deflate"} + String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame", "zstd"} //, "deflate"} for (final def compressionType in compressionTypes) { - def fileName = "test_compress.csv." + (compressionType.equals("lz4frame") ? "lz4" : compressionType) + def fileSuffix = compressionType.equals("lz4frame") ? "lz4" : compressionType + fileSuffix = fileSuffix.equals("zstd") ? "zst" : fileSuffix + def fileName = "test_compress.csv." + fileSuffix streamLoad { set 'version', '1' set 'sql', """ @@ -249,7 +251,7 @@ suite("test_group_commit_http_stream") { } } - getRowCount(19) + getRowCount(26) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " // group commit http stream (SELECT * FROM http_stream(...)) should not register load jobs diff --git a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy index eeab7e809754b1..5e2870fd4c010b 100644 --- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy +++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy @@ -196,6 +196,49 @@ suite("test_stream_load_compress_type", "load_p0") { } } + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', 'csv' + set 'compress_type', 'zstd' + + file "basic_data.csv.zst" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'compress_type', 'zstd' + set 'read_json_by_line', 'true' + + file "basic_data_by_line.json.zst" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + // no compress_type streamLoad { table "${tableName}" diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index 342b87f13960c0..7d7afc71f7ee34 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -98,9 +98,9 @@ suite("test_group_commit_stream_load") { """ // stream load with compress file - String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4"} //, "deflate"} + String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4", "zstd"} //, "deflate"} for (final def compressionType in compressionTypes) { - def fileName = "test_compress.csv." + compressionType + def fileName = "test_compress.csv." + (compressionType.equals("zstd") ? "zst" : compressionType) streamLoad { table "${tableName}" @@ -227,7 +227,7 @@ suite("test_group_commit_stream_load") { } } - getRowCount(21) + getRowCount(25) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " } finally { // try_sql("DROP TABLE ${tableName}")