55#include " common/exception/runtime.h"
66#include " common/system_config.h"
77#include " function/table/bind_input.h"
8+ #include " function/table/simple_table_function.h"
89#include " numpy/numpy_scan.h"
910#include " processor/execution_context.h"
1011#include " py_connection.h"
@@ -43,26 +44,9 @@ std::unique_ptr<TableFuncBindData> bindFunc(ClientContext* /*context*/,
4344 scanConfig);
4445}
4546
46- bool sharedStateNext (const TableFuncBindData* /* bindData*/ , PandasScanLocalState* localState,
47- TableFuncSharedState* sharedState) {
48- auto pandasSharedState = sharedState->ptrCast <PandasScanSharedState>();
49- std::lock_guard<std::mutex> lck{pandasSharedState->mtx };
50- if (pandasSharedState->numRowsRead >= pandasSharedState->numRows ) {
51- return false ;
52- }
53- localState->start = pandasSharedState->startRow + pandasSharedState->numRowsRead ;
54- pandasSharedState->numRowsRead +=
55- std::min (pandasSharedState->numRows - pandasSharedState->numRowsRead ,
56- CopyConfig::PANDAS_PARTITION_COUNT);
57- localState->end = pandasSharedState->startRow + pandasSharedState->numRowsRead ;
58- return true ;
59- }
60-
61- std::unique_ptr<TableFuncLocalState> initLocalState (const TableFunctionInitInput& input,
62- TableFuncSharedState* sharedState, storage::MemoryManager* /* mm*/ ) {
63- auto localState = std::make_unique<PandasScanLocalState>(0 /* start */ , 0 /* end */ );
64- sharedStateNext (input.bindData , localState.get (), sharedState);
65- return localState;
47+ std::unique_ptr<TableFuncLocalState> initLocalState (const TableFunctionInitInput& /* input*/ ,
48+ TableFuncSharedState* /* sharedState*/ , storage::MemoryManager* /* mm*/ ) {
49+ return std::make_unique<PandasScanLocalState>(0 /* start */ , 0 /* end */ );
6650}
6751
6852std::unique_ptr<TableFuncSharedState> initSharedState (const TableFunctionInitInput& input) {
@@ -71,9 +55,9 @@ std::unique_ptr<TableFuncSharedState> initSharedState(const TableFunctionInitInp
7155 throw RuntimeException (" PandasScan called but GIL was already held!" );
7256 }
7357 // LCOV_EXCL_STOP
74- auto scanBindData = ku_dynamic_cast <PandasScanFunctionData*>(input. bindData );
75- return std::make_unique<PandasScanSharedState >(scanBindData->scanConfig . skipNum ,
76- scanBindData-> numRows );
58+ auto scanBindData = input. bindData -> constPtrCast <PandasScanFunctionData>( );
59+ return std::make_unique<SimpleTableFuncSharedState >(scanBindData->numRows ,
60+ CopyConfig::PANDAS_PARTITION_COUNT );
7761}
7862
7963void pandasBackendScanSwitch (PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
@@ -91,10 +75,14 @@ void pandasBackendScanSwitch(PandasColumnBindData* bindData, uint64_t count, uin
9175offset_t tableFunc (const TableFuncInput& input, const TableFuncOutput& output) {
9276 auto pandasScanData = input.bindData ->constPtrCast <PandasScanFunctionData>();
9377 auto pandasLocalState = input.localState ->ptrCast <PandasScanLocalState>();
78+ auto sharedState = input.sharedState ->ptrCast <SimpleTableFuncSharedState>();
9479 if (pandasLocalState->start >= pandasLocalState->end ) {
95- if (!sharedStateNext (input.bindData , pandasLocalState, input.sharedState )) {
80+ auto morsel = sharedState->getMorsel ();
81+ if (!morsel.hasMoreToOutput ()) {
9682 return 0 ;
9783 }
84+ pandasLocalState->start = pandasScanData->scanConfig .skipNum + morsel.startOffset ;
85+ pandasLocalState->end = pandasScanData->scanConfig .skipNum + morsel.endOffset ;
9886 }
9987 auto numValuesToOutput =
10088 std::min (DEFAULT_VECTOR_CAPACITY, pandasLocalState->end - pandasLocalState->start );
@@ -110,22 +98,12 @@ offset_t tableFunc(const TableFuncInput& input, const TableFuncOutput& output) {
11098 return numValuesToOutput;
11199}
112100
113- std::vector<std::unique_ptr<PandasColumnBindData>>
114- PandasScanFunctionData::copyColumnBindData () const {
115- std::vector<std::unique_ptr<PandasColumnBindData>> result;
116- result.reserve (columnBindData.size ());
117- for (auto & bindData : columnBindData) {
118- result.push_back (bindData->copy ());
119- }
120- return result;
121- }
122-
123101static double progressFunc (TableFuncSharedState* sharedState) {
124- auto pandasSharedState = sharedState->ptrCast <PandasScanSharedState >();
102+ auto pandasSharedState = sharedState->ptrCast <SimpleTableFuncSharedState >();
125103 if (pandasSharedState->numRows == 0 ) {
126104 return 0.0 ;
127105 }
128- return static_cast <double >(pandasSharedState->numRowsRead ) / pandasSharedState->numRows ;
106+ return static_cast <double >(pandasSharedState->curRowIdx ) / pandasSharedState->numRows ;
129107}
130108
131109static void finalizeFunc (const processor::ExecutionContext* ctx, TableFuncSharedState*) {
0 commit comments