Skip to content

Commit 9e8e320

Browse files
committed
resolve comments
1 parent b144fc6 commit 9e8e320

File tree

9 files changed

+353
-64
lines changed

9 files changed

+353
-64
lines changed

qa/L0_backend_python/decoupled/decoupled_test.py

Lines changed: 122 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -325,62 +325,137 @@ def test_decoupled_execute_cancel(self):
325325
self.assertIn("[execute_cancel] Request cancelled at ", log_text)
326326

327327
def test_decoupled_bls_cancel(self):
328-
model_name = "decoupled_bls_cancel"
328+
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
329329
input_value = 1
330330
max_sum_value = 10
331+
ignore_cancel = False
331332
user_data = UserData()
333+
for model_name in model_names:
334+
with self._shm_leak_detector.Probe() as shm_probe:
335+
with grpcclient.InferenceServerClient(
336+
f"{_tritonserver_ipaddr}:8001"
337+
) as client:
338+
client.start_stream(callback=partial(callback, user_data))
339+
input_data = np.array([input_value], dtype=np.int32)
340+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
341+
ignore_cancel_data = np.array([ignore_cancel], dtype=np.bool_)
342+
inputs = [
343+
grpcclient.InferInput(
344+
"INPUT",
345+
input_data.shape,
346+
np_to_triton_dtype(input_data.dtype),
347+
),
348+
grpcclient.InferInput(
349+
"MAX_SUM",
350+
max_sum_data.shape,
351+
np_to_triton_dtype(max_sum_data.dtype),
352+
),
353+
grpcclient.InferInput(
354+
"IGNORE_CANCEL",
355+
ignore_cancel_data.shape,
356+
np_to_triton_dtype(ignore_cancel_data.dtype),
357+
),
358+
]
359+
inputs[0].set_data_from_numpy(input_data)
360+
inputs[1].set_data_from_numpy(max_sum_data)
361+
inputs[2].set_data_from_numpy(ignore_cancel_data)
362+
client.async_stream_infer(model_name, inputs)
363+
364+
# Check the results of the decoupled model using BLS
365+
def check_result(result):
366+
# Make sure the result is not an exception
367+
self.assertIsNot(type(result), InferenceServerException)
368+
is_cancelled = result.as_numpy("IS_CANCELLED")
369+
self.assertTrue(
370+
is_cancelled[0],
371+
"error: expected the request to be cancelled",
372+
)
332373

333-
with self._shm_leak_detector.Probe() as shm_probe:
334-
with grpcclient.InferenceServerClient(
335-
f"{_tritonserver_ipaddr}:8001"
336-
) as client:
337-
client.start_stream(callback=partial(callback, user_data))
338-
input_data = np.array([input_value], dtype=np.int32)
339-
max_sum_data = np.array([max_sum_value], dtype=np.int32)
340-
inputs = [
341-
grpcclient.InferInput(
342-
"INPUT", input_data.shape, np_to_triton_dtype(input_data.dtype)
343-
),
344-
grpcclient.InferInput(
345-
"MAX_SUM",
346-
max_sum_data.shape,
347-
np_to_triton_dtype(max_sum_data.dtype),
348-
),
349-
]
350-
inputs[0].set_data_from_numpy(input_data)
351-
inputs[1].set_data_from_numpy(max_sum_data)
352-
client.async_stream_infer(model_name, inputs)
374+
sum_data = result.as_numpy("SUM")
375+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
376+
self.assertTrue(
377+
np.array_equal(sum_data, max_sum_data),
378+
"error: expected output {} to match input {}".format(
379+
sum_data, max_sum_data
380+
),
381+
)
353382

354-
# Check the results of the decoupled model using BLS
355-
def check_result(result):
356-
# Make sure the result is not an exception
357-
self.assertIsNot(type(result), InferenceServerException)
383+
result = user_data._completed_requests.get()
384+
check_result(result)
358385

359-
sum_data = result.as_numpy("SUM")
360-
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
361-
self.assertTrue(
362-
np.array_equal(sum_data, max_sum_data),
363-
"error: expected output {} to match input {}".format(
364-
sum_data, max_sum_data
386+
def test_decoupled_bls_ignore_cancel(self):
387+
model_names = ["decoupled_bls_cancel", "decoupled_bls_async_cancel"]
388+
input_value = 1
389+
max_sum_value = 10
390+
ignore_cancel = True
391+
user_data = UserData()
392+
for model_name in model_names:
393+
with self._shm_leak_detector.Probe() as shm_probe:
394+
with grpcclient.InferenceServerClient(
395+
f"{_tritonserver_ipaddr}:8001"
396+
) as client:
397+
client.start_stream(callback=partial(callback, user_data))
398+
input_data = np.array([input_value], dtype=np.int32)
399+
max_sum_data = np.array([max_sum_value], dtype=np.int32)
400+
ignore_cancel_data = np.array([ignore_cancel], dtype=np.bool_)
401+
inputs = [
402+
grpcclient.InferInput(
403+
"INPUT",
404+
input_data.shape,
405+
np_to_triton_dtype(input_data.dtype),
365406
),
366-
)
407+
grpcclient.InferInput(
408+
"MAX_SUM",
409+
max_sum_data.shape,
410+
np_to_triton_dtype(max_sum_data.dtype),
411+
),
412+
grpcclient.InferInput(
413+
"IGNORE_CANCEL",
414+
ignore_cancel_data.shape,
415+
np_to_triton_dtype(ignore_cancel_data.dtype),
416+
),
417+
]
418+
inputs[0].set_data_from_numpy(input_data)
419+
inputs[1].set_data_from_numpy(max_sum_data)
420+
inputs[2].set_data_from_numpy(ignore_cancel_data)
421+
client.async_stream_infer(model_name, inputs)
422+
423+
# Check the results of the decoupled model using BLS
424+
def check_result(result):
425+
# Make sure the result is not an exception
426+
self.assertIsNot(type(result), InferenceServerException)
427+
is_cancelled = result.as_numpy("IS_CANCELLED")
428+
self.assertFalse(
429+
is_cancelled[0],
430+
"error: expected the request not being cancelled",
431+
)
367432

368-
result = user_data._completed_requests.get()
369-
check_result(result)
433+
sum_data = result.as_numpy("SUM")
434+
self.assertIsNotNone(sum_data, "error: expected 'SUM'")
435+
self.assertTrue(
436+
sum_data > max_sum_data,
437+
"error: expected sum_data {} to be greater than max_sum_data {}".format(
438+
sum_data, max_sum_data
439+
),
440+
)
441+
442+
result = user_data._completed_requests.get()
443+
check_result(result)
370444

371-
def test_decoupled_bls_async_cancel(self):
372-
model_name = "decoupled_bls_async_cancel"
445+
def test_decoupled_bls_cancel_after_completion(self):
446+
model_name = "decoupled_bls_cancel_after_complete"
373447
input_value = 1
374448
max_sum_value = 10
449+
ignore_cancel = False
375450
user_data = UserData()
376-
377451
with self._shm_leak_detector.Probe() as shm_probe:
378452
with grpcclient.InferenceServerClient(
379453
f"{_tritonserver_ipaddr}:8001"
380454
) as client:
381455
client.start_stream(callback=partial(callback, user_data))
382456
input_data = np.array([input_value], dtype=np.int32)
383457
max_sum_data = np.array([max_sum_value], dtype=np.int32)
458+
ignore_cancel_data = np.array([ignore_cancel], dtype=np.bool_)
384459
inputs = [
385460
grpcclient.InferInput(
386461
"INPUT", input_data.shape, np_to_triton_dtype(input_data.dtype)
@@ -390,15 +465,25 @@ def test_decoupled_bls_async_cancel(self):
390465
max_sum_data.shape,
391466
np_to_triton_dtype(max_sum_data.dtype),
392467
),
468+
grpcclient.InferInput(
469+
"IGNORE_CANCEL",
470+
ignore_cancel_data.shape,
471+
np_to_triton_dtype(ignore_cancel_data.dtype),
472+
),
393473
]
394474
inputs[0].set_data_from_numpy(input_data)
395475
inputs[1].set_data_from_numpy(max_sum_data)
476+
inputs[2].set_data_from_numpy(ignore_cancel_data)
396477
client.async_stream_infer(model_name, inputs)
397478

398479
# Check the results of the decoupled model using BLS
399480
def check_result(result):
400481
# Make sure the result is not an exception
401482
self.assertIsNot(type(result), InferenceServerException)
483+
is_cancelled = result.as_numpy("IS_CANCELLED")
484+
self.assertTrue(
485+
is_cancelled[0], "error: expected the request to be cancelled"
486+
)
402487

403488
sum_data = result.as_numpy("SUM")
404489
self.assertIsNotNone(sum_data, "error: expected 'SUM'")

qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/1/model.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,30 @@
3030

3131

3232
class TritonPythonModel:
33-
"""This model will send a decoupled bls request to 'response_sender_until_cancelled' model
34-
The model will start adding the response from the model.
35-
When the MAX_SUM is reached. The model will call the response iterarior cancel() method to
36-
cancel the response stream.
37-
The number of response should not reach MAX_NUMBER_OF_RESPONSE.
33+
"""
34+
This model sends a decoupled bls inference request to 'response_sender_until_cancelled'
35+
model, and sums up its responses.
36+
Once the MAX_SUM is reached, the model will call the response iterator's
37+
cancel() method to cancel the response stream.
38+
If the IGNORE_CANCEL is set to True, the 'response_sender_until_cancelled' model will not hornor
39+
the request cancellation and keep sending the output to the model.
40+
The number of total responses should not reach MAX_NUMBER_OF_RESPONSE.
3841
"""
3942

4043
async def execute(self, requests):
4144
max_sum = (
4245
pb_utils.get_input_tensor_by_name(requests[0], "MAX_SUM").as_numpy().flat[0]
4346
)
4447
input = pb_utils.get_input_tensor_by_name(requests[0], "INPUT")
48+
ignore_cancel = pb_utils.get_input_tensor_by_name(requests[0], "IGNORE_CANCEL")
4549
delay = pb_utils.Tensor("DELAY", np.array([50], dtype=np.int32))
4650
max_num_response = pb_utils.Tensor(
4751
"MAX_NUMBER_OF_RESPONSE", np.array([100], dtype=np.int32)
4852
)
4953

5054
infer_request = pb_utils.InferenceRequest(
5155
model_name="response_sender_until_cancelled",
52-
inputs=[input, max_num_response, delay],
56+
inputs=[input, max_num_response, delay, ignore_cancel],
5357
requested_output_names=["OUTPUT"],
5458
)
5559

@@ -69,18 +73,18 @@ async def execute(self, requests):
6973
out = pb_utils.get_output_tensor_by_name(
7074
infer_response, "OUTPUT"
7175
).as_numpy()[0]
72-
if response_sum + out > max_sum:
73-
response_stream.cancel()
74-
else:
75-
response_sum += out
7676

77-
if error is None and not is_cancelled:
78-
error = pb_utils.TritonError("request is not cancelled!")
77+
response_sum += out
78+
if response_sum >= max_sum:
79+
response_stream.cancel()
7980

8081
responses = [
8182
pb_utils.InferenceResponse(
8283
output_tensors=[
83-
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32))
84+
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32)),
85+
pb_utils.Tensor(
86+
"IS_CANCELLED", np.array([is_cancelled], dtype=np.bool_)
87+
),
8488
],
8589
error=error,
8690
)

qa/L0_backend_python/decoupled/models/decoupled_bls_async_cancel/config.pbtxt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,23 @@ input [
3737
name: "MAX_SUM"
3838
data_type: TYPE_INT32
3939
dims: [ 1 ]
40+
},
41+
{
42+
name: "IGNORE_CANCEL"
43+
data_type: TYPE_BOOL
44+
dims: [ 1 ]
4045
}
4146
]
4247
output [
4348
{
4449
name: "SUM"
4550
data_type: TYPE_INT32
4651
dims: [ 1 ]
52+
},
53+
{
54+
name: "IS_CANCELLED"
55+
data_type: TYPE_BOOL
56+
dims: [ 1 ]
4757
}
4858
]
4959

qa/L0_backend_python/decoupled/models/decoupled_bls_cancel/1/model.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,30 @@
2929

3030

3131
class TritonPythonModel:
32-
"""This model will send a decoupled bls request to 'response_sender_until_cancelled' model
33-
The model will start adding the response from the model.
34-
When the MAX_SUM is reached. The model will call the response iterarior cancel() method to
35-
cancel the response stream.
36-
The number of response should not reach MAX_NUMBER_OF_RESPONSE.
32+
"""
33+
This model sends a decoupled bls inference request to 'response_sender_until_cancelled'
34+
model, and sums up its responses.
35+
Once the MAX_SUM is reached, the model will call the response iterator's
36+
cancel() method to cancel the response stream.
37+
If the IGNORE_CANCEL is set to True, the 'response_sender_until_cancelled' model will not hornor
38+
the request cancellation and keep sending the output to the model.
39+
The number of total responses should not reach MAX_NUMBER_OF_RESPONSE.
3740
"""
3841

3942
def execute(self, requests):
4043
max_sum = (
4144
pb_utils.get_input_tensor_by_name(requests[0], "MAX_SUM").as_numpy().flat[0]
4245
)
4346
input = pb_utils.get_input_tensor_by_name(requests[0], "INPUT")
47+
ignore_cancel = pb_utils.get_input_tensor_by_name(requests[0], "IGNORE_CANCEL")
4448
delay = pb_utils.Tensor("DELAY", np.array([50], dtype=np.int32))
4549
max_num_response = pb_utils.Tensor(
4650
"MAX_NUMBER_OF_RESPONSE", np.array([100], dtype=np.int32)
4751
)
4852

4953
infer_request = pb_utils.InferenceRequest(
5054
model_name="response_sender_until_cancelled",
51-
inputs=[input, max_num_response, delay],
55+
inputs=[input, max_num_response, delay, ignore_cancel],
5256
requested_output_names=["OUTPUT"],
5357
)
5458

@@ -68,18 +72,18 @@ def execute(self, requests):
6872
out = pb_utils.get_output_tensor_by_name(
6973
infer_response, "OUTPUT"
7074
).as_numpy()[0]
71-
if response_sum + out > max_sum:
72-
response_stream.cancel()
73-
else:
74-
response_sum += out
7575

76-
if error is None and not is_cancelled:
77-
error = pb_utils.TritonError("request is not cancelled!")
76+
response_sum += out
77+
if response_sum >= max_sum:
78+
response_stream.cancel()
7879

7980
responses = [
8081
pb_utils.InferenceResponse(
8182
output_tensors=[
82-
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32))
83+
pb_utils.Tensor("SUM", np.array([response_sum], dtype=np.int32)),
84+
pb_utils.Tensor(
85+
"IS_CANCELLED", np.array([is_cancelled], dtype=np.bool_)
86+
),
8387
],
8488
error=error,
8589
)

qa/L0_backend_python/decoupled/models/decoupled_bls_cancel/config.pbtxt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,23 @@ input [
3737
name: "MAX_SUM"
3838
data_type: TYPE_INT32
3939
dims: [ 1 ]
40+
},
41+
{
42+
name: "IGNORE_CANCEL"
43+
data_type: TYPE_BOOL
44+
dims: [ 1 ]
4045
}
4146
]
4247
output [
4348
{
4449
name: "SUM"
4550
data_type: TYPE_INT32
4651
dims: [ 1 ]
52+
},
53+
{
54+
name: "IS_CANCELLED"
55+
data_type: TYPE_BOOL
56+
dims: [ 1 ]
4757
}
4858
]
4959

0 commit comments

Comments
 (0)