Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions localstack-core/localstack/services/sqs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
class SqsMessage:
message: Message
created: float
visibility_timeout: int
visibility_timeout: int | None
receive_count: int
delay_seconds: int | None
receipt_handles: set[str]
Expand All @@ -65,9 +65,7 @@ class SqsMessage:
visibility_deadline: float | None
deleted: bool
priority: float
message_deduplication_id: str
message_group_id: str
Comment on lines -68 to -69
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where was the problem with message_deduplocation_id and message_group_id? 🙂

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the constructor, they are provided as parameter but an instance variable is never assigned. They are used to populate the attributes dict.

sequence_number: str
sequence_number: str | None

def __init__(
self,
Expand All @@ -85,6 +83,7 @@ def __init__(
self.delay_seconds = None
self.last_received = None
self.first_received = None
self.visibility_timeout = None
self.visibility_deadline = None
self.deleted = False
self.priority = priority
Expand Down Expand Up @@ -270,28 +269,36 @@ class MessageMoveTask:
# configurable fields
source_arn: str
"""The arn of the DLQ the messages are currently in."""
destination_arn: str | None = None
destination_arn: str
"""If the DestinationArn is not specified, the original source arn will be used as target."""
max_number_of_messages_per_second: int | None = None
max_number_of_messages_per_second: int | None

# dynamic fields
task_id: str
status: str = MessageMoveTaskStatus.CREATED
started_timestamp: datetime | None = None
approximate_number_of_messages_moved: int | None = None
approximate_number_of_messages_to_move: int | None = None
failure_reason: str | None = None
status: str
started_timestamp: datetime | None
approximate_number_of_messages_moved: int | None
approximate_number_of_messages_to_move: int | None
failure_reason: str | None

cancel_event: threading.Event

def __init__(
self, source_arn: str, destination_arn: str, max_number_of_messages_per_second: int = None
self,
source_arn: str,
destination_arn: str,
max_number_of_messages_per_second: int | None = None,
):
self.task_id = long_uid()
self.source_arn = source_arn
self.destination_arn = destination_arn
self.max_number_of_messages_per_second = max_number_of_messages_per_second
self.cancel_event = threading.Event()
self.status = MessageMoveTaskStatus.CREATED
self.started_timestamp = None
self.approximate_number_of_messages_moved = None
self.approximate_number_of_messages_to_move = None
self.failure_reason = None

def mark_started(self):
self.started_timestamp = datetime.utcnow()
Expand All @@ -318,6 +325,7 @@ class SqsQueue:
# Simulating an ordered set in python. Only the keys are used and of interest.
inflight: dict[SqsMessage, None]
receipts: dict[str, SqsMessage]
mutex: threading.RLock

def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
self.name = name
Expand Down Expand Up @@ -1374,7 +1382,7 @@ def clear(self):


class SqsStore(BaseStore):
queues: dict[str, SqsQueue] = LocalAttribute(default=dict)
queues: dict[str, FifoQueue | StandardQueue] = LocalAttribute(default=dict)

deleted: dict[str, float] = LocalAttribute(default=dict)

Expand Down
13 changes: 8 additions & 5 deletions localstack-core/localstack/services/sqs/queue.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import time
from queue import Empty, PriorityQueue, Queue
from typing import Generic, TypeVar

T = TypeVar("T")

class InterruptibleQueue(Queue):

class InterruptibleQueue(Queue, Generic[T]):
# is_shutdown is used to check whether we have triggered a shutdown of the Queue
is_shutdown: bool

def __init__(self, maxsize=0):
def __init__(self, maxsize: int = 0):
super().__init__(maxsize)
self.is_shutdown = False

def get(self, block=True, timeout=None):
def get(self, block: bool = True, timeout: float | None = None) -> T:
with self.not_empty:
if self.is_shutdown:
raise Empty
Expand All @@ -35,7 +38,7 @@ def get(self, block=True, timeout=None):
self.not_full.notify()
return item

def shutdown(self):
def shutdown(self) -> None:
"""
`shutdown` signals to stop all current and future `Queue.get` calls from executing.

Expand All @@ -46,5 +49,5 @@ def shutdown(self):
self.not_empty.notify_all()


class InterruptiblePriorityQueue(PriorityQueue, InterruptibleQueue):
class InterruptiblePriorityQueue(PriorityQueue, InterruptibleQueue[T], Generic[T]):
pass
2 changes: 1 addition & 1 deletion localstack-core/localstack/testing/pytest/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
LOG = logging.getLogger(__name__)

ENV_TEST_CONTAINER_MOUNT_SOURCES = "TEST_CONTAINER_MOUNT_SOURCES"
"""Environment variable used to indicate that we should mount LocalStack source files into the container."""
"""Environment variable used to indicate that we should mount LocalStack source files into the container."""

ENV_TEST_CONTAINER_MOUNT_DEPENDENCIES = "TEST_CONTAINER_MOUNT_DEPENDENCIES"
"""Environment variable used to indicate that we should mount dependencies into the container."""
Expand Down
Loading