forked from Zipstack/unstract
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_deployment_views.py
More file actions
152 lines (135 loc) · 5.67 KB
/
api_deployment_views.py
File metadata and controls
152 lines (135 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import json
import logging
from typing import Any, Optional
from api.constants import ApiExecution
from api.deployment_helper import DeploymentHelper
from api.exceptions import InvalidAPIRequest, NoActiveAPIKeyError
from api.models import APIDeployment
from api.postman_collection.dto import PostmanCollection
from api.serializers import (
APIDeploymentListSerializer,
APIDeploymentSerializer,
DeploymentResponseSerializer,
ExecutionRequestSerializer,
)
from django.db.models import QuerySet
from django.http import HttpResponse
from permissions.permission import IsOwner
from rest_framework import serializers, status, views, viewsets
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import Serializer
from utils.enums import CeleryTaskState
from workflow_manager.workflow.dto import ExecutionResponse
logger = logging.getLogger(__name__)
class DeploymentExecution(views.APIView):
def initialize_request(
self, request: Request, *args: Any, **kwargs: Any
) -> Request:
"""To remove csrf request for public API.
Args:
request (Request): _description_
Returns:
Request: _description_
"""
setattr(request, "csrf_processing_done", True)
return super().initialize_request(request, *args, **kwargs)
@DeploymentHelper.validate_api_key
def post(
self, request: Request, org_name: str, api_name: str, api: APIDeployment
) -> Response:
file_objs = request.FILES.getlist(ApiExecution.FILES_FORM_DATA)
serializer = ExecutionRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
timeout = serializer.get_timeout(serializer.validated_data)
include_metadata = (
request.data.get(ApiExecution.INCLUDE_METADATA, "false").lower() == "true"
)
if not file_objs or len(file_objs) == 0:
raise InvalidAPIRequest("File shouldn't be empty")
response = DeploymentHelper.execute_workflow(
organization_name=org_name,
api=api,
file_objs=file_objs,
timeout=timeout,
include_metadata=include_metadata,
)
if "error" in response and response["error"]:
return Response(
{"message": response},
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
return Response({"message": response}, status=status.HTTP_200_OK)
@DeploymentHelper.validate_api_key
def get(
self, request: Request, org_name: str, api_name: str, api: APIDeployment
) -> Response:
execution_id = request.query_params.get("execution_id")
if not execution_id:
raise InvalidAPIRequest("execution_id shouldn't be empty")
response: ExecutionResponse = DeploymentHelper.get_execution_status(
execution_id=execution_id
)
if response.execution_status != CeleryTaskState.SUCCESS.value:
return Response(
{
"status": response.execution_status,
"message": response.result,
},
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
return Response(
{"status": response.execution_status, "message": response.result},
status=status.HTTP_200_OK,
)
class APIDeploymentViewSet(viewsets.ModelViewSet):
permission_classes = [IsOwner]
def get_queryset(self) -> Optional[QuerySet]:
return APIDeployment.objects.filter(created_by=self.request.user)
def get_serializer_class(self) -> serializers.Serializer:
if self.action in ["list"]:
return APIDeploymentListSerializer
return APIDeploymentSerializer
@action(detail=True, methods=["get"])
def fetch_one(self, request: Request, pk: Optional[str] = None) -> Response:
"""Custom action to fetch a single instance."""
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def create(
self, request: Request, *args: tuple[Any], **kwargs: dict[str, Any]
) -> Response:
serializer: Serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
api_key = DeploymentHelper.create_api_key(serializer=serializer)
response_serializer = DeploymentResponseSerializer(
{"api_key": api_key.api_key, **serializer.data}
)
headers = self.get_success_headers(serializer.data)
return Response(
response_serializer.data,
status=status.HTTP_201_CREATED,
headers=headers,
)
@action(detail=True, methods=["get"])
def download_postman_collection(
self, request: Request, pk: Optional[str] = None
) -> Response:
"""Downloads a Postman Collection of the API deployment instance."""
instance = self.get_object()
api_key_inst = instance.apikey_set.filter(is_active=True).first()
if not api_key_inst:
logger.error(f"No active API key set for deployment {instance.pk}")
raise NoActiveAPIKeyError(deployment_name=instance.display_name)
postman_collection = PostmanCollection.create(
instance=instance, api_key=api_key_inst.api_key
)
response = HttpResponse(
json.dumps(postman_collection.to_dict()), content_type="application/json"
)
response["Content-Disposition"] = (
f'attachment; filename="{instance.display_name}.json"'
)
return response