Skip to content

Commit fc320ca

Browse files
committed
adapters: postgres_output: multiple worker threads
Adds support for multiple worker threads in the postgres output connector. The postgres output endpoint spawns n (default: 4) PostgresWorkers (a postgres worker is the same as the old PostgresOutputEndpoint), and then each worker is passed a SplitCursor to process. Each worker has its own retry mechanism, and each worker performs its own transaction on the table, so, if a worker fails only this fraction of the batch will not be committed to Postgres. Workers are created at the start, and then sent BatchStart, BatchEnd, Encode and Shutdown messages. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent fa06b6f commit fc320ca

File tree

5 files changed

+933
-214
lines changed

5 files changed

+933
-214
lines changed

crates/adapters/benches/postgres_output.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,15 @@ fn bench_postgres_encode(c: &mut Criterion) {
135135
drop_bench_table(&mut pg_client);
136136
}
137137

138-
/// Benchmark Postgres output scaling: 1k/10k/100k records x 1/2/4/8 workers.
138+
/// Benchmark Postgres output scaling: 100k/1M/2M records x 1/2/4/8 workers.
139139
fn bench_postgres_encode_scaling(c: &mut Criterion) {
140140
let mut pg_client = postgres_client();
141141
create_bench_table(&mut pg_client);
142142

143143
let mut group = c.benchmark_group("postgres_output_encode_scaling");
144144
group.sample_size(10);
145145

146-
for num_records in [1_000, 10_000, 100_000] {
146+
for num_records in [100_000, 1_000_000, 2_000_000] {
147147
let data = generate_test_data(num_records);
148148
let batch = build_indexed_batch(&data);
149149

0 commit comments

Comments
 (0)