forked from anomalyco/sst
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQueue.ts
More file actions
123 lines (108 loc) · 3.63 KB
/
Copy pathQueue.ts
File metadata and controls
123 lines (108 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import * as cdk from "@aws-cdk/core";
import * as sqs from "@aws-cdk/aws-sqs";
import * as lambdaEventSources from "@aws-cdk/aws-lambda-event-sources";
import { App } from "./App";
import { Function as Fn, FunctionDefinition } from "./Function";
import { Permissions } from "./util/permission";
export interface QueueProps {
readonly sqsQueue?: sqs.IQueue | sqs.QueueProps;
readonly consumer?: FunctionDefinition | QueueConsumerProps;
}
export interface QueueConsumerProps {
readonly function: FunctionDefinition;
readonly consumerProps?: lambdaEventSources.SqsEventSourceProps;
}
export class Queue extends cdk.Construct {
public readonly sqsQueue: sqs.Queue;
public consumerFunction?: Fn;
private readonly permissionsAttachedForAllConsumers: Permissions[];
constructor(scope: cdk.Construct, id: string, props?: QueueProps) {
super(scope, id);
const root = scope.node.root as App;
const {
// Queue props
sqsQueue,
// Function props
consumer,
} = props || {};
this.permissionsAttachedForAllConsumers = [];
////////////////////
// Create Queue
////////////////////
if (cdk.Construct.isConstruct(sqsQueue)) {
this.sqsQueue = sqsQueue as sqs.Queue;
} else {
const sqsQueueProps: sqs.QueueProps = sqsQueue || {};
// If debugIncreaseTimeout is enabled (ie. sst start):
// - Set visibilityTimeout to > 900s. This is because Lambda timeout is
// set to 900s, and visibilityTimeout has to be greater or equal to it.
// This will give people more time to debug the function without timing
// out the request.
let debugOverrideProps;
if (root.debugIncreaseTimeout) {
if (
!sqsQueueProps.visibilityTimeout ||
sqsQueueProps.visibilityTimeout.toSeconds() < 900
) {
debugOverrideProps = {
visibilityTimeout: cdk.Duration.seconds(900),
};
}
}
const name =
root.logicalPrefixedName(id) + (sqsQueueProps.fifo ? ".fifo" : "");
this.sqsQueue = new sqs.Queue(this, "Queue", {
queueName: name,
...sqsQueueProps,
...(debugOverrideProps || {}),
});
}
///////////////////////////
// Create Consumer
///////////////////////////
if (consumer) {
this.addConsumer(this, consumer);
}
}
public addConsumer(
scope: cdk.Construct,
consumer: FunctionDefinition | QueueConsumerProps
): void {
if (this.consumerFunction) {
throw new Error("Cannot configure more than 1 consumer for a Queue");
}
// create consumer
if ((consumer as QueueConsumerProps).function) {
consumer = consumer as QueueConsumerProps;
this.consumerFunction = Fn.fromDefinition(
scope,
"Consumer",
consumer.function
);
this.consumerFunction.addEventSource(
new lambdaEventSources.SqsEventSource(
this.sqsQueue,
consumer.consumerProps
)
);
} else {
consumer = consumer as FunctionDefinition;
this.consumerFunction = Fn.fromDefinition(scope, `Consumer`, consumer);
this.consumerFunction.addEventSource(
new lambdaEventSources.SqsEventSource(this.sqsQueue)
);
}
// attach permissions
this.permissionsAttachedForAllConsumers.forEach((permissions) => {
if (this.consumerFunction) {
this.consumerFunction.attachPermissions(permissions);
}
});
}
public attachPermissions(permissions: Permissions): void {
if (this.consumerFunction) {
this.consumerFunction.attachPermissions(permissions);
}
this.permissionsAttachedForAllConsumers.push(permissions);
}
}