forked from oceanbase/oceanbase
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathob_job_parser.cpp
More file actions
181 lines (172 loc) · 6.59 KB
/
Copy pathob_job_parser.cpp
File metadata and controls
181 lines (172 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_EXE
#include "sql/ob_sql_define.h"
#include "sql/executor/ob_job_parser.h"
#include "sql/engine/px/exchange/ob_px_transmit_op.h"
#include "sql/engine/ob_exec_context.h"
#include "lib/profile/ob_perf_event.h"
#include "lib/json/ob_json_print_utils.h"
using namespace oceanbase::common;
namespace oceanbase
{
namespace sql
{
ObJobParser::ObJobParser()
{
}
ObJobParser::~ObJobParser()
{
}
/* 本文件的入口函数,将plan里面的op tree切分成多个Job
* 切割点为每一对ObReceive和ObTransmit之间
* @input ObPhysicalPlan 内包含operator tree
* @input ob_execution_id 当前execution的id,用以区分不同execution
* @input ObTaskSpliterFactory 负责构造TaskSpliter,构造的结果保存在ObJob中.
* 创建出的每一个Spliter都已经被初始化好, 拥有一个指向ObJob的引用
* @output ObJobControl 切分好的job都填充到ObJobControl内管理起来
*/
int ObJobParser::parse_job(ObExecContext &exec_ctx,
ObPhysicalPlan *phy_plan,
const ObExecutionID &exec_id,
ObTaskSpliterFactory &spfactory,
ObJobControl &job_ctrl) const
{
NG_TRACE(parse_job_begin);
int ret = OB_SUCCESS;
ObOpSpec *root_spec = NULL;
if (OB_ISNULL(phy_plan)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("plan is NULL", K(ret));
} else if (OB_ISNULL(root_spec =
const_cast<ObOpSpec *>(phy_plan->get_root_op_spec()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("root op spec of plan is NULL", K(ret));
} else {
int task_split_type = ObTaskSpliter::LOCAL_IDENTITY_SPLIT;
ObJob *root_job = NULL;
if (OB_FAIL(create_job(exec_ctx, phy_plan, root_spec, exec_id, job_ctrl,
task_split_type, spfactory, root_job))) {
LOG_WARN("fail to create job", K(ret));
} else if (OB_ISNULL(root_job)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root_job is NULL", K(ret));
} else if (FALSE_IT(root_job->set_root_job())) {
} else if (OB_FAIL(split_jobs(exec_ctx, phy_plan, root_spec, exec_id,
job_ctrl, spfactory, *root_job))) {
LOG_WARN("fail to split jobs", K(ret));
} else {}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(root_spec->create_op_input(exec_ctx))) {
LOG_WARN("fail create root_spec's input", K(ret));
} else if (OB_FAIL(job_ctrl.sort_job_scan_part_locs(exec_ctx))) {
LOG_WARN("fail to sort job scan partition locations", K(ret));
} else if (OB_FAIL(job_ctrl.init_job_finish_queue(exec_ctx))) {
LOG_WARN("fail init job", K(ret));
} else {
// sanity check for early stage debug, can be removed after code stabilized
if (OB_UNLIKELY(job_ctrl.get_job_count() <= 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("job count should > 0", K(ret), K(job_ctrl.get_job_count()));
}
}
}
NG_TRACE(parse_job_end);
return ret;
}
int ObJobParser::split_jobs(ObExecContext &exec_ctx,
ObPhysicalPlan *phy_plan,
ObOpSpec *op_spec,
const ObExecutionID &exec_id,
ObJobControl &job_ctrl,
ObTaskSpliterFactory &spfactory,
ObJob &cur_job) const
{
int ret = OB_SUCCESS;
ObTransmitSpec *transmit_op_spec = NULL;
ObJob *job = NULL;
if (OB_ISNULL(op_spec)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_op and op_spec are NULL ptrs", K(ret));
} else if (OB_ISNULL(phy_plan)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("phy_plan is NULL", K(ret));
} else {
if (!IS_TRANSMIT(op_spec->get_type())) {
if (0 == op_spec->get_child_num()) {
cur_job.set_scan(true);
}
} else if (OB_ISNULL(transmit_op_spec = static_cast<ObTransmitSpec *>(op_spec))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transmit op is NULL", K(ret), K(op_spec));
} else if (OB_FAIL(create_job(exec_ctx, phy_plan, op_spec, exec_id, job_ctrl,
ObTaskSpliter::REMOTE_IDENTITY_SPLIT, spfactory, job))) {
LOG_WARN("fail to create job", K(ret), K(exec_id));
} else if (OB_FAIL(cur_job.append_child_job(job))) {
LOG_WARN("fail to add child job", K(ret), K(exec_id));
}
for (int32_t i = 0; OB_SUCC(ret) && i < op_spec->get_child_num(); ++i) {
if (OB_FAIL(split_jobs(exec_ctx, phy_plan, op_spec->get_child(i), exec_id,
job_ctrl, spfactory, NULL != job ? *job : cur_job))) {
LOG_WARN("fail to split jobs for child op", K(ret), K(exec_id), K(i));
} else {}
}
}
return ret;
}
int ObJobParser::create_job(
ObExecContext &exec_ctx,
ObPhysicalPlan *phy_plan,
ObOpSpec *op_spec,
const ObExecutionID &exec_id,
ObJobControl &job_ctrl,
const int task_split_type,
ObTaskSpliterFactory &spfactory,
ObJob *&job) const
{
int ret = OB_SUCCESS;
job = NULL;
if (OB_ISNULL(phy_plan) || OB_ISNULL(op_spec)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("NULL ptr is unexpected", K(ret), K(phy_plan), K(op_spec));
} else if (OB_FAIL(job_ctrl.create_job(exec_ctx.get_allocator(), exec_id,
op_spec->get_id(), job))) {
LOG_WARN("fail to create job", K(ret));
} else if (OB_ISNULL(job)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("job is NULL", K(ret), K(exec_id));
} else {
job->set_phy_plan(phy_plan);
job->set_state(OB_JOB_STATE_INITED);
job->set_root_spec(op_spec);
if (OB_FAIL(job_ctrl.add_job(job))) {
job->~ObJob();
job = NULL;
LOG_WARN("fail add job", K(ret));
} else {
// 设置Spliter和ServersProvider到Job中
ObTaskSpliter *task_spliter = NULL;
if (OB_FAIL(spfactory.create(exec_ctx, *job, task_split_type, task_spliter))) {
LOG_WARN("fail create task spliter", "type", task_split_type, K(ret));
} else if (OB_ISNULL(task_spliter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task_spliter is NULL", K(ret));
} else {
job->set_task_spliter(task_spliter);
}
}
}
return ret;
}
}/* ns sql*/
}/* ns oceanbase */