Skip to content

Commit c126d1d

Browse files
authored
feat(pubsublite): alarm registry implementation (googleapis#8575)
1 parent 89e9bed commit c126d1d

7 files changed

Lines changed: 324 additions & 1 deletion

File tree

google/cloud/pubsublite/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ add_library(
8686
internal/admin_stub_factory.cc
8787
internal/admin_stub_factory.h
8888
internal/alarm_registry.h
89+
internal/alarm_registry_impl.cc
90+
internal/alarm_registry_impl.h
8991
internal/batching_options.h
9092
internal/cursor_auth_decorator.cc
9193
internal/cursor_auth_decorator.h
@@ -245,6 +247,7 @@ function (google_cloud_cpp_pubsublite_client_define_tests)
245247
set(pubsublite_unit_tests
246248
# cmake-format: sort
247249
endpoint_test.cc
250+
internal/alarm_registry_impl_test.cc
248251
internal/default_routing_policy_test.cc
249252
internal/partition_publisher_test.cc
250253
internal/resumable_async_streaming_read_write_rpc_test.cc

google/cloud/pubsublite/google_cloud_cpp_pubsublite.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ google_cloud_cpp_pubsublite_hdrs = [
3131
"internal/admin_stub.h",
3232
"internal/admin_stub_factory.h",
3333
"internal/alarm_registry.h",
34+
"internal/alarm_registry_impl.h",
3435
"internal/batching_options.h",
3536
"internal/cursor_auth_decorator.h",
3637
"internal/cursor_logging_decorator.h",
@@ -87,6 +88,7 @@ google_cloud_cpp_pubsublite_srcs = [
8788
"internal/admin_option_defaults.cc",
8889
"internal/admin_stub.cc",
8990
"internal/admin_stub_factory.cc",
91+
"internal/alarm_registry_impl.cc",
9092
"internal/cursor_auth_decorator.cc",
9193
"internal/cursor_logging_decorator.cc",
9294
"internal/cursor_metadata_decorator.cc",

google/cloud/pubsublite/internal/alarm_registry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class AlarmRegistry {
4949
virtual ~CancelToken() = default;
5050
CancelToken(CancelToken const&) = delete;
5151
CancelToken& operator=(CancelToken const&) = delete;
52-
CancelToken(AlarmRegistry&&) = delete;
52+
CancelToken(CancelToken&&) = delete;
5353
CancelToken& operator=(CancelToken&&) = delete;
5454
};
5555

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsublite/internal/alarm_registry_impl.h"
16+
#include "google/cloud/log.h"
17+
#include "google/cloud/version.h"
18+
#include "absl/memory/memory.h"
19+
#include <utility>
20+
21+
namespace google {
22+
namespace cloud {
23+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
24+
namespace pubsublite_internal {
25+
26+
AlarmRegistryImpl::AlarmRegistryImpl(google::cloud::CompletionQueue cq)
27+
: cq_{std::move(cq)} {}
28+
29+
void AlarmRegistryImpl::ScheduleAlarm(
30+
std::shared_ptr<AlarmState> const& state) {
31+
auto temp =
32+
state->cq.MakeRelativeTimer(state->period)
33+
.then([state](
34+
future<StatusOr<std::chrono::system_clock::time_point>> f) {
35+
auto status = f.get();
36+
if (!status.ok()) {
37+
GCP_LOG(INFO) << "`MakeRelativeTimer` returned a non-ok status: "
38+
<< status.status();
39+
return;
40+
}
41+
{
42+
std::lock_guard<std::mutex> g{state->mu};
43+
if (state->shutdown) return;
44+
state->on_alarm();
45+
}
46+
ScheduleAlarm(state);
47+
});
48+
std::lock_guard<std::mutex> g{state->mu};
49+
if (!state->shutdown) {
50+
state->timer = std::move(temp);
51+
} else {
52+
temp.cancel();
53+
}
54+
}
55+
56+
AlarmRegistryImpl::CancelTokenImpl::CancelTokenImpl(
57+
std::shared_ptr<AlarmState> state)
58+
: state_{std::move(state)} {}
59+
60+
AlarmRegistryImpl::CancelTokenImpl::~CancelTokenImpl() {
61+
// the alarm function is guarded by mu and is only invoked after checking
62+
// shutdown all while guarded by mu, so this guarantees that the alarm
63+
// function isn't running when the destructor is run and the function won't
64+
// run after the destructor finishes
65+
std::lock_guard<std::mutex> g{state_->mu};
66+
state_->timer.cancel();
67+
state_->shutdown = true;
68+
}
69+
70+
std::unique_ptr<AlarmRegistry::CancelToken> AlarmRegistryImpl::RegisterAlarm(
71+
std::chrono::milliseconds period, std::function<void()> on_alarm) {
72+
auto state = std::make_shared<AlarmState>(cq_, period, std::move(on_alarm));
73+
ScheduleAlarm(state);
74+
return absl::make_unique<CancelTokenImpl>(state);
75+
}
76+
77+
} // namespace pubsublite_internal
78+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
79+
} // namespace cloud
80+
} // namespace google
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ALARM_REGISTRY_IMPL_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ALARM_REGISTRY_IMPL_H
17+
18+
#include "google/cloud/pubsublite/internal/alarm_registry.h"
19+
#include "google/cloud/completion_queue.h"
20+
#include "google/cloud/version.h"
21+
#include <deque>
22+
#include <mutex>
23+
24+
namespace google {
25+
namespace cloud {
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
namespace pubsublite_internal {
28+
29+
/**
30+
* Default implementation of `AlarmRegistry`.
31+
*
32+
* @note Utilizes `CompletionQueue::MakeRelativeTimer`.
33+
*/
34+
class AlarmRegistryImpl : public AlarmRegistry {
35+
public:
36+
explicit AlarmRegistryImpl(google::cloud::CompletionQueue cq);
37+
38+
/**
39+
* @note The returned `CancelToken` should not be destructed in `on_alarm()`'s
40+
* downcall.
41+
*/
42+
std::unique_ptr<AlarmRegistry::CancelToken> RegisterAlarm(
43+
std::chrono::milliseconds period,
44+
std::function<void()> on_alarm) override;
45+
46+
private:
47+
struct AlarmState {
48+
AlarmState(CompletionQueue completion_queue,
49+
std::chrono::milliseconds period, std::function<void()> on_alarm)
50+
: cq{std::move(completion_queue)},
51+
period{period},
52+
on_alarm{std::move(on_alarm)} {}
53+
CompletionQueue cq;
54+
std::chrono::milliseconds const period;
55+
std::mutex mu;
56+
future<void> timer; // ABSL_GUARDED_BY(mu)
57+
std::function<void()> on_alarm; // ABSL_GUARDED_BY(mu)
58+
bool shutdown = false; // ABSL_GUARDED_BY(mu)
59+
};
60+
61+
// When CancelToken is destroyed, the alarm will not be running and will never
62+
// run again.
63+
class CancelTokenImpl : public AlarmRegistry::CancelToken {
64+
public:
65+
explicit CancelTokenImpl(std::shared_ptr<AlarmState>);
66+
67+
~CancelTokenImpl() override;
68+
69+
private:
70+
std::shared_ptr<AlarmState> state_;
71+
};
72+
73+
// static with arguments rather than member variables, so parameters aren't
74+
// bound to object lifetime
75+
static void ScheduleAlarm(std::shared_ptr<AlarmState> const& state);
76+
77+
google::cloud::CompletionQueue const cq_;
78+
};
79+
80+
} // namespace pubsublite_internal
81+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
82+
} // namespace cloud
83+
} // namespace google
84+
85+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_ALARM_REGISTRY_IMPL_H
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsublite/internal/alarm_registry_impl.h"
16+
#include "google/cloud/future.h"
17+
#include "google/cloud/status_or.h"
18+
#include "google/cloud/testing_util/async_sequencer.h"
19+
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
20+
#include <gmock/gmock.h>
21+
#include <chrono>
22+
#include <deque>
23+
#include <memory>
24+
#include <thread>
25+
26+
namespace google {
27+
namespace cloud {
28+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
29+
namespace pubsublite_internal {
30+
namespace {
31+
32+
using google::cloud::CompletionQueue;
33+
using google::cloud::testing_util::AsyncSequencer;
34+
using google::cloud::testing_util::MockCompletionQueueImpl;
35+
using ::testing::ByMove;
36+
using ::testing::InSequence;
37+
using ::testing::MockFunction;
38+
using ::testing::Return;
39+
using ::testing::StrictMock;
40+
41+
// 10,000 seconds to ensure this test has no time dependence
42+
auto constexpr kAlarmPeriod =
43+
std::chrono::milliseconds{10000 * std::chrono::seconds(1)};
44+
45+
class AlarmRegistryImplTest : public ::testing::Test {
46+
protected:
47+
AlarmRegistryImplTest()
48+
: cq_{std::make_shared<StrictMock<MockCompletionQueueImpl>>()},
49+
alarm_{CompletionQueue{cq_}} {}
50+
std::shared_ptr<StrictMock<MockCompletionQueueImpl>> cq_;
51+
AlarmRegistryImpl alarm_;
52+
StrictMock<MockFunction<void()>> fun_;
53+
AsyncSequencer<StatusOr<std::chrono::system_clock::time_point>> sequencer_;
54+
};
55+
56+
TEST_F(AlarmRegistryImplTest, TokenDestroyedBeforeRun) {
57+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
58+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
59+
auto token = alarm_.RegisterAlarm(kAlarmPeriod, fun_.AsStdFunction());
60+
token = nullptr;
61+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
62+
}
63+
64+
TEST_F(AlarmRegistryImplTest, TimerErrorBeforeRun) {
65+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
66+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
67+
alarm_.RegisterAlarm(kAlarmPeriod, fun_.AsStdFunction());
68+
sequencer_.PopFront().set_value(Status{StatusCode::kCancelled, "cancelled"});
69+
}
70+
71+
TEST_F(AlarmRegistryImplTest, TokenDestroyedAfterSingleRun) {
72+
InSequence seq;
73+
74+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
75+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
76+
auto token = alarm_.RegisterAlarm(kAlarmPeriod, fun_.AsStdFunction());
77+
78+
EXPECT_CALL(fun_, Call);
79+
80+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
81+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
82+
83+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
84+
85+
token = nullptr;
86+
87+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
88+
}
89+
90+
TEST_F(AlarmRegistryImplTest, TokenDestroyedAfterFiveRuns) {
91+
InSequence seq;
92+
93+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
94+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
95+
auto token = alarm_.RegisterAlarm(kAlarmPeriod, fun_.AsStdFunction());
96+
97+
for (unsigned int i = 0; i < 5; ++i) {
98+
EXPECT_CALL(fun_, Call);
99+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
100+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
101+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
102+
}
103+
104+
token = nullptr;
105+
106+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
107+
}
108+
109+
TEST_F(AlarmRegistryImplTest, TokenDestroyedDuringSecondRun) {
110+
InSequence seq;
111+
112+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
113+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
114+
auto token = alarm_.RegisterAlarm(kAlarmPeriod, fun_.AsStdFunction());
115+
116+
EXPECT_CALL(fun_, Call);
117+
118+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
119+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
120+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
121+
122+
promise<void> in_alarm_function;
123+
promise<void> destroy_finished;
124+
125+
EXPECT_CALL(fun_, Call).WillOnce([&]() {
126+
in_alarm_function.set_value();
127+
// Unable to destroy token while the alarm is outstanding
128+
EXPECT_EQ(destroy_finished.get_future().wait_for(std::chrono::seconds(2)),
129+
std::future_status::timeout);
130+
});
131+
132+
std::thread first{[&]() {
133+
// guarantee that alarm function is being invoked
134+
in_alarm_function.get_future().get();
135+
token = nullptr;
136+
destroy_finished.set_value();
137+
}};
138+
139+
EXPECT_CALL(*cq_, MakeRelativeTimer(std::chrono::nanoseconds(kAlarmPeriod)))
140+
.WillOnce(Return(ByMove(sequencer_.PushBack())));
141+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
142+
143+
first.join();
144+
145+
sequencer_.PopFront().set_value(std::chrono::system_clock::time_point{});
146+
}
147+
148+
} // namespace
149+
} // namespace pubsublite_internal
150+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
151+
} // namespace cloud
152+
} // namespace google

google/cloud/pubsublite/pubsublite_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
pubsublite_unit_tests = [
2020
"endpoint_test.cc",
21+
"internal/alarm_registry_impl_test.cc",
2122
"internal/default_routing_policy_test.cc",
2223
"internal/partition_publisher_test.cc",
2324
"internal/resumable_async_streaming_read_write_rpc_test.cc",

0 commit comments

Comments
 (0)