|
| 1 | +import json |
1 | 2 | import logging |
| 3 | +import re |
2 | 4 |
|
3 | | -from localstack.aws.api.sns import SnsApi |
| 5 | +from botocore.utils import InvalidArnException |
| 6 | + |
| 7 | +from localstack.aws.api import RequestContext |
| 8 | +from localstack.aws.api.sns import ( |
| 9 | + CreateTopicResponse, |
| 10 | + GetTopicAttributesResponse, |
| 11 | + InvalidParameterException, |
| 12 | + ListTopicsResponse, |
| 13 | + NotFoundException, |
| 14 | + SnsApi, |
| 15 | + TagList, |
| 16 | + TopicAttributesMap, |
| 17 | + attributeName, |
| 18 | + attributeValue, |
| 19 | + nextToken, |
| 20 | + topicARN, |
| 21 | + topicName, |
| 22 | +) |
| 23 | +from localstack.services.sns.v2.models import SnsStore, Topic, sns_stores |
| 24 | +from localstack.utils.aws.arns import ArnData, parse_arn, sns_topic_arn |
| 25 | +from localstack.utils.collections import PaginatedList |
4 | 26 |
|
5 | 27 | # set up logger |
6 | 28 | LOG = logging.getLogger(__name__) |
7 | 29 |
|
| 30 | +SNS_TOPIC_NAME_PATTERN_FIFO = r"^[a-zA-Z0-9_-]{1,256}\.fifo$" |
| 31 | +SNS_TOPIC_NAME_PATTERN = r"^[a-zA-Z0-9_-]{1,256}$" |
| 32 | + |
| 33 | + |
| 34 | +class SnsProvider(SnsApi): |
| 35 | + def create_topic( |
| 36 | + self, |
| 37 | + context: RequestContext, |
| 38 | + name: topicName, |
| 39 | + attributes: TopicAttributesMap | None = None, |
| 40 | + tags: TagList | None = None, |
| 41 | + data_protection_policy: attributeValue | None = None, |
| 42 | + **kwargs, |
| 43 | + ) -> CreateTopicResponse: |
| 44 | + store = self.get_store(context.account_id, context.region) |
| 45 | + topic_arn = sns_topic_arn( |
| 46 | + topic_name=name, region_name=context.region, account_id=context.account_id |
| 47 | + ) |
| 48 | + topic: Topic = store.topics.get(topic_arn) |
| 49 | + attributes = attributes or {} |
| 50 | + if topic: |
| 51 | + attrs = topic["attributes"] |
| 52 | + for k, v in attributes.values(): |
| 53 | + if not attrs.get(k) or not attrs.get(k) == v: |
| 54 | + # TODO: |
| 55 | + raise InvalidParameterException("Fix this Exception message and type") |
| 56 | + return CreateTopicResponse(TopicArn=topic_arn) |
| 57 | + |
| 58 | + attributes = attributes or {} |
| 59 | + if attributes.get("FifoTopic") and attributes["FifoTopic"].lower() == "true": |
| 60 | + fifo_match = re.match(SNS_TOPIC_NAME_PATTERN_FIFO, name) |
| 61 | + if not fifo_match: |
| 62 | + # TODO: check this with a separate test |
| 63 | + raise InvalidParameterException( |
| 64 | + "Fifo Topic names must end with .fifo and must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long." |
| 65 | + ) |
| 66 | + else: |
| 67 | + # AWS does not seem to save explicit settings of fifo = false |
| 68 | + attributes.pop("FifoTopic", None) |
| 69 | + name_match = re.match(SNS_TOPIC_NAME_PATTERN, name) |
| 70 | + if not name_match: |
| 71 | + raise InvalidParameterException("Invalid parameter: Topic Name") |
| 72 | + |
| 73 | + topic = _create_topic(name=name, attributes=attributes, context=context) |
| 74 | + store.topics[topic_arn] = topic |
| 75 | + # todo: tags |
| 76 | + |
| 77 | + return CreateTopicResponse(TopicArn=topic_arn) |
| 78 | + |
| 79 | + def get_topic_attributes( |
| 80 | + self, context: RequestContext, topic_arn: topicARN, **kwargs |
| 81 | + ) -> GetTopicAttributesResponse: |
| 82 | + topic: Topic = self._get_topic(arn=topic_arn, context=context) |
| 83 | + if topic: |
| 84 | + attributes = topic["attributes"] |
| 85 | + return GetTopicAttributesResponse(Attributes=attributes) |
| 86 | + else: |
| 87 | + raise NotFoundException("Topic does not exist") |
| 88 | + |
| 89 | + def delete_topic(self, context: RequestContext, topic_arn: topicARN, **kwargs) -> None: |
| 90 | + store = self.get_store(context.account_id, context.region) |
| 91 | + |
| 92 | + store.topics.pop(topic_arn, None) |
| 93 | + |
| 94 | + def list_topics( |
| 95 | + self, context: RequestContext, next_token: nextToken | None = None, **kwargs |
| 96 | + ) -> ListTopicsResponse: |
| 97 | + store = self.get_store(context.account_id, context.region) |
| 98 | + topics = [{"TopicArn": t["arn"]} for t in list(store.topics.values())] |
| 99 | + topics = PaginatedList(topics) |
| 100 | + page, nxt = topics.get_page( |
| 101 | + lambda topic: topic["TopicArn"], next_token=next_token, page_size=100 |
| 102 | + ) |
| 103 | + topics = {"Topics": page, "NextToken": nxt} |
| 104 | + return ListTopicsResponse(**topics) |
| 105 | + |
| 106 | + def set_topic_attributes( |
| 107 | + self, |
| 108 | + context: RequestContext, |
| 109 | + topic_arn: topicARN, |
| 110 | + attribute_name: attributeName, |
| 111 | + attribute_value: attributeValue | None = None, |
| 112 | + **kwargs, |
| 113 | + ) -> None: |
| 114 | + topic: Topic = self._get_topic(arn=topic_arn, context=context) |
| 115 | + if attribute_name == "FifoTopic": |
| 116 | + raise InvalidParameterException("Invalid parameter: AttributeName") |
| 117 | + topic["attributes"][attribute_name] = attribute_value |
| 118 | + |
| 119 | + @staticmethod |
| 120 | + def get_store(account_id: str, region: str) -> SnsStore: |
| 121 | + return sns_stores[account_id][region] |
| 122 | + |
| 123 | + # TODO: reintroduce multi-region parameter (latest before final migration from v1) |
| 124 | + @staticmethod |
| 125 | + def _get_topic(arn: str, context: RequestContext) -> Topic: |
| 126 | + """ |
| 127 | + :param arn: the Topic ARN |
| 128 | + :param context: the RequestContext of the request |
| 129 | + :return: the model Topic |
| 130 | + """ |
| 131 | + arn_data = parse_and_validate_topic_arn(arn) |
| 132 | + if context.region != arn_data["region"]: |
| 133 | + raise InvalidParameterException("Invalid parameter: TopicArn") |
| 134 | + try: |
| 135 | + store = SnsProvider.get_store(context.account_id, context.region) |
| 136 | + return store.topics[arn] |
| 137 | + except KeyError: |
| 138 | + raise NotFoundException("Topic does not exist") |
| 139 | + |
| 140 | + |
| 141 | +def parse_and_validate_topic_arn(topic_arn: str | None) -> ArnData: |
| 142 | + topic_arn = topic_arn or "" |
| 143 | + try: |
| 144 | + return parse_arn(topic_arn) |
| 145 | + except InvalidArnException: |
| 146 | + count = len(topic_arn.split(":")) |
| 147 | + raise InvalidParameterException( |
| 148 | + f"Invalid parameter: TopicArn Reason: An ARN must have at least 6 elements, not {count}" |
| 149 | + ) |
| 150 | + |
| 151 | + |
| 152 | +def _create_topic(name: str, attributes: dict, context: RequestContext) -> Topic: |
| 153 | + topic_arn = sns_topic_arn( |
| 154 | + topic_name=name, region_name=context.region, account_id=context.account_id |
| 155 | + ) |
| 156 | + topic: Topic = { |
| 157 | + "name": name, |
| 158 | + "arn": topic_arn, |
| 159 | + "attributes": {}, |
| 160 | + } |
| 161 | + attrs = _default_attributes(topic, context) |
| 162 | + attrs.update(attributes or {}) |
| 163 | + topic["attributes"] = attrs |
| 164 | + |
| 165 | + return topic |
| 166 | + |
| 167 | + |
| 168 | +def _default_attributes(topic: Topic, context: RequestContext) -> TopicAttributesMap: |
| 169 | + default_attributes = { |
| 170 | + "DisplayName": "", |
| 171 | + "Owner": context.account_id, |
| 172 | + "Policy": _create_default_topic_policy(topic, context), |
| 173 | + "SubscriptionsConfirmed": "0", |
| 174 | + "SubscriptionsDeleted": "0", |
| 175 | + "SubscriptionsPending": "0", |
| 176 | + "TopicArn": topic["arn"], |
| 177 | + } |
| 178 | + if topic["name"].endswith(".fifo"): |
| 179 | + default_attributes.update( |
| 180 | + { |
| 181 | + "ContentBasedDeduplication": "false", |
| 182 | + "FifoTopic": "false", |
| 183 | + "SignatureVersion": "2", |
| 184 | + } |
| 185 | + ) |
| 186 | + return default_attributes |
| 187 | + |
8 | 188 |
|
9 | | -class SnsProvider(SnsApi): ... |
| 189 | +def _create_default_topic_policy(topic: Topic, context: RequestContext) -> str: |
| 190 | + return json.dumps( |
| 191 | + { |
| 192 | + "Version": "2008-10-17", |
| 193 | + "Id": "__default_policy_ID", |
| 194 | + "Statement": [ |
| 195 | + { |
| 196 | + "Effect": "Allow", |
| 197 | + "Sid": "__default_statement_ID", |
| 198 | + "Principal": {"AWS": "*"}, |
| 199 | + "Action": [ |
| 200 | + "SNS:GetTopicAttributes", |
| 201 | + "SNS:SetTopicAttributes", |
| 202 | + "SNS:AddPermission", |
| 203 | + "SNS:RemovePermission", |
| 204 | + "SNS:DeleteTopic", |
| 205 | + "SNS:Subscribe", |
| 206 | + "SNS:ListSubscriptionsByTopic", |
| 207 | + "SNS:Publish", |
| 208 | + ], |
| 209 | + "Resource": topic["arn"], |
| 210 | + "Condition": {"StringEquals": {"AWS:SourceOwner": context.account_id}}, |
| 211 | + } |
| 212 | + ], |
| 213 | + } |
| 214 | + ) |
0 commit comments