Skip to content

Commit ac44173

Browse files
authored
Encryption sample (temporalio#16)
Fixes temporalio#5
1 parent 7f2f9b7 commit ac44173

8 files changed

Lines changed: 1042 additions & 68 deletions

File tree

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ Prerequisites:
1616

1717
With this repository cloned, run the following at the root of the directory:
1818

19-
poetry install --no-root
19+
poetry install
2020

21-
That loads all dependencies. Then to run a sample, usually you just run it in Python. For example:
21+
That loads all required dependencies. Then to run a sample, usually you just run it in Python. For example:
2222

2323
poetry run python hello/hello_activity.py
2424

25-
See each sample's directory for specific instructions.
25+
Some examples require extra dependencies. See each sample's directory for specific instructions.
2626

2727
## Samples
2828

@@ -53,3 +53,4 @@ See each sample's directory for specific instructions.
5353
while running.
5454
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5555
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
56+
* [encryption](encryption) - Apply end-to-end encryption for all input/output.

encryption/README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Encryption Sample
2+
3+
This sample shows how to make an encryption codec for end-to-end encryption. It is built to work with the encryption
4+
samples [in TypeScript](https://github.com/temporalio/samples-typescript/tree/main/encryption) and
5+
[in Go](https://github.com/temporalio/samples-go/tree/main/encryption).
6+
7+
For this sample, the optional `encryption` dependency group must be included. To include, run:
8+
9+
poetry install --with encryption
10+
11+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
12+
worker:
13+
14+
poetry run python worker.py
15+
16+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
17+
18+
poetry run python starter.py
19+
20+
The workflow should complete with the hello result. To view the workflow, use [tctl](https://docs.temporal.io/tctl/):
21+
22+
tctl workflow show --workflow_id encryption-workflow-id
23+
24+
Note how the input/result look like (with wrapping removed):
25+
26+
```
27+
Input:[encoding binary/encrypted: payload encoding is not supported]
28+
...
29+
Result:[encoding binary/encrypted: payload encoding is not supported]
30+
```
31+
32+
This is because the data is encrypted and not visible. To make data visible to external Temporal tools like `tctl` and
33+
the UI, start a codec server in another terminal:
34+
35+
poetry run python codec_server.py
36+
37+
Now with that running, run `tctl` again with the codec endpoint:
38+
39+
tctl --codec_endpoint http://localhost:8081 workflow show --workflow_id encryption-workflow-id
40+
41+
Notice now the output has the unencrypted values:
42+
43+
```
44+
Input:["Temporal"]
45+
...
46+
Result:["Hello, Temporal"]
47+
```
48+
49+
This decryption did not leave the local machine here.
50+
51+
Same case with the web UI. If you go to the web UI, you'll only see encrypted input/results. But, assuming your web UI
52+
is at `http://localhost:8080`, if you set the "Remote Codec Endpoint" in the web UI to `http://localhost:8081` you can
53+
then see the unencrypted results. This is possible because CORS settings in the codec server allow the browser to access
54+
the codec server directly over localhost. They can be changed to suit Temporal cloud web UI instead if necessary.

encryption/codec.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import base64
2+
import os
3+
from typing import Iterable, List
4+
5+
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
6+
from temporalio.api.common.v1 import Payload
7+
from temporalio.converter import PayloadCodec
8+
9+
default_key = base64.b64decode(b"MkUb3RVdHQuOTedqETZW7ra2GkZqpBRmYWRACUospMc=")
10+
default_key_id = "my-key"
11+
12+
13+
class EncryptionCodec(PayloadCodec):
14+
def __init__(self, key_id: str = default_key_id, key: bytes = default_key) -> None:
15+
super().__init__()
16+
self.key_id = key_id
17+
# We are using direct AESGCM to be compatible with samples from
18+
# TypeScript and Go. Pure Python samples may prefer the higher-level,
19+
# safer APIs.
20+
self.encryptor = AESGCM(key)
21+
22+
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
23+
# We blindly encode all payloads with the key and set the metadata
24+
# saying which key we used
25+
return [
26+
Payload(
27+
metadata={
28+
"encoding": b"binary/encrypted",
29+
"encryption-key-id": self.key_id.encode(),
30+
},
31+
data=self.encrypt(p.SerializeToString()),
32+
)
33+
for p in payloads
34+
]
35+
36+
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
37+
ret: List[Payload] = []
38+
for p in payloads:
39+
# Ignore ones w/out our expected encoding
40+
if p.metadata.get("encoding", b"").decode() != "binary/encrypted":
41+
ret.append(p)
42+
continue
43+
# Confirm our key ID is the same
44+
key_id = p.metadata.get("encryption-key-id", b"").decode()
45+
if key_id != self.key_id:
46+
raise ValueError(f"Unrecognized key ID {key_id}")
47+
# Decrypt and append
48+
ret.append(Payload.FromString(self.decrypt(p.data)))
49+
return ret
50+
51+
def encrypt(self, data: bytes) -> bytes:
52+
nonce = os.urandom(12)
53+
return nonce + self.encryptor.encrypt(nonce, data, None)
54+
55+
def decrypt(self, data: bytes) -> bytes:
56+
return self.encryptor.decrypt(data[:12], data[12:], None)

encryption/codec_server.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from functools import partial
2+
from typing import Awaitable, Callable, Iterable, List
3+
4+
from aiohttp import hdrs, web
5+
from google.protobuf import json_format
6+
from temporalio.api.common.v1 import Payload, Payloads
7+
8+
from encryption.codec import EncryptionCodec
9+
10+
11+
def build_codec_server() -> web.Application:
12+
# Cors handler
13+
async def cors_options(req: web.Request) -> web.Response:
14+
resp = web.Response()
15+
if req.headers.get(hdrs.ORIGIN) == "http://localhost:8080":
16+
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "http://localhost:8080"
17+
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_METHODS] = "POST"
18+
resp.headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = "content-type,x-namespace"
19+
return resp
20+
21+
# General purpose payloads-to-payloads
22+
async def apply(
23+
fn: Callable[[Iterable[Payload]], Awaitable[List[Payload]]], req: web.Request
24+
) -> web.Response:
25+
# Read payloads as JSON
26+
assert req.content_type == "application/json"
27+
payloads = json_format.Parse(await req.read(), Payloads())
28+
29+
# Apply
30+
payloads = Payloads(payloads=await fn(payloads.payloads))
31+
32+
# Apply CORS and return JSON
33+
resp = await cors_options(req)
34+
resp.content_type = "application/json"
35+
resp.text = json_format.MessageToJson(payloads)
36+
return resp
37+
38+
# Build app
39+
codec = EncryptionCodec()
40+
app = web.Application()
41+
app.add_routes(
42+
[
43+
web.post("/encode", partial(apply, codec.encode)),
44+
web.post("/decode", partial(apply, codec.decode)),
45+
web.options("/decode", cors_options),
46+
]
47+
)
48+
return app
49+
50+
51+
if __name__ == "__main__":
52+
web.run_app(build_codec_server(), host="127.0.0.1", port=8081)

encryption/starter.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import asyncio
2+
import dataclasses
3+
4+
import temporalio.converter
5+
from temporalio.client import Client
6+
7+
from encryption.codec import EncryptionCodec
8+
from encryption.worker import GreetingWorkflow
9+
10+
11+
async def main():
12+
# Connect client
13+
client = await Client.connect(
14+
"localhost:7233",
15+
# Use the default converter, but change the codec
16+
data_converter=dataclasses.replace(
17+
temporalio.converter.default(), payload_codec=EncryptionCodec()
18+
),
19+
)
20+
21+
# Run workflow
22+
result = await client.execute_workflow(
23+
GreetingWorkflow.run,
24+
"Temporal",
25+
id=f"encryption-workflow-id",
26+
task_queue="encryption-task-queue",
27+
)
28+
print(f"Workflow result: {result}")
29+
30+
31+
if __name__ == "__main__":
32+
asyncio.run(main())

encryption/worker.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import asyncio
2+
import dataclasses
3+
4+
import temporalio.converter
5+
from temporalio import workflow
6+
from temporalio.client import Client
7+
from temporalio.worker import Worker
8+
9+
from encryption.codec import EncryptionCodec
10+
11+
12+
@workflow.defn
13+
class GreetingWorkflow:
14+
@workflow.run
15+
async def run(self, name: str) -> str:
16+
return f"Hello, {name}"
17+
18+
19+
interrupt_event = asyncio.Event()
20+
21+
22+
async def main():
23+
# Connect client
24+
client = await Client.connect(
25+
"localhost:7233",
26+
# Use the default converter, but change the codec
27+
data_converter=dataclasses.replace(
28+
temporalio.converter.default(), payload_codec=EncryptionCodec()
29+
),
30+
)
31+
32+
# Run a worker for the workflow
33+
async with Worker(
34+
client,
35+
task_queue="encryption-task-queue",
36+
workflows=[GreetingWorkflow],
37+
):
38+
# Wait until interrupted
39+
print("Worker started, ctrl+c to exit")
40+
await interrupt_event.wait()
41+
print("Shutting down")
42+
43+
44+
if __name__ == "__main__":
45+
loop = asyncio.new_event_loop()
46+
try:
47+
loop.run_until_complete(main())
48+
except KeyboardInterrupt:
49+
interrupt_event.set()
50+
loop.run_until_complete(loop.shutdown_asyncgens())

0 commit comments

Comments
 (0)