Skip to content

Commit 322db0b

Browse files
authored
Output multiple query results for Python and Node.js APIs (#3322)
* Add methods for querying multiple results * Add multiple query results support for Python API * Fix newline * Node.js bindings (not working) * Fix bug * Fix linter errors * Fix * Run clang-format --------- Co-authored-by: CI Bot <mewim@users.noreply.github.com>
1 parent 79d4ce9 commit 322db0b

5 files changed

Lines changed: 139 additions & 16 deletions

File tree

src_cpp/include/node_connection.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class ConnectionExecuteAsyncWorker : public Napi::AsyncWorker {
7070

7171
void Execute() override {
7272
try {
73-
std::shared_ptr<QueryResult> result =
74-
connection->executeWithParams(preparedStatement.get(), std::move(params));
75-
nodeQueryResult->SetQueryResult(result);
73+
auto result =
74+
connection->executeWithParams(preparedStatement.get(), std::move(params)).release();
75+
nodeQueryResult->SetQueryResult(result, true);
7676
if (!result->isSuccess()) {
7777
SetError(result->getErrorMessage());
7878
return;
@@ -104,8 +104,8 @@ class ConnectionQueryAsyncWorker : public Napi::AsyncWorker {
104104

105105
void Execute() override {
106106
try {
107-
std::shared_ptr<QueryResult> result = connection->query(statement);
108-
nodeQueryResult->SetQueryResult(result);
107+
auto result = connection->query(statement).release();
108+
nodeQueryResult->SetQueryResult(result, true);
109109
if (!result->isSuccess()) {
110110
SetError(result->getErrorMessage());
111111
return;

src_cpp/include/node_query_result.h

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,27 @@ using namespace kuzu::main;
1313
class NodeQueryResult : public Napi::ObjectWrap<NodeQueryResult> {
1414
friend class NodeQueryResultGetNextAsyncWorker;
1515
friend class NodeQueryResultGetColumnMetadataAsyncWorker;
16+
friend class NodeQueryResultGetNextQueryResultAsyncWorker;
1617

1718
public:
1819
static Napi::Object Init(Napi::Env env, Napi::Object exports);
1920
explicit NodeQueryResult(const Napi::CallbackInfo& info);
20-
void SetQueryResult(std::shared_ptr<QueryResult>& queryResult);
21-
~NodeQueryResult() override = default;
21+
void SetQueryResult(QueryResult* queryResult, bool isOwned);
22+
~NodeQueryResult() override;
2223

2324
private:
2425
void ResetIterator(const Napi::CallbackInfo& info);
2526
Napi::Value HasNext(const Napi::CallbackInfo& info);
27+
Napi::Value HasNextQueryResult(const Napi::CallbackInfo& info);
28+
Napi::Value GetNextQueryResultAsync(const Napi::CallbackInfo& info);
2629
Napi::Value GetNumTuples(const Napi::CallbackInfo& info);
2730
Napi::Value GetNextAsync(const Napi::CallbackInfo& info);
2831
Napi::Value GetColumnDataTypesAsync(const Napi::CallbackInfo& info);
2932
Napi::Value GetColumnNamesAsync(const Napi::CallbackInfo& info);
3033

3134
private:
32-
std::shared_ptr<QueryResult> queryResult;
35+
QueryResult* queryResult = nullptr;
36+
bool isOwned = false;
3337
};
3438

3539
enum GetColumnMetadataType { DATA_TYPE, NAME };
@@ -119,3 +123,30 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker {
119123
NodeQueryResult* nodeQueryResult;
120124
std::shared_ptr<FlatTuple> cppTuple;
121125
};
126+
127+
class NodeQueryResultGetNextQueryResultAsyncWorker : public Napi::AsyncWorker {
128+
public:
129+
NodeQueryResultGetNextQueryResultAsyncWorker(Napi::Function& callback,
130+
NodeQueryResult* currentQueryResult, NodeQueryResult* nextQueryResult)
131+
: AsyncWorker(callback), currQueryResult(currentQueryResult),
132+
nextQueryResult(nextQueryResult) {}
133+
134+
~NodeQueryResultGetNextQueryResultAsyncWorker() override = default;
135+
136+
void Execute() override {
137+
try {
138+
auto nextResult = currQueryResult->queryResult->getNextQueryResult();
139+
nextQueryResult->SetQueryResult(nextResult, false);
140+
} catch (const std::exception& exc) {
141+
SetError(std::string(exc.what()));
142+
}
143+
}
144+
145+
void OnOK() override { Callback().Call({Env().Null()}); }
146+
147+
void OnError(Napi::Error const& error) override { Callback().Call({error.Value()}); }
148+
149+
private:
150+
NodeQueryResult* currQueryResult;
151+
NodeQueryResult* nextQueryResult;
152+
};

src_cpp/node_query_result.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) {
1414
{
1515
InstanceMethod("resetIterator", &NodeQueryResult::ResetIterator),
1616
InstanceMethod("hasNext", &NodeQueryResult::HasNext),
17+
InstanceMethod("hasNextQueryResult", &NodeQueryResult::HasNextQueryResult),
18+
InstanceMethod("getNextQueryResultAsync", &NodeQueryResult::GetNextQueryResultAsync),
1719
InstanceMethod("getNumTuples", &NodeQueryResult::GetNumTuples),
1820
InstanceMethod("getNextAsync", &NodeQueryResult::GetNextAsync),
1921
InstanceMethod("getColumnDataTypesAsync", &NodeQueryResult::GetColumnDataTypesAsync),
@@ -27,8 +29,16 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) {
2729
NodeQueryResult::NodeQueryResult(const Napi::CallbackInfo& info)
2830
: Napi::ObjectWrap<NodeQueryResult>(info) {}
2931

30-
void NodeQueryResult::SetQueryResult(std::shared_ptr<kuzu::main::QueryResult>& queryResult) {
32+
NodeQueryResult::~NodeQueryResult() {
33+
if (this->isOwned) {
34+
delete this->queryResult;
35+
this->queryResult = nullptr;
36+
}
37+
}
38+
39+
void NodeQueryResult::SetQueryResult(QueryResult* queryResult, bool isOwned) {
3140
this->queryResult = queryResult;
41+
this->isOwned = isOwned;
3242
}
3343

3444
void NodeQueryResult::ResetIterator(const Napi::CallbackInfo& info) {
@@ -52,6 +62,28 @@ Napi::Value NodeQueryResult::HasNext(const Napi::CallbackInfo& info) {
5262
return info.Env().Undefined();
5363
}
5464

65+
Napi::Value NodeQueryResult::HasNextQueryResult(const Napi::CallbackInfo& info) {
66+
Napi::Env env = info.Env();
67+
Napi::HandleScope scope(env);
68+
try {
69+
return Napi::Boolean::New(env, this->queryResult->hasNextQueryResult());
70+
} catch (const std::exception& exc) {
71+
Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException();
72+
}
73+
return info.Env().Undefined();
74+
}
75+
76+
Napi::Value NodeQueryResult::GetNextQueryResultAsync(const Napi::CallbackInfo& info) {
77+
Napi::Env env = info.Env();
78+
Napi::HandleScope scope(env);
79+
auto newQueryResult = Napi::ObjectWrap<NodeQueryResult>::Unwrap(info[0].As<Napi::Object>());
80+
auto callback = info[1].As<Napi::Function>();
81+
auto* asyncWorker =
82+
new NodeQueryResultGetNextQueryResultAsyncWorker(callback, this, newQueryResult);
83+
asyncWorker->Queue();
84+
return info.Env().Undefined();
85+
}
86+
5587
Napi::Value NodeQueryResult::GetNumTuples(const Napi::CallbackInfo& info) {
5688
Napi::Env env = info.Env();
5789
Napi::HandleScope scope(env);

src_js/connection.js

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@ const KuzuNative = require("./kuzu_native.js");
44
const QueryResult = require("./query_result.js");
55
const PreparedStatement = require("./prepared_statement.js");
66

7-
const PRIMARY_KEY_TEXT = "(PRIMARY KEY)";
8-
const SRC_NODE_TEXT = "src node:";
9-
const DST_NODE_TEXT = "dst node:";
10-
const PROPERTIES_TEXT = "properties:";
11-
127
class Connection {
138
/**
149
* Initialize a new Connection object. Note that the initialization is done
@@ -139,7 +134,13 @@ class Connection {
139134
if (err) {
140135
return reject(err);
141136
}
142-
return resolve(new QueryResult(this, nodeQueryResult));
137+
this._unwrapMultipleQueryResults(nodeQueryResult)
138+
.then((queryResults) => {
139+
return resolve(queryResults);
140+
})
141+
.catch((err) => {
142+
return reject(err);
143+
});
143144
}
144145
);
145146
} catch (e) {
@@ -191,7 +192,13 @@ class Connection {
191192
if (err) {
192193
return reject(err);
193194
}
194-
return resolve(new QueryResult(this, nodeQueryResult));
195+
this._unwrapMultipleQueryResults(nodeQueryResult)
196+
.then((queryResults) => {
197+
return resolve(queryResults);
198+
})
199+
.catch((err) => {
200+
return reject(err);
201+
});
195202
});
196203
} catch (e) {
197204
return reject(e);
@@ -200,6 +207,43 @@ class Connection {
200207
});
201208
}
202209

210+
/**
211+
* Internal function to get the next query result for multiple query results.
212+
* @param {KuzuNative.NodeQueryResult} nodeQueryResult the current node query result.
213+
* @returns {Promise<kuzu.QueryResult>} a promise that resolves to the next query result. The promise is rejected if there is an error.
214+
*/
215+
_getNextQueryResult(nodeQueryResult) {
216+
return new Promise((resolve, reject) => {
217+
const nextNodeQueryResult = new KuzuNative.NodeQueryResult();
218+
nodeQueryResult.getNextQueryResultAsync(nextNodeQueryResult, (err) => {
219+
if (err) {
220+
return reject(err);
221+
}
222+
return resolve(new QueryResult(this, nextNodeQueryResult));
223+
});
224+
});
225+
}
226+
227+
/**
228+
* Internal function to unwrap multiple query results into an array of query results.
229+
* @param {KuzuNative.NodeQueryResult} nodeQueryResult the node query result.
230+
* @returns {Promise<Array<kuzu.QueryResult>> | kuzu.QueryResult} a promise that resolves to an array of query results. The promise is rejected if there is an error.
231+
*/
232+
async _unwrapMultipleQueryResults(nodeQueryResult) {
233+
const wrappedQueryResult = new QueryResult(this, nodeQueryResult);
234+
if (!nodeQueryResult.hasNextQueryResult()) {
235+
return wrappedQueryResult;
236+
}
237+
const queryResults = [wrappedQueryResult];
238+
let currentQueryResult = nodeQueryResult;
239+
while (currentQueryResult.hasNextQueryResult()) {
240+
queryResults.push(await this._getNextQueryResult(currentQueryResult));
241+
currentQueryResult =
242+
queryResults[queryResults.length - 1]._queryResult;
243+
}
244+
return queryResults;
245+
}
246+
203247
/**
204248
* Set the maximum number of threads to use for query execution.
205249
* @param {Number} numThreads the maximum number of threads to use for query execution.

test/test_connection.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,22 @@ describe("Query", function () {
194194
assert.equal(e.message, "statement must be a string.");
195195
}
196196
});
197+
198+
it("should be able to run multiple queries", async function () {
199+
const queryResults = await conn.query(`
200+
RETURN 1;
201+
RETURN 2;
202+
RETURN 3;
203+
`);
204+
assert.exists(queryResults);
205+
assert.equal(queryResults.length, 3);
206+
const results = await Promise.all([
207+
queryResults[0].getAll(),
208+
queryResults[1].getAll(),
209+
queryResults[2].getAll(),
210+
]);
211+
assert.deepEqual(results, [[{ 1: 1 }], [{ 2: 2 }], [{ 3: 3 }]]);
212+
});
197213
});
198214

199215
describe("Timeout", function () {

0 commit comments

Comments
 (0)