Skip to content

Commit

Permalink
Add parameters support to InferResponse
Browse files Browse the repository at this point in the history
* Infer response to track parameters

* Add parameters to binding infer response

* Rank parameters argument up among InferResponse constructor arguments

* Add setting parameters to Triton response

* Send response parameters only on non-error
  • Loading branch information
kthui committed Jan 24, 2025
1 parent b771f4f commit 7b63cc9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 25 deletions.
77 changes: 66 additions & 11 deletions src/infer_response.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -39,8 +39,10 @@ namespace triton { namespace backend { namespace python {

InferResponse::InferResponse(
const std::vector<std::shared_ptr<PbTensor>>& output_tensors,
std::shared_ptr<PbError> error, const bool is_last_response, void* id)
: error_(error), is_last_response_(is_last_response), id_(id)
std::shared_ptr<PbError> error, std::string parameters,
const bool is_last_response, void* id)
: error_(error), is_last_response_(is_last_response), id_(id),
parameters_(std::move(parameters))
{
for (auto& output : output_tensors) {
if (!output) {
Expand All @@ -58,6 +60,12 @@ InferResponse::OutputTensors()
return output_tensors_;
}

std::string&
InferResponse::Parameters()
{
return parameters_;
}

bool
InferResponse::HasError()
{
Expand Down Expand Up @@ -106,6 +114,9 @@ InferResponse::SaveToSharedMemory(
j++;
}
response_shm_ptr->id = id_;

parameters_shm_ = PbString::Create(shm_pool, parameters_);
response_shm_ptr->parameters = parameters_shm_->ShmHandle();
}
}

Expand Down Expand Up @@ -143,6 +154,8 @@ InferResponse::LoadFromSharedMemory(

std::shared_ptr<PbError> pb_error;
std::vector<std::shared_ptr<PbTensor>> output_tensors;
std::shared_ptr<PbString> parameters_shm;
std::string parameters;

// If the error field is set, do not load output tensors from shared memory.
if (response_shm_ptr->has_error && response_shm_ptr->is_error_set) {
Expand All @@ -154,33 +167,43 @@ InferResponse::LoadFromSharedMemory(
bi::managed_external_buffer::handle_t* tensor_handle_shm =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_shm.data_.get() + sizeof(ResponseShm));
{
#ifdef TRITON_PB_STUB
// Need to acquire the GIL to avoid hangs.
py::gil_scoped_acquire acquire;
// Need to acquire the GIL to avoid hangs.
py::gil_scoped_acquire acquire;
#endif
for (size_t idx = 0; idx < requested_output_count; ++idx) {
std::shared_ptr<PbTensor> pb_tensor = PbTensor::LoadFromSharedMemory(
shm_pool, tensor_handle_shm[idx], open_cuda_handle);
output_tensors.emplace_back(std::move(pb_tensor));
for (size_t idx = 0; idx < requested_output_count; ++idx) {
std::shared_ptr<PbTensor> pb_tensor = PbTensor::LoadFromSharedMemory(
shm_pool, tensor_handle_shm[idx], open_cuda_handle);
output_tensors.emplace_back(std::move(pb_tensor));
}
}

parameters_shm = std::move(
PbString::LoadFromSharedMemory(shm_pool, response_shm_ptr->parameters));
parameters = parameters_shm->String();
}

return std::unique_ptr<InferResponse>(new InferResponse(
response_shm, output_tensors, pb_error,
response_shm_ptr->is_last_response, response_shm_ptr->id));
response_shm_ptr->is_last_response, response_shm_ptr->id, parameters_shm,
parameters));
}

InferResponse::InferResponse(
AllocatedSharedMemory<char>& response_shm,
std::vector<std::shared_ptr<PbTensor>>& output_tensors,
std::shared_ptr<PbError>& pb_error, const bool is_last_response, void* id)
std::shared_ptr<PbError>& pb_error, const bool is_last_response, void* id,
std::shared_ptr<PbString>& parameters_shm, std::string& parameters)
{
response_shm_ = std::move(response_shm);
output_tensors_ = std::move(output_tensors);
error_ = std::move(pb_error);
shm_handle_ = response_shm_.handle_;
id_ = id;
is_last_response_ = is_last_response;
parameters_shm_ = std::move(parameters_shm);
parameters_ = std::move(parameters);
}

std::shared_ptr<PbError>&
Expand Down Expand Up @@ -387,6 +410,38 @@ InferResponse::Send(
cuda_copy |= cuda_used;
}

if (!parameters_.empty()) {
triton::common::TritonJson::Value param;
THROW_IF_TRITON_ERROR(
param.Parse(parameters_.c_str(), parameters_.length()));
std::vector<std::string> param_keys;
THROW_IF_TRITON_ERROR(param.Members(&param_keys));
for (const auto& key : param_keys) {
triton::common::TritonJson::Value value;
if (!param.Find(key.c_str(), &value)) {
throw PythonBackendException("Unexpected missing key on parameters");
}
if (value.IsString()) {
std::string string_value;
THROW_IF_TRITON_ERROR(value.AsString(&string_value));
THROW_IF_TRITON_ERROR(TRITONBACKEND_ResponseSetStringParameter(
response, key.c_str(), string_value.c_str()));
} else if (value.IsInt()) {
int64_t int_value = 0;
THROW_IF_TRITON_ERROR(value.AsInt(&int_value));
THROW_IF_TRITON_ERROR(TRITONBACKEND_ResponseSetIntParameter(
response, key.c_str(), int_value));
} else if (value.IsBool()) {
bool bool_value = false;
THROW_IF_TRITON_ERROR(value.AsBool(&bool_value));
THROW_IF_TRITON_ERROR(TRITONBACKEND_ResponseSetBoolParameter(
response, key.c_str(), bool_value));
} else {
throw PythonBackendException("Unsupported value type on parameters");
}
}
}

#ifdef TRITON_ENABLE_GPU
if (cuda_copy) {
cudaStreamSynchronize(reinterpret_cast<cudaStream_t>(cuda_stream));
Expand Down
13 changes: 9 additions & 4 deletions src/infer_response.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -38,6 +38,7 @@ namespace triton { namespace backend { namespace python {

struct ResponseShm {
uint32_t outputs_size;
bi::managed_external_buffer::handle_t parameters;
bi::managed_external_buffer::handle_t error;
bool has_error;
// Indicates whether this error has a message or not.
Expand Down Expand Up @@ -72,9 +73,10 @@ class InferResponse {
public:
InferResponse(
const std::vector<std::shared_ptr<PbTensor>>& output_tensors,
std::shared_ptr<PbError> error = nullptr,
std::shared_ptr<PbError> error = nullptr, std::string parameters = "",
const bool is_last_response = true, void* id = nullptr);
std::vector<std::shared_ptr<PbTensor>>& OutputTensors();
std::string& Parameters();
void SaveToSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool, bool copy_gpu = true);
static std::unique_ptr<InferResponse> LoadFromSharedMemory(
Expand Down Expand Up @@ -116,8 +118,8 @@ class InferResponse {
InferResponse(
AllocatedSharedMemory<char>& response_shm,
std::vector<std::shared_ptr<PbTensor>>& output_tensors,
std::shared_ptr<PbError>& pb_error, const bool is_last_response,
void* id);
std::shared_ptr<PbError>& pb_error, const bool is_last_response, void* id,
std::shared_ptr<PbString>& parameters_shm, std::string& parameters);
std::vector<std::shared_ptr<PbTensor>> output_tensors_;

std::shared_ptr<PbError> error_;
Expand All @@ -128,6 +130,9 @@ class InferResponse {
bool is_last_response_;
// Representing the request id that the response was created from.
void* id_;

std::shared_ptr<PbString> parameters_shm_;
std::string parameters_;
};

}}} // namespace triton::backend::python
37 changes: 35 additions & 2 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -1874,11 +1874,44 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
std::shared_ptr<PbError>>(),
py::arg("output_tensors") = py::list(),
py::arg("error") = static_cast<std::shared_ptr<PbError>>(nullptr))
.def(
py::init([](const std::vector<std::shared_ptr<PbTensor>>&
output_tensors,
std::shared_ptr<PbError> error,
const py::object& parameters_) {
py::dict parameters =
PyDefaultArgumentToMutableType<py::dict>(parameters_);
for (const auto& pair : parameters) {
if (!py::isinstance<py::str>(pair.first)) {
throw PythonBackendException(
"Expect parameters keys to have type str, found type " +
std::string(py::str(pair.first.get_type())));
}
if (!py::isinstance<py::bool_>(pair.second) &&
!py::isinstance<py::int_>(pair.second) &&
!py::isinstance<py::str>(pair.second)) {
throw PythonBackendException(
"Expect parameters values to have type bool/int/str, found "
"type " +
std::string(py::str(pair.second.get_type())));
}
}
py::module_ py_json = py::module_::import("json");
std::string parameters_str =
py::str(py_json.attr("dumps")(parameters));

return std::make_shared<InferResponse>(
output_tensors, error, parameters_str /* parameters */);
}),
py::arg("output_tensors") = py::list(),
py::arg("error") = static_cast<std::shared_ptr<PbError>>(nullptr),
py::arg("parameters") = py::str())
.def(
"output_tensors", &InferResponse::OutputTensors,
py::return_value_policy::reference)
.def("has_error", &InferResponse::HasError)
.def("error", &InferResponse::Error);
.def("error", &InferResponse::Error)
.def("parameters", &InferResponse::Parameters);

py::class_<ResponseSender, std::shared_ptr<ResponseSender>>(
module, "InferenceResponseSender")
Expand Down
20 changes: 12 additions & 8 deletions src/request_executor.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -153,20 +153,22 @@ InferResponseComplete(
output_tensors.clear();
}

// TODO: [DLIS-7864] Pass response parameters from BLS response.
if (!infer_payload->IsDecoupled()) {
infer_response = std::make_unique<InferResponse>(
output_tensors, pb_error, true /* is_last_response */);
output_tensors, pb_error, "" /* parameters */,
true /* is_last_response */);
} else {
if ((flags & TRITONSERVER_RESPONSE_COMPLETE_FINAL) == 0) {
// Not the last response.
infer_response = std::make_unique<InferResponse>(
output_tensors, pb_error, false /* is_last_response */,
userp /* id */);
output_tensors, pb_error, "" /* parameters */,
false /* is_last_response */, userp /* id */);
} else {
// The last response.
infer_response = std::make_unique<InferResponse>(
output_tensors, pb_error, true /* is_last_response */,
userp /* id */);
output_tensors, pb_error, "" /* parameters */,
true /* is_last_response */, userp /* id */);
}
}

Expand All @@ -178,11 +180,13 @@ InferResponseComplete(
(flags & TRITONSERVER_RESPONSE_COMPLETE_FINAL) != 0) {
// An empty response may be the last response for decoupled models.
infer_response = std::make_unique<InferResponse>(
output_tensors, pb_error, true /* is_last_response */, userp /* id */);
output_tensors, pb_error, "" /* parameters */,
true /* is_last_response */, userp /* id */);
} else {
pb_error = std::make_shared<PbError>("Unexpected empty response.");
infer_response = std::make_unique<InferResponse>(
output_tensors, pb_error, true /* is_last_response */, userp /* id */);
output_tensors, pb_error, "" /* parameters */,
true /* is_last_response */, userp /* id */);
}

infer_payload->SetValue(std::move(infer_response));
Expand Down

0 comments on commit 7b63cc9

Please sign in to comment.