Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit f239baf

Browse files
authored
Sns: v2 topic migration (#13205)
1 parent e0ec7f6 commit f239baf

6 files changed

Lines changed: 935 additions & 16 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from typing import TypedDict
2+
3+
from localstack.aws.api.sns import TopicAttributesMap
4+
from localstack.services.stores import (
5+
AccountRegionBundle,
6+
BaseStore,
7+
CrossRegionAttribute,
8+
LocalAttribute,
9+
)
10+
from localstack.utils.tagging import TaggingService
11+
12+
13+
class Topic(TypedDict, total=True):
14+
arn: str
15+
name: str
16+
attributes: TopicAttributesMap
17+
18+
19+
class SnsStore(BaseStore):
20+
topics: dict[str, Topic] = LocalAttribute(default=dict)
21+
22+
TAGS: TaggingService = CrossRegionAttribute(default=TaggingService)
23+
24+
25+
sns_stores = AccountRegionBundle("sns", SnsStore)
Lines changed: 207 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,214 @@
1+
import json
12
import logging
3+
import re
24

3-
from localstack.aws.api.sns import SnsApi
5+
from botocore.utils import InvalidArnException
6+
7+
from localstack.aws.api import RequestContext
8+
from localstack.aws.api.sns import (
9+
CreateTopicResponse,
10+
GetTopicAttributesResponse,
11+
InvalidParameterException,
12+
ListTopicsResponse,
13+
NotFoundException,
14+
SnsApi,
15+
TagList,
16+
TopicAttributesMap,
17+
attributeName,
18+
attributeValue,
19+
nextToken,
20+
topicARN,
21+
topicName,
22+
)
23+
from localstack.services.sns.v2.models import SnsStore, Topic, sns_stores
24+
from localstack.utils.aws.arns import ArnData, parse_arn, sns_topic_arn
25+
from localstack.utils.collections import PaginatedList
426

527
# set up logger
628
LOG = logging.getLogger(__name__)
729

30+
SNS_TOPIC_NAME_PATTERN_FIFO = r"^[a-zA-Z0-9_-]{1,256}\.fifo$"
31+
SNS_TOPIC_NAME_PATTERN = r"^[a-zA-Z0-9_-]{1,256}$"
32+
33+
34+
class SnsProvider(SnsApi):
35+
def create_topic(
36+
self,
37+
context: RequestContext,
38+
name: topicName,
39+
attributes: TopicAttributesMap | None = None,
40+
tags: TagList | None = None,
41+
data_protection_policy: attributeValue | None = None,
42+
**kwargs,
43+
) -> CreateTopicResponse:
44+
store = self.get_store(context.account_id, context.region)
45+
topic_arn = sns_topic_arn(
46+
topic_name=name, region_name=context.region, account_id=context.account_id
47+
)
48+
topic: Topic = store.topics.get(topic_arn)
49+
attributes = attributes or {}
50+
if topic:
51+
attrs = topic["attributes"]
52+
for k, v in attributes.values():
53+
if not attrs.get(k) or not attrs.get(k) == v:
54+
# TODO:
55+
raise InvalidParameterException("Fix this Exception message and type")
56+
return CreateTopicResponse(TopicArn=topic_arn)
57+
58+
attributes = attributes or {}
59+
if attributes.get("FifoTopic") and attributes["FifoTopic"].lower() == "true":
60+
fifo_match = re.match(SNS_TOPIC_NAME_PATTERN_FIFO, name)
61+
if not fifo_match:
62+
# TODO: check this with a separate test
63+
raise InvalidParameterException(
64+
"Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long."
65+
)
66+
else:
67+
# AWS does not seem to save explicit settings of fifo = false
68+
attributes.pop("FifoTopic", None)
69+
name_match = re.match(SNS_TOPIC_NAME_PATTERN, name)
70+
if not name_match:
71+
raise InvalidParameterException("Invalid parameter: Topic Name")
72+
73+
topic = _create_topic(name=name, attributes=attributes, context=context)
74+
store.topics[topic_arn] = topic
75+
# todo: tags
76+
77+
return CreateTopicResponse(TopicArn=topic_arn)
78+
79+
def get_topic_attributes(
80+
self, context: RequestContext, topic_arn: topicARN, **kwargs
81+
) -> GetTopicAttributesResponse:
82+
topic: Topic = self._get_topic(arn=topic_arn, context=context)
83+
if topic:
84+
attributes = topic["attributes"]
85+
return GetTopicAttributesResponse(Attributes=attributes)
86+
else:
87+
raise NotFoundException("Topic does not exist")
88+
89+
def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None:
90+
store = self.get_store(context.account_id, context.region)
91+
92+
store.topics.pop(topic_arn, None)
93+
94+
def list_topics(
95+
self, context: RequestContext, next_token: nextToken | None = None, **kwargs
96+
) -> ListTopicsResponse:
97+
store = self.get_store(context.account_id, context.region)
98+
topics = [{"TopicArn": t["arn"]} for t in list(store.topics.values())]
99+
topics = PaginatedList(topics)
100+
page, nxt = topics.get_page(
101+
lambda topic: topic["TopicArn"], next_token=next_token, page_size=100
102+
)
103+
topics = {"Topics": page, "NextToken": nxt}
104+
return ListTopicsResponse(**topics)
105+
106+
def set_topic_attributes(
107+
self,
108+
context: RequestContext,
109+
topic_arn: topicARN,
110+
attribute_name: attributeName,
111+
attribute_value: attributeValue | None = None,
112+
**kwargs,
113+
) -> None:
114+
topic: Topic = self._get_topic(arn=topic_arn, context=context)
115+
if attribute_name == "FifoTopic":
116+
raise InvalidParameterException("Invalid parameter: AttributeName")
117+
topic["attributes"][attribute_name] = attribute_value
118+
119+
@staticmethod
120+
def get_store(account_id: str, region: str) -> SnsStore:
121+
return sns_stores[account_id][region]
122+
123+
# TODO: reintroduce multi-region parameter (latest before final migration from v1)
124+
@staticmethod
125+
def _get_topic(arn: str, context: RequestContext) -> Topic:
126+
"""
127+
:param arn: the Topic ARN
128+
:param context: the RequestContext of the request
129+
:return: the model Topic
130+
"""
131+
arn_data = parse_and_validate_topic_arn(arn)
132+
if context.region != arn_data["region"]:
133+
raise InvalidParameterException("Invalid parameter: TopicArn")
134+
try:
135+
store = SnsProvider.get_store(context.account_id, context.region)
136+
return store.topics[arn]
137+
except KeyError:
138+
raise NotFoundException("Topic does not exist")
139+
140+
141+
def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData:
142+
topic_arn = topic_arn or ""
143+
try:
144+
return parse_arn(topic_arn)
145+
except InvalidArnException:
146+
count = len(topic_arn.split(":"))
147+
raise InvalidParameterException(
148+
f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}"
149+
)
150+
151+
152+
def _create_topic(name: str, attributes: dict, context: RequestContext) -> Topic:
153+
topic_arn = sns_topic_arn(
154+
topic_name=name, region_name=context.region, account_id=context.account_id
155+
)
156+
topic: Topic = {
157+
"name": name,
158+
"arn": topic_arn,
159+
"attributes": {},
160+
}
161+
attrs = _default_attributes(topic, context)
162+
attrs.update(attributes or {})
163+
topic["attributes"] = attrs
164+
165+
return topic
166+
167+
168+
def _default_attributes(topic: Topic, context: RequestContext) -> TopicAttributesMap:
169+
default_attributes = {
170+
"DisplayName": "",
171+
"Owner": context.account_id,
172+
"Policy": _create_default_topic_policy(topic, context),
173+
"SubscriptionsConfirmed": "0",
174+
"SubscriptionsDeleted": "0",
175+
"SubscriptionsPending": "0",
176+
"TopicArn": topic["arn"],
177+
}
178+
if topic["name"].endswith(".fifo"):
179+
default_attributes.update(
180+
{
181+
"ContentBasedDeduplication": "false",
182+
"FifoTopic": "false",
183+
"SignatureVersion": "2",
184+
}
185+
)
186+
return default_attributes
187+
8188

9-
class SnsProvider(SnsApi): ...
189+
def _create_default_topic_policy(topic: Topic, context: RequestContext) -> str:
190+
return json.dumps(
191+
{
192+
"Version": "2008-10-17",
193+
"Id": "__default_policy_ID",
194+
"Statement": [
195+
{
196+
"Effect": "Allow",
197+
"Sid": "__default_statement_ID",
198+
"Principal": {"AWS": "*"},
199+
"Action": [
200+
"SNS:GetTopicAttributes",
201+
"SNS:SetTopicAttributes",
202+
"SNS:AddPermission",
203+
"SNS:RemovePermission",
204+
"SNS:DeleteTopic",
205+
"SNS:Subscribe",
206+
"SNS:ListSubscriptionsByTopic",
207+
"SNS:Publish",
208+
],
209+
"Resource": topic["arn"],
210+
"Condition": {"StringEquals": {"AWS:SourceOwner": context.account_id}},
211+
}
212+
],
213+
}
214+
)

tests/aws/services/sns/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def is_sns_v2_provider():
2424
return os.environ.get("PROVIDER_OVERRIDE_SNS") == "v2" and not is_aws_cloud()
2525

2626

27+
def is_sns_v1_provider() -> bool:
28+
return not os.environ.get("PROVIDER_OVERRIDE_SNS") == "v2" and not is_aws_cloud()
29+
30+
2731
skip_if_sns_v2 = pytest.mark.skipif(
2832
is_sns_v2_provider(),
2933
reason="Skipping test for v2 provider as it contains operations not yet supported",

0 commit comments

Comments
 (0)