Skip to content

Commit d63dd2a

Browse files
authored
feat(firestore): Add error callbacks for listener threads
* Add DocumentListener#last_error * Add DocumentListener#on_error * Add QueryListener#last_error * Add QueryListener#on_error closes: googleapis#7637 pr: googleapis#7811
1 parent 6568255 commit d63dd2a

9 files changed

Lines changed: 343 additions & 30 deletions

File tree

google-cloud-firestore/lib/google/cloud/firestore/document_listener.rb

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414

1515

1616
require "google/cloud/firestore/watch/listener"
17+
require "monitor"
1718

1819
module Google
1920
module Cloud
2021
module Firestore
2122
##
23+
# # DocumentListener
24+
#
2225
# An ongoing listen operation on a document reference. This is returned by
2326
# calling {DocumentReference#listen}.
2427
#
@@ -31,25 +34,28 @@ module Firestore
3134
# nyc_ref = firestore.doc "cities/NYC"
3235
#
3336
# listener = nyc_ref.listen do |snapshot|
34-
# puts "The population of #{snapshot[:name]} "
35-
# puts "is #{snapshot[:population]}."
37+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
3638
# end
3739
#
3840
# # When ready, stop the listen operation and close the stream.
3941
# listener.stop
4042
#
4143
class DocumentListener
44+
include MonitorMixin
4245
##
4346
# @private
4447
# Creates the watch stream and listener object.
4548
def initialize doc_ref, &callback
49+
super() # to init MonitorMixin
50+
4651
@doc_ref = doc_ref
4752
raise ArgumentError if @doc_ref.nil?
4853

4954
@callback = callback
5055
raise ArgumentError if @callback.nil?
56+
@error_callbacks = []
5157

52-
@listener = Watch::Listener.for_doc_ref doc_ref do |query_snp|
58+
@listener = Watch::Listener.for_doc_ref self, doc_ref do |query_snp|
5359
doc_snp = query_snp.docs.find { |doc| doc.path == @doc_ref.path }
5460

5561
if doc_snp.nil?
@@ -80,8 +86,7 @@ def start
8086
# nyc_ref = firestore.doc "cities/NYC"
8187
#
8288
# listener = nyc_ref.listen do |snapshot|
83-
# puts "The population of #{snapshot[:name]} "
84-
# puts "is #{snapshot[:population]}."
89+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
8590
# end
8691
#
8792
# # When ready, stop the listen operation and close the stream.
@@ -103,8 +108,7 @@ def stop
103108
# nyc_ref = firestore.doc "cities/NYC"
104109
#
105110
# listener = nyc_ref.listen do |snapshot|
106-
# puts "The population of #{snapshot[:name]} "
107-
# puts "is #{snapshot[:population]}."
111+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
108112
# end
109113
#
110114
# # Checks if the listener is stopped.
@@ -119,6 +123,81 @@ def stop
119123
def stopped?
120124
@listener.stopped?
121125
end
126+
127+
##
128+
# Register to be notified of errors when raised.
129+
#
130+
# If an unhandled error has occurred the listener will attempt to
131+
# recover from the error and resume listening.
132+
#
133+
# Multiple error handlers can be added.
134+
#
135+
# @yield [callback] The block to be called when an error is raised.
136+
# @yieldparam [Exception] error The error raised.
137+
#
138+
# @example
139+
# require "google/cloud/firestore"
140+
#
141+
# firestore = Google::Cloud::Firestore.new
142+
#
143+
# # Get a document reference
144+
# nyc_ref = firestore.doc "cities/NYC"
145+
#
146+
# listener = nyc_ref.listen do |snapshot|
147+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
148+
# end
149+
#
150+
# # Register to be notified when unhandled errors occur.
151+
# listener.on_error do |error|
152+
# puts error
153+
# end
154+
#
155+
# # When ready, stop the listen operation and close the stream.
156+
# listener.stop
157+
#
158+
def on_error &block
159+
raise ArgumentError, "on_error must be called with a block" unless block_given?
160+
synchronize { @error_callbacks << block }
161+
end
162+
163+
##
164+
# The most recent unhandled error to occur while listening for changes.
165+
#
166+
# If an unhandled error has occurred the listener will attempt to
167+
# recover from the error and resume listening.
168+
#
169+
# @return [Exception, nil] error The most recent error raised.
170+
#
171+
# @example
172+
# require "google/cloud/firestore"
173+
#
174+
# firestore = Google::Cloud::Firestore.new
175+
#
176+
# # Get a document reference
177+
# nyc_ref = firestore.doc "cities/NYC"
178+
#
179+
# listener = nyc_ref.listen do |snapshot|
180+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
181+
# end
182+
#
183+
# # If an error was raised, it can be retrieved here:
184+
# listener.last_error #=> nil
185+
#
186+
# # When ready, stop the listen operation and close the stream.
187+
# listener.stop
188+
#
189+
def last_error
190+
synchronize { @last_error }
191+
end
192+
193+
# @private Pass the error to user-provided error callbacks.
194+
def error! error
195+
error_callbacks = synchronize do
196+
@last_error = error
197+
@error_callbacks.dup
198+
end
199+
error_callbacks.each { |error_callback| error_callback.call error }
200+
end
122201
end
123202
end
124203
end

google-cloud-firestore/lib/google/cloud/firestore/document_reference.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,7 @@ def get
168168
# nyc_ref = firestore.doc "cities/NYC"
169169
#
170170
# listener = nyc_ref.listen do |snapshot|
171-
# puts "The population of #{snapshot[:name]} "
172-
# puts "is #{snapshot[:population]}."
171+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
173172
# end
174173
#
175174
# # When ready, stop the listen operation and close the stream.

google-cloud-firestore/lib/google/cloud/firestore/document_snapshot.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ module Firestore
5353
# nyc_ref = firestore.doc "cities/NYC"
5454
#
5555
# listener = nyc_ref.listen do |snapshot|
56-
# puts "The population of #{snapshot[:name]} "
57-
# puts "is #{snapshot[:population]}."
56+
# puts "The population of #{snapshot[:name]} is #{snapshot[:population]}."
5857
# end
5958
#
6059
# # When ready, stop the listen operation and close the stream.

google-cloud-firestore/lib/google/cloud/firestore/query_listener.rb

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,21 @@ module Firestore
4141
# listener.stop
4242
#
4343
class QueryListener
44+
include MonitorMixin
4445
##
4546
# @private
4647
# Creates the watch stream and listener object.
4748
def initialize query, &callback
49+
super() # to init MonitorMixin
50+
4851
@query = query
4952
raise ArgumentError if @query.nil?
5053

5154
@callback = callback
5255
raise ArgumentError if @callback.nil?
56+
@error_callbacks = []
5357

54-
@listener = Watch::Listener.for_query query, &callback
58+
@listener = Watch::Listener.for_query self, query, &callback
5559
end
5660

5761
##
@@ -112,6 +116,83 @@ def stop
112116
def stopped?
113117
@listener.stopped?
114118
end
119+
120+
##
121+
# Register to be notified of errors when raised.
122+
#
123+
# If an unhandled error has occurred the listener will attempt to
124+
# recover from the error and resume listening.
125+
#
126+
# Multiple error handlers can be added.
127+
#
128+
# @yield [callback] The block to be called when an error is raised.
129+
# @yieldparam [Exception] error The error raised.
130+
#
131+
# @example
132+
# require "google/cloud/firestore"
133+
#
134+
# firestore = Google::Cloud::Firestore.new
135+
#
136+
# # Create a query
137+
# query = firestore.col(:cities).order(:population, :desc)
138+
#
139+
# listener = query.listen do |snapshot|
140+
# puts "The query snapshot has #{snapshot.docs.count} documents "
141+
# puts "and has #{snapshot.changes.count} changes."
142+
# end
143+
#
144+
# # Register to be notified when unhandled errors occur.
145+
# listener.on_error do |error|
146+
# puts error
147+
# end
148+
#
149+
# # When ready, stop the listen operation and close the stream.
150+
# listener.stop
151+
#
152+
def on_error &block
153+
raise ArgumentError, "on_error must be called with a block" unless block_given?
154+
synchronize { @error_callbacks << block }
155+
end
156+
157+
##
158+
# The most recent unhandled error to occur while listening for changes.
159+
#
160+
# If an unhandled error has occurred the listener will attempt to
161+
# recover from the error and resume listening.
162+
#
163+
# @return [Exception, nil] error The most recent error raised.
164+
#
165+
# @example
166+
# require "google/cloud/firestore"
167+
#
168+
# firestore = Google::Cloud::Firestore.new
169+
#
170+
# # Create a query
171+
# query = firestore.col(:cities).order(:population, :desc)
172+
#
173+
# listener = query.listen do |snapshot|
174+
# puts "The query snapshot has #{snapshot.docs.count} documents "
175+
# puts "and has #{snapshot.changes.count} changes."
176+
# end
177+
#
178+
# # If an error was raised, it can be retrieved here:
179+
# listener.last_error #=> nil
180+
#
181+
# # When ready, stop the listen operation and close the stream.
182+
# listener.stop
183+
#
184+
def last_error
185+
synchronize { @last_error }
186+
end
187+
188+
# @private Pass the error to user-provided error callbacks.
189+
def error! error
190+
error_callbacks = synchronize do
191+
@last_error = error
192+
@error_callbacks.dup
193+
end
194+
error_callbacks.each { |error_callback| error_callback.call error }
195+
end
115196
end
116197
end
117198
end

google-cloud-firestore/lib/google/cloud/firestore/watch/listener.rb

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module Watch
3030
class Listener
3131
include MonitorMixin
3232

33-
def self.for_doc_ref doc_ref, &callback
33+
def self.for_doc_ref parent, doc_ref, &callback
3434
raise ArgumentError if doc_ref.nil?
3535
raise ArgumentError if callback.nil?
3636

@@ -44,10 +44,10 @@ def self.for_doc_ref doc_ref, &callback
4444
)
4545
)
4646

47-
new nil, doc_ref, doc_ref.client, init_listen_req, &callback
47+
new parent, nil, doc_ref, doc_ref.client, init_listen_req, &callback
4848
end
4949

50-
def self.for_query query, &callback
50+
def self.for_query parent, query, &callback
5151
raise ArgumentError if query.nil?
5252
raise ArgumentError if callback.nil?
5353

@@ -61,12 +61,13 @@ def self.for_query query, &callback
6161
)
6262
)
6363

64-
new query, nil, query.client, init_listen_req, &callback
64+
new parent, query, nil, query.client, init_listen_req, &callback
6565
end
6666

67-
def initialize query, doc_ref, client, init_listen_req, &callback
67+
def initialize parent, query, doc_ref, client, init_listen_req, &callback
6868
super() # to init MonitorMixin
6969

70+
@parent = parent
7071
@query = query
7172
@doc_ref = doc_ref
7273
@client = client
@@ -119,6 +120,8 @@ def stopped?
119120

120121
def send_callback query_snp
121122
@callback.call query_snp
123+
rescue StandardError => e
124+
@parent.error! e
122125
end
123126

124127
def start_listening!
@@ -270,25 +273,29 @@ def background_run
270273
@request_queue.push self
271274
rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal,
272275
GRPC::ResourceExhausted, GRPC::Unauthenticated,
273-
GRPC::Unavailable, GRPC::Core::CallError
276+
GRPC::Unavailable, GRPC::Core::CallError => e
274277
# Restart the stream with an incremental back for a retriable error.
275278
# Also when GRPC raises the internal CallError.
276279

277-
# Re-raise if retried more than the max
278-
raise err if @backoff[:current] > @backoff[:max]
279-
280-
# Sleep with incremental backoff before restarting
281-
sleep @backoff[:delay]
280+
# Raise if retried more than the max
281+
if @backoff[:current] > @backoff[:max]
282+
@parent.error! e
283+
raise e
284+
else
285+
# Sleep with incremental backoff before restarting
286+
sleep @backoff[:delay]
282287

283-
# Update increment backoff delay and retry counter
284-
@backoff[:delay] *= @backoff[:mod]
285-
@backoff[:current] += 1
288+
# Update increment backoff delay and retry counter
289+
@backoff[:delay] *= @backoff[:mod]
290+
@backoff[:current] += 1
286291

287-
retry
292+
retry
293+
end
288294
rescue RestartStream
289295
retry
290296
rescue StandardError => e
291-
raise Google::Cloud::Error.from_error(e)
297+
@parent.error! e
298+
raise e
292299
end
293300
end
294301
end

google-cloud-firestore/support/doctest_helper.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ def stop
5757
def stopped?
5858
@stopped
5959
end
60+
61+
def on_error &block
62+
end
63+
64+
def last_error
65+
end
6066
end
6167
DocumentListener = StubbedListener
6268
QueryListener = StubbedListener

0 commit comments

Comments
 (0)