Skip to content

Commit f11f331

Browse files
committed
Implement scan pandas
1 parent 71a4418 commit f11f331

17 files changed

Lines changed: 801 additions & 8 deletions

CMakeLists.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ pybind11_add_module(_kuzu
1313
src_cpp/py_database.cpp
1414
src_cpp/py_prepared_statement.cpp
1515
src_cpp/py_query_result.cpp
16-
src_cpp/py_query_result_converter.cpp)
16+
src_cpp/py_query_result_converter.cpp
17+
src_cpp/pandas/pandas_bind.cpp
18+
src_cpp/pandas/pandas_scan.cpp
19+
src_cpp/numpy/numpy_type.cpp
20+
src_cpp/numpy/numpy_scan.cpp)
1721

1822
set_target_properties(_kuzu
1923
PROPERTIES
@@ -28,7 +32,8 @@ target_link_libraries(_kuzu
2832
target_include_directories(
2933
_kuzu
3034
PUBLIC
31-
../../src/include)
35+
../../src/include
36+
src_cpp/include)
3237

3338
get_target_property(PYTHON_DEST _kuzu LIBRARY_OUTPUT_DIRECTORY)
3439

src_cpp/include/numpy/numpy_scan.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#pragma once
2+
3+
#include "common/vector/value_vector.h"
4+
#include "pybind_include.h"
5+
6+
namespace kuzu {
7+
8+
struct PandasColumnBindData;
9+
10+
struct NumpyScan {
11+
static void scan(PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
12+
common::ValueVector* outputVector);
13+
};
14+
15+
} // namespace kuzu

src_cpp/include/numpy/numpy_type.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#pragma once
2+
3+
#include "common/types/types.h"
4+
#include "pybind_include.h"
5+
6+
namespace kuzu {
7+
8+
// Pandas has two different sets of types
9+
// NumPy dtypes (e.g., bool, int8,...)
10+
// Pandas Specific Types (e.g., categorical, datetime_tz,...)
11+
// TODO(Ziyi): Support more timestamp types, object and category(enum) type.
12+
enum class NumpyNullableType : uint8_t {
13+
//! NumPy dtypes
14+
BOOL, //! bool_, bool8
15+
INT_8, //! byte, int8
16+
UINT_8, //! ubyte, uint8
17+
INT_16, //! int16, short
18+
UINT_16, //! uint16, ushort
19+
INT_32, //! int32, intc
20+
UINT_32, //! uint32, uintc,
21+
INT_64, //! int64, int0, int_, intp, matrix
22+
UINT_64, //! uint64, uint, uint0, uintp
23+
FLOAT_16, //! float16, half
24+
FLOAT_32, //! float32, single
25+
FLOAT_64, //! float64, float_, double
26+
DATETIME_US, //! datetime64[us], <M8[us]
27+
DATETIME_NS, //! datetime64[ns], <M8[ns]
28+
TIMEDELTA, //! timedelta64[D], timedelta64
29+
};
30+
31+
struct NumpyType {
32+
NumpyNullableType type;
33+
};
34+
35+
struct NumpyTypeUtils {
36+
static NumpyType convertNumpyType(const py::handle& colType);
37+
static std::unique_ptr<common::LogicalType> numpyToLogicalType(const NumpyType& npType);
38+
};
39+
40+
} // namespace kuzu
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
3+
#include "numpy/numpy_type.h"
4+
#include "pandas_column.h"
5+
#include "pybind_include.h"
6+
7+
namespace kuzu {
8+
9+
namespace main {
10+
class ClientContext;
11+
}
12+
13+
struct RegisteredArray {
14+
explicit RegisteredArray(py::array npArray) : npArray(std::move(npArray)) {}
15+
py::array npArray;
16+
};
17+
18+
struct PandasColumnBindData {
19+
NumpyType npType;
20+
std::unique_ptr<PandasColumn> pandasCol;
21+
std::unique_ptr<RegisteredArray> mask;
22+
23+
PandasColumnBindData() = default;
24+
25+
PandasColumnBindData(NumpyType npType, std::unique_ptr<PandasColumn> pandasCol,
26+
std::unique_ptr<RegisteredArray> mask)
27+
: npType{npType}, pandasCol{std::move(pandasCol)}, mask{std::move(mask)} {}
28+
29+
std::unique_ptr<PandasColumnBindData> copy() {
30+
return std::make_unique<PandasColumnBindData>(npType, pandasCol->copy(),
31+
mask == nullptr ? nullptr : std::make_unique<RegisteredArray>(mask->npArray));
32+
}
33+
};
34+
35+
struct Pandas {
36+
static void bind(py::handle dfToBind,
37+
std::vector<std::unique_ptr<PandasColumnBindData>>& columnBindData,
38+
std::vector<std::unique_ptr<common::LogicalType>>& returnTypes,
39+
std::vector<std::string>& names);
40+
};
41+
42+
} // namespace kuzu
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
5+
namespace kuzu {
6+
7+
// We currently only support NUMPY as backend.
8+
enum class PandasColumnBackend : uint8_t { NUMPY = 0 };
9+
10+
class PandasColumn {
11+
public:
12+
PandasColumn(PandasColumnBackend backend) : backend(backend) {}
13+
virtual ~PandasColumn() = default;
14+
15+
public:
16+
PandasColumnBackend getBackEnd() const { return backend; }
17+
18+
virtual std::unique_ptr<PandasColumn> copy() const = 0;
19+
20+
protected:
21+
PandasColumnBackend backend;
22+
};
23+
24+
class PandasNumpyColumn : public PandasColumn {
25+
public:
26+
PandasNumpyColumn(py::array array)
27+
: PandasColumn{PandasColumnBackend::NUMPY}, array{std::move(array)} {}
28+
29+
std::unique_ptr<PandasColumn> copy() const override {
30+
return std::make_unique<PandasNumpyColumn>(array);
31+
}
32+
33+
public:
34+
py::array array;
35+
};
36+
37+
} // namespace kuzu
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#pragma once
2+
3+
#include "function/scalar_function.h"
4+
#include "function/table_functions.h"
5+
#include "function/table_functions/bind_data.h"
6+
#include "function/table_functions/scan_functions.h"
7+
#include "pandas_bind.h"
8+
#include "pybind_include.h"
9+
10+
namespace kuzu {
11+
12+
struct PandasScanLocalState : public function::TableFuncLocalState {
13+
PandasScanLocalState(uint64_t start, uint64_t end) : start{start}, end{end} {}
14+
15+
uint64_t start;
16+
uint64_t end;
17+
};
18+
19+
struct PandasScanSharedState : public function::BaseScanSharedState {
20+
explicit PandasScanSharedState(uint64_t numRows) : BaseScanSharedState{numRows}, position{0} {}
21+
22+
std::mutex lock;
23+
uint64_t position;
24+
};
25+
26+
struct PandasScanFunction {
27+
static function::function_set getFunctionSet();
28+
29+
static void tableFunc(function::TableFunctionInput& input, common::DataChunk& outputChunk);
30+
31+
static std::unique_ptr<function::TableFuncBindData> bindFunc(main::ClientContext* /*context*/,
32+
function::TableFuncBindInput* input, catalog::CatalogContent* catalog);
33+
34+
static std::unique_ptr<function::TableFuncSharedState> initSharedState(
35+
function::TableFunctionInitInput& input);
36+
37+
static std::unique_ptr<function::TableFuncLocalState> initLocalState(
38+
function::TableFunctionInitInput& input, function::TableFuncSharedState* state);
39+
40+
static bool sharedStateNext(const function::TableFuncBindData* bindData,
41+
PandasScanLocalState* localState, function::TableFuncSharedState* sharedState);
42+
43+
static void pandasBackendScanSwitch(PandasColumnBindData* bindData, uint64_t count,
44+
uint64_t offset, common::ValueVector* outputVector);
45+
};
46+
47+
struct PandasScanFunctionData : public function::TableFuncBindData {
48+
py::handle df;
49+
uint64_t numRows;
50+
std::vector<std::unique_ptr<PandasColumnBindData>> columnBindData;
51+
52+
PandasScanFunctionData(std::vector<std::unique_ptr<common::LogicalType>> columnTypes,
53+
std::vector<std::string> columnNames, py::handle df, uint64_t numRows,
54+
std::vector<std::unique_ptr<PandasColumnBindData>> columnBindData)
55+
: TableFuncBindData{std::move(columnTypes), std::move(columnNames)}, df{df},
56+
numRows{numRows}, columnBindData{std::move(columnBindData)} {}
57+
58+
~PandasScanFunctionData() override {
59+
py::gil_scoped_acquire acquire;
60+
columnBindData.clear();
61+
}
62+
63+
std::vector<std::unique_ptr<PandasColumnBindData>> copyColumnBindData();
64+
65+
std::unique_ptr<function::TableFuncBindData> copy() override {
66+
return std::make_unique<PandasScanFunctionData>(
67+
common::LogicalType::copy(columnTypes), columnNames, df, numRows, copyColumnBindData());
68+
}
69+
};
70+
71+
std::unique_ptr<common::Value> replacePD(common::Value* value);
72+
73+
} // namespace kuzu

src_cpp/include/py_connection.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class PyConnection {
3030
const std::string& srcTableName, const std::string& relName,
3131
const std::string& dstTableName, size_t queryBatchSize);
3232

33+
static bool isPandasDataframe(const py::object& object);
34+
3335
private:
3436
std::unordered_map<std::string, std::shared_ptr<kuzu::common::Value>> transformPythonParameters(
3537
py::dict params);

src_cpp/numpy/numpy_scan.cpp

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#include "numpy/numpy_scan.h"
2+
3+
#include "common/types/timestamp_t.h"
4+
#include "pandas/pandas_bind.h"
5+
6+
namespace kuzu {
7+
8+
using namespace kuzu::common;
9+
10+
template<class T>
11+
void ScanNumpyColumn(
12+
py::array& npArray, uint64_t offset, ValueVector* outputVector, uint64_t count) {
13+
auto srcData = (T*)npArray.data();
14+
memcpy(outputVector->getData(), srcData + offset, count * sizeof(T));
15+
}
16+
17+
template<class T>
18+
void scanNumpyMasked(
19+
PandasColumnBindData* bindData, uint64_t count, uint64_t offset, ValueVector* outputVector) {
20+
KU_ASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY);
21+
auto& npColumn = reinterpret_cast<PandasNumpyColumn&>(*bindData->pandasCol);
22+
ScanNumpyColumn<T>(npColumn.array, offset, outputVector, count);
23+
if (bindData->mask != nullptr) {
24+
KU_UNREACHABLE;
25+
}
26+
}
27+
28+
template<typename T>
29+
void setNullIfNan(T value, uint64_t pos, ValueVector* outputVector) {
30+
if (std::isnan(value)) {
31+
outputVector->setNull(pos, true /* isNull */);
32+
}
33+
}
34+
35+
template<class T>
36+
void ScanNumpyFpColumn(
37+
const T* srcData, uint64_t count, uint64_t offset, ValueVector* outputVector) {
38+
memcpy(outputVector->getData(), srcData + offset, count * sizeof(T));
39+
for (auto i = 0u; i < count; i++) {
40+
setNullIfNan(outputVector->getValue<T>(i), i, outputVector);
41+
}
42+
}
43+
44+
void NumpyScan::scan(PandasColumnBindData* bindData, uint64_t count, uint64_t offset,
45+
common::ValueVector* outputVector) {
46+
KU_ASSERT(bindData->pandasCol->getBackEnd() == PandasColumnBackend::NUMPY);
47+
auto& npCol = reinterpret_cast<PandasNumpyColumn&>(*bindData->pandasCol);
48+
auto& array = npCol.array;
49+
50+
switch (bindData->npType.type) {
51+
case NumpyNullableType::BOOL:
52+
scanNumpyMasked<bool>(bindData, count, offset, outputVector);
53+
break;
54+
case NumpyNullableType::UINT_8:
55+
scanNumpyMasked<uint8_t>(bindData, count, offset, outputVector);
56+
break;
57+
case NumpyNullableType::UINT_16:
58+
scanNumpyMasked<uint16_t>(bindData, count, offset, outputVector);
59+
break;
60+
case NumpyNullableType::UINT_32:
61+
scanNumpyMasked<uint32_t>(bindData, count, offset, outputVector);
62+
break;
63+
case NumpyNullableType::UINT_64:
64+
scanNumpyMasked<uint64_t>(bindData, count, offset, outputVector);
65+
break;
66+
case NumpyNullableType::INT_8:
67+
scanNumpyMasked<int8_t>(bindData, count, offset, outputVector);
68+
break;
69+
case NumpyNullableType::INT_16:
70+
scanNumpyMasked<int16_t>(bindData, count, offset, outputVector);
71+
break;
72+
case NumpyNullableType::INT_32:
73+
scanNumpyMasked<int32_t>(bindData, count, offset, outputVector);
74+
break;
75+
case NumpyNullableType::INT_64:
76+
scanNumpyMasked<int64_t>(bindData, count, offset, outputVector);
77+
break;
78+
case NumpyNullableType::FLOAT_32:
79+
ScanNumpyFpColumn<float>(
80+
reinterpret_cast<const float*>(array.data()), count, offset, outputVector);
81+
break;
82+
case NumpyNullableType::FLOAT_64:
83+
ScanNumpyFpColumn<double>(
84+
reinterpret_cast<const double*>(array.data()), count, offset, outputVector);
85+
break;
86+
case NumpyNullableType::DATETIME_NS:
87+
case NumpyNullableType::DATETIME_US: {
88+
auto sourceData = reinterpret_cast<const int64_t*>(array.data());
89+
auto dstData = reinterpret_cast<timestamp_t*>(outputVector->getData());
90+
auto timestampCastFunc = bindData->npType.type == NumpyNullableType::DATETIME_NS ?
91+
Timestamp::fromEpochNanoSeconds :
92+
Timestamp::fromEpochMicroSeconds;
93+
for (auto i = 0u; i < count; i++) {
94+
auto pos = offset + i;
95+
dstData[i] = timestampCastFunc(sourceData[pos]);
96+
outputVector->setNull(i, false /* isNull */);
97+
}
98+
break;
99+
}
100+
case NumpyNullableType::TIMEDELTA: {
101+
auto sourceData = reinterpret_cast<const int64_t*>(array.data());
102+
auto dstData = reinterpret_cast<interval_t*>(outputVector->getData());
103+
for (auto i = 0u; i < count; i++) {
104+
auto pos = offset + i;
105+
auto micro = sourceData[pos] / 1000;
106+
auto days = micro / Interval::MICROS_PER_DAY;
107+
micro = micro % Interval::MICROS_PER_DAY;
108+
auto months = days / Interval::DAYS_PER_MONTH;
109+
days = days % Interval::DAYS_PER_MONTH;
110+
interval_t interval;
111+
interval.months = months;
112+
interval.days = days;
113+
interval.micros = micro;
114+
dstData[i] = interval;
115+
outputVector->setNull(i, false /* isNull */);
116+
}
117+
break;
118+
}
119+
default:
120+
KU_UNREACHABLE;
121+
}
122+
}
123+
124+
} // namespace kuzu

0 commit comments

Comments
 (0)