Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/paimon/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ class PAIMON_EXPORT Catalog {
/// @note This does not check whether the table actually exists.
///
/// @param identifier The table identifier containing database and table name.
/// @return A string representing the expected location of the table.
virtual std::string GetTableLocation(const Identifier& identifier) const = 0;
/// @return A result containing the expected location of the table, or an error status on
/// failure.
virtual Result<std::string> GetTableLocation(const Identifier& identifier) const = 0;

/// Returns the root path of the catalog.
///
Expand Down
6 changes: 6 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,13 @@ struct PAIMON_EXPORT Options {
/// "blob-external-storage-path" - The external storage path where raw BLOB data from fields
/// configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is
/// not applied to this path. No default value.
/// @note: this option differs from the Java paimon and will be deprecated once
/// RestCatalog is supported.
static const char BLOB_EXTERNAL_STORAGE_PATH[];
/// "blob-view-upstream-warehouse" - Since the catalog capabilities are partially missing, when
/// Blob View is enabled, cpp paimon cannot automatically obtain the upstream table warehouse
/// path and requires manual configuration by the user. No default value.
static const char BLOB_VIEW_UPSTREAM_WAREHOUSE[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
Comment thread
lszskye marked this conversation as resolved.
/// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No
Expand Down
3 changes: 3 additions & 0 deletions include/paimon/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class PAIMON_EXPORT Executor {

/// Shutdown the executor immediately, discarding all pending tasks.
virtual void ShutdownNow() = 0;

/// Get thread number.
virtual uint32_t GetThreadNum() const = 0;
};

} // namespace paimon
26 changes: 15 additions & 11 deletions src/paimon/common/data/blob_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "fmt/format.h"
#include "paimon/common/data/blob_defs.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_view_struct.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/string_utils.h"
Expand Down Expand Up @@ -127,16 +128,18 @@ std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
std::make_shared<arrow::KeyValueMetadata>(metadata));
}

Status BlobUtils::ValidateInlineBlobDescriptors(
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_descriptor_fields) {
if (inline_descriptor_fields.empty()) {
Status BlobUtils::ValidateBlobInlineFields(const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& field_names,
const std::string& config_label) {
if (field_names.empty()) {
return Status::OK();
}
if (!struct_array) {
return Status::Invalid("array in ValidateInlineBlobDescriptors must be a struct_array");
return Status::Invalid("array in ValidateBlobInlineFields must be a struct_array");
}
for (const auto& field_name : inline_descriptor_fields) {

bool is_descriptor = (config_label == "blob-descriptor-field");
for (const auto& field_name : field_names) {
auto field_array = struct_array->GetFieldByName(field_name);
if (!field_array) {
continue;
Expand All @@ -152,12 +155,13 @@ Status BlobUtils::ValidateInlineBlobDescriptors(
continue;
}
auto value = binary_array->GetView(row);
PAIMON_ASSIGN_OR_RAISE(bool is_descriptor,
BlobDescriptor::IsBlobDescriptor(value.data(), value.size()));
if (!is_descriptor) {
Result<bool> valid = is_descriptor
? BlobDescriptor::IsBlobDescriptor(value.data(), value.size())
: BlobViewStruct::IsBlobViewStruct(value.data(), value.size());
PAIMON_ASSIGN_OR_RAISE(bool is_valid, std::move(valid));
if (!is_valid) {
return Status::Invalid(fmt::format(
"BLOB inline field {} configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.",
"BLOB inline field {} require values to be set as corresponding type.",
field_name));
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/data/blob_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class PAIMON_EXPORT BlobUtils {
const std::string& field_name, bool nullable = false,
std::unordered_map<std::string, std::string> metadata = {});

static Status ValidateInlineBlobDescriptors(
const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& inline_descriptor_fields);
static Status ValidateBlobInlineFields(const std::shared_ptr<arrow::StructArray>& struct_array,
const std::set<std::string>& field_names,
const std::string& config_label);

/// Converts inline blob DataFields from large_binary to binary type.
/// Inline blob fields use large_binary in the table schema (because they are BLOB type),
Expand Down
133 changes: 102 additions & 31 deletions src/paimon/common/data/blob_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include "arrow/api.h"
#include "arrow/c/bridge.h"
#include "gtest/gtest.h"
#include "paimon/catalog/identifier.h"
#include "paimon/common/data/blob_defs.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_view_struct.h"
#include "paimon/common/types/data_field.h"
#include "paimon/data/blob.h"
#include "paimon/memory/memory_pool.h"
Expand All @@ -29,44 +31,47 @@
namespace paimon::test {

class BlobUtilsTest : public ::testing::Test {
private:
public:
std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
std::unordered_map<std::string, std::string> blob_metadata_map = {
{BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
}

private:
std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
};

TEST_F(BlobUtilsTest, IsBlobMetadata) {
auto correct_metadata = CreateBlobMetadata();
EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
ASSERT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
std::unordered_map<std::string, std::string> wrong_metadata_map = {
{BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
auto wrong_metadata = std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
std::unordered_map<std::string, std::string> no_extension_metadata_map = {
{"other_key", BlobDefs::kExtensionTypeValue}};
auto no_extension_metadata =
std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
ASSERT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
}

TEST_F(BlobUtilsTest, IsBlobField) {
std::shared_ptr<arrow::Field> blob_field = BlobUtils::ToArrowField("f1", true);
EXPECT_TRUE(BlobUtils::IsBlobField(blob_field));
ASSERT_TRUE(BlobUtils::IsBlobField(blob_field));

auto int_field = arrow::field("i_int", arrow::int32());
EXPECT_FALSE(BlobUtils::IsBlobField(int_field));
ASSERT_FALSE(BlobUtils::IsBlobField(int_field));

auto binary_field_no_meta = arrow::field("b_no_meta", arrow::large_binary());
EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));
ASSERT_FALSE(BlobUtils::IsBlobField(binary_field_no_meta));

auto wrong_meta = std::make_shared<arrow::KeyValueMetadata>(
std::unordered_map<std::string, std::string>{{"other_key", "value"}});
auto binary_field_wrong_meta =
arrow::field("b_wrong_meta", arrow::large_binary(), false, wrong_meta);
EXPECT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
ASSERT_FALSE(BlobUtils::IsBlobField(binary_field_wrong_meta));
}

TEST_F(BlobUtilsTest, SeparateBlobSchema) {
Expand Down Expand Up @@ -233,7 +238,7 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsEmptyFields) {
auto struct_array =
arrow::StructArray::Make({array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateInlineBlobDescriptors(sa, {}));
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {}, "blob-descriptor-field"));
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsFieldNotPresent) {
Expand All @@ -245,22 +250,21 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsFieldNotPresent) {
arrow::StructArray::Make({int_array}, {arrow::field("f0", arrow::int32())}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
// "b0" does not exist in the struct -> should pass
ASSERT_OK(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}));
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"b0"}, "blob-descriptor-field"));
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithValidDescriptor) {
// Valid BlobDescriptor bytes -> OK
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);
auto serialized = descriptor->Serialize(pool_);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}));
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"b0"}, "blob-descriptor-field"));
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithNullValue) {
Expand All @@ -271,7 +275,7 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithNullValue) {
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}));
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"b0"}, "blob-descriptor-field"));
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithRawBytes) {
Expand All @@ -282,36 +286,29 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsWithRawBytes) {
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobInlineFields(sa, {"b0"}, "blob-descriptor-field"),
"BLOB inline field b0 require values to be set as corresponding type.");
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMixedValidAndInvalid) {
// First row is valid descriptor, second row is raw bytes -> error on row 1
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);

auto serialized = descriptor->Serialize(pool_);
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
ASSERT_TRUE(builder.Append("raw_bytes_not_descriptor").ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("b0")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0"}),
"BLOB inline field b0 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobInlineFields(sa, {"b0"}, "blob-descriptor-field"),
"BLOB inline field b0 require values to be set as corresponding type.");
}

TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMultipleFields) {
// Two inline fields: b0 is valid, b1 has raw bytes -> error on b1
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);
auto serialized = descriptor->Serialize(pool_);

arrow::LargeBinaryBuilder b0_builder;
ASSERT_TRUE(b0_builder.Append(serialized->data(), serialized->size()).ok());
Expand All @@ -327,9 +324,83 @@ TEST_F(BlobUtilsTest, ValidateInlineBlobDescriptorsMultipleFields) {
.ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(
BlobUtils::ValidateInlineBlobDescriptors(sa, {"b0", "b1"}),
"BLOB inline field b1 configured by blob-descriptor-field or blob-view-field "
"require values to be a BlobDescriptor or BlobViewStruct.");
BlobUtils::ValidateBlobInlineFields(sa, {"b0", "b1"}, "blob-descriptor-field"),
"BLOB inline field b1 require values to be set as corresponding type.");
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsEmptyFields) {
// Empty view_fields -> always OK
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append("random_data").ok());
auto array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {}, "blob-view-field"));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsFieldNotPresent) {
// Field not in struct_array -> skip, OK
arrow::Int32Builder int_builder;
ASSERT_TRUE(int_builder.Append(42).ok());
auto int_array = int_builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({int_array}, {arrow::field("f0", arrow::int32())}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"view"}, "blob-view-field"));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithValidViewStruct) {
// A BlobViewStruct value is accepted for a view field.
BlobViewStruct view_struct(Identifier("db", "tbl"), /*field_id=*/2, /*row_id=*/5);
auto serialized = view_struct.Serialize(pool_);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"view"}, "blob-view-field"));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithNullValue) {
// Null values in view column -> skip, OK
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.AppendNull().ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_OK(BlobUtils::ValidateBlobInlineFields(sa, {"view"}, "blob-view-field"));
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsWithRawBytes) {
// Raw bytes -> error
arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append("raw_bytes_not_view").ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobInlineFields(sa, {"view"}, "blob-view-field"),
"BLOB inline field view require values to be set as corresponding type.");
}

TEST_F(BlobUtilsTest, ValidateBlobViewFieldsRejectsBlobDescriptor) {
// A BlobDescriptor value is NOT accepted for a view field.
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto descriptor, BlobDescriptor::Create("file:///tmp/test.bin", 0, 100));
auto serialized = descriptor->Serialize(pool);

arrow::LargeBinaryBuilder builder;
ASSERT_TRUE(builder.Append(serialized->data(), serialized->size()).ok());
auto blob_array = builder.Finish().ValueOrDie();
auto struct_array =
arrow::StructArray::Make({blob_array}, {BlobUtils::ToArrowField("view")}).ValueOrDie();
auto sa = std::dynamic_pointer_cast<arrow::StructArray>(struct_array);
ASSERT_NOK_WITH_MSG(BlobUtils::ValidateBlobInlineFields(sa, {"view"}, "blob-view-field"),
"BLOB inline field view require values to be set as corresponding type.");
}

TEST_F(BlobUtilsTest, TestConvertBlobInlineDataFields) {
Expand Down
46 changes: 46 additions & 0 deletions src/paimon/common/data/blob_view_struct_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,50 @@ TEST_F(BlobViewStructTest, TestEqual) {
}
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructValid) {
auto serialized = view_struct_.Serialize(pool_);
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_TRUE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithTooShortBuffer) {
// Buffer shorter than 9 bytes should return false
std::vector<char> short_buffer = {0x02, 0x43, 0x53, 0x45, 0x44, 0x42, 0x4F, 0x4C};
ASSERT_OK_AND_ASSIGN(
bool result, BlobViewStruct::IsBlobViewStruct(short_buffer.data(), short_buffer.size()));
ASSERT_FALSE(result);

// Empty buffer
ASSERT_OK_AND_ASSIGN(bool empty_result, BlobViewStruct::IsBlobViewStruct(nullptr, 0));
ASSERT_FALSE(empty_result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithFutureVersion) {
// Version > CURRENT_VERSION should return false (not an error)
auto serialized = view_struct_.Serialize(pool_);
(*serialized)[0] = '\x02'; // set version to 2 (> CURRENT_VERSION)
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithWrongMagic) {
// Wrong magic number should return false
auto serialized = view_struct_.Serialize(pool_);
// Corrupt the magic bytes (bytes 1-8)
(*serialized)[1] = '\x00';
(*serialized)[2] = '\x00';
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(serialized->data(), serialized->size()));
ASSERT_FALSE(result);
}

TEST_F(BlobViewStructTest, TestIsBlobViewStructWithRandomData) {
// Random data that doesn't match format
std::vector<char> random_data = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
ASSERT_OK_AND_ASSIGN(bool result,
BlobViewStruct::IsBlobViewStruct(random_data.data(), random_data.size()));
ASSERT_FALSE(result);
}
} // namespace paimon::test
Loading
Loading