Skip to content

Commit 5f0a22b

Browse files
Takashi Matsuogmlewis
authored andcommitted
pubsub: update to pubsub example to v1beta2
Change-Id: I6e960d08527a911c3b1850a6333409c6377c58df Reviewed-on: https://code-review.googlesource.com/2450 Reviewed-by: Glenn Lewis <gmlewis@google.com>
1 parent b1edac7 commit 5f0a22b

1 file changed

Lines changed: 70 additions & 80 deletions

File tree

examples/pubsub.go

Lines changed: 70 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ import (
1212
"os"
1313
"strings"
1414

15-
pubsub "google.golang.org/api/pubsub/v1beta1"
15+
pubsub "google.golang.org/api/pubsub/v1beta2"
1616
)
1717

1818
const USAGE = `Available arguments are:
19-
PROJ list_topics
20-
PROJ create_topic TOPIC
21-
PROJ delete_topic TOPIC
22-
PROJ list_subscriptions
23-
PROJ create_subscription SUBSCRIPTION LINKED_TOPIC
24-
PROJ delete_subscription SUBSCRIPTION
25-
PROJ connect_irc TOPIC SERVER CHANNEL
26-
PROJ pull_messages SUBSCRIPTION
19+
<project_id> list_topics
20+
<project_id> create_topic <topic>
21+
<project_id> delete_topic <topic>
22+
<project_id> list_subscriptions
23+
<project_id> create_subscription <subscription> <linked topic>
24+
<project_id> delete_subscription <subscription>
25+
<project_id> connect_irc <topic> <server> <channel>
26+
<project_id> pull_messages <subscription>
2727
`
2828

2929
type IRCBot struct {
@@ -67,13 +67,15 @@ func (bot *IRCBot) CheckConnection() {
6767
if err != nil {
6868
log.Fatal("Unable to read a line during checking the connection.")
6969
}
70-
if strings.Contains(line, "004") {
71-
log.Println("The nick accepted.")
72-
} else if strings.Contains(line, "433") {
73-
log.Fatal("The nick is already in use.")
74-
} else if strings.Contains(line, "366") {
75-
log.Println("Starting to publish messages.")
76-
return
70+
if parts := strings.Split(line, " "); len(parts) > 1 {
71+
if parts[1] == "004" {
72+
log.Println("The nick accepted.")
73+
} else if parts[1] == "433" {
74+
log.Fatalf("The nick is already in use: %s", line)
75+
} else if parts[1] == "366" {
76+
log.Println("Starting to publish messages.")
77+
return
78+
}
7779
}
7880
}
7981
}
@@ -100,14 +102,14 @@ func pubsubUsage() {
100102

101103
// Returns a fully qualified resource name for Cloud Pub/Sub.
102104
func fqrn(res, proj, name string) string {
103-
return fmt.Sprintf("/%s/%s/%s", res, proj, name)
105+
return fmt.Sprintf("projects/%s/%s/%s", proj, res, name)
104106
}
105107

106108
func fullTopicName(proj, topic string) string {
107109
return fqrn("topics", proj, topic)
108110
}
109111

110-
func fullSubscriptionName(proj, topic string) string {
112+
func fullSubName(proj, topic string) string {
111113
return fqrn("subscriptions", proj, topic)
112114
}
113115

@@ -120,89 +122,76 @@ func checkArgs(argv []string, min int) {
120122
}
121123

122124
func listTopics(service *pubsub.Service, argv []string) {
123-
var nextPageToken string = ""
125+
next := ""
124126
for {
125-
query := service.Topics.List().Query(
126-
fmt.Sprintf(
127-
"cloud.googleapis.com/project in (/projects/%s)",
128-
argv[0])).PageToken(nextPageToken)
129-
topicsList, err := query.Do()
127+
topicsList, err := service.Projects.Topics.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
130128
if err != nil {
131-
log.Fatal("Got an error: %v", err)
129+
log.Fatalf("listTopics query.Do() failed: %v", err)
132130
}
133-
for _, topic := range topicsList.Topic {
131+
for _, topic := range topicsList.Topics {
134132
fmt.Println(topic.Name)
135133
}
136-
nextPageToken = topicsList.NextPageToken
137-
if nextPageToken == "" {
134+
next = topicsList.NextPageToken
135+
if next == "" {
138136
break
139137
}
140138
}
141139
}
142140

143141
func createTopic(service *pubsub.Service, argv []string) {
144142
checkArgs(argv, 3)
145-
topic := &pubsub.Topic{Name: fullTopicName(argv[0], argv[2])}
146-
topic, err := service.Topics.Create(topic).Do()
143+
topic, err := service.Projects.Topics.Create(fullTopicName(argv[0], argv[2]), &pubsub.Topic{}).Do()
147144
if err != nil {
148-
log.Fatal("Got an error: %v", err)
145+
log.Fatalf("createTopic Create().Do() failed: %v", err)
149146
}
150147
fmt.Printf("Topic %s was created.\n", topic.Name)
151148
}
152149

153150
func deleteTopic(service *pubsub.Service, argv []string) {
154151
checkArgs(argv, 3)
155152
topicName := fullTopicName(argv[0], argv[2])
156-
err := service.Topics.Delete(topicName).Do()
157-
if err != nil {
158-
log.Fatal("Got an error: %v", err)
153+
if _, err := service.Projects.Topics.Delete(topicName).Do(); err != nil {
154+
log.Fatalf("deleteTopic Delete().Do() failed: %v", err)
159155
}
160156
fmt.Printf("Topic %s was deleted.\n", topicName)
161157
}
162158

163159
func listSubscriptions(service *pubsub.Service, argv []string) {
164-
var nextPageToken string = ""
160+
next := ""
165161
for {
166-
query := service.Subscriptions.List().Query(
167-
fmt.Sprintf(
168-
"cloud.googleapis.com/project in (/projects/%s)",
169-
argv[0])).PageToken(nextPageToken)
170-
subscriptionsList, err := query.Do()
162+
subscriptionsList, err := service.Projects.Subscriptions.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
171163
if err != nil {
172-
log.Fatal("Got an error: %v", err)
164+
log.Fatalf("listSubscriptions query.Do() failed: %v", err)
173165
}
174-
for _, subscription := range subscriptionsList.Subscription {
166+
for _, subscription := range subscriptionsList.Subscriptions {
175167
sub_text, _ := json.MarshalIndent(subscription, "", " ")
176168
fmt.Printf("%s\n", sub_text)
177169
}
178-
nextPageToken = subscriptionsList.NextPageToken
179-
if nextPageToken == "" {
170+
next = subscriptionsList.NextPageToken
171+
if next == "" {
180172
break
181173
}
182174
}
183175
}
184176

185177
func createSubscription(service *pubsub.Service, argv []string) {
186178
checkArgs(argv, 4)
187-
subscription := &pubsub.Subscription{
188-
Name: fullSubscriptionName(argv[0], argv[2]),
189-
Topic: fullTopicName(argv[0], argv[3]),
190-
}
191-
subscription, err := service.Subscriptions.Create(subscription).Do()
179+
name := fullSubName(argv[0], argv[2])
180+
sub := &pubsub.Subscription{Topic: fullTopicName(argv[0], argv[3])}
181+
subscription, err := service.Projects.Subscriptions.Create(name, sub).Do()
192182
if err != nil {
193-
log.Fatal("Got an error: %v", err)
183+
log.Fatalf("createSubscription Create().Do() failed: %v", err)
194184
}
195185
fmt.Printf("Subscription %s was created.\n", subscription.Name)
196186
}
197187

198188
func deleteSubscription(service *pubsub.Service, argv []string) {
199189
checkArgs(argv, 3)
200-
subscriptionName := fullSubscriptionName(argv[0], argv[2])
201-
err := service.Subscriptions.Delete(subscriptionName).Do()
202-
if err != nil {
203-
log.Fatal("Got an error: %v", err)
190+
name := fullSubName(argv[0], argv[2])
191+
if _, err := service.Projects.Subscriptions.Delete(name).Do(); err != nil {
192+
log.Fatalf("deleteSubscription Delete().Do() failed: %v", err)
204193
}
205-
fmt.Printf("Subscription %s was deleted.\n", subscriptionName)
194+
fmt.Printf("Subscription %s was deleted.\n", name)
206195
}
207196

208197
func connectIRC(service *pubsub.Service, argv []string) {
@@ -234,39 +223,40 @@ func connectIRC(service *pubsub.Service, argv []string) {
234223
Data: base64.StdEncoding.EncodeToString([]byte(privMsg)),
235224
}
236225
publishRequest := &pubsub.PublishRequest{
237-
Message: pubsubMessage,
238-
Topic: topicName,
226+
Messages: []*pubsub.PubsubMessage{pubsubMessage},
227+
}
228+
if _, err := service.Projects.Topics.Publish(topicName, publishRequest).Do(); err != nil {
229+
log.Fatalf("connectIRC Publish().Do() failed: %v", err)
239230
}
240-
service.Topics.Publish(publishRequest).Do()
241231
log.Println("Published a message to the topic.")
242232
}
243233
}
244234
}
245235

246236
func pullMessages(service *pubsub.Service, argv []string) {
247237
checkArgs(argv, 3)
248-
subscriptionName := fullSubscriptionName(argv[0], argv[2])
238+
subName := fullSubName(argv[0], argv[2])
249239
pullRequest := &pubsub.PullRequest{
250240
ReturnImmediately: false,
251-
Subscription: subscriptionName,
241+
MaxMessages: 1,
252242
}
253243
for {
254-
pullResponse, err := service.Subscriptions.Pull(pullRequest).Do()
244+
pullResponse, err := service.Projects.Subscriptions.Pull(subName, pullRequest).Do()
255245
if err != nil {
256-
log.Fatal("Got an error while pull a message: %v", err)
246+
log.Fatalf("pullMessages Pull().Do() failed: %v", err)
257247
}
258-
if pullResponse.PubsubEvent.Message != nil {
259-
data, err := base64.StdEncoding.DecodeString(
260-
pullResponse.PubsubEvent.Message.Data)
248+
for _, receivedMessage := range pullResponse.ReceivedMessages {
249+
data, err := base64.StdEncoding.DecodeString(receivedMessage.Message.Data)
261250
if err != nil {
262-
log.Fatal("Got an error while decoding the message: %v", err)
251+
log.Fatalf("pullMessages DecodeString() failed: %v", err)
263252
}
264253
fmt.Printf("%s\n", data)
265254
ackRequest := &pubsub.AcknowledgeRequest{
266-
AckId: []string{pullResponse.AckId},
267-
Subscription: subscriptionName,
255+
AckIds: []string{receivedMessage.AckId},
256+
}
257+
if _, err = service.Projects.Subscriptions.Acknowledge(subName, ackRequest).Do(); err != nil {
258+
log.Printf("pullMessages Acknowledge().Do() failed: %v", err)
268259
}
269-
service.Subscriptions.Acknowledge(ackRequest).Do()
270260
}
271261
}
272262
}
@@ -283,18 +273,18 @@ func pullMessages(service *pubsub.Service, argv []string) {
283273
//
284274
// It has 8 subcommands as follows:
285275
//
286-
// PROJ list_topics
287-
// PROJ create_topic TOPIC
288-
// PROJ delete_topic TOPIC
289-
// PROJ list_subscriptions
290-
// PROJ create_subscription SUBSCRIPTION LINKED_TOPIC
291-
// PROJ delete_subscription SUBSCRIPTION
292-
// PROJ connect_irc TOPIC SERVER CHANNEL
293-
// PROJ pull_messages SUBSCRIPTION
276+
// <project_id> list_topics
277+
// <project_id> create_topic <topic>
278+
// <project_id> delete_topic <topic>
279+
// <project_id> list_subscriptions
280+
// <project_id> create_subscription <subscription> <linked topic>
281+
// <project_id> delete_subscription <subscription>
282+
// <project_id> connect_irc <topic> <server> <channel>
283+
// <project_id> pull_messages <subscription>
294284
//
295285
// You can use either of your alphanumerical or numerial Cloud Project
296-
// ID for PROJ. You can choose any names for TOPIC and SUBSCRIPTION as
297-
// long as they follow the naming rule described at:
286+
// ID for project_id. You can choose any names for topic and
287+
// subscription as long as they follow the naming rule described at:
298288
// https://developers.google.com/pubsub/overview#names
299289
//
300290
// You can list/create/delete topics/subscriptions by self-explanatory
@@ -312,9 +302,9 @@ func pubsubMain(client *http.Client, argv []string) {
312302

313303
m := map[string]func(service *pubsub.Service, argv []string){
314304
"list_topics": listTopics,
315-
"list_subscriptions": listSubscriptions,
316305
"create_topic": createTopic,
317306
"delete_topic": deleteTopic,
307+
"list_subscriptions": listSubscriptions,
318308
"create_subscription": createSubscription,
319309
"delete_subscription": deleteSubscription,
320310
"connect_irc": connectIRC,

0 commit comments

Comments
 (0)