Skip to content

Commit 89e9bed

Browse files
authored
feat(pubsublite): default routing policy (#8553)
1 parent f06db13 commit 89e9bed

8 files changed

Lines changed: 307 additions & 1 deletion

.typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
extend-exclude = [
33
# Testing input file
44
"google/cloud/testing_util/rest_testing_large_json.json",
5+
# Test files
6+
"google/cloud/pubsublite/internal/default_routing_policy_test.cc",
57
# Autogenerated
68
"google/cloud/bigtable/internal/readrowsparser_acceptance_tests.inc",
79
# The source proto files have one or more typos in their comments

google/cloud/pubsublite/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ add_library(
9797
internal/cursor_stub.h
9898
internal/cursor_stub_factory.cc
9999
internal/cursor_stub_factory.h
100+
internal/default_routing_policy.cc
101+
internal/default_routing_policy.h
100102
internal/futures.h
101103
internal/partition_assignment_auth_decorator.cc
102104
internal/partition_assignment_auth_decorator.h
@@ -122,6 +124,7 @@ add_library(
122124
internal/publisher_stub_factory.cc
123125
internal/publisher_stub_factory.h
124126
internal/resumable_async_streaming_read_write_rpc.h
127+
internal/routing_policy.h
125128
internal/service.h
126129
internal/service_composite.h
127130
internal/stream_factory.h
@@ -215,7 +218,6 @@ function (google_cloud_cpp_pubsublite_client_define_tests)
215218
# googletest itself), because the generic `FindGTest` module does not define
216219
# the GTest::gmock target, and the target names are also weird.
217220
find_package(GTest CONFIG REQUIRED)
218-
219221
add_library(pubsublite_testing INTERFACE)
220222
target_sources(
221223
pubsublite_testing
@@ -243,6 +245,7 @@ function (google_cloud_cpp_pubsublite_client_define_tests)
243245
set(pubsublite_unit_tests
244246
# cmake-format: sort
245247
endpoint_test.cc
248+
internal/default_routing_policy_test.cc
246249
internal/partition_publisher_test.cc
247250
internal/resumable_async_streaming_read_write_rpc_test.cc
248251
internal/service_composite_test.cc

google/cloud/pubsublite/google_cloud_cpp_pubsublite.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ google_cloud_cpp_pubsublite_hdrs = [
3737
"internal/cursor_metadata_decorator.h",
3838
"internal/cursor_stub.h",
3939
"internal/cursor_stub_factory.h",
40+
"internal/default_routing_policy.h",
4041
"internal/futures.h",
4142
"internal/partition_assignment_auth_decorator.h",
4243
"internal/partition_assignment_logging_decorator.h",
@@ -51,6 +52,7 @@ google_cloud_cpp_pubsublite_hdrs = [
5152
"internal/publisher_stub.h",
5253
"internal/publisher_stub_factory.h",
5354
"internal/resumable_async_streaming_read_write_rpc.h",
55+
"internal/routing_policy.h",
5456
"internal/service.h",
5557
"internal/service_composite.h",
5658
"internal/stream_factory.h",
@@ -90,6 +92,7 @@ google_cloud_cpp_pubsublite_srcs = [
9092
"internal/cursor_metadata_decorator.cc",
9193
"internal/cursor_stub.cc",
9294
"internal/cursor_stub_factory.cc",
95+
"internal/default_routing_policy.cc",
9396
"internal/partition_assignment_auth_decorator.cc",
9497
"internal/partition_assignment_logging_decorator.cc",
9598
"internal/partition_assignment_metadata_decorator.cc",
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsublite/internal/default_routing_policy.h"
16+
#include "google/cloud/version.h"
17+
#include <unordered_map>
18+
19+
namespace google {
20+
namespace cloud {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
namespace pubsublite_internal {
23+
24+
// Uses the identity that `(a*b) % m == ((a % m) * (b % m)) % m`
25+
std::uint64_t ModPow(std::uint64_t val, std::uint64_t pow, std::uint32_t mod) {
26+
std::uint64_t result = 1;
27+
for (std::uint32_t i = 0; i < pow; ++i) {
28+
result *= (val % mod);
29+
result %= mod;
30+
}
31+
return result;
32+
}
33+
34+
// Uses the identity that `(a*b) % m == ((a % m) * (b % m)) % m`
35+
// Uses the identity that `(a+b) % m == ((a % m) + (b % m)) % m`
36+
std::uint64_t GetMod(std::array<uint8_t, 32> big_endian, std::uint32_t mod) {
37+
std::uint64_t result = 0;
38+
for (std::uint64_t i = 0; i < big_endian.size(); ++i) {
39+
std::uint64_t val_mod = big_endian[i] % mod;
40+
41+
std::uint64_t pow = big_endian.size() - (i + 1);
42+
std::uint64_t pow_mod = ModPow(
43+
// 2^8
44+
static_cast<std::uint64_t>(1 << 8), pow, mod);
45+
46+
result += (val_mod * pow_mod) % mod;
47+
result %= mod;
48+
}
49+
return result;
50+
}
51+
52+
std::uint64_t DefaultRoutingPolicy::Route(std::uint32_t num_partitions) {
53+
// atomic operation
54+
return counter_++ % num_partitions;
55+
}
56+
57+
std::uint64_t DefaultRoutingPolicy::Route(std::string const& message_key,
58+
std::uint32_t num_partitions) {
59+
return GetMod(google::cloud::internal::Sha256Hash(message_key),
60+
num_partitions);
61+
}
62+
63+
} // namespace pubsublite_internal
64+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
65+
} // namespace cloud
66+
} // namespace google
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_DEFAULT_ROUTING_POLICY_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_DEFAULT_ROUTING_POLICY_H
17+
18+
#include "google/cloud/pubsublite/internal/routing_policy.h"
19+
#include "google/cloud/internal/sha256_hash.h"
20+
#include "google/cloud/version.h"
21+
#include <atomic>
22+
#include <cstddef>
23+
#include <unordered_map>
24+
25+
namespace google {
26+
namespace cloud {
27+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
28+
namespace pubsublite_internal {
29+
30+
// Calculates the val^pow % mod while accounting for overflow.
31+
// Needed because after calculating `big_endian[i]` % `mod` in `GetMod`, we need
32+
// to account for its position in the array by multiplying it by an offset.
33+
std::uint64_t ModPow(std::uint64_t val, std::uint64_t pow, std::uint32_t mod);
34+
35+
// returns <integer value of `big_endian`> % `mod` while accounting for overflow
36+
std::uint64_t GetMod(std::array<uint8_t, 32> big_endian, std::uint32_t mod);
37+
38+
/**
39+
* Implements the same routing policy as all the other Pub/Sub Lite client
40+
* libraries.
41+
*
42+
* All the client libraries provided by Google use the same algorithm to route
43+
* messages.
44+
*
45+
* @note The algorithm for routing with a message key is <big-endian integer
46+
* representation of SHA256(message key)> % <number of partitions>. It uses
47+
* SHA-256 as it is available in most programming languages, enabling consistent
48+
* hashing across languages.
49+
*/
50+
class DefaultRoutingPolicy : public RoutingPolicy {
51+
public:
52+
std::uint64_t Route(std::uint32_t num_partitions) override;
53+
std::uint64_t Route(std::string const& message_key,
54+
std::uint32_t num_partitions) override;
55+
56+
private:
57+
std::atomic<std::int64_t> counter_{0};
58+
};
59+
60+
} // namespace pubsublite_internal
61+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
62+
} // namespace cloud
63+
} // namespace google
64+
65+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_DEFAULT_ROUTING_POLICY_H
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsublite/internal/default_routing_policy.h"
16+
#include <gmock/gmock.h>
17+
#include <unordered_map>
18+
19+
namespace google {
20+
namespace cloud {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
namespace pubsublite_internal {
23+
24+
using google::cloud::pubsublite_internal::DefaultRoutingPolicy;
25+
26+
TEST(DefaultRoutingPolicyTest, RouteWithKey) {
27+
// same list of test values as in other client libraries
28+
DefaultRoutingPolicy rp;
29+
std::unordered_map<std::string, std::uint64_t> mods = {
30+
{"oaisdhfoiahsd", 18},
31+
{"P(#*YNPOIUDF", 9},
32+
{"LCIUNDFPOASIUN", 8},
33+
{";odsfiupoius", 9},
34+
{"OPISUDfpoiu", 2},
35+
{"dokjwO:IDf", 21},
36+
{"%^&*", 19},
37+
{"XXXXXXXXX", 15},
38+
{"dpcollins", 28},
39+
{"#()&$IJHLOIURF", 2},
40+
{"dfasiduyf", 6},
41+
{"983u2poer", 3},
42+
{"8888888", 6},
43+
{"OPUIPOUYPOIOPUIOIPUOUIPJOP", 2},
44+
{"x", 16}};
45+
for (auto const& kv : mods) {
46+
EXPECT_EQ(rp.Route(kv.first, 29), kv.second);
47+
}
48+
}
49+
50+
TEST(DefaultRoutingPolicyTest, RouteWithoutKey) {
51+
unsigned int num_partitions = 29;
52+
DefaultRoutingPolicy rp;
53+
uint64_t initial_partition = rp.Route(num_partitions);
54+
for (unsigned int i = 0; i < num_partitions; ++i) {
55+
uint64_t next_partition = rp.Route(num_partitions);
56+
EXPECT_EQ((initial_partition + 1) % num_partitions,
57+
next_partition % num_partitions);
58+
initial_partition = next_partition;
59+
}
60+
}
61+
62+
// expected values obtained from Python3 REPL
63+
TEST(TestGetMod, MaxValue) {
64+
std::array<std::uint8_t, 32> arr{255, 255, 255, 255, 255, 255, 255, 255,
65+
255, 255, 255, 255, 255, 255, 255, 255,
66+
255, 255, 255, 255, 255, 255, 255, 255,
67+
255, 255, 255, 255, 255, 255, 255, 255};
68+
EXPECT_EQ(GetMod(arr, 2), 1);
69+
EXPECT_EQ(GetMod(arr, 18), 15);
70+
EXPECT_EQ(GetMod(arr, 100), 35);
71+
EXPECT_EQ(GetMod(arr, 10023), 5397);
72+
EXPECT_EQ(GetMod(arr, UINT8_MAX), 0);
73+
}
74+
75+
TEST(TestGetMod, OneLessThanMaxValue) {
76+
std::array<std::uint8_t, 32> arr{255, 255, 255, 255, 255, 255, 255, 255,
77+
255, 255, 255, 255, 255, 255, 255, 255,
78+
255, 255, 255, 255, 255, 255, 255, 255,
79+
255, 255, 255, 255, 255, 255, 255, 254};
80+
EXPECT_EQ(GetMod(arr, 2), 0);
81+
EXPECT_EQ(GetMod(arr, 18), 14);
82+
EXPECT_EQ(GetMod(arr, 100), 34);
83+
EXPECT_EQ(GetMod(arr, 10023), 5396);
84+
EXPECT_EQ(GetMod(arr, UINT8_MAX), 254);
85+
}
86+
87+
TEST(TestGetMod, Zeros) {
88+
std::array<std::uint8_t, 32> arr{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
89+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
90+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
91+
EXPECT_EQ(GetMod(arr, 2), 0);
92+
EXPECT_EQ(GetMod(arr, 18), 0);
93+
EXPECT_EQ(GetMod(arr, 100), 0);
94+
EXPECT_EQ(GetMod(arr, 10023), 0);
95+
EXPECT_EQ(GetMod(arr, UINT8_MAX), 0);
96+
}
97+
98+
TEST(TestGetMod, ArbitraryValue) {
99+
std::array<std::uint8_t, 32> arr{255, 255, 255, 255, 255, 255, 2, 255,
100+
5, 79, 255, 255, 255, 255, 80, 255,
101+
255, 255, 8, 255, 255, 4, 255, 255,
102+
78, 255, 255, 100, 255, 255, 255, 254};
103+
EXPECT_EQ(GetMod(arr, 10), 0);
104+
EXPECT_EQ(GetMod(arr, 109), 4);
105+
EXPECT_EQ(GetMod(arr, 10023), 3346);
106+
EXPECT_EQ(GetMod(arr, 109000), 60390);
107+
EXPECT_EQ(GetMod(arr, UINT8_MAX), 100);
108+
}
109+
110+
TEST(TestGetMod, ArbitraryValue1) {
111+
std::array<std::uint8_t, 32> arr{0, 48, 0, 0, 60, 0, 0, 56, 0, 99, 0,
112+
0, 0, 0, 0, 90, 231, 0, 89, 0, 27, 80,
113+
0, 0, 0, 254, 0, 0, 0, 0, 23, 0};
114+
EXPECT_EQ(GetMod(arr, 109001), 68945);
115+
EXPECT_EQ(GetMod(arr, 102301), 93535);
116+
EXPECT_EQ(GetMod(arr, 23), 13);
117+
EXPECT_EQ(GetMod(arr, UINT8_MAX), 37);
118+
}
119+
120+
} // namespace pubsublite_internal
121+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
122+
} // namespace cloud
123+
} // namespace google
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ROUTING_POLICY_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ROUTING_POLICY_H
17+
18+
#include "google/cloud/version.h"
19+
20+
namespace google {
21+
namespace cloud {
22+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
23+
namespace pubsublite_internal {
24+
25+
/**
26+
* Interface for a Pub/Sub Lite routing policy that determines the partition
27+
* that a message should be sent to, either depending on the message's key or
28+
* not.
29+
*/
30+
class RoutingPolicy {
31+
public:
32+
virtual ~RoutingPolicy() = default;
33+
virtual std::uint64_t Route(std::uint32_t num_partitions) = 0;
34+
virtual std::uint64_t Route(std::string const& message_key,
35+
std::uint32_t num_partitions) = 0;
36+
};
37+
38+
} // namespace pubsublite_internal
39+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
40+
} // namespace cloud
41+
} // namespace google
42+
43+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ROUTING_POLICY_H

google/cloud/pubsublite/pubsublite_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
pubsublite_unit_tests = [
2020
"endpoint_test.cc",
21+
"internal/default_routing_policy_test.cc",
2122
"internal/partition_publisher_test.cc",
2223
"internal/resumable_async_streaming_read_write_rpc_test.cc",
2324
"internal/service_composite_test.cc",

0 commit comments

Comments
 (0)