From f8161c604b77ac8819f82e3b43684bfb40954a01 Mon Sep 17 00:00:00 2001 From: Refrain Date: Tue, 23 Jun 2026 21:22:55 +0800 Subject: [PATCH] [feature](be) Support zstd stream load compression (#64711) Stream load does not support `compress_type=zstd` in the shared load format parser. Async group commit also checks only legacy compressed CSV format enum values when estimating compressed input size, so `compress_type` based compressed input is not handled consistently by stream load and HTTP stream load. This PR adds ZSTD parsing in `LoadUtil::parse_format`, adds a shared `LoadUtil::is_compressed_load` helper for `compress_type` and legacy compressed CSV format types, and uses it in stream load and HTTP stream group commit paths. This PR also adds BE UT and regression coverage for ZSTD CSV/JSON stream load and group commit stream/HTTP stream load. --- be/src/service/http/action/http_stream.cpp | 30 +++++++++--- be/src/service/http/action/stream_load.cpp | 8 +--- be/src/util/load_util.cpp | 25 ++++++++++ be/src/util/load_util.h | 5 +- be/test/util/load_util_test.cpp | 41 +++++++++++++++++ .../load_p0/http_stream/test_compress.csv.zst | Bin 0 -> 42 bytes .../test_group_commit_http_stream.out | 4 ++ .../load_p0/stream_load/basic_data.csv.zst | Bin 0 -> 4207 bytes .../stream_load/basic_data_by_line.json.zst | Bin 0 -> 3461 bytes .../load_p0/stream_load/test_compress.csv.zst | Bin 0 -> 42 bytes .../stream_load/test_compress_type.out | 2 +- .../test_group_commit_stream_load.out | 4 ++ .../test_group_commit_http_stream.groovy | 8 ++-- .../stream_load/test_compress_type.groovy | 43 ++++++++++++++++++ .../test_group_commit_stream_load.groovy | 6 +-- 15 files changed, 154 insertions(+), 22 deletions(-) create mode 100644 regression-test/data/load_p0/http_stream/test_compress.csv.zst create mode 100644 regression-test/data/load_p0/stream_load/basic_data.csv.zst create mode 100644 regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst create mode 100644 regression-test/data/load_p0/stream_load/test_compress.csv.zst diff --git a/be/src/service/http/action/http_stream.cpp b/be/src/service/http/action/http_stream.cpp index 9e98199ac46504..e519a62779f405 100644 --- a/be/src/service/http/action/http_stream.cpp +++ b/be/src/service/http/action/http_stream.cpp @@ -17,6 +17,7 @@ #include "service/http/action/http_stream.h" +#include #include #include #include @@ -55,6 +56,7 @@ #include "storage/storage_engine.h" #include "util/byte_buffer.h" #include "util/client_cache.h" +#include "util/load_util.h" #include "util/string_util.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -63,6 +65,26 @@ namespace doris { using namespace ErrorCode; +namespace { + +bool is_compressed_file_scan(const TPipelineFragmentParams& params) { + if (!params.__isset.file_scan_params) { + return false; + } + return std::ranges::any_of(params.file_scan_params, [](const auto& file_scan_param) { + const auto& file_scan_params = file_scan_param.second; + TFileCompressType::type compress_type = file_scan_params.__isset.compress_type + ? file_scan_params.compress_type + : TFileCompressType::UNKNOWN; + TFileFormatType::type format_type = file_scan_params.__isset.format_type + ? file_scan_params.format_type + : TFileFormatType::FORMAT_UNKNOWN; + return LoadUtil::is_compressed_load(compress_type, format_type); + }); +} + +} // namespace + DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS); @@ -387,13 +409,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); } - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + if (is_compressed_file_scan(ctx->put_result.pipeline_params)) { content_length *= 3; } } diff --git a/be/src/service/http/action/stream_load.cpp b/be/src/service/http/action/stream_load.cpp index 7e0f4ba0977121..7cfcccaa547170 100644 --- a/be/src/service/http/action/stream_load.cpp +++ b/be/src/service/http/action/stream_load.cpp @@ -812,13 +812,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); } - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + if (LoadUtil::is_compressed_load(ctx->compress_type, ctx->format)) { content_length *= 3; } } diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp index ab6e3e887f8a19..0a7bf155d13778 100644 --- a/be/src/util/load_util.cpp +++ b/be/src/util/load_util.cpp @@ -37,6 +37,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *compress_type = TFileCompressType::LZO; } else if (iequal(compress_type_str, "BZ2")) { *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "ZSTD")) { + *compress_type = TFileCompressType::ZSTD; } else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) { *compress_type = TFileCompressType::LZ4FRAME; } else if (iequal(compress_type_str, "LZ4_BLOCK")) { @@ -62,6 +64,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co *format_type = TFileFormatType::FORMAT_CSV_LZO; } else if (iequal(compress_type_str, "BZ2")) { *format_type = TFileFormatType::FORMAT_CSV_BZ2; + } else if (iequal(compress_type_str, "ZSTD")) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; } else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) { *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; } else if (iequal(compress_type_str, "LZ4_BLOCK")) { @@ -108,4 +112,25 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) { } return false; } + +bool LoadUtil::is_compressed_load(TFileCompressType::type compress_type, + TFileFormatType::type format_type) { + if (compress_type != TFileCompressType::UNKNOWN && compress_type != TFileCompressType::PLAIN) { + return true; + } + + switch (format_type) { + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4BLOCK: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: + return true; + default: + return false; + } +} } // namespace doris diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h index 60bd79ab1bea9b..e84222c8fa2d92 100644 --- a/be/src/util/load_util.h +++ b/be/src/util/load_util.h @@ -29,5 +29,8 @@ class LoadUtil { TFileCompressType::type* compress_type); static bool is_format_support_streaming(TFileFormatType::type format); + + static bool is_compressed_load(TFileCompressType::type compress_type, + TFileFormatType::type format_type); }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp index 724e9f3cfc9093..c8c4cc201de83d 100644 --- a/be/test/util/load_util_test.cpp +++ b/be/test/util/load_util_test.cpp @@ -106,6 +106,22 @@ TEST_F(LoadUtilTest, ParseTest) { EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type); EXPECT_EQ(TFileCompressType::DEFLATE, compress_type); } + { + // CSV, ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("CSV", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } + { + // "", ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } { // JSON, "" TFileFormatType::type format_type; @@ -178,6 +194,14 @@ TEST_F(LoadUtilTest, ParseTest) { EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type); EXPECT_EQ(TFileCompressType::DEFLATE, compress_type); } + { + // JSON, ZSTD + TFileFormatType::type format_type; + TFileCompressType::type compress_type; + LoadUtil::parse_format("JSON", "ZSTD", &format_type, &compress_type); + EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type); + EXPECT_EQ(TFileCompressType::ZSTD, compress_type); + } { // JSON, unkonw TFileFormatType::type format_type; @@ -204,4 +228,21 @@ TEST_F(LoadUtilTest, ParseTest) { } } +TEST_F(LoadUtilTest, IsCompressedLoadTest) { + EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::PLAIN, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::ZSTD, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_TRUE( + LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_CSV_GZ)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN, + TFileFormatType::FORMAT_CSV_DEFLATE)); + EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::SNAPPYBLOCK, + TFileFormatType::FORMAT_CSV_PLAIN)); + EXPECT_FALSE( + LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_JSON)); +} + } // namespace doris diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.zst b/regression-test/data/load_p0/http_stream/test_compress.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..34c3da9c54ef04def6abc52997456bd5643eaaca GIT binary patch literal 42 xcmdPcs{fZI;w1xvp-!TXp#hhXPLht10hh5(vW~FJOJ@G3c&yX literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out index 99954d0853c950..d0f44b2055f87a 100644 --- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out @@ -3,12 +3,16 @@ 1 a 10 1 a 10 1 a 10 +1 a 10 +2 b 20 2 b 20 2 b 20 2 b 20 3 c 30 3 c 30 3 c 30 +3 c 30 +4 d \N 4 d \N 4 d \N 4 d \N diff --git a/regression-test/data/load_p0/stream_load/basic_data.csv.zst b/regression-test/data/load_p0/stream_load/basic_data.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..b4a541578bd7bcee92fced6edc28c74b54ff3c11 GIT binary patch literal 4207 zcmV-#5RmUEwJ-f-bQukU0Qwx+GAiICkkUD;{in$YAgECaziw?%l&-zM{=NSJnCzA< zE5fb~6ml423+#!axmru#Zq-`#Gt_T(}>1Cxi>RyzejN`%@XCLfO+HYqz&#D4|Plhx2`WR{3!5%UEp{V}J->aKhS1 z4;^d^jW{6^=~85+twAwhN-7iwhNwciSlk2qA+*n-LcqvMn8t#hCX8#~^FW#h4a8$T zBnoF8M1dhws7z%v5JJcp4h@A*nhcf7Y-$r17$8f5jh-UHrm>N1zA-0r_1l)*1mx=J z;-A!Zhg;2m3u7;yn_v2S(0j2``cBm+^?fgVsxZ3td{IW}%kibuj{ObigHj)}{&e_w zs(h}Z3=rWg_rg$$gn$S|CvgDL`|p`Wa~BYTrF?KYst3aCUmRU1?-0IkzqA; zAtBU$=|A7f7vJLL>qn?vP5+i27q0AE&2^OwRl<$G4|U~Nv*mlwysl9hMg zSCip$t|rfcgbTuyTScv!mFIi1f2S_w$BuIUXWaPE7Yyb0&ezZGizuN!>x=O1IyZdo z=lQ=cA7tM5{-R3RQ{}>b->#B=-fGzXRngeTG&j38}+sKzoU6Jd5+u`Si7Rma`s*5#EhHF`ClT*S8u zBlN}2h7Wi3-^z8lDrKC`QJ=T3eta^@XDA;>_;0HT{lT5gGC%~zg|3E#rRZSUwp|8X zHtJ}jiT3B(E{sMxAh?j;0Rdu&&Yz?yFbMqvCi1$8N3%gJ_K<+_Y+KYthzaA35C|gS zVA%qR0VD8nc#?-9j|S30g9FJY(3l(uFvq9zkRJsSLDmP-re`##}7)VP=BEexu14d^FgTn7ExbL-w6Mi zIkMacxncjl=l$aI_;#*F`R~P_(Kqh&YOBZrA`}~TIe@s(2$R?@O)EhXSm@xxxdx!% zJe~}K!nD!oGU|jbZFJc%)Dq3?!%-r+RDRwbFgxZ;sE; zqboD_W5|nTjj!K2Z{%Ju)YmfZYliI4R<QlG z)mlwBw`6VISiOE<&F#p~7slWBSQic3JFet^7=Pb;``GvW$9~GyeAXyHtTPg63>pfKV!*^QPe8x|LTzy)+6*8XL>G+!#UQhJbfo0KIUZ7)a-O3RfDIr< z%K* zyoGSGJA6>%^MlXfgdUam6-Ipz{h#jSm(cb7+bg|77uRtYiB!@sQCEs2u2d?kD6Xw^ zbsV-S2F%MrPZ@(J6j!V-h6HUe3LFRpk4wZLY=h(U)WK9v8-@Z&5CoXgKZb@ck0eGm zosXCxJ`PI8p?Q$^FlkIU%;-$iQQ#&icwtQwc~T7`$v+exoynY_>1(Yw$7PKkSH@T9 z4mG}pv19LoTUVw3$M_cN&o%NhKi7Wy8Xr%8`;z_r^+k^_C7cTTVdRcGw^^C1=m8@9 zJDI5TStxI%?%FVt2y{{qVq^jhkK&X zK#4JFg-m=%Iu}MmYH-_0BBv*Kh&sbuHZr8iAe}4&F)4TmG)cjnqWRj2>+Ec`TC54? z$WS@%$0(od)djb|l?vrTWAu;8|1Q;EM%cs8#d;5em_W zv%o?m(FK74AiG4^HacnCa7@DvFXvyF!Wqvq6%O`g|B14GO3U$~B0Gi}b)G?+5;6 zV#zcLK|q&-heik5L^uOU0VqPCAicDa2JW+I&`+&}{@JhfNq;ZlOZF$_+c#S^Jv;-x;RAut}bJ^KFS|YhMg1%0U=SQ zT_R9r0W&oo2rL%V2gM8>sQYb3emO|fRf3b4IS@H`6uhBO={^Mcpi-$_&Sfy!EMS@@ zD!}WsTB9+dC*shAJ&6JZKyj4FO{Sw5ix_ zkDibcw&57SG>;A@i)>I*;uHs4M37BlqAYko8-0rwa_A_J4Au|>38zPfsZ4}Pne5hg z+={EU;I==6@TDqmLw<{8*!o(dFF1DG=7v(Pd{Z4he3t!Eu5xS`CzLL$`LAJIkCsUi z*@r7(Mx%@~Y#B#_cEBh_Is>ioXg)kJ`8a485HPcji^7E666sB5kx}=-4*v)Sat@sX z2$F21ljdClkYi}648e2Cl7K+43MYh)B?XZKnfcsem0R(97If=3!7AfQ_%GvBf7Pck zM);ubL+Q)%IYR#Jm+!gz)c4OG{dubwAi{HP4cur`V5d>Yb72?=AQ(tYxTGu--ZUIj zfnyZ13M7;uTG-YB!WcLu2Z|*_!5TD~azZ5YgKKpwSKTfjoVwWk{fd=)s;c@gII5hh z`y@kG2;u96-uutreAzDlA$+?NYIEGStHUHl9xc0Ykp$-1Mp(C90vv-xsB~2{4Vxsi z&!~Vwgo!vT)LmGE;iGBC2Z3%QxkucDg^Uq$*gOIP*-LGDnMMgRl(5-2kb+T1R!#+7 z%3dx+0f9-P6bw+Jg$+sqF+oh0hKZm{aj`=J*qqmVYN=aK6TDT_3ToACJzK1j9j^PT zYd`E6AAI+P{N-I=7~}KBcE4)&I>QHH%m3D?ao_tcR9NYIJN@yq%cr}1ak32)F%qAy zG{PvftkXuJEOl)S14#$UzQfrlD8dE8ut6}tJ)lw?O(0Zy4FZmYg@%ZLMdNe|NS6f@ zuv8c_&gig^NJD}N7?T!~l!tK9e)4d_9t=(=Ly3A|a`1Quq0z_)SOl^XM;jclMn&t8 zG@4LBFJK^L?drACt=N?lkstH-`fO(P=+mFS2p9fOMvrhZRM{%KA4C6U=>Fd_K7|is zLzh1zZ|wfxZH}-s`Xn}Ej>A3z9>su}@Nv8fY1<^g17*10M3E$cF2P5t^vHk`AQ1DQ zJvxU5#*yA7g7R=22CZpiq=QN{en6Bo4k+}PWFODqG9U9`O1Jh|yopt9>!0L`x1R@r zgQ%exL{K4QW<*3pBuSDsafHMmVj#uz0TBQuQ$~|ynFEJKNVP#IhGe@VB^8zZ8v+R6 zBdSQ8KahAW(`B}|>FyY!dlbSo;om@B0~K8-T--&LLeF?0(0W{>*;a-C4A!ttMd6^P z%pOHb`?Lc{r3G4GSlkTk2LO{0V6k|^*cDhfqsTc>45o+QIAIID-cahpmriA&9Ksh0 z1R~1d3*{ux?IMx3%^dYR$o`ELTykdJFjXF;G7@3HUX*B;p2aBm0;1(A;hsFZhN}Hw z1f)V!ARSS{SRn#V0~4$-pmwQEDHJPyblBoGKfqq<$#pAOQCIXlm#y- za=xWTGj*L|b$ai0JPFH)aE|$oP>0@7001DIq=mO?24s!#mkr@Yujh%V%5`^C7^@aT zey%1&BaN*eYG@BLCB_AmTZpz-e(@kEk&9&Pe6XP&7q}VLM9Y~!gzrft2#GW8N-UUI zqJYAYNLv9@XR+qt;6HJ?d1GVv^@dJRsVo?34KtLRMjB>vI#pYQL)f4`kEvJ@=-%)< zw(^#o@IN4oIFN7>Tk#fYa7681#msArHA04s*S687GFm(v6z-BZYIC2U>?(U3{V}G8Owqxj>8J$?=JEkDa!88`rXd(8Ur?+G-f659B=`=edL_I%KoS`+Dh8Rq>VK4xS z2;x>!FlvM#u|bGNA=ONh9)y{0atI1QmPt$wh1$IkX*nAVpCw#?RtyL>k%>XsnB!qE#L_JBQP&<#1~Bq1uYfpC^Q z(kXo(MEXjf44M^*-aD#-w?rNlS$Yc+LG?7CnC6n7VS{@$zQZ~}ep-*c)w$HG`i`#Z zJ|fY_7@*azk4$?~3x5azO%G53;M-Q>10+5YK~iSc^93|fQfTy?X~ZUGjAmml(fADI zcdd{I;SHu_U$zrhM(~RcC>C}U5yk}0JwFlK{)O==89i{@u{;Rc3wPuiLkEQZRslo) Fflk1kw8j7c literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst new file mode 100644 index 0000000000000000000000000000000000000000..bed19ec3346f66fe384f1188c55b4b6cfe0ff135 GIT binary patch literal 3461 zcmV;04SMn@wJ-f-YazXB0Lsv8BqrcQXyta&009?}h6kU~NCc9bC{5=X+LV-(R38F) zM>EVYGsGANF#;z7IRf|1ci*4!k@K<&1x4g)cnztcptV4%U=)&qi-w^N6$5HCk zt6Lz6F4!;IT*MGV#PYJ+%ufIj({0Mfhs}nK{^*B>sfax~duz^n?f<&vsj{~0*4$Xx z-}9O`dVB2dArlD$f;2HzDEw(Xjc^$z4@5|1NhBOAg4hgzR)jfWS`7S{BpFIIGKlta zT_yttMh79p0wYNlCggfu@TK_#h0dnX9P1A>2@OQ!3C8>|Jj7J1yluCx*n8K{r~B<2W4^a``s&)-?KblqJ)4Ilnk0BuiPM8Zae-W^kP@k`W&m#( z+G?Zf$X^Ud44(`qGLw{#8X(u}g~-_yL^KJW66Q0#VPPq;P|Wl)K7mHe53y{RDQ1fT zeu<9n&4=uU`iY9leE8OUp10qN$f@nAp6=*vI}_J=|60>)$i3N$>WtbBaUXyJKIBp* zWKNhx1@k}*i178P28KpV2$)z5crc8PCFIb!8iXZ_q>vg7kd-9iBhs8xN+X7pgmLj` zYXE`82w5h6(43sPW{LujSS}`px|lU*hXSaWreuHt5Zl{bCU)2owQ~=ct+#rQSX(~i ztLDGsp0YY>YPW`Mo!i^9skergul?PP+bt{mz9+WYdijX`VVZ|03P3=B05Ag($O5<% z1wcTl?T~-%aa%TDcds(mi_MC@+?1PpcUwgLbvYPJoCMT^n(1*;2q7#CaWray)2d9T zrL&MICgDQDVoeN2EzUwgX@&YtWkAIsRYHUcAg(`VJpsSX&I9KC>LTN(9 z@^V8@cD?i4?teV)PHk0eho6r8kJzD7P`?lrg7Hd`S_q~A7NSYNiZomnr54J#`m*-K zvK?OA--9sK$U4GPM&%fvGOT=|7dpWU0=Mkw=dCx*i z63R;Cp<206td}+B#Pl%D8nZhAygbc=YJOvj2VC^$yY$!A$-S8Na{Ft}4RiIh8~JxJ zG4^b{zqZZzAYAH`8c1boLQzdzj?f7Nd2kk)g6$ljN)vp9{N)(sg_IFU0>nbePpgw~ ztiI)`KoY68*;pE`66V92xQq32g($N?VJs1s>LWr2B}Cu|K{}L)C4mpbOS4&P_9)O< z)9g@SGm*a?I90#3pWn5$+unBWmzd}c`TB2;F*i9IM zf{~vrj!i{An1&X5P6!qbp^{SHSXnqR5;Y(kPbk5pU{Qb=-1Nx?W5JY=K#7_^Fe`?K zX91cpn!f^pbigfEDK)?wxap7(AJjyXkl+{?SO6hbnIUORnK4+-P!+M)#PTp@#uQ-0 z+F{xH`uuR&se7uJ?%~fHe!FMpXT$H*ZtT@@mvw*n7cto!*BP;|J#P2yBJSel5Rt}( zkW%7tplFmHED_`sf+S2RmR04|IUy()3b{y;0TOPF9v+hG(^MqS`$NHDk)Rl?1BOft z76T*%!2qYUaZJJnKazt^b73HfRp+u>-^2JY^*wf6Z0Fec(AZjH@0i=mt*wmhcZpr% zvqHmVflx9)s2rUE#e6WJ9EgAzFo;%Pa8b!bk|_?=gedI~8M$D0SQ^F9F=!E_!NkJs zM$srl>2-N{z956rL*I+RzV4P5dXV!4iZhyU+wMn|mWAG#%D**w3~QN8Ul^X=)O zd-6AX<|2lR09x$C)Sy9VgHzb}O43XURW;+KVq8QBjkcUhj1)t{gEDQ!;rehE1&T>r z8pA~aME(uKu|kN9NccRuQ9!r^f?#n#z^J_C023(Gu{xFstHB~^Q|n<-`64j6 z`9R*G>9}Ln0xcw$1!fz2iXUCZV5QG;9q_9}XxFRktwN4uo!vk3# z@V&fhHDJ^bRSZsvlt|-16+j3?jz^}k3LOh5kY=Goa1M~7RN%T)$&s=o2tEy)0#+&b z3jz8#5GUI3WYhq05)C#Vr~*%8bIEKsQ$&n2!^<)s`|lfej(Z2+zjnYrMeL6^dDUHS z8?U$amRR$)w)lD)mI5SI15}h!Bx7N47$P1LVxVH8P%a7ztP_I4^I|CY$<-jOLY)Ij zVlhBzgOmXx7zH6zJCp=vl7KcPmuI!PEHp3%lu7I3NEHldD5Y^2O5`AW&2AC59e(lp z`?9qIwm0JWVnc3tt$jY@`EUE?saUto$2uD%B?@_mqsiPsrJ>N!uz17C_2QHiBh@Y_ z76=Io4h5ye=@6JU6pw;3QMfQ7|-gjAecVDKa+rH%=M84+lLM|63Ld1rPh@QLN}ejnJkJ%8&RJe#sBGPAq3-)ryJ z=H`vB>gvdNx!AXY0YONj8BtD{0tCGW^9Uo2MHCv~+<>qdsI*d_#_6DzG#XGLBAU58 z1Jc=8Fs1|wQxFHs9}JR)EEP=~VM2{g=+anaK93TFfW*1^+fsSE9q!-p{V(0Q9dG+K zMs@bqcYoTCy)LSD0jPBPCq+urP>e@}gb-rH(C|?xLi=jC&1wKb8zE>KI1PdUoi zKA`F%XK%MQcRl3`f*Yuk5|dJnAf=4Rh!mrO;zA1&24WBRPME%=8b^_FTfR?y{)?W2wxuWxS}$a{n!9G{;5lYQh^bNgXsHBdN|FAdu=Z9QeiL*YzzhOcNa^FqO6+htKMI zvYKbc5Wb8Qywy(uFs{3c>Dg34LX^LJSExKl$;{28n`dZm>zH-cs#SU0;N}xV1Emqg z(|@{KiZmAUK7zrwS$JQ=%PA-+0t})$t8(t6lLFhS>Ch*3C+oT>{B&PLhU%BiE-91lNrU+T%QhXT?x>pT`2M(idvImEB3Q zz=!ugrZ3QL8d&G_a}7KyS>zvSc523qPM5~hU?IrHb745u%V&UsOfTn|Df+~DL^l#D z>^zou_C;^kpdN6i<}9?bVS>xhqH%!8g9L2QhColp+-1`a*>z zjebGD)v%_63w~rI30#+8`hCV_TEYGvIDP;FE!2y}h8cdPvRF0`0&{*B_~KRq(R?mQ n%7@3s=@87{C!xs)z}~9kgg^}rzApa50^s`|IKL_ZNV*}}w{?ev literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.zst b/regression-test/data/load_p0/stream_load/test_compress.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..34c3da9c54ef04def6abc52997456bd5643eaaca GIT binary patch literal 42 xcmdPcs{fZI;w1xvp-!TXp#hhXPLht10hh5(vW~FJOJ@G3c&yX literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out b/regression-test/data/load_p0/stream_load/test_compress_type.out index 56d195c569edca..92a8f1b82a6c0f 100644 --- a/regression-test/data/load_p0/stream_load/test_compress_type.out +++ b/regression-test/data/load_p0/stream_load/test_compress_type.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -160 +200 diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out index 3e5d73452d08b3..fac63335a7e58d 100644 --- a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out +++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out @@ -3,12 +3,16 @@ 1 a 10 1 a 10 1 a 10 +1 a 10 +2 b 20 2 b 20 2 b 20 2 b 20 3 c 30 3 c 30 3 c 30 +3 c 30 +4 d \N 4 d \N 4 d \N 4 d \N diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index b160b478c8e06c..ad07ee55d74971 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -94,9 +94,11 @@ suite("test_group_commit_http_stream") { """ // stream load with compress file - String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame"} //, "deflate"} + String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame", "zstd"} //, "deflate"} for (final def compressionType in compressionTypes) { - def fileName = "test_compress.csv." + (compressionType.equals("lz4frame") ? "lz4" : compressionType) + def fileSuffix = compressionType.equals("lz4frame") ? "lz4" : compressionType + fileSuffix = fileSuffix.equals("zstd") ? "zst" : fileSuffix + def fileName = "test_compress.csv." + fileSuffix streamLoad { set 'version', '1' set 'sql', """ @@ -249,7 +251,7 @@ suite("test_group_commit_http_stream") { } } - getRowCount(19) + getRowCount(26) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " // group commit http stream (SELECT * FROM http_stream(...)) should not register load jobs diff --git a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy index eeab7e809754b1..5e2870fd4c010b 100644 --- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy +++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy @@ -196,6 +196,49 @@ suite("test_stream_load_compress_type", "load_p0") { } } + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', 'csv' + set 'compress_type', 'zstd' + + file "basic_data.csv.zst" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'compress_type', 'zstd' + set 'read_json_by_line', 'true' + + file "basic_data_by_line.json.zst" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + // no compress_type streamLoad { table "${tableName}" diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index 342b87f13960c0..7d7afc71f7ee34 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -98,9 +98,9 @@ suite("test_group_commit_stream_load") { """ // stream load with compress file - String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4"} //, "deflate"} + String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4", "zstd"} //, "deflate"} for (final def compressionType in compressionTypes) { - def fileName = "test_compress.csv." + compressionType + def fileName = "test_compress.csv." + (compressionType.equals("zstd") ? "zst" : compressionType) streamLoad { table "${tableName}" @@ -227,7 +227,7 @@ suite("test_group_commit_stream_load") { } } - getRowCount(21) + getRowCount(25) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " } finally { // try_sql("DROP TABLE ${tableName}")