You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Subscribe with flow control max_lease_duration < time to ack the message, to ensure that the library will drop leasing prior to the message being acked.
Results:
Listening for messages...
Received b'message1'.
Dropping 1 items because they were leased too long.
Acking message with ordering key: key1
AssertionError: A message queue should exist for every ordered message in flight.
It is possible to call activate_ordering_keys with keys which are not present in messages_on_hold._pending_ordered_messages. This can happen if:
A message (or messages) lease exceeds max_lease_duration, it is dropped from maintain_leases
We call drop in the dispatcher
python-pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Line 295 in 9ffb8d6
If there are no messages remaining in messages_on_hold._pending_ordered_messages, therefore the pending_ordered_messages queue for the ordering key is removed:
https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py#L154
The message is acked from the user callback, the ack completes and drop is called in the dispatcher:
python-pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Line 247 in f3bf21d
We call _activate_ordering_keys with a key that does not exist in messages_on_hold._pending_ordered_messages:
https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py#L122
Reproduction:
Results: