Skip to content

Commit 2225bdf

Browse files
committed
execute Java Lambda functions in Docker containers
1 parent 89d6ee9 commit 2225bdf

12 files changed

Lines changed: 96 additions & 37 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ test-docker: ## Run automated tests in Docker
8888
ENTRYPOINT="--entrypoint=" CMD="make test" make docker-run
8989

9090
test-docker-mount:
91-
ENTRYPOINT="--entrypoint= -v `pwd`/localstack/utils:/opt/code/localstack/localstack/utils -v `pwd`/localstack/services:/opt/code/localstack/localstack/services -v `pwd`/tests:/opt/code/localstack/tests" CMD="make test" make docker-run
91+
ENTRYPOINT="--entrypoint= -v `pwd`/localstack/constants.py:/opt/code/localstack/localstack/constants.py -v `pwd`/localstack/utils:/opt/code/localstack/localstack/utils -v `pwd`/localstack/services:/opt/code/localstack/localstack/services -v `pwd`/tests:/opt/code/localstack/tests" CMD="make test" make docker-run
9292

9393
reinstall-p2: ## Re-initialize the virtualenv with Python 2.x
9494
rm -rf $(VENV_DIR)

localstack/constants.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
DEFAULT_PORT_DYNAMODB_BACKEND = 4564
3939
DEFAULT_PORT_S3_BACKEND = 4563
4040
DEFAULT_PORT_SNS_BACKEND = 4562
41-
DEFAULT_PORT_ELASTICSEARCH_BACKEND = 4561
42-
DEFAULT_PORT_CLOUDFORMATION_BACKEND = 4560
41+
DEFAULT_PORT_SQS_BACKEND = 4561
42+
DEFAULT_PORT_ELASTICSEARCH_BACKEND = 4560
43+
DEFAULT_PORT_CLOUDFORMATION_BACKEND = 4559
4344

4445
DEFAULT_PORT_WEB_UI = 8080
4546

localstack/plugins.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from localstack.services.dynamodb import dynamodb_listener, dynamodb_starter
55
from localstack.services.kinesis import kinesis_listener, kinesis_starter
66
from localstack.services.sns import sns_listener
7+
from localstack.services.sqs import sqs_listener
78
from localstack.services.s3 import s3_listener, s3_starter
89
from localstack.services.es import es_starter
910

@@ -25,7 +26,8 @@ def register_localstack_plugins():
2526
start=start_sns,
2627
listener=sns_listener.UPDATE_SNS))
2728
register_plugin(Plugin('sqs',
28-
start=start_sqs))
29+
start=start_sqs,
30+
listener=sqs_listener.UPDATE_SQS))
2931
register_plugin(Plugin('ses',
3032
start=start_ses))
3133
register_plugin(Plugin('apigateway',

localstack/services/awslambda/lambda_api.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,22 @@ def run_lambda(func, event, context, func_arn, suppress_output=False):
239239
handler_args = '"%s"' % handler
240240
entrypoint = ''
241241

242+
# prepare event body
243+
if not event:
244+
LOG.warning('Empty event body specified for invocation of Lambda "%s"' % func_arn)
245+
event = {}
246+
event_body = json.dumps(event)
247+
242248
# if running a Java Lambda, set up classpath arguments
243249
if runtime == LAMBDA_RUNTIME_JAVA8:
250+
# copy executor jar into temp directory
251+
cp_r(LAMBDA_EXECUTOR_JAR, lambda_cwd)
244252
# TODO cleanup once we have custom Java Docker image
245-
event_file = 'event_file.json' # TODO
246-
handler_args = ("java -cp .:`ls *.jar | tr '\\n' ':'` '%s' '%s' '%s'" %
247-
(LAMBDA_EXECUTOR_CLASS, handler, event_file))
253+
taskdir = '/var/task'
254+
event_file = 'event_file.json'
255+
save_file(os.path.join(lambda_cwd, event_file), event_body)
256+
handler_args = ("bash -c 'cd %s; java -cp .:`ls *.jar | tr \"\\n\" \":\"` \"%s\" \"%s\" \"%s\"'" %
257+
(taskdir, LAMBDA_EXECUTOR_CLASS, handler, event_file))
248258
entrypoint = ' --entrypoint ""'
249259

250260
if config.LAMBDA_REMOTE_DOCKER:
@@ -268,16 +278,11 @@ def run_lambda(func, event, context, func_arn, suppress_output=False):
268278
) % (entrypoint, lambda_cwd_on_host, runtime, handler_args)
269279

270280
print(cmd)
271-
# prepare event body
272-
if event is None or str(event).strip() == '':
273-
LOG.warning('Empty event body specified for invocation of Lambda "%s"' % func_arn)
274-
event_body = '{}'
275-
else:
276-
event_body = json.dumps(event).replace("'", "\\'")
281+
event_body_escaped = event_body.replace("'", "\\'")
277282
# lambci writes the Lambda result to stdout and logs to stderr, fetch it from there!
278283
env_vars = {
279-
'AWS_LAMBDA_EVENT_BODY': event_body,
280-
'HOSTNAME': DOCKER_BRIDGE_IP,
284+
'AWS_LAMBDA_EVENT_BODY': event_body_escaped,
285+
'HOSTNAME': DOCKER_BRIDGE_IP
281286
}
282287
result, log_output = run_lambda_executor(cmd, env_vars)
283288
LOG.debug('Lambda log output:\n%s' % log_output)
@@ -367,7 +372,7 @@ def set_function_code(code, lambda_name):
367372

368373
def generic_handler(event, context):
369374
raise Exception(('Unable to find executor for Lambda function "%s". ' +
370-
'Note that non-Python Lambdas require LAMBDA_EXECUTOR=docker') % lambda_name)
375+
'Note that Node.js Lambdas currently require LAMBDA_EXECUTOR=docker') % lambda_name)
371376

372377
lambda_handler = generic_handler
373378
lambda_cwd = None

localstack/services/dynamodb/dynamodb_listener.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from localstack import config
77
from localstack.utils.aws import aws_stack
88
from localstack.utils.common import *
9+
from localstack.utils.analytics import event_publisher
910
from localstack.constants import *
1011
from localstack.services.awslambda import lambda_api
1112
from localstack.services.dynamodbstreams import dynamodbstreams_api
@@ -106,6 +107,12 @@ def return_response(self, method, path, data, headers, response):
106107
elif action == '%s.CreateTable' % ACTION_PREFIX:
107108
if 'StreamSpecification' in data:
108109
create_dynamodb_stream(data)
110+
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,
111+
payload={'n': event_publisher.get_hash(data['TableName'])})
112+
return
113+
elif action == '%s.DeleteTable' % ACTION_PREFIX:
114+
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,
115+
payload={'n': event_publisher.get_hash(data['TableName'])})
109116
return
110117
elif action == '%s.UpdateTable' % ACTION_PREFIX:
111118
if 'StreamSpecification' in data:

localstack/services/generic_proxy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def forward_request(self, method, path, data, headers):
4343
of the incoming request, and returns either of the following results:
4444
4545
* True if the request should be forwarded to the backend service as-is (default).
46-
* An integer (e.g., 200) service code to return directly to the client without
46+
* An integer (e.g., 200) status code to return directly to the client without
4747
calling the backend service.
4848
* An instance of requests.models.Response to return directly to the client without
4949
calling the backend service.

localstack/services/infra.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ def start_sns(port=PORT_SNS, async=False, update_listener=None):
124124
backend_port=DEFAULT_PORT_SNS_BACKEND, update_listener=update_listener)
125125

126126

127+
def start_sqs(port=PORT_SQS, async=False, update_listener=None):
128+
return start_moto_server('sqs', port, name='SQS', async=async,
129+
backend_port=DEFAULT_PORT_SQS_BACKEND, update_listener=update_listener)
130+
131+
127132
def start_cloudformation(port=PORT_CLOUDFORMATION, async=False, update_listener=None):
128133
return start_moto_server('cloudformation', port, name='CloudFormation', async=async,
129134
backend_port=DEFAULT_PORT_CLOUDFORMATION_BACKEND, update_listener=update_listener)
@@ -137,10 +142,6 @@ def start_redshift(port=PORT_REDSHIFT, async=False):
137142
return start_moto_server('redshift', port, name='Redshift', async=async)
138143

139144

140-
def start_sqs(port=PORT_SQS, async=False):
141-
return start_moto_server('sqs', port, name='SQS', async=async)
142-
143-
144145
def start_route53(port=PORT_ROUTE53, async=False):
145146
return start_moto_server('route53', port, name='Route53', async=async)
146147

localstack/services/s3/s3_listener.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from localstack.utils import persistence
1313
from localstack.utils.aws import aws_stack
1414
from localstack.utils.common import timestamp, TIMESTAMP_FORMAT_MILLIS, to_str, to_bytes
15+
from localstack.utils.analytics import event_publisher
1516
from localstack.services.generic_proxy import ProxyListener
1617

1718
# mappings for S3 bucket notifications
@@ -199,7 +200,7 @@ def forward_request(self, method, path, data, headers):
199200
modified_data = None
200201

201202
# If this request contains streaming v4 authentication signatures, strip them from the message
202-
# Related isse: https://github.com/atlassian/localstack/issues/98
203+
# Related isse: https://github.com/localstack/localstack/issues/98
203204
# TODO we should evaluate whether to replace moto s3 with scality/S3:
204205
# https://github.com/scality/S3/issues/237
205206
if headers.get('x-amz-content-sha256') == 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD':
@@ -264,16 +265,23 @@ def forward_request(self, method, path, data, headers):
264265

265266
def return_response(self, method, path, data, headers, response):
266267

268+
parsed = urlparse.urlparse(path)
269+
# TODO: consider the case of hostname-based (as opposed to path-based) bucket addressing
270+
bucket_name = parsed.path.split('/')[1]
271+
267272
# get subscribers and send bucket notifications
268273
if method in ('PUT', 'DELETE') and '/' in path[1:]:
269-
parts = path[1:].split('/', 1)
270-
bucket_name = parts[0]
274+
parts = parsed.path[1:].split('/', 1)
271275
object_path = '/%s' % parts[1]
272276
send_notifications(method, bucket_name, object_path)
277+
# for creation/deletion of buckets, publish an event:
278+
if method in ('PUT', 'DELETE') and '/' not in path[1:]:
279+
event_type = (event_publisher.EVENT_S3_CREATE_BUCKET if method == 'PUT'
280+
else event_publisher.EVENT_S3_DELETE_BUCKET)
281+
event_publisher.fire_event(event_type, payload={'n': event_publisher.get_hash(bucket_name)})
282+
273283
# append CORS headers to response
274284
if response:
275-
parsed = urlparse.urlparse(path)
276-
bucket_name = parsed.path.split('/')[0]
277285
append_cors_headers(bucket_name, request_method=method, request_headers=headers, response=response)
278286

279287
# we need to un-pretty-print the XML, otherwise we run into this issue with Spark:

localstack/services/sqs/__init__.py

Whitespace-only changes.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import xmltodict
2+
from six.moves.urllib import parse as urlparse
3+
from localstack.utils.analytics import event_publisher
4+
from localstack.services.generic_proxy import ProxyListener
5+
6+
7+
class ProxyListenerSQS(ProxyListener):
8+
9+
def return_response(self, method, path, data, headers, response):
10+
11+
if method == 'POST' and path == '/':
12+
req_data = urlparse.parse_qs(data)
13+
action = req_data.get('Action', [None])[0]
14+
event_type = None
15+
if action == 'CreateQueue':
16+
event_type = event_publisher.EVENT_SQS_CREATE_QUEUE
17+
response_data = xmltodict.parse(response.content)
18+
queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']
19+
elif action == 'DeleteQueue':
20+
event_type = event_publisher.EVENT_SQS_DELETE_QUEUE
21+
queue_url = req_data.get('QueueUrl', [None])[0]
22+
23+
if event_type:
24+
event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})
25+
26+
return True
27+
28+
29+
# instantiate listener
30+
UPDATE_SQS = ProxyListenerSQS()

0 commit comments

Comments
 (0)