Skip to content

Commit 79d61ef

Browse files
authored
Refactor table function (#5046)
1 parent c95b66a commit 79d61ef

File tree

2 files changed

+14
-44
lines changed

2 files changed

+14
-44
lines changed

src_cpp/include/pandas/pandas_scan.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@ struct PandasScanLocalState final : public function::TableFuncLocalState {
1616
uint64_t end;
1717
};
1818

19-
struct PandasScanSharedState final : public function::TableFuncSharedState {
20-
PandasScanSharedState(uint64_t startRow, uint64_t numRows)
21-
: function::TableFuncSharedState{numRows}, startRow(startRow), numRowsRead{0} {}
22-
23-
uint64_t startRow;
24-
uint64_t numRowsRead;
25-
};
26-
2719
struct PandasScanFunction {
2820
static constexpr const char* name = "READ_PANDAS";
2921

src_cpp/pandas/pandas_scan.cpp

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "common/exception/runtime.h"
66
#include "common/system_config.h"
77
#include "function/table/bind_input.h"
8+
#include "function/table/simple_table_function.h"
89
#include "numpy/numpy_scan.h"
910
#include "processor/execution_context.h"
1011
#include "py_connection.h"
@@ -43,26 +44,9 @@ std::unique_ptr<TableFuncBindData> bindFunc(ClientContext* /*context*/,
4344
scanConfig);
4445
}
4546

46-
bool sharedStateNext(const TableFuncBindData* /*bindData*/, PandasScanLocalState* localState,
47-
TableFuncSharedState* sharedState) {
48-
auto pandasSharedState = sharedState->ptrCast<PandasScanSharedState>();
49-
std::lock_guard<std::mutex> lck{pandasSharedState->mtx};
50-
if (pandasSharedState->numRowsRead >= pandasSharedState->numRows) {
51-
return false;
52-
}
53-
localState->start = pandasSharedState->startRow + pandasSharedState->numRowsRead;
54-
pandasSharedState->numRowsRead +=
55-
std::min(pandasSharedState->numRows - pandasSharedState->numRowsRead,
56-
CopyConfig::PANDAS_PARTITION_COUNT);
57-
localState->end = pandasSharedState->startRow + pandasSharedState->numRowsRead;
58-
return true;
59-
}
60-
61-
std::unique_ptr<TableFuncLocalState> initLocalState(const TableFunctionInitInput& input,
62-
TableFuncSharedState* sharedState, storage::MemoryManager* /*mm*/) {
63-
auto localState = std::make_unique<PandasScanLocalState>(0 /* start */, 0 /* end */);
64-
sharedStateNext(input.bindData, localState.get(), sharedState);
65-
return localState;
47+
std::unique_ptr<TableFuncLocalState> initLocalState(const TableFunctionInitInput& /*input*/,
48+
TableFuncSharedState* /*sharedState*/, storage::MemoryManager* /*mm*/) {
49+
return std::make_unique<PandasScanLocalState>(0 /* start */, 0 /* end */);
6650
}
6751

6852
std::unique_ptr<TableFuncSharedState> initSharedState(const TableFunctionInitInput& input) {
@@ -71,9 +55,9 @@ std::unique_ptr<TableFuncSharedState> initSharedState(const TableFunctionInitInp
7155
throw RuntimeException("PandasScan called but GIL was already held!");
7256
}
7357
// LCOV_EXCL_STOP
74-
auto scanBindData = ku_dynamic_cast<PandasScanFunctionData*>(input.bindData);
75-
return std::make_unique<PandasScanSharedState>(scanBindData->scanConfig.skipNum,
76-
scanBindData->numRows);
58+
auto scanBindData = input.bindData->constPtrCast<PandasScanFunctionData>();
59+
return std::make_unique<SimpleTableFuncSharedState>(scanBindData->numRows,
60+
CopyConfig::PANDAS_PARTITION_COUNT);
7761
}
7862

7963
void pandasBackendScanSwitch(PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
@@ -91,10 +75,14 @@ void pandasBackendScanSwitch(PandasColumnBindData* bindData, uint64_t count, uin
9175
offset_t tableFunc(const TableFuncInput& input, const TableFuncOutput& output) {
9276
auto pandasScanData = input.bindData->constPtrCast<PandasScanFunctionData>();
9377
auto pandasLocalState = input.localState->ptrCast<PandasScanLocalState>();
78+
auto sharedState = input.sharedState->ptrCast<SimpleTableFuncSharedState>();
9479
if (pandasLocalState->start >= pandasLocalState->end) {
95-
if (!sharedStateNext(input.bindData, pandasLocalState, input.sharedState)) {
80+
auto morsel = sharedState->getMorsel();
81+
if (!morsel.hasMoreToOutput()) {
9682
return 0;
9783
}
84+
pandasLocalState->start = pandasScanData->scanConfig.skipNum + morsel.startOffset;
85+
pandasLocalState->end = pandasScanData->scanConfig.skipNum + morsel.endOffset;
9886
}
9987
auto numValuesToOutput =
10088
std::min(DEFAULT_VECTOR_CAPACITY, pandasLocalState->end - pandasLocalState->start);
@@ -110,22 +98,12 @@ offset_t tableFunc(const TableFuncInput& input, const TableFuncOutput& output) {
11098
return numValuesToOutput;
11199
}
112100

113-
std::vector<std::unique_ptr<PandasColumnBindData>>
114-
PandasScanFunctionData::copyColumnBindData() const {
115-
std::vector<std::unique_ptr<PandasColumnBindData>> result;
116-
result.reserve(columnBindData.size());
117-
for (auto& bindData : columnBindData) {
118-
result.push_back(bindData->copy());
119-
}
120-
return result;
121-
}
122-
123101
static double progressFunc(TableFuncSharedState* sharedState) {
124-
auto pandasSharedState = sharedState->ptrCast<PandasScanSharedState>();
102+
auto pandasSharedState = sharedState->ptrCast<SimpleTableFuncSharedState>();
125103
if (pandasSharedState->numRows == 0) {
126104
return 0.0;
127105
}
128-
return static_cast<double>(pandasSharedState->numRowsRead) / pandasSharedState->numRows;
106+
return static_cast<double>(pandasSharedState->curRowIdx) / pandasSharedState->numRows;
129107
}
130108

131109
static void finalizeFunc(const processor::ExecutionContext* ctx, TableFuncSharedState*) {

0 commit comments

Comments
 (0)