Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions localstack-core/localstack/services/sns/v2/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import TypedDict

from localstack.aws.api.sns import TopicAttributesMap
from localstack.services.stores import (
AccountRegionBundle,
BaseStore,
CrossRegionAttribute,
LocalAttribute,
)
from localstack.utils.tagging import TaggingService


class Topic(TypedDict, total=True):
arn: str
name: str
attributes: TopicAttributesMap


class SnsStore(BaseStore):
topics: dict[str, Topic] = LocalAttribute(default=dict)

TAGS: TaggingService = CrossRegionAttribute(default=TaggingService)


sns_stores = AccountRegionBundle("sns", SnsStore)
209 changes: 207 additions & 2 deletions localstack-core/localstack/services/sns/v2/provider.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,214 @@
import json
import logging
import re

from localstack.aws.api.sns import SnsApi
from botocore.utils import InvalidArnException

from localstack.aws.api import RequestContext
from localstack.aws.api.sns import (
CreateTopicResponse,
GetTopicAttributesResponse,
InvalidParameterException,
ListTopicsResponse,
NotFoundException,
SnsApi,
TagList,
TopicAttributesMap,
attributeName,
attributeValue,
nextToken,
topicARN,
topicName,
)
from localstack.services.sns.v2.models import SnsStore, Topic, sns_stores
from localstack.utils.aws.arns import ArnData, parse_arn, sns_topic_arn
from localstack.utils.collections import PaginatedList

# set up logger
LOG = logging.getLogger(__name__)

SNS_TOPIC_NAME_PATTERN_FIFO = r"^[a-zA-Z0-9_-]{1,256}\.fifo$"
SNS_TOPIC_NAME_PATTERN = r"^[a-zA-Z0-9_-]{1,256}$"


class SnsProvider(SnsApi):
def create_topic(
self,
context: RequestContext,
name: topicName,
attributes: TopicAttributesMap | None = None,
tags: TagList | None = None,
data_protection_policy: attributeValue | None = None,
**kwargs,
) -> CreateTopicResponse:
store = self.get_store(context.account_id, context.region)
topic_arn = sns_topic_arn(
topic_name=name, region_name=context.region, account_id=context.account_id
)
topic: Topic = store.topics.get(topic_arn)
attributes = attributes or {}
if topic:
attrs = topic["attributes"]
for k, v in attributes.values():
if not attrs.get(k) or not attrs.get(k) == v:
# TODO:
raise InvalidParameterException("Fix this Exception message and type")
Comment on lines +54 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for later: this is about idempotency when creating, right? This was pretty tricky for subscriptions, might be worth looking into subscribe to see if we can take some things from there 👍

return CreateTopicResponse(TopicArn=topic_arn)

attributes = attributes or {}
Comment thread
bentsku marked this conversation as resolved.
if attributes.get("FifoTopic") and attributes["FifoTopic"].lower() == "true":
fifo_match = re.match(SNS_TOPIC_NAME_PATTERN_FIFO, name)
if not fifo_match:
# TODO: check this with a separate test
raise InvalidParameterException(
"Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long."
)
else:
# AWS does not seem to save explicit settings of fifo = false
attributes.pop("FifoTopic", None)
name_match = re.match(SNS_TOPIC_NAME_PATTERN, name)
if not name_match:
raise InvalidParameterException("Invalid parameter: Topic Name")

topic = _create_topic(name=name, attributes=attributes, context=context)
store.topics[topic_arn] = topic
# todo: tags

return CreateTopicResponse(TopicArn=topic_arn)

def get_topic_attributes(
self, context: RequestContext, topic_arn: topicARN, **kwargs
) -> GetTopicAttributesResponse:
topic: Topic = self._get_topic(arn=topic_arn, context=context)
if topic:
attributes = topic["attributes"]
return GetTopicAttributesResponse(Attributes=attributes)
else:
raise NotFoundException("Topic does not exist")

def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
store = self.get_store(context.account_id, context.region)

store.topics.pop(topic_arn, None)

def list_topics(
self, context: RequestContext, next_token: nextToken | None = None, **kwargs
) -> ListTopicsResponse:
store = self.get_store(context.account_id, context.region)
topics = [{"TopicArn": t["arn"]} for t in list(store.topics.values())]
topics = PaginatedList(topics)
page, nxt = topics.get_page(
lambda topic: topic["TopicArn"], next_token=next_token, page_size=100
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using the topic ARN as the pagination token might be a bit of an issue (some client might not handle the arn format for that), but this can left as is for now, just a thought

)
topics = {"Topics": page, "NextToken": nxt}
return ListTopicsResponse(**topics)

def set_topic_attributes(
self,
context: RequestContext,
topic_arn: topicARN,
attribute_name: attributeName,
attribute_value: attributeValue | None = None,
**kwargs,
) -> None:
topic: Topic = self._get_topic(arn=topic_arn, context=context)
if attribute_name == "FifoTopic":
raise InvalidParameterException("Invalid parameter: AttributeName")
topic["attributes"][attribute_name] = attribute_value

@staticmethod
def get_store(account_id: str, region: str) -> SnsStore:
return sns_stores[account_id][region]

# TODO: reintroduce multi-region parameter (latest before final migration from v1)
@staticmethod
def _get_topic(arn: str, context: RequestContext) -> Topic:
"""
:param arn: the Topic ARN
:param context: the RequestContext of the request
:return: the model Topic
"""
arn_data = parse_and_validate_topic_arn(arn)
if context.region != arn_data["region"]:
raise InvalidParameterException("Invalid parameter: TopicArn")
try:
store = SnsProvider.get_store(context.account_id, context.region)
return store.topics[arn]
except KeyError:
raise NotFoundException("Topic does not exist")


def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
topic_arn = topic_arn or ""
try:
return parse_arn(topic_arn)
except InvalidArnException:
count = len(topic_arn.split(":"))
raise InvalidParameterException(
f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
)


def _create_topic(name: str, attributes: dict, context: RequestContext) -> Topic:
topic_arn = sns_topic_arn(
topic_name=name, region_name=context.region, account_id=context.account_id
)
topic: Topic = {
"name": name,
"arn": topic_arn,
"attributes": {},
}
attrs = _default_attributes(topic, context)
attrs.update(attributes or {})
topic["attributes"] = attrs

return topic


def _default_attributes(topic: Topic, context: RequestContext) -> TopicAttributesMap:
default_attributes = {
"DisplayName": "",
"Owner": context.account_id,
"Policy": _create_default_topic_policy(topic, context),
"SubscriptionsConfirmed": "0",
"SubscriptionsDeleted": "0",
"SubscriptionsPending": "0",
"TopicArn": topic["arn"],
}
if topic["name"].endswith(".fifo"):
default_attributes.update(
{
"ContentBasedDeduplication": "false",
"FifoTopic": "false",
"SignatureVersion": "2",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this is an oversight from the test, looking at all our tests, they all set the SignatureVersion to 2 when creating a fifo topic.

But looking at AWS, it doesn't seem like it sets this attribute by default:

$ aws us-east-1 sns create-topic --name test-topic.fifo --attributes FifoTopic=true
{
    "TopicArn": "arn:aws:sns:us-east-1:<account-id>:test-topic.fifo"
}
$ aws sns get-topic-attributes --topic-arn arn:aws:sns:us-east-1:<account-id>:test-topic.fifo
{
    "Attributes": {
        "Policy": "{\"Version\":\"2008-10-17\",\"Id\":\"__default_policy_ID\",\"Statement\":[{\"Sid\":\"__default_statement_ID\",\"Effect\":\"Allow\",\"Principal\":{\"AWS\":\"*\"},\"Action\":[\"SNS:GetTopicAttributes\",\"SNS:SetTopicAttributes\",\"SNS:AddPermission\",\"SNS:RemovePermission\",\"SNS:DeleteTopic\",\"SNS:Subscribe\",\"SNS:ListSubscriptionsByTopic\",\"SNS:Publish\"],\"Resource\":\"arn:aws:sns:us-east-1:<account-id>:test-topic.fifo\",\"Condition\":{\"StringEquals\":{\"AWS:SourceOwner\":\"<account-id>\"}}}]}",
        "Owner": "<account-id>",
        "SubscriptionsPending": "0",
        "TopicArn": "arn:aws:sns:us-east-1:<account-id>:test-topic.fifo",
        "EffectiveDeliveryPolicy": "{\"http\":{\"defaultHealthyRetryPolicy\":{\"minDelayTarget\":20,\"maxDelayTarget\":20,\"numRetries\":3,\"numMaxDelayRetries\":0,\"numNoDelayRetries\":0,\"numMinDelayRetries\":0,\"backoffFunction\":\"linear\"},\"disableSubscriptionOverrides\":false,\"defaultRequestPolicy\":{\"headerContentType\":\"text/plain; charset=UTF-8\"}}}",
        "SubscriptionsConfirmed": "0",
        "FifoTopic": "true",
        "DisplayName": "",
        "ContentBasedDeduplication": "false",
        "SubscriptionsDeleted": "0"
    }
}

But this is a nit, not blocking, something to consider to remove in a follow-up PR 👍 let's get this one merged! Sorry I missed it earlier

}
)
return default_attributes


class SnsProvider(SnsApi): ...
def _create_default_topic_policy(topic: Topic, context: RequestContext) -> str:
return json.dumps(
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Effect": "Allow",
"Sid": "__default_statement_ID",
"Principal": {"AWS": "*"},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
],
"Resource": topic["arn"],
"Condition": {"StringEquals": {"AWS:SourceOwner": context.account_id}},
}
],
}
)
4 changes: 4 additions & 0 deletions tests/aws/services/sns/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def is_sns_v2_provider():
return os.environ.get("PROVIDER_OVERRIDE_SNS") == "v2" and not is_aws_cloud()


def is_sns_v1_provider() -> bool:
return not os.environ.get("PROVIDER_OVERRIDE_SNS") == "v2" and not is_aws_cloud()


skip_if_sns_v2 = pytest.mark.skipif(
is_sns_v2_provider(),
reason="Skipping test for v2 provider as it contains operations not yet supported",
Expand Down
Loading
Loading