-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
156 lines (126 loc) · 4.34 KB
/
Copy pathapi.py
File metadata and controls
156 lines (126 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import importlib.metadata
import os
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from .db import session
from .logging import getLogger
from .mail import otp_delivery
from .user import (
AuthenticationError,
EditableUser,
get_current_active,
Token,
UnknownUser,
User,
)
APP_NAME = "mycodeplug"
app = FastAPI()
logger = getLogger(APP_NAME)
class RootData(BaseModel):
"""
Information returned from the top level GET request.
"""
application: str = APP_NAME
version: str = importlib.metadata.metadata(APP_NAME)["version"]
@app.get("/", response_model=RootData)
async def root() -> RootData:
"""
:return: Information about the application and version
"""
return RootData()
@app.post("/login")
def login(
email: str,
request: Request,
deliver=Depends(otp_delivery),
session=Depends(session),
):
"""
Trigger a login request for the given email address.
Generate a one time password for login via magic link or standard
username/password oauth via /token endpoint.
In production mode, the otp would be emailed to the given address.
In development mode, the otp is printed to the console.
Either way, the client POSTing /token must be the same client
that POSTed /login.
:param email: the user to login
:param request: the request, used to fetch the login IP.
:return: None -- the token is emailed (or printed, in dev mode).
"""
try:
user = User.from_email(email, session=session)
except UnknownUser:
user = User(email=email, created_ip=request.client.host)
session.add(user)
session.commit()
session.refresh(user)
logger.info("Created a new user for {}".format(email))
return deliver(user, user.login(ip=request.client.host, session=session))
def _token(email: str, otp: str, request: Request) -> Token:
"""
Authenticate an OTP.
:param email: the user to login
:param otp: the one-time password from a /login request
:param request: the request, must match the IP that requested /login
:return: oauth Token
"""
token_data = User.from_email(email).authenticate(ip=request.client.host, otp=otp)
return Token(
access_token=token_data.to_jwt(),
token_type="bearer",
)
@app.post("/token", response_model=Token)
def token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()) -> Token:
"""
Standard OAuth2 Password endpoint
:param request: the request IP must match the IP that requested /login
:param form_data: username/password
:return: oauth Token
"""
return _token(form_data.username, form_data.password, request)
@app.get("/magic/{email}/{otp}", response_model=Token)
def magic(email: str, otp: str, request: Request) -> Token:
"""
Magic link login: XXX: Needs to be a JS application to save
the token client side.
:param email:
:param otp:
:param request:
:return:
"""
return _token(email, otp, request)
@app.get("/users/me", response_model=User)
async def get_users_me(current_user: User = Depends(get_current_active)) -> User:
"""
Get information about the authenticated user.
:param current_user: active user from oAuth/database
:return: User information
"""
return current_user
@app.post("/users/me")
def post_users_me(
data: EditableUser,
request: Request,
otp: Optional[str] = None,
current_user: User = Depends(get_current_active),
deliver=Depends(otp_delivery),
session=Depends(session),
):
updated_settings = data.dict(
exclude_none=True, exclude_unset=True, exclude_defaults=True
)
if "email" in updated_settings:
if otp is not None:
current_user.authenticate(ip=request.client.host, otp=otp, session=session)
else:
# handle email updates specially, to validate the new address
current_user.email = data.email
otp = current_user.login(ip=request.client.host, session=session)
deliver(current_user, otp)
return {"detail": "Resubmit request with updated OTP"}
for k, v in updated_settings.items():
setattr(current_user, k, v)
session.add(current_user)
session.commit()