Skip to content
This repository was archived by the owner on Oct 11, 2023. It is now read-only.

Commit 31beab9

Browse files
authored
Update pipeline op & event tests to follow new pattern (#104)
* Update pipeline op & event tests to follow new pattern, make /request_id use consistent * Add documentation * flake8 and test fixes * code-review feedback
1 parent baa0660 commit 31beab9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+555
-695
lines changed

azure-iot-device/azure/iot/device/common/pipeline/pipeline_events_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class IotResponseEvent(PipelineEvent):
5656
"""
5757

5858
def __init__(self, request_id, status_code, response_body):
59-
super(IotResponseEvent, self).__init()
59+
super(IotResponseEvent, self).__init__()
6060
self.request_id = request_id
6161
self.status_code = status_code
6262
self.response_body = response_body

azure-iot-device/azure/iot/device/common/pipeline/pipeline_stages_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ def _handle_pipeline_event(self, event):
664664
op.response_body = event.response_body
665665
else:
666666
logger.warning(
667-
"IotResponseEvent with unknown RID received. Nothing to do. Dropping"
667+
"IotResponseEvent with unknown request_id received. Nothing to do. Dropping"
668668
)
669669
else:
670670
super(CoordinateRequestAndResponse, self)._handle_pipeline_event(event)

azure-iot-device/azure/iot/device/iothub/pipeline/mqtt_topic_iothub.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def get_method_topic_for_publish(request_id, status):
5151
:return: The topic for publishing method responses. It is of the format
5252
"$iothub/methods/res/<status>/?$rid=<requestId>
5353
"""
54-
return "$iothub/methods/res/{status}/?$rid={rid}".format(
55-
status=urllib.parse.quote_plus(status), rid=urllib.parse.quote_plus(request_id)
54+
return "$iothub/methods/res/{status}/?$rid={request_id}".format(
55+
status=urllib.parse.quote_plus(status), request_id=urllib.parse.quote_plus(request_id)
5656
)
5757

5858

azure-iot-device/azure/iot/device/iothub/pipeline/pipeline_ops_iothub.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,13 @@ class GetTwin(PipelineOperation):
146146
:type twin: Twin
147147
"""
148148

149-
def __init__(self):
149+
def __init__(self, callback=None):
150150
"""
151151
Initializer for GetTwin objects.
152152
"""
153-
super(GetTwin, self).__init()
153+
super(GetTwin, self).__init__(callback=callback)
154154
self.twin = None
155+
self.needs_connection = True
155156

156157

157158
class PatchTwinReportedProperties(PipelineOperation):
@@ -160,12 +161,13 @@ class PatchTwinReportedProperties(PipelineOperation):
160161
IoT Hub or Azure IoT Edge Hub service.
161162
"""
162163

163-
def __init__(self, patch):
164+
def __init__(self, patch, callback=None):
164165
"""
165166
Initializer for PatchTwinReportedProperties object
166167
167168
:param patch: The reported properties patch to send to the service.
168169
:type patch: dict, str, int, float, bool, or None (JSON compatible values)
169170
"""
170-
super(PatchTwinReportedProperties, self).__init()
171+
super(PatchTwinReportedProperties, self).__init__(callback=callback)
171172
self.patch = patch
173+
self.needs_connection = True

azure-iot-device/azure/iot/device/iothub/pipeline/pipeline_stages_iothub_mqtt.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,21 +154,21 @@ def _handle_pipeline_event(self, event):
154154
)
155155

156156
elif mqtt_topic_iothub.is_method_topic(topic):
157-
rid = mqtt_topic_iothub.get_method_request_id_from_topic(topic)
157+
request_id = mqtt_topic_iothub.get_method_request_id_from_topic(topic)
158158
method_name = mqtt_topic_iothub.get_method_name_from_topic(topic)
159159
method_received = MethodRequest(
160-
request_id=rid,
160+
request_id=request_id,
161161
name=method_name,
162162
payload=json.loads(event.payload.decode("utf-8")),
163163
)
164164
self.handle_pipeline_event(pipeline_events_iothub.MethodRequest(method_received))
165165

166166
elif mqtt_topic_iothub.is_twin_response_topic(topic):
167-
rid = mqtt_topic_iothub.get_twin_request_id_from_topic(topic)
167+
request_id = mqtt_topic_iothub.get_twin_request_id_from_topic(topic)
168168
status_code = mqtt_topic_iothub.get_twin_status_code_from_topic(topic)
169169
self.handle_pipeline_event(
170170
pipeline_events_base.IotResponseEvent(
171-
request_id=rid, status_code=status_code, response_body=event.payload
171+
request_id=request_id, status_code=status_code, response_body=event.payload
172172
)
173173
)
174174

azure-iot-device/azure/iot/device/provisioning/internal/polling_machine.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,11 @@ def _send_register_request(self, event_data):
186186
logger.info("Sending registration request")
187187
self._set_query_timer()
188188

189-
rid = str(uuid.uuid4())
189+
request_id = str(uuid.uuid4())
190190

191-
self._operations[rid] = constant.PUBLISH_TOPIC_REGISTRATION.format(rid)
191+
self._operations[request_id] = constant.PUBLISH_TOPIC_REGISTRATION.format(request_id)
192192
self._request_response_provider.send_request(
193-
rid=rid,
193+
request_id=request_id,
194194
request_payload=" ",
195195
operation_id=None,
196196
callback_on_response=self._on_register_response_received,
@@ -203,22 +203,24 @@ def _query_operation_status(self, event_data):
203203
logger.info("Querying operation status from polling machine")
204204
self._set_query_timer()
205205

206-
rid = str(uuid.uuid4())
206+
request_id = str(uuid.uuid4())
207207
result = event_data.args[0].args[0]
208208

209209
operation_id = result.operation_id
210-
self._operations[rid] = constant.PUBLISH_TOPIC_QUERYING.format(rid, operation_id)
210+
self._operations[request_id] = constant.PUBLISH_TOPIC_QUERYING.format(
211+
request_id, operation_id
212+
)
211213
self._request_response_provider.send_request(
212-
rid=rid,
214+
request_id=request_id,
213215
request_payload=" ",
214216
operation_id=operation_id,
215217
callback_on_response=self._on_query_response_received,
216218
)
217219

218-
def _on_register_response_received(self, rid, status_code, key_values_dict, response):
220+
def _on_register_response_received(self, request_id, status_code, key_values_dict, response):
219221
"""
220222
The function to call in case of a response from a registration request.
221-
:param rid: The id of the original register request.
223+
:param request_id: The id of the original register request.
222224
:param status_code: The status code in the response.
223225
:param key_values_dict: The dictionary containing the query parameters of the returned topic.
224226
:param response: The complete response from the service.
@@ -228,21 +230,21 @@ def _on_register_response_received(self, rid, status_code, key_values_dict, resp
228230
retry_after = (
229231
None if "retry-after" not in key_values_dict else str(key_values_dict["retry-after"][0])
230232
)
231-
intermediate_registration_result = RegistrationQueryStatusResult(rid, retry_after)
233+
intermediate_registration_result = RegistrationQueryStatusResult(request_id, retry_after)
232234

233235
if int(status_code, 10) >= 429:
234-
del self._operations[rid]
236+
del self._operations[request_id]
235237
self._trig_wait(intermediate_registration_result)
236238
elif int(status_code, 10) >= 300: # pure failure
237239
self._registration_error = ValueError("Incoming message failure")
238240
self._trig_error()
239241
else: # successful case, transition into complete or poll status
240-
self._process_successful_response(rid, retry_after, response)
242+
self._process_successful_response(request_id, retry_after, response)
241243

242-
def _on_query_response_received(self, rid, status_code, key_values_dict, response):
244+
def _on_query_response_received(self, request_id, status_code, key_values_dict, response):
243245
"""
244246
The function to call in case of a response from a polling/query request.
245-
:param rid: The id of the original query request.
247+
:param request_id: The id of the original query request.
246248
:param status_code: The status code in the response.
247249
:param key_values_dict: The dictionary containing the query parameters of the returned topic.
248250
:param response: The complete response from the service.
@@ -253,12 +255,12 @@ def _on_query_response_received(self, rid, status_code, key_values_dict, respons
253255
retry_after = (
254256
None if "retry-after" not in key_values_dict else str(key_values_dict["retry-after"][0])
255257
)
256-
intermediate_registration_result = RegistrationQueryStatusResult(rid, retry_after)
258+
intermediate_registration_result = RegistrationQueryStatusResult(request_id, retry_after)
257259

258260
if int(status_code, 10) >= 429:
259-
if rid in self._operations:
260-
publish_query_topic = self._operations[rid]
261-
del self._operations[rid]
261+
if request_id in self._operations:
262+
publish_query_topic = self._operations[request_id]
263+
del self._operations[request_id]
262264
topic_parts = publish_query_topic.split("$")
263265
key_values_publish_topic = urllib.parse.parse_qs(
264266
topic_parts[POS_QUERY_PARAM_PORTION]
@@ -273,17 +275,17 @@ def _on_query_response_received(self, rid, status_code, key_values_dict, respons
273275
self._registration_error = ValueError("Incoming message failure")
274276
self._trig_error()
275277
else: # successful status code case, transition into complete or another poll status
276-
self._process_successful_response(rid, retry_after, response)
278+
self._process_successful_response(request_id, retry_after, response)
277279

278-
def _process_successful_response(self, rid, retry_after, response):
280+
def _process_successful_response(self, request_id, retry_after, response):
279281
"""
280282
Fucntion to call in case of 200 response from the service
281-
:param rid: The request id
283+
:param request_id: The request id
282284
:param retry_after: The time after which to try again.
283285
:param response: The complete response
284286
"""
285-
del self._operations[rid]
286-
successful_result = self._decode_json_response(rid, retry_after, response)
287+
del self._operations[request_id]
288+
successful_result = self._decode_json_response(request_id, retry_after, response)
287289
if successful_result.status == "assigning":
288290
self._trig_wait(successful_result)
289291
elif successful_result.status == "assigned" or successful_result.status == "failed":
@@ -389,17 +391,17 @@ def _decode_complete_json_response(self, query_result, response):
389391
)
390392

391393
registration_result = RegistrationResult(
392-
rid=query_result.rid,
394+
request_id=query_result.request_id,
393395
operation_id=query_result.operation_id,
394396
status=query_result.status,
395397
registration_state=registration_state,
396398
)
397399
return registration_result
398400

399-
def _decode_json_response(self, rid, retry_after, response):
401+
def _decode_json_response(self, request_id, retry_after, response):
400402
"""
401403
Decodes the json response for operation id and status
402-
:param rid: The request id.
404+
:param request_id: The request id.
403405
:param retry_after: The time in secs after which to retry.
404406
:param response: The complete response from the service.
405407
"""
@@ -410,7 +412,7 @@ def _decode_json_response(self, rid, retry_after, response):
410412
)
411413
status = None if "status" not in decoded_result else str(decoded_result["status"])
412414

413-
return RegistrationQueryStatusResult(rid, retry_after, operation_id, status)
415+
return RegistrationQueryStatusResult(request_id, retry_after, operation_id, status)
414416

415417
def _on_disconnect_completed_error(self):
416418
logger.info("on_disconnect_completed for Device Provisioning Service")

azure-iot-device/azure/iot/device/provisioning/internal/registration_query_status_result.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,22 @@ class RegistrationQueryStatusResult(object):
1515
from the provisioning service.
1616
"""
1717

18-
def __init__(self, rid=None, retry_after=None, operation_id=None, status=None):
18+
def __init__(self, request_id=None, retry_after=None, operation_id=None, status=None):
1919
"""
20-
:param rid: The request id to which the response is being obtained
20+
:param request_id: The request id to which the response is being obtained
2121
:param retry_after : Number of secs after which to retry again.
2222
:param operation_id: The id of the operation as returned by the initial registration request.
2323
:param status: The status of the registration process.
2424
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
2525
from the provisioning service.
2626
"""
27-
self._request_id = rid
27+
self._request_id = request_id
2828
self._operation_id = operation_id
2929
self._status = status
3030
self._retry_after = retry_after
3131

3232
@property
33-
def rid(self):
33+
def request_id(self):
3434
return self._request_id
3535

3636
@property

azure-iot-device/azure/iot/device/provisioning/internal/request_response_provider.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@ def __init__(self, provisioning_pipeline):
2828

2929
self._pending_requests = {}
3030

31-
def send_request(self, rid, request_payload, operation_id=None, callback_on_response=None):
31+
def send_request(
32+
self, request_id, request_payload, operation_id=None, callback_on_response=None
33+
):
3234
"""
3335
Sends a request
34-
:param rid: Id of the request
36+
:param request_id: Id of the request
3537
:param request_payload: The payload of the request.
3638
:param operation_id: A id of the operation in case it is an ongoing process.
3739
:param callback_on_response: callback which is called when response comes back for this request.
3840
"""
39-
self._pending_requests[rid] = callback_on_response
41+
self._pending_requests[request_id] = callback_on_response
4042
self._provisioning_pipeline.send_request(
41-
rid=rid,
43+
request_id=request_id,
4244
request_payload=request_payload,
4345
operation_id=operation_id,
4446
callback=self._on_publish_completed,
@@ -64,10 +66,10 @@ def disable_responses(self, callback=None):
6466
callback = self._on_unsubscribe_completed
6567
self._provisioning_pipeline.disable_responses(callback=callback)
6668

67-
def _receive_response(self, rid, status_code, key_value_dict, response_payload):
69+
def _receive_response(self, request_id, status_code, key_value_dict, response_payload):
6870
"""
6971
Handler that processes the response from the service.
70-
:param rid: The id of the request which is being responded to.
72+
:param request_id: The id of the request which is being responded to.
7173
:param status_code: The status code inside the response
7274
:param key_value_dict: A dictionary of keys mapped to a list of values extracted from the topic of the response.
7375
:param response_payload: String payload of the message received.
@@ -79,13 +81,13 @@ def _receive_response(self, rid, status_code, key_value_dict, response_payload):
7981
# """
8082
logger.info("Received response {}:".format(response_payload))
8183

82-
if rid in self._pending_requests:
83-
callback = self._pending_requests[rid]
84+
if request_id in self._pending_requests:
85+
callback = self._pending_requests[request_id]
8486
# Only send the status code and the extracted topic
85-
callback(rid, status_code, key_value_dict, response_payload)
86-
del self._pending_requests[rid]
87+
callback(request_id, status_code, key_value_dict, response_payload)
88+
del self._pending_requests[request_id]
8789

88-
# TODO : What happens when rid if not there ? trigger error ?
90+
# TODO : What happens when request_id if not there ? trigger error ?
8991

9092
def _on_connection_state_change(self, new_state):
9193
"""Handler to be called by the pipeline upon a connection state change."""

azure-iot-device/azure/iot/device/provisioning/models/registration_result.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ class RegistrationResult(object):
1616
from the provisioning service.
1717
"""
1818

19-
def __init__(self, rid, operation_id, status, registration_state=None):
19+
def __init__(self, request_id, operation_id, status, registration_state=None):
2020
"""
21-
:param rid: The request id to which the response is being obtained
21+
:param request_id: The request id to which the response is being obtained
2222
:param operation_id: The id of the operation as returned by the initial registration request.
2323
:param status: The status of the registration process.
2424
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
2525
:param registration_state : Details like device id, assigned hub , date times etc returned
2626
from the provisioning service.
2727
"""
28-
self._request_id = rid
28+
self._request_id = request_id
2929
self._operation_id = operation_id
3030
self._status = status
3131
self._registration_state = registration_state

azure-iot-device/azure/iot/device/provisioning/pipeline/mqtt_topic.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,24 @@ def get_topic_for_subscribe():
2424
return _get_topic_base() + "res/#"
2525

2626

27-
def get_topic_for_register(rid):
27+
def get_topic_for_register(request_id):
2828
"""
2929
return the topic string used to publish telemetry
3030
"""
31-
return (_get_topic_base() + "PUT/iotdps-register/?$rid={rid}").format(rid=rid)
31+
return (_get_topic_base() + "PUT/iotdps-register/?$rid={request_id}").format(
32+
request_id=request_id
33+
)
3234

3335

34-
def get_topic_for_query(rid, operation_id):
36+
def get_topic_for_query(request_id, operation_id):
3537
"""
3638
:return: The topic for cloud to device messages.It is of the format
3739
"devices/<deviceid>/messages/devicebound/#"
3840
"""
3941
return (
40-
_get_topic_base() + "GET/iotdps-get-operationstatus/?$rid={rid}&operationId={operation_id}"
41-
).format(rid=rid, operation_id=operation_id)
42+
_get_topic_base()
43+
+ "GET/iotdps-get-operationstatus/?$rid={request_id}&operationId={operation_id}"
44+
).format(request_id=request_id, operation_id=operation_id)
4245

4346

4447
def get_topic_for_response():

0 commit comments

Comments
 (0)