Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions be/src/service/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "service/http/action/http_stream.h"

#include <algorithm>
#include <cstddef>
#include <future>
#include <sstream>
Expand Down Expand Up @@ -55,6 +56,7 @@
#include "storage/storage_engine.h"
#include "util/byte_buffer.h"
#include "util/client_cache.h"
#include "util/load_util.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
Expand All @@ -63,6 +65,26 @@
namespace doris {
using namespace ErrorCode;

namespace {

bool is_compressed_file_scan(const TPipelineFragmentParams& params) {
if (!params.__isset.file_scan_params) {
return false;
}
return std::ranges::any_of(params.file_scan_params, [](const auto& file_scan_param) {
const auto& file_scan_params = file_scan_param.second;
TFileCompressType::type compress_type = file_scan_params.__isset.compress_type
? file_scan_params.compress_type
: TFileCompressType::UNKNOWN;
TFileFormatType::type format_type = file_scan_params.__isset.format_type
? file_scan_params.format_type
: TFileFormatType::FORMAT_UNKNOWN;
return LoadUtil::is_compressed_load(compress_type, format_type);
});
}

} // namespace

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS);
Expand Down Expand Up @@ -387,13 +409,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
http_req->header(HttpHeaders::CONTENT_LENGTH),
e.what());
}
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
if (is_compressed_file_scan(ctx->put_result.pipeline_params)) {
content_length *= 3;
}
}
Expand Down
8 changes: 1 addition & 7 deletions be/src/service/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
http_req->header(HttpHeaders::CONTENT_LENGTH),
e.what());
}
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
if (LoadUtil::is_compressed_load(ctx->compress_type, ctx->format)) {
content_length *= 3;
}
}
Expand Down
25 changes: 25 additions & 0 deletions be/src/util/load_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co
*compress_type = TFileCompressType::LZO;
} else if (iequal(compress_type_str, "BZ2")) {
*compress_type = TFileCompressType::BZ2;
} else if (iequal(compress_type_str, "ZSTD")) {
*compress_type = TFileCompressType::ZSTD;
} else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) {
*compress_type = TFileCompressType::LZ4FRAME;
} else if (iequal(compress_type_str, "LZ4_BLOCK")) {
Expand All @@ -62,6 +64,8 @@ void LoadUtil::parse_format(const std::string& format_str, const std::string& co
*format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (iequal(compress_type_str, "BZ2")) {
*format_type = TFileFormatType::FORMAT_CSV_BZ2;
} else if (iequal(compress_type_str, "ZSTD")) {
*format_type = TFileFormatType::FORMAT_CSV_PLAIN;
} else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, "LZ4FRAME")) {
*format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (iequal(compress_type_str, "LZ4_BLOCK")) {
Expand Down Expand Up @@ -108,4 +112,25 @@ bool LoadUtil::is_format_support_streaming(TFileFormatType::type format) {
}
return false;
}

bool LoadUtil::is_compressed_load(TFileCompressType::type compress_type,
TFileFormatType::type format_type) {
if (compress_type != TFileCompressType::UNKNOWN && compress_type != TFileCompressType::PLAIN) {
return true;
}

switch (format_type) {
case TFileFormatType::FORMAT_CSV_BZ2:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_GZ:
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZO:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
return true;
default:
return false;
}
}
} // namespace doris
5 changes: 4 additions & 1 deletion be/src/util/load_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ class LoadUtil {
TFileCompressType::type* compress_type);

static bool is_format_support_streaming(TFileFormatType::type format);

static bool is_compressed_load(TFileCompressType::type compress_type,
TFileFormatType::type format_type);
};
} // namespace doris
} // namespace doris
41 changes: 41 additions & 0 deletions be/test/util/load_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ TEST_F(LoadUtilTest, ParseTest) {
EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type);
EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
}
{
// CSV, ZSTD
TFileFormatType::type format_type;
TFileCompressType::type compress_type;
LoadUtil::parse_format("CSV", "ZSTD", &format_type, &compress_type);
EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
}
{
// "", ZSTD
TFileFormatType::type format_type;
TFileCompressType::type compress_type;
LoadUtil::parse_format("", "ZSTD", &format_type, &compress_type);
EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
}
{
// JSON, ""
TFileFormatType::type format_type;
Expand Down Expand Up @@ -178,6 +194,14 @@ TEST_F(LoadUtilTest, ParseTest) {
EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
}
{
// JSON, ZSTD
TFileFormatType::type format_type;
TFileCompressType::type compress_type;
LoadUtil::parse_format("JSON", "ZSTD", &format_type, &compress_type);
EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
}
{
// JSON, unkonw
TFileFormatType::type format_type;
Expand All @@ -204,4 +228,21 @@ TEST_F(LoadUtilTest, ParseTest) {
}
}

TEST_F(LoadUtilTest, IsCompressedLoadTest) {
EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::PLAIN,
TFileFormatType::FORMAT_CSV_PLAIN));
EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
TFileFormatType::FORMAT_CSV_PLAIN));
EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::ZSTD,
TFileFormatType::FORMAT_CSV_PLAIN));
EXPECT_TRUE(
LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_CSV_GZ));
EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
TFileFormatType::FORMAT_CSV_DEFLATE));
EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::SNAPPYBLOCK,
TFileFormatType::FORMAT_CSV_PLAIN));
EXPECT_FALSE(
LoadUtil::is_compressed_load(TFileCompressType::PLAIN, TFileFormatType::FORMAT_JSON));
}

} // namespace doris
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
1 a 10
1 a 10
1 a 10
1 a 10
2 b 20
2 b 20
2 b 20
2 b 20
3 c 30
3 c 30
3 c 30
3 c 30
4 d \N
4 d \N
4 d \N
4 d \N
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
160
200

Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
1 a 10
1 a 10
1 a 10
1 a 10
2 b 20
2 b 20
2 b 20
2 b 20
3 c 30
3 c 30
3 c 30
3 c 30
4 d \N
4 d \N
4 d \N
4 d \N
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ suite("test_group_commit_http_stream") {
"""

// stream load with compress file
String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame"} //, "deflate"}
String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4frame", "zstd"} //, "deflate"}
for (final def compressionType in compressionTypes) {
def fileName = "test_compress.csv." + (compressionType.equals("lz4frame") ? "lz4" : compressionType)
def fileSuffix = compressionType.equals("lz4frame") ? "lz4" : compressionType
fileSuffix = fileSuffix.equals("zstd") ? "zst" : fileSuffix
def fileName = "test_compress.csv." + fileSuffix
streamLoad {
set 'version', '1'
set 'sql', """
Expand Down Expand Up @@ -249,7 +251,7 @@ suite("test_group_commit_http_stream") {
}
}

getRowCount(19)
getRowCount(26)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "

// group commit http stream (SELECT * FROM http_stream(...)) should not register load jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,49 @@ suite("test_stream_load_compress_type", "load_p0") {
}
}

streamLoad {
table "${tableName}"
set 'column_separator', '|'
set 'trim_double_quotes', 'true'
set 'format', 'csv'
set 'compress_type', 'zstd'

file "basic_data.csv.zst"
check {
result, exception, startTime, endTime ->
assertTrue(exception == null)
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("Success", json.Status)
assertEquals(20, json.NumberTotalRows)
assertEquals(20, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
assertTrue(json.LoadBytes > 0)
}
}

streamLoad {
table "${tableName}"
set 'format', 'json'
set 'compress_type', 'zstd'
set 'read_json_by_line', 'true'

file "basic_data_by_line.json.zst"
check {
result, exception, startTime, endTime ->
assertTrue(exception == null)
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("Success", json.Status)
assertEquals(20, json.NumberTotalRows)
assertEquals(20, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
assertTrue(json.LoadBytes > 0)
}
}

// no compress_type
streamLoad {
table "${tableName}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ suite("test_group_commit_stream_load") {
"""

// stream load with compress file
String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4"} //, "deflate"}
String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ "lz4", "zstd"} //, "deflate"}
for (final def compressionType in compressionTypes) {
def fileName = "test_compress.csv." + compressionType
def fileName = "test_compress.csv." + (compressionType.equals("zstd") ? "zst" : compressionType)
streamLoad {
table "${tableName}"

Expand Down Expand Up @@ -227,7 +227,7 @@ suite("test_group_commit_stream_load") {
}
}

getRowCount(21)
getRowCount(25)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
} finally {
// try_sql("DROP TABLE ${tableName}")
Expand Down
Loading