forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
140 lines (115 loc) · 5.05 KB
/
main.py
File metadata and controls
140 lines (115 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright 2019 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import os
from flask import current_app, Flask, render_template, request
from google.auth.transport import requests
from google.cloud import pubsub_v1
from google.oauth2 import id_token
app = Flask(__name__)
# Configure the following environment variables via app.yaml
# This is used in the push request handler to verify that the request came from
# pubsub and originated from a trusted source.
app.config["PUBSUB_VERIFICATION_TOKEN"] = os.environ["PUBSUB_VERIFICATION_TOKEN"]
app.config["PUBSUB_TOPIC"] = os.environ["PUBSUB_TOPIC"]
app.config["GOOGLE_CLOUD_PROJECT"] = os.environ["GOOGLE_CLOUD_PROJECT"]
# Global list to store messages, tokens, etc. received by this instance.
MESSAGES = []
TOKENS = []
CLAIMS = []
# [START gae_standard_pubsub_index]
@app.route("/", methods=["GET", "POST"])
def index():
if request.method == "GET":
return render_template(
"index.html", messages=MESSAGES, tokens=TOKENS, claims=CLAIMS
)
data = request.form.get("payload", "Example payload").encode("utf-8")
# Consider initializing the publisher client outside this function
# for better latency performance.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
app.config["GOOGLE_CLOUD_PROJECT"], app.config["PUBSUB_TOPIC"]
)
future = publisher.publish(topic_path, data)
future.result()
return "OK", 200
# [END gae_standard_pubsub_index]
# [START gae_standard_pubsub_auth_push]
@app.route("/push-handlers/receive_messages", methods=["POST"])
def receive_messages_handler():
# Verify that the request originates from the application.
if request.args.get("token", "") != current_app.config["PUBSUB_VERIFICATION_TOKEN"]:
return "Invalid request", 400
# Verify that the push request originates from Cloud Pub/Sub.
try:
# Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
bearer_token = request.headers.get("Authorization")
token = bearer_token.split(" ")[1]
TOKENS.append(token)
# Verify and decode the JWT. `verify_oauth2_token` verifies
# the JWT signature, the `aud` claim, and the `exp` claim.
# Note: For high volume push requests, it would save some network
# overhead if you verify the tokens offline by downloading Google's
# Public Cert and decode them using the `google.auth.jwt` module;
# caching already seen tokens works best when a large volume of
# messages have prompted a single push server to handle them, in which
# case they would all share the same token for a limited time window.
claim = id_token.verify_oauth2_token(
token, requests.Request(), audience="example.com"
)
# IMPORTANT: you should validate claim details not covered by signature
# and audience verification above, including:
# - Ensure that `claim["email"]` is equal to the expected service
# account set up in the push subscription settings.
# - Ensure that `claim["email_verified"]` is set to true.
CLAIMS.append(claim)
except Exception as e:
return f"Invalid token: {e}\n", 400
envelope = json.loads(request.data.decode("utf-8"))
payload = base64.b64decode(envelope["message"]["data"])
MESSAGES.append(payload)
# Returning any 2xx status indicates successful receipt of the message.
return "OK", 200
# [END gae_standard_pubsub_auth_push]
# [START gae_standard_pubsub_push]
@app.route("/pubsub/push", methods=["POST"])
def receive_pubsub_messages_handler():
# Verify that the request originates from the application.
if request.args.get("token", "") != current_app.config["PUBSUB_VERIFICATION_TOKEN"]:
return "Invalid request", 400
envelope = json.loads(request.data.decode("utf-8"))
payload = base64.b64decode(envelope["message"]["data"])
MESSAGES.append(payload)
# Returning any 2xx status indicates successful receipt of the message.
return "OK", 200
# [END gae_standard_pubsub_push]
@app.errorhandler(500)
def server_error(e):
logging.exception("An error occurred during a request.")
return (
"""
An internal error occurred: <pre>{}</pre>
See logs for full stacktrace.
""".format(
e
),
500,
)
if __name__ == "__main__":
# This is used when running locally. Gunicorn is used to run the
# application on Google App Engine. See entrypoint in app.yaml.
app.run(host="127.0.0.1", port=8080, debug=True)