Skip to content

Commit 9b5a464

Browse files
authored
[Feature][external catalog/lakesoul] support lakesoul catalog (#32164)
Issue Number: close #32163
1 parent 074b87a commit 9b5a464

41 files changed

Lines changed: 2474 additions & 11 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,5 @@ lru_cache_test
133133

134134
# other
135135
compile_commands.json
136+
.github
137+
docker/runtime/be/resource/apache-doris/

be/src/common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ pch_reuse(Common)
2525

2626
# Generate env_config.h according to env_config.h.in
2727
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in ${GENSRC_DIR}/common/env_config.h)
28+
target_include_directories(Common PUBLIC ${GENSRC_DIR}/common/)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "lakesoul_jni_reader.h"
19+
20+
#include <map>
21+
#include <ostream>
22+
23+
#include "common/logging.h"
24+
#include "runtime/descriptors.h"
25+
#include "runtime/runtime_state.h"
26+
#include "runtime/types.h"
27+
#include "vec/core/types.h"
28+
29+
namespace doris {
30+
class RuntimeProfile;
31+
class RuntimeState;
32+
33+
namespace vectorized {
34+
class Block;
35+
} // namespace vectorized
36+
} // namespace doris
37+
38+
namespace doris::vectorized {
39+
LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
40+
const std::vector<SlotDescriptor*>& file_slot_descs,
41+
RuntimeState* state, RuntimeProfile* profile)
42+
: _lakesoul_params(lakesoul_params),
43+
_file_slot_descs(file_slot_descs),
44+
_state(state),
45+
_profile(profile) {
46+
std::vector<std::string> required_fields;
47+
for (auto& desc : _file_slot_descs) {
48+
required_fields.emplace_back(desc->col_name());
49+
}
50+
51+
std::map<String, String> params = {
52+
{"query_id", print_id(_state->query_id())},
53+
{"file_paths", join(_lakesoul_params.file_paths, ";")},
54+
{"primary_keys", join(_lakesoul_params.primary_keys, ";")},
55+
{"partition_descs", join(_lakesoul_params.partition_descs, ";")},
56+
{"required_fields", join(required_fields, ";")},
57+
{"options", _lakesoul_params.options},
58+
{"table_schema", _lakesoul_params.table_schema},
59+
};
60+
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/lakesoul/LakeSoulJniScanner",
61+
params, required_fields);
62+
}
63+
64+
Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
65+
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
66+
if (*eof) {
67+
RETURN_IF_ERROR(_jni_connector->close());
68+
}
69+
return Status::OK();
70+
}
71+
72+
Status LakeSoulJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
73+
std::unordered_set<std::string>* missing_cols) {
74+
for (auto& desc : _file_slot_descs) {
75+
name_to_type->emplace(desc->col_name(), desc->type());
76+
}
77+
return Status::OK();
78+
}
79+
80+
Status LakeSoulJniReader::init_reader(
81+
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
82+
_colname_to_value_range = colname_to_value_range;
83+
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
84+
return _jni_connector->open(_state, _profile);
85+
}
86+
} // namespace doris::vectorized
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstddef>
21+
#include <memory>
22+
#include <string>
23+
#include <unordered_map>
24+
#include <unordered_set>
25+
#include <vector>
26+
27+
#include "common/status.h"
28+
#include "exec/olap_common.h"
29+
#include "vec/exec/format/generic_reader.h"
30+
#include "vec/exec/jni_connector.h"
31+
32+
namespace doris {
33+
class RuntimeProfile;
34+
class RuntimeState;
35+
class SlotDescriptor;
36+
37+
namespace vectorized {
38+
class Block;
39+
} // namespace vectorized
40+
struct TypeDescriptor;
41+
} // namespace doris
42+
43+
namespace doris::vectorized {
44+
class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
45+
ENABLE_FACTORY_CREATOR(LakeSoulJniReader);
46+
47+
public:
48+
LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
49+
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
50+
RuntimeProfile* profile);
51+
52+
~LakeSoulJniReader() override = default;
53+
54+
Status get_next_block(::doris::vectorized::Block* block, size_t* read_rows, bool* eof) override;
55+
56+
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
57+
std::unordered_set<std::string>* missing_cols) override;
58+
59+
Status init_reader(
60+
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
61+
62+
private:
63+
const TLakeSoulFileDesc& _lakesoul_params;
64+
const std::vector<SlotDescriptor*>& _file_slot_descs;
65+
RuntimeState* _state;
66+
RuntimeProfile* _profile;
67+
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
68+
std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
69+
};
70+
} // namespace doris::vectorized

be/src/vec/exec/scan/vfile_scanner.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "vec/exec/format/parquet/vparquet_reader.h"
6262
#include "vec/exec/format/table/hudi_jni_reader.h"
6363
#include "vec/exec/format/table/iceberg_reader.h"
64+
#include "vec/exec/format/table/lakesoul_jni_reader.h"
6465
#include "vec/exec/format/table/max_compute_jni_reader.h"
6566
#include "vec/exec/format/table/paimon_jni_reader.h"
6667
#include "vec/exec/format/table/paimon_reader.h"
@@ -806,6 +807,13 @@ Status VFileScanner::_get_next_reader() {
806807
_file_slot_descs, _state, _profile);
807808
init_status =
808809
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
810+
} else if (range.__isset.table_format_params &&
811+
range.table_format_params.table_format_type == "lakesoul") {
812+
_cur_reader =
813+
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
814+
_file_slot_descs, _state, _profile);
815+
init_status = ((LakeSoulJniReader*)_cur_reader.get())
816+
->init_reader(_colname_to_value_range);
809817
} else if (range.__isset.table_format_params &&
810818
range.table_format_params.table_format_type == "trino_connector") {
811819
_cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,

build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
547547
modules+=("be-java-extensions/trino-connector-scanner")
548548
modules+=("be-java-extensions/max-compute-scanner")
549549
modules+=("be-java-extensions/avro-scanner")
550+
modules+=("be-java-extensions/lakesoul-scanner")
550551
modules+=("be-java-extensions/preload-extensions")
551552

552553
# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES
@@ -834,6 +835,7 @@ EOF
834835
extensions_modules+=("trino-connector-scanner")
835836
extensions_modules+=("max-compute-scanner")
836837
extensions_modules+=("avro-scanner")
838+
extensions_modules+=("lakesoul-scanner")
837839
extensions_modules+=("preload-extensions")
838840

839841
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
version: '3'
19+
20+
services:
21+
lakesoul-meta-db:
22+
image: postgres:14.5
23+
container_name: lakesoul-test-pg
24+
hostname: lakesoul-docker-compose-env-lakesoul-meta-db-1
25+
ports:
26+
- "5432:5432"
27+
restart: always
28+
environment:
29+
POSTGRES_PASSWORD: lakesoul_test
30+
POSTGRES_USER: lakesoul_test
31+
POSTGRES_DB: lakesoul_test
32+
command:
33+
- "postgres"
34+
- "-c"
35+
- "max_connections=4096"
36+
- "-c"
37+
- "default_transaction_isolation=serializable"
38+
volumes:
39+
- ./meta_init.sql:/docker-entrypoint-initdb.d/meta_init.sql
40+
- ./meta_cleanup.sql:/meta_cleanup.sql
41+
42+
# minio:
43+
# image: bitnami/minio:latest
44+
# ports:
45+
# - "9000:9000"
46+
# - "9001:9001"
47+
# environment:
48+
# MINIO_DEFAULT_BUCKETS: lakesoul-test-bucket:public
49+
# MINIO_ROOT_USER: minioadmin1
50+
# MINIO_ROOT_PASSWORD: minioadmin1
51+
# healthcheck:
52+
# test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
53+
# interval: 3s
54+
# timeout: 5s
55+
# retries: 3
56+
# hostname: minio
57+
# profiles: ["s3"]
58+
59+
60+
networks:
61+
default:
62+
driver: bridge
63+
ipam:
64+
driver: default
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
2+
--
3+
-- SPDX-License-Identifier: Apache-2.0
4+
5+
delete from namespace;
6+
insert into namespace(namespace, properties, comment) values ('default', '{}', '');
7+
delete from data_commit_info;
8+
delete from table_info;
9+
delete from table_path_id;
10+
delete from table_name_id;
11+
delete from partition_info;

0 commit comments

Comments
 (0)