Skip to content

Commit f967ea0

Browse files
Google APIscopybara-github
authored andcommitted
feat: Add ComputeHeadCursor RPC for Pub/Sub Lite.
PiperOrigin-RevId: 347681363
1 parent e689e62 commit f967ea0

8 files changed

Lines changed: 87 additions & 54 deletions

File tree

google/cloud/pubsublite/v1/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ py_gapic_library(
173173
grpc_service_config = "pubsublite_grpc_service_config.json",
174174
)
175175

176+
# Open Source Packages
176177
py_gapic_assembly_pkg(
177178
name = "pubsublite-v1-py",
178179
deps = [

google/cloud/pubsublite/v1/admin.proto

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
3636
// subscriptions, such creating, listing, and deleting topics and subscriptions.
3737
service AdminService {
3838
option (google.api.default_host) = "pubsublite.googleapis.com";
39-
option (google.api.oauth_scopes) =
40-
"https://www.googleapis.com/auth/cloud-platform";
39+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
4140

4241
// Creates a new topic.
4342
rpc CreateTopic(CreateTopicRequest) returns (Topic) {
@@ -90,8 +89,7 @@ service AdminService {
9089
}
9190

9291
// Lists the subscriptions attached to the specified topic.
93-
rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest)
94-
returns (ListTopicSubscriptionsResponse) {
92+
rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest) returns (ListTopicSubscriptionsResponse) {
9593
option (google.api.http) = {
9694
get: "/v1/admin/{name=projects/*/locations/*/topics/*}/subscriptions"
9795
};
@@ -104,8 +102,7 @@ service AdminService {
104102
post: "/v1/admin/{parent=projects/*/locations/*}/subscriptions"
105103
body: "subscription"
106104
};
107-
option (google.api.method_signature) =
108-
"parent,subscription,subscription_id";
105+
option (google.api.method_signature) = "parent,subscription,subscription_id";
109106
}
110107

111108
// Returns the subscription configuration.
@@ -117,8 +114,7 @@ service AdminService {
117114
}
118115

119116
// Returns the list of subscriptions for the given project.
120-
rpc ListSubscriptions(ListSubscriptionsRequest)
121-
returns (ListSubscriptionsResponse) {
117+
rpc ListSubscriptions(ListSubscriptionsRequest) returns (ListSubscriptionsResponse) {
122118
option (google.api.http) = {
123119
get: "/v1/admin/{parent=projects/*/locations/*}/subscriptions"
124120
};
@@ -135,8 +131,7 @@ service AdminService {
135131
}
136132

137133
// Deletes the specified subscription.
138-
rpc DeleteSubscription(DeleteSubscriptionRequest)
139-
returns (google.protobuf.Empty) {
134+
rpc DeleteSubscription(DeleteSubscriptionRequest) returns (google.protobuf.Empty) {
140135
option (google.api.http) = {
141136
delete: "/v1/admin/{name=projects/*/locations/*/subscriptions/*}"
142137
};
@@ -155,12 +150,11 @@ message CreateTopicRequest {
155150
}
156151
];
157152

158-
// Required. Configuration of the topic to create. Its `name` field is
159-
// ignored.
153+
// Required. Configuration of the topic to create. Its `name` field is ignored.
160154
Topic topic = 2 [(google.api.field_behavior) = REQUIRED];
161155

162-
// Required. The ID to use for the topic, which will become the final
163-
// component of the topic's name.
156+
// Required. The ID to use for the topic, which will become the final component of
157+
// the topic's name.
164158
//
165159
// This value is structured like: `my-topic-name`.
166160
string topic_id = 3 [(google.api.field_behavior) = REQUIRED];
@@ -235,8 +229,7 @@ message UpdateTopicRequest {
235229
Topic topic = 1 [(google.api.field_behavior) = REQUIRED];
236230

237231
// Required. A mask specifying the topic fields to change.
238-
google.protobuf.FieldMask update_mask = 2
239-
[(google.api.field_behavior) = REQUIRED];
232+
google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED];
240233
}
241234

242235
// Request for DeleteTopic.
@@ -295,12 +288,11 @@ message CreateSubscriptionRequest {
295288
}
296289
];
297290

298-
// Required. Configuration of the subscription to create. Its `name` field is
299-
// ignored.
291+
// Required. Configuration of the subscription to create. Its `name` field is ignored.
300292
Subscription subscription = 2 [(google.api.field_behavior) = REQUIRED];
301293

302-
// Required. The ID to use for the subscription, which will become the final
303-
// component of the subscription's name.
294+
// Required. The ID to use for the subscription, which will become the final component
295+
// of the subscription's name.
304296
//
305297
// This value is structured like: `my-sub-name`.
306298
string subscription_id = 3 [(google.api.field_behavior) = REQUIRED];
@@ -359,8 +351,7 @@ message UpdateSubscriptionRequest {
359351
Subscription subscription = 1 [(google.api.field_behavior) = REQUIRED];
360352

361353
// Required. A mask specifying the subscription fields to change.
362-
google.protobuf.FieldMask update_mask = 2
363-
[(google.api.field_behavior) = REQUIRED];
354+
google.protobuf.FieldMask update_mask = 2 [(google.api.field_behavior) = REQUIRED];
364355
}
365356

366357
// Request for DeleteSubscription.

google/cloud/pubsublite/v1/common.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ message Subscription {
180180
// Structured like:
181181
// projects/{project_number}/locations/{location}/topics/{topic_id}
182182
string topic = 2 [(google.api.resource_reference) = {
183-
type: "pubsublite.googleapis.com/Topic"
184-
}];
183+
type: "pubsublite.googleapis.com/Topic"
184+
}];
185185

186186
// The settings for this subscription's message delivery.
187187
DeliveryConfig delivery_config = 3;

google/cloud/pubsublite/v1/cursor.proto

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,18 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
3636
// progress within a topic partition for a given subscription.
3737
service CursorService {
3838
option (google.api.default_host) = "pubsublite.googleapis.com";
39-
option (google.api.oauth_scopes) =
40-
"https://www.googleapis.com/auth/cloud-platform";
39+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
4140

4241
// Establishes a stream with the server for managing committed cursors.
43-
rpc StreamingCommitCursor(stream StreamingCommitCursorRequest)
44-
returns (stream StreamingCommitCursorResponse) {}
42+
rpc StreamingCommitCursor(stream StreamingCommitCursorRequest) returns (stream StreamingCommitCursorResponse) {
43+
}
4544

4645
// Updates the committed cursor.
47-
rpc CommitCursor(CommitCursorRequest) returns (CommitCursorResponse) {}
46+
rpc CommitCursor(CommitCursorRequest) returns (CommitCursorResponse) {
47+
}
4848

4949
// Returns all committed cursor information for a subscription.
50-
rpc ListPartitionCursors(ListPartitionCursorsRequest)
51-
returns (ListPartitionCursorsResponse) {
50+
rpc ListPartitionCursors(ListPartitionCursorsRequest) returns (ListPartitionCursorsResponse) {
5251
option (google.api.http) = {
5352
get: "/v1/cursor/{parent=projects/*/locations/*/subscriptions/*}/cursors"
5453
};
@@ -69,7 +68,9 @@ message InitialCommitCursorRequest {
6968
}
7069

7170
// Response to an InitialCommitCursorRequest.
72-
message InitialCommitCursorResponse {}
71+
message InitialCommitCursorResponse {
72+
73+
}
7374

7475
// Streaming request to update the committed cursor. Subsequent
7576
// SequencedCommitCursorRequests override outstanding ones.
@@ -124,7 +125,9 @@ message CommitCursorRequest {
124125
}
125126

126127
// Response for CommitCursor.
127-
message CommitCursorResponse {}
128+
message CommitCursorResponse {
129+
130+
}
128131

129132
// Request for ListPartitionCursors.
130133
message ListPartitionCursorsRequest {

google/cloud/pubsublite/v1/publisher.proto

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ syntax = "proto3";
1616

1717
package google.cloud.pubsublite.v1;
1818

19-
import "google/api/client.proto";
2019
import "google/cloud/pubsublite/v1/common.proto";
20+
import "google/api/client.proto";
2121

2222
option cc_enable_arenas = true;
2323
option csharp_namespace = "Google.Cloud.PubSubLite.V1";
@@ -34,8 +34,7 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
3434
// to subscriber clients upon request (via the `SubscriberService`).
3535
service PublisherService {
3636
option (google.api.default_host) = "pubsublite.googleapis.com";
37-
option (google.api.oauth_scopes) =
38-
"https://www.googleapis.com/auth/cloud-platform";
37+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
3938

4039
// Establishes a stream with the server for publishing messages. Once the
4140
// stream is initialized, the client publishes messages by sending publish
@@ -44,7 +43,8 @@ service PublisherService {
4443
// were sent. Note that multiple PublishRequests can be in flight
4544
// simultaneously, but they will be processed by the server in the order that
4645
// they are sent by the client on a given stream.
47-
rpc Publish(stream PublishRequest) returns (stream PublishResponse) {}
46+
rpc Publish(stream PublishRequest) returns (stream PublishResponse) {
47+
}
4848
}
4949

5050
// The first request that must be sent on a newly-opened stream.
@@ -59,7 +59,9 @@ message InitialPublishRequest {
5959
}
6060

6161
// Response to an InitialPublishRequest.
62-
message InitialPublishResponse {}
62+
message InitialPublishResponse {
63+
64+
}
6365

6466
// Request to publish messages to the topic.
6567
message MessagePublishRequest {

google/cloud/pubsublite/v1/pubsublite_v1.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ authentication:
3333
oauth:
3434
canonical_scopes: |-
3535
https://www.googleapis.com/auth/cloud-platform
36+
- selector: google.cloud.pubsublite.v1.TopicStatsService.ComputeHeadCursor
37+
oauth:
38+
canonical_scopes: |-
39+
https://www.googleapis.com/auth/cloud-platform
3640
- selector: google.cloud.pubsublite.v1.TopicStatsService.ComputeMessageStats
3741
oauth:
3842
canonical_scopes: |-

google/cloud/pubsublite/v1/subscriber.proto

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ syntax = "proto3";
1616

1717
package google.cloud.pubsublite.v1;
1818

19-
import "google/api/client.proto";
2019
import "google/cloud/pubsublite/v1/common.proto";
20+
import "google/api/client.proto";
2121

2222
option cc_enable_arenas = true;
2323
option csharp_namespace = "Google.Cloud.PubSubLite.V1";
@@ -32,19 +32,18 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
3232
// from subscriptions.
3333
service SubscriberService {
3434
option (google.api.default_host) = "pubsublite.googleapis.com";
35-
option (google.api.oauth_scopes) =
36-
"https://www.googleapis.com/auth/cloud-platform";
35+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
3736

3837
// Establishes a stream with the server for receiving messages.
39-
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {}
38+
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {
39+
}
4040
}
4141

4242
// The service that a subscriber client application uses to determine which
4343
// partitions it should connect to.
4444
service PartitionAssignmentService {
4545
option (google.api.default_host) = "pubsublite.googleapis.com";
46-
option (google.api.oauth_scopes) =
47-
"https://www.googleapis.com/auth/cloud-platform";
46+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
4847

4948
// Assign partitions for this client to handle for the specified subscription.
5049
//
@@ -53,8 +52,8 @@ service PartitionAssignmentService {
5352
// outstanding on the stream at a time.
5453
// The client should send a PartitionAssignmentAck after updating the
5554
// partitions it is connected to to reflect the new assignment.
56-
rpc AssignPartitions(stream PartitionAssignmentRequest)
57-
returns (stream PartitionAssignment) {}
55+
rpc AssignPartitions(stream PartitionAssignmentRequest) returns (stream PartitionAssignment) {
56+
}
5857
}
5958

6059
// The first request that must be sent on a newly-opened stream. The client must
@@ -194,7 +193,9 @@ message PartitionAssignment {
194193
// partitions may remain unassigned for a period of time until the
195194
// client is known to be inactive, after which time the server will break the
196195
// stream.
197-
message PartitionAssignmentAck {}
196+
message PartitionAssignmentAck {
197+
198+
}
198199

199200
// A request on the PartitionAssignment stream.
200201
message PartitionAssignmentRequest {

google/cloud/pubsublite/v1/topic_stats.proto

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ syntax = "proto3";
1717
package google.cloud.pubsublite.v1;
1818

1919
import "google/api/annotations.proto";
20-
import "google/api/client.proto";
2120
import "google/api/field_behavior.proto";
2221
import "google/api/resource.proto";
2322
import "google/cloud/pubsublite/v1/common.proto";
2423
import "google/protobuf/timestamp.proto";
24+
import "google/api/client.proto";
2525

2626
option csharp_namespace = "Google.Cloud.PubSubLite.V1";
2727
option go_package = "google.golang.org/genproto/googleapis/cloud/pubsublite/v1;pubsublite";
@@ -34,18 +34,29 @@ option ruby_package = "Google::Cloud::PubSubLite::V1";
3434
// This service allows users to get stats about messages in their topic.
3535
service TopicStatsService {
3636
option (google.api.default_host) = "pubsublite.googleapis.com";
37-
option (google.api.oauth_scopes) =
38-
"https://www.googleapis.com/auth/cloud-platform";
37+
option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform";
3938

4039
// Compute statistics about a range of messages in a given topic and
4140
// partition.
42-
rpc ComputeMessageStats(ComputeMessageStatsRequest)
43-
returns (ComputeMessageStatsResponse) {
41+
rpc ComputeMessageStats(ComputeMessageStatsRequest) returns (ComputeMessageStatsResponse) {
4442
option (google.api.http) = {
4543
post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeMessageStats"
4644
body: "*"
4745
};
4846
}
47+
48+
// Compute the head cursor for the partition.
49+
// The head cursor’s offset is guaranteed to be before or equal to all
50+
// messages which have not yet been acknowledged to be published, and
51+
// greater than the offset of any message whose publish has already
52+
// been acknowledged. It is 0 if there have never been messages on the
53+
// partition.
54+
rpc ComputeHeadCursor(ComputeHeadCursorRequest) returns (ComputeHeadCursorResponse) {
55+
option (google.api.http) = {
56+
post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeHeadCursor"
57+
body: "*"
58+
};
59+
}
4960
}
5061

5162
// Compute statistics about a range of messages in a given topic and partition.
@@ -79,12 +90,32 @@ message ComputeMessageStatsResponse {
7990
int64 message_bytes = 2;
8091

8192
// The minimum publish timestamp across these messages. Note that publish
82-
// timestamps within a partition are non-decreasing. The timestamp will be
83-
// unset if there are no messages.
93+
// timestamps within a partition are not guaranteed to be non-decreasing. The
94+
// timestamp will be unset if there are no messages.
8495
google.protobuf.Timestamp minimum_publish_time = 3;
8596

8697
// The minimum event timestamp across these messages. For the purposes of this
8798
// computation, if a message does not have an event time, we use the publish
8899
// time. The timestamp will be unset if there are no messages.
89100
google.protobuf.Timestamp minimum_event_time = 4;
90101
}
102+
103+
// Compute the current head cursor for a partition.
104+
message ComputeHeadCursorRequest {
105+
// Required. The topic for which we should compute the head cursor.
106+
string topic = 1 [
107+
(google.api.field_behavior) = REQUIRED,
108+
(google.api.resource_reference) = {
109+
type: "pubsublite.googleapis.com/Topic"
110+
}
111+
];
112+
113+
// Required. The partition for which we should compute the head cursor.
114+
int64 partition = 2 [(google.api.field_behavior) = REQUIRED];
115+
}
116+
117+
// Response containing the head cursor for the requested topic and partition.
118+
message ComputeHeadCursorResponse {
119+
// The head cursor.
120+
Cursor head_cursor = 1;
121+
}

0 commit comments

Comments
 (0)