Skip to content

Commit 7df467f

Browse files
committed
Added sync and async benchamrking comparison
1 parent b763acb commit 7df467f

File tree

4 files changed

+218
-32
lines changed

4 files changed

+218
-32
lines changed

packages/google-cloud-spanner/benchmark/async_bench/point_reads.py

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import asyncio
33
import random
44
import time
5-
from google.cloud.spanner_v1._async.client import Client
5+
from google.cloud.spanner_v1._async.client import Client as AsyncClient
6+
from google.cloud import spanner as sync_spanner
67
from benchmark.async_bench.metrics_utils import BenchmarkMetrics
8+
import concurrent.futures
79

810
PROJECT_ID = "span-cloud-testing"
911
INSTANCE_ID = "suvham-testing"
@@ -31,24 +33,77 @@ async def read_worker(database, metrics, operation_count, worker_id):
3133
print(f"Worker {worker_id} finished.")
3234

3335

36+
def sync_read_worker(database, metrics, operation_count, worker_id):
37+
"""A worker that performs point reads synchronously."""
38+
print(f"Sync Worker {worker_id} started.")
39+
for _ in range(operation_count):
40+
row_id = f"user-{random.randint(0, 99999)}"
41+
start_op = time.perf_counter()
42+
43+
with database.snapshot() as snapshot:
44+
results = snapshot.execute_sql("SELECT * FROM {} WHERE id = @id".format(TABLE_NAME),
45+
params={"id": row_id})
46+
for _ in results:
47+
pass
48+
49+
end_op = time.perf_counter()
50+
metrics.record_latency((end_op - start_op) * 1000.0)
51+
52+
print(f"Sync Worker {worker_id} finished.")
53+
54+
3455
async def main():
35-
parser = argparse.ArgumentParser(description="Benchmark Point Reads")
56+
parser = argparse.ArgumentParser(description="Benchmark Point Reads (Sync vs Async)")
3657
parser.add_argument("--concurrency", type=int, default=10, help="Number of concurrent workers")
3758
parser.add_argument("--operations", type=int, default=100, help="Number of operations per worker")
3859
args = parser.parse_args()
3960

40-
client = Client(project=PROJECT_ID)
41-
instance = client.instance(INSTANCE_ID)
42-
database = await instance.database(DATABASE_ID)
61+
# --- Sync Part ---
62+
print("\n--- Running Synchronous Benchmark ---")
63+
sync_client = sync_spanner.Client(project=PROJECT_ID)
64+
sync_instance = sync_client.instance(INSTANCE_ID)
65+
sync_database = sync_instance.database(DATABASE_ID)
66+
67+
# Warmup Sync
68+
print("Warming up Synchronous Client (10 operations)...")
69+
sync_read_worker(sync_database, BenchmarkMetrics(), 10, "warmup")
70+
71+
sync_metrics = BenchmarkMetrics()
72+
sync_metrics.start()
73+
74+
with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
75+
futures = [executor.submit(sync_read_worker, sync_database, sync_metrics, args.operations, i) for i in range(args.concurrency)]
76+
concurrent.futures.wait(futures)
4377

44-
metrics = BenchmarkMetrics()
45-
metrics.start()
78+
sync_metrics.stop()
79+
sync_metrics.report("Scenario A (Sync Point Reads)")
4680

47-
tasks = [read_worker(database, metrics, args.operations, i) for i in range(args.concurrency)]
81+
# --- Async Part ---
82+
print("\n--- Running Asynchronous Benchmark ---")
83+
async_client = AsyncClient(project=PROJECT_ID)
84+
async_instance = async_client.instance(INSTANCE_ID)
85+
async_database = await async_instance.database(DATABASE_ID)
86+
87+
# Warmup Async
88+
print("Warming up Asynchronous Client (10 operations)...")
89+
await read_worker(async_database, BenchmarkMetrics(), 10, "warmup")
90+
91+
async_metrics = BenchmarkMetrics()
92+
async_metrics.start()
93+
94+
tasks = [read_worker(async_database, async_metrics, args.operations, i) for i in range(args.concurrency)]
4895
await asyncio.gather(*tasks)
4996

50-
metrics.stop()
51-
metrics.report("Scenario A (Point Reads)")
97+
async_metrics.stop()
98+
async_metrics.report("Scenario A (Async Point Reads)")
99+
100+
# --- Comparison ---
101+
print("\n--- Comparison (Async vs Sync) ---")
102+
sync_qps = float(args.concurrency * args.operations) / (sync_metrics.end_time - sync_metrics.start_time)
103+
async_qps = float(args.concurrency * args.operations) / (async_metrics.end_time - async_metrics.start_time)
104+
print(f"Sync Throughput: {sync_qps:.2f} QPS")
105+
print(f"Async Throughput: {async_qps:.2f} QPS")
106+
print(f"Throughput Ratio (Async/Sync): {async_qps / sync_qps:.2f}x")
52107

53108

54109
if __name__ == "__main__":

packages/google-cloud-spanner/benchmark/async_bench/provision_db.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ async def provision_database():
2828
f"CREATE TABLE {TABLE_NAME} ("
2929
f" id STRING(36) NOT NULL,"
3030
f" field0 STRING(100),"
31-
f" field1 STRING(100)"
31+
f" field1 STRING(100),"
32+
f" field2 STRING(100),"
33+
f" field3 STRING(100),"
34+
f" field4 STRING(100),"
35+
f" field5 STRING(100),"
36+
f" field6 STRING(100),"
37+
f" field7 STRING(100),"
38+
f" field8 STRING(100),"
39+
f" field9 STRING(100)"
3240
f") PRIMARY KEY(id)"
3341
]
3442

packages/google-cloud-spanner/benchmark/async_bench/streaming.py

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import argparse
22
import asyncio
33
import time
4-
from google.cloud.spanner_v1._async.client import Client
4+
from google.cloud.spanner_v1._async.client import Client as AsyncClient
5+
from google.cloud import spanner as sync_spanner
56
from benchmark.async_bench.metrics_utils import BenchmarkMetrics
7+
import concurrent.futures
68

79
PROJECT_ID = "span-cloud-testing"
810
INSTANCE_ID = "suvham-testing"
@@ -30,24 +32,75 @@ async def stream_worker(database, metrics, operation_count, worker_id):
3032
print(f"Worker {worker_id} finished.")
3133

3234

35+
def sync_stream_worker(database, metrics, operation_count, worker_id):
36+
"""A worker that retrieves large result sets via streaming synchronously."""
37+
print(f"Sync Worker {worker_id} started.")
38+
for _ in range(operation_count):
39+
start_op = time.perf_counter()
40+
41+
with database.snapshot() as snapshot:
42+
results = snapshot.execute_sql("SELECT * FROM {} LIMIT 100000".format(TABLE_NAME))
43+
for _ in results:
44+
pass # Consuming the stream quickly
45+
46+
end_op = time.perf_counter()
47+
metrics.record_latency((end_op - start_op) * 1000.0)
48+
49+
print(f"Sync Worker {worker_id} finished.")
50+
51+
3352
async def main():
34-
parser = argparse.ArgumentParser(description="Benchmark Streaming Reads")
53+
parser = argparse.ArgumentParser(description="Benchmark Streaming Reads (Sync vs Async)")
3554
parser.add_argument("--concurrency", type=int, default=1, help="Number of concurrent workers (usually 1 for large streams)")
3655
parser.add_argument("--operations", type=int, default=1, help="Number of times to run per worker")
3756
args = parser.parse_args()
3857

39-
client = Client(project=PROJECT_ID)
40-
instance = client.instance(INSTANCE_ID)
41-
database = await instance.database(DATABASE_ID)
58+
# --- Sync Part ---
59+
print("\n--- Running Synchronous Benchmark ---")
60+
sync_client = sync_spanner.Client(project=PROJECT_ID)
61+
sync_instance = sync_client.instance(INSTANCE_ID)
62+
sync_database = sync_instance.database(DATABASE_ID)
63+
64+
# Warmup Sync
65+
print("Warming up Synchronous Client (10 operations)...")
66+
sync_stream_worker(sync_database, BenchmarkMetrics(), 10, "warmup")
67+
68+
sync_metrics = BenchmarkMetrics()
69+
sync_metrics.start()
70+
71+
with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
72+
futures = [executor.submit(sync_stream_worker, sync_database, sync_metrics, args.operations, i) for i in range(args.concurrency)]
73+
concurrent.futures.wait(futures)
4274

43-
metrics = BenchmarkMetrics()
44-
metrics.start()
75+
sync_metrics.stop()
76+
sync_metrics.report("Scenario C (Sync Streaming)")
4577

46-
tasks = [stream_worker(database, metrics, args.operations, i) for i in range(args.concurrency)]
78+
# --- Async Part ---
79+
print("\n--- Running Asynchronous Benchmark ---")
80+
async_client = AsyncClient(project=PROJECT_ID)
81+
async_instance = async_client.instance(INSTANCE_ID)
82+
async_database = await async_instance.database(DATABASE_ID)
83+
84+
# Warmup Async
85+
print("Warming up Asynchronous Client (10 operations)...")
86+
await stream_worker(async_database, BenchmarkMetrics(), 10, "warmup")
87+
88+
async_metrics = BenchmarkMetrics()
89+
async_metrics.start()
90+
91+
tasks = [stream_worker(async_database, async_metrics, args.operations, i) for i in range(args.concurrency)]
4792
await asyncio.gather(*tasks)
4893

49-
metrics.stop()
50-
metrics.report("Scenario C (Streaming)")
94+
async_metrics.stop()
95+
async_metrics.report("Scenario C (Async Streaming)")
96+
97+
# --- Comparison ---
98+
print("\n--- Comparison (Async vs Sync) ---")
99+
sync_qps = float(args.concurrency * args.operations) / (sync_metrics.end_time - sync_metrics.start_time)
100+
async_qps = float(args.concurrency * args.operations) / (async_metrics.end_time - async_metrics.start_time)
101+
print(f"Sync Throughput: {sync_qps:.2f} QPS")
102+
print(f"Async Throughput: {async_qps:.2f} QPS")
103+
print(f"Throughput Ratio (Async/Sync): {async_qps / sync_qps:.2f}x")
51104

52105

53106
if __name__ == "__main__":

packages/google-cloud-spanner/benchmark/async_bench/transactions.py

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import asyncio
33
import random
44
import time
5-
from google.cloud.spanner_v1._async.client import Client
5+
from google.cloud.spanner_v1._async.client import Client as AsyncClient
6+
from google.cloud import spanner as sync_spanner
67
from benchmark.async_bench.metrics_utils import BenchmarkMetrics
8+
import concurrent.futures
79

810
PROJECT_ID = "span-cloud-testing"
911
INSTANCE_ID = "suvham-testing"
@@ -47,24 +49,92 @@ async def transaction_worker(database, metrics, operation_count, worker_id):
4749
print(f"Worker {worker_id} finished.")
4850

4951

52+
import string
53+
54+
def sync_transaction_worker(database, metrics, operation_count, worker_id):
55+
"""A worker that performs transactions synchronously."""
56+
print(f"Sync Worker {worker_id} started.")
57+
for _ in range(operation_count):
58+
row_id = f"user-{random.randint(0, 99999)}"
59+
start_op = time.perf_counter()
60+
61+
try:
62+
def update_columns(transaction):
63+
results = transaction.execute_sql(
64+
"SELECT field0 FROM {} WHERE id = @id".format(TABLE_NAME),
65+
params={"id": row_id}
66+
)
67+
for _ in results:
68+
pass
69+
70+
transaction.update(
71+
table=TABLE_NAME,
72+
columns=["id", "field0"],
73+
values=[(row_id, "".join(random.choice(string.ascii_letters) for _ in range(10)))]
74+
)
75+
76+
database.run_in_transaction(update_columns)
77+
end_op = time.perf_counter()
78+
metrics.record_latency((end_op - start_op) * 1000.0)
79+
80+
except Exception as e:
81+
print(f"Sync Transaction failed in worker {worker_id}: {e}")
82+
83+
print(f"Sync Worker {worker_id} finished.")
84+
85+
5086
async def main():
51-
parser = argparse.ArgumentParser(description="Benchmark Transactions")
87+
parser = argparse.ArgumentParser(description="Benchmark Read-Write Transactions (Sync vs Async)")
5288
parser.add_argument("--concurrency", type=int, default=5, help="Number of concurrent workers")
53-
parser.add_argument("--operations", type=int, default=50, help="Number of operations per worker")
89+
parser.add_argument("--operations", type=int, default=20, help="Number of operations per worker")
5490
args = parser.parse_args()
5591

56-
client = Client(project=PROJECT_ID)
57-
instance = client.instance(INSTANCE_ID)
58-
database = await instance.database(DATABASE_ID)
92+
# --- Sync Part ---
93+
print("\n--- Running Synchronous Benchmark ---")
94+
sync_client = sync_spanner.Client(project=PROJECT_ID)
95+
sync_instance = sync_client.instance(INSTANCE_ID)
96+
sync_database = sync_instance.database(DATABASE_ID)
97+
98+
# Warmup Sync
99+
print("Warming up Synchronous Client (10 operations)...")
100+
sync_transaction_worker(sync_database, BenchmarkMetrics(), 10, "warmup")
101+
102+
sync_metrics = BenchmarkMetrics()
103+
sync_metrics.start()
104+
105+
with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
106+
futures = [executor.submit(sync_transaction_worker, sync_database, sync_metrics, args.operations, i) for i in range(args.concurrency)]
107+
concurrent.futures.wait(futures)
59108

60-
metrics = BenchmarkMetrics()
61-
metrics.start()
109+
sync_metrics.stop()
110+
sync_metrics.report("Scenario B (Sync Transactions)")
62111

63-
tasks = [transaction_worker(database, metrics, args.operations, i) for i in range(args.concurrency)]
112+
# --- Async Part ---
113+
print("\n--- Running Asynchronous Benchmark ---")
114+
async_client = AsyncClient(project=PROJECT_ID)
115+
async_instance = async_client.instance(INSTANCE_ID)
116+
async_database = await async_instance.database(DATABASE_ID)
117+
118+
# Warmup Async
119+
print("Warming up Asynchronous Client (10 operations)...")
120+
await transaction_worker(async_database, BenchmarkMetrics(), 10, "warmup")
121+
122+
async_metrics = BenchmarkMetrics()
123+
async_metrics.start()
124+
125+
tasks = [transaction_worker(async_database, async_metrics, args.operations, i) for i in range(args.concurrency)]
64126
await asyncio.gather(*tasks)
65127

66-
metrics.stop()
67-
metrics.report("Scenario B (Transactions)")
128+
async_metrics.stop()
129+
async_metrics.report("Scenario B (Async Transactions)")
130+
131+
# --- Comparison ---
132+
print("\n--- Comparison (Async vs Sync) ---")
133+
sync_qps = float(args.concurrency * args.operations) / (sync_metrics.end_time - sync_metrics.start_time)
134+
async_qps = float(args.concurrency * args.operations) / (async_metrics.end_time - async_metrics.start_time)
135+
print(f"Sync Throughput: {sync_qps:.2f} QPS")
136+
print(f"Async Throughput: {async_qps:.2f} QPS")
137+
print(f"Throughput Ratio (Async/Sync): {async_qps / sync_qps:.2f}x")
68138

69139

70140
if __name__ == "__main__":

0 commit comments

Comments
 (0)