@@ -164,53 +164,77 @@ InferResponse::Error()
164164}
165165
166166#ifndef TRITON_PB_STUB
167- TRITONSERVER_Error*
167+ std::shared_ptr< TRITONSERVER_Error*>
168168InferResponse::Send (
169169 TRITONBACKEND_ResponseFactory* response_factory, void * cuda_stream,
170- const uint32_t flags)
170+ bool & requires_deferred_callback, const uint32_t flags,
171+ std::unique_ptr<SharedMemoryManager>& shm_pool,
172+ std::vector<std::pair<std::unique_ptr<PbMemory>, void *>>& output_buffers,
173+ const std::set<std::string>& requested_output_names,
174+ TRITONBACKEND_Response* response)
171175{
172- // [FIXME] Use this code to send responses in non-decoupled mode.
173- TRITONBACKEND_Response* response = nullptr ;
174- TRITONSERVER_Error* response_error = nullptr ;
175- ScopedDefer response_error_handling ([&response, &response_error, flags,
176- response_factory] {
177- if (response != nullptr ) {
178- LOG_IF_ERROR (
179- TRITONBACKEND_ResponseSend (response, flags, response_error),
180- " failed to send the response." );
181- if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL ) {
182- std::unique_ptr<
183- TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
184- response_factory_ptr (
185- reinterpret_cast <TRITONBACKEND_ResponseFactory*>(response_factory));
186- }
187- }
188- });
176+ std::shared_ptr<TRITONSERVER_Error*> response_error =
177+ WrapTritonErrorInSharedPtr (nullptr );
178+ std::unique_ptr<ScopedDefer> response_error_handling;
179+ requires_deferred_callback = false ;
180+
181+ // Should only destruct the response factory whenever a response factory is
182+ // being created.
183+ bool destruct_response_factor = (response == nullptr );
184+
185+ if (response == nullptr ) {
186+ SET_ERROR_AND_RETURN (
187+ response_error,
188+ TRITONBACKEND_ResponseNewFromFactory (&response, response_factory));
189+ }
189190
190- SET_ERROR_AND_RETURN (
191- response_error,
192- TRITONBACKEND_ResponseNewFromFactory (&response, response_factory));
191+ // This lambda expression will be called when this function exits, if the
192+ // inference response doesn't have any GPU tensors. Otherwise, it will be
193+ // called when the object is destructed or DeferredSendCallback is called.
194+ response_error_handling = std::make_unique<ScopedDefer>(
195+ [response, response_error, flags, response_factory,
196+ destruct_response_factor] {
197+ if (response != nullptr ) {
198+ LOG_IF_ERROR (
199+ TRITONBACKEND_ResponseSend (response, flags, *response_error),
200+ " failed to send the response." );
201+ if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL &&
202+ destruct_response_factor) {
203+ std::unique_ptr<
204+ TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
205+ response_factory_ptr (
206+ reinterpret_cast <TRITONBACKEND_ResponseFactory*>(
207+ response_factory));
208+ }
209+ }
210+ });
211+
212+ // Moves the response sending callback so that it is not called until the stub
213+ // process fills in the GPU buffers.
214+ ScopedDefer deferred_task (
215+ [this , &requires_deferred_callback, &response_error_handling] {
216+ if (requires_deferred_callback) {
217+ deferred_send_callback_ = std::move (response_error_handling);
218+ }
219+ });
193220
194221 if (HasError ()) {
195- response_error = TRITONSERVER_ErrorNew (
222+ * response_error = TRITONSERVER_ErrorNew (
196223 TRITONSERVER_ERROR_INTERNAL , Error ()->Message ().c_str ());
197224 return nullptr ;
198225 }
199226
200227 bool cuda_copy = false ;
228+
201229 for (auto & output_tensor : OutputTensors ()) {
202230 TRITONSERVER_MemoryType src_memory_type = output_tensor->MemoryType ();
203231 int64_t src_memory_type_id = output_tensor->MemoryTypeId ();
204232
205233 TRITONSERVER_MemoryType actual_memory_type = src_memory_type;
206234 int64_t actual_memory_type_id = src_memory_type_id;
207235
208- // [FIXME] GPU tensors are not supported in the decoupled API mode.
209236 if (actual_memory_type == TRITONSERVER_MEMORY_GPU ) {
210- response_error = TRITONSERVER_ErrorNew (
211- TRITONSERVER_ERROR_INTERNAL ,
212- " GPU tensors are not supported in decoupled API." );
213- return response_error;
237+ requires_deferred_callback = true ;
214238 }
215239
216240 TRITONBACKEND_Output* response_output;
@@ -222,12 +246,61 @@ InferResponse::Send(
222246 output_tensor->Dims ().data (), output_tensor->Dims ().size ()));
223247
224248 void * buffer;
225- bool cuda_used = false ;
226249 SET_ERROR_AND_RETURN (
227250 response_error, TRITONBACKEND_OutputBuffer (
228251 response_output, &buffer, output_tensor->ByteSize (),
229252 &actual_memory_type, &actual_memory_type_id));
230253
254+ bool cuda_used = false ;
255+ TRITONSERVER_BufferAttributes* output_buffer_attributes;
256+ SET_ERROR_AND_RETURN (
257+ response_error, TRITONBACKEND_OutputBufferAttributes (
258+ response_output, &output_buffer_attributes));
259+
260+ std::unique_ptr<PbMemory> output_buffer;
261+ if (src_memory_type == TRITONSERVER_MEMORY_GPU &&
262+ actual_memory_type == TRITONSERVER_MEMORY_GPU ) {
263+ #ifdef TRITON_ENABLE_GPU
264+ cudaIpcMemHandle_t* cuda_ipc_mem_handle_p;
265+ SET_ERROR_AND_RETURN (
266+ response_error,
267+ TRITONSERVER_BufferAttributesCudaIpcHandle (
268+ output_buffer_attributes,
269+ reinterpret_cast <void **>(&cuda_ipc_mem_handle_p)));
270+
271+ if (cuda_ipc_mem_handle_p != nullptr ) {
272+ SET_ERROR_AND_RETURN_IF_EXCEPTION (
273+ response_error,
274+ output_buffer = PbMemory::Create (
275+ shm_pool, actual_memory_type, actual_memory_type_id,
276+ output_tensor->ByteSize (), reinterpret_cast <char *>(buffer),
277+ false /* copy_gpu */ ));
278+ output_buffer->SetCudaIpcHandle (cuda_ipc_mem_handle_p);
279+ } else {
280+ SET_ERROR_AND_RETURN_IF_EXCEPTION (
281+ response_error,
282+ output_buffer = PbMemory::Create (
283+ shm_pool, actual_memory_type, actual_memory_type_id,
284+ output_tensor->ByteSize (), reinterpret_cast <char *>(buffer),
285+ true /* copy_gpu */ ));
286+ }
287+ output_buffers.push_back ({std::move (output_buffer), buffer});
288+ #endif
289+ }
290+
291+ // When we requested a GPU buffer but received a CPU buffer.
292+ if (src_memory_type == TRITONSERVER_MEMORY_GPU &&
293+ (actual_memory_type == TRITONSERVER_MEMORY_CPU ||
294+ actual_memory_type == TRITONSERVER_MEMORY_CPU_PINNED )) {
295+ SET_ERROR_AND_RETURN_IF_EXCEPTION (
296+ response_error,
297+ output_buffer = PbMemory::Create (
298+ shm_pool, actual_memory_type, actual_memory_type_id,
299+ output_tensor->ByteSize (), nullptr /* data ptr */ ));
300+
301+ output_buffers.push_back ({std::move (output_buffer), buffer});
302+ }
303+
231304 if (src_memory_type != TRITONSERVER_MEMORY_GPU ) {
232305 SET_ERROR_AND_RETURN (
233306 response_error,
@@ -251,4 +324,12 @@ InferResponse::Send(
251324}
252325#endif
253326
327+ #ifndef TRITON_PB_STUB
328+ void
329+ InferResponse::DeferredSendCallback ()
330+ {
331+ deferred_send_callback_.reset ();
332+ }
333+ #endif
334+
254335}}} // namespace triton::backend::python
0 commit comments