-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathClientCoreState.hpp
357 lines (308 loc) · 16.3 KB
/
ClientCoreState.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
/*
* Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
/**
* @file ClientCoreState.hpp
* @brief
*
*/
#pragma once
#include <condition_variable>
#include <chrono>
#include "util/Core_EXPORTS.hpp"
#include "util/Utf8String.hpp"
#include "util/memory/stl/Map.hpp"
#include "util/memory/stl/Queue.hpp"
#include "Action.hpp"
#include "ResponseCode.hpp"
#include "NetworkConnection.hpp"
/**
* Default sleep duration between each execution of Client Core thread operations
*/
#define DEFAULT_CORE_THREAD_SLEEP_DURATION_MS 100
/**
* Max size of the queue
*/
#define DEFAULT_MAX_QUEUE_SIZE 16
namespace awsiotsdk {
/**
* @brief MQTT Disconnect Callback Context Data
*
* This class can be used to provide customer context data to be provided with each disconnect callback.
* Uses a pure virtual destructor to allow for polymorphism
*/
class DisconnectCallbackContextData {
public:
virtual ~DisconnectCallbackContextData() = 0;
};
/**
* @brief MQTT Reconnect Callback Context Data
*
* This class can be used to provide customer context data to be provided with each reconnect callback.
* Uses a pure virtual destructor to allow for polymorphism
*/
class ReconnectCallbackContextData {
public:
virtual ~ReconnectCallbackContextData() = 0;
};
/**
* @brief MQTT Resubscribe Callback Context Data
*
* This class can be used to provide customer context data to be provided with each resubscribe callback.
* Uses a pure virtual destructor to allow for polymorphism
*/
class ResubscribeCallbackContextData {
public:
virtual ~ResubscribeCallbackContextData() = 0;
};
/**
* @brief Client Core State Class
*
* Defining a class for the Core Client State.
* This class is responsible for maintaing the state information for the core client
* It can also be extended to provide state information to Actions.
* It contains the action queue, an action registry and also keeps track of
* the Common Action State as well as the Network connection
*
*/
class ClientCoreState : public ActionState {
protected:
/**
* @brief Pending Ack Data Class
*
* Defining an internal class for storing information about Pending Acks.
*
*/
class PendingAckData {
public:
std::chrono::system_clock::time_point time_of_request_; ///< Time at which the request was sent
ActionData::AsyncAckNotificationHandlerPtr p_async_ack_handler_; ///< Handler to which response must be sent
};
std::atomic<uint16_t> next_action_id_; ///< Atomic, ID of the next Action that will be enqueued
std::atomic_int cur_core_threads_; ///< Atomic, Count of currently running core threads
std::atomic_int max_hardware_threads_; ///< Atomic, Count of the maximum allowed hardware threads
std::atomic_size_t max_queue_size_; ///< Atomic, Current configured max queue size
std::chrono::seconds ack_timeout_; ///< Timeout for pending Acks, older Acks are deleted with a failed response
std::mutex register_action_lock_; ///< Mutex for Register Action Request flow
std::mutex ack_map_lock_; ///< Mutex for Ack Map operations
// Used to perform blocking sync actions
std::mutex sync_action_request_lock_; ///< Mutex for Sync Action Request flow
std::mutex sync_action_response_lock_; ///< Mutex for Sync Action Response flow
std::condition_variable sync_action_response_wait_; ///< Condition variable used to wake up calling thread on Sync Action response
ResponseCode sync_action_response_; ///< Variable to store received Sync Action response
std::atomic_bool process_queued_actions_; ///< Atomic, indicates whether currently queued Actions should be processed or not
std::shared_ptr<std::atomic_bool> continue_execution_; ///< Atomic, Used to synchronize running threads, false value causes running threads to stop
util::Map<ActionType, std::unique_ptr<Action>> action_map_; ///< Map containing currently initialized Action Instances
util::Map<uint16_t, std::unique_ptr<PendingAckData>> pending_ack_map_; ///< Map containing currently pending Acks
util::Map<ActionType, Action::CreateHandlerPtr> action_create_handler_map_; ///< Map containing currently registered Action Types and corrosponding Factories
util::Queue<std::pair<ActionType, std::shared_ptr<ActionData>>> outbound_action_queue_; ///< Queue of outbound actions
/**
* @brief Internal Action Handler for Sync Action responses
*
* @param action_id - ID of the Action that response was received for
* @param rc - Received response
*/
void SyncActionHandler(uint16_t action_id, ResponseCode rc);
public:
/**
* @brief Define Handler for Disconnect Callbacks
*
* This handler is used to provide notification to the application when a disconnect occurs
* NOTE: This handler should be NON-BLOCKING
*/
typedef std::function<ResponseCode(util::String mqtt_client_id,
std::shared_ptr<DisconnectCallbackContextData> p_app_handler_data)> ApplicationDisconnectCallbackPtr;
ApplicationDisconnectCallbackPtr disconnect_handler_ptr_; ///< Pointer to the Application Disconnect Callback
std::shared_ptr<DisconnectCallbackContextData> p_disconnect_app_handler_data_; ///< Data to be passed to the Application Handler
/**
* @brief Define Handler for Reconnect Callbacks
*
* This handler is used to provide notification to the application when a reconnect occurs
* NOTE: This handler should be NON-BLOCKING
*/
typedef std::function<ResponseCode(util::String mqtt_client_id,
std::shared_ptr<ReconnectCallbackContextData> p_app_handler_data,
ResponseCode reconnect_result)> ApplicationReconnectCallbackPtr;
ApplicationReconnectCallbackPtr reconnect_handler_ptr_; ///< Pointer to the Application Reconnect Callback
std::shared_ptr<ReconnectCallbackContextData> p_reconnect_app_handler_data_; ///< Data to be passed to the Application Handler
/**
* @brief Define Handler for Resubscribe Callbacks
*
* This handler is used to provide notification to the application when a resubscribe occurs.
* NOTE: This handler should be NON-BLOCKING
*/
typedef std::function<ResponseCode(util::String mqtt_client_id,
std::shared_ptr<ResubscribeCallbackContextData> p_app_handler_data,
ResponseCode resubscribe_result)> ApplicationResubscribeCallbackPtr;
ApplicationResubscribeCallbackPtr resubscribe_handler_ptr_; ///< Pointer to the Application Resubscribe Callback
std::shared_ptr<ResubscribeCallbackContextData> p_resubscribe_app_handler_data_; ///< Data to be passed to the Application Handler
/**
* @brief Network connection instance to use for this instance of the Client
* This is shared between Actions, public to avoid multiple shared pointer operations while passing as argument by ClientCore
*/
std::shared_ptr<NetworkConnection> p_network_connection_;
/**
* @brief Overload for Get next Action ID
* @return uint16_t Action ID
*/
virtual uint16_t GetNextActionId() {
if (UINT16_MAX == next_action_id_) {
next_action_id_ = 1;
return UINT16_MAX;
}
return next_action_id_++;
};
/**
* @brief Get current value of maximum action queue size
* @return size_t max_queue_size_
*/
size_t GetMaxActionQueueSize() { return max_queue_size_; }
/**
* @brief Set max size for action queue
* @param size_t max_queue_size
*/
void SetMaxActionQueueSize(size_t max_queue_size) { max_queue_size_ = max_queue_size; }
/**
* @brief Get pointer to sync point used for execution status of the Core instance
*
* This sync point is used to indicate SDK is still continuing execution. Set to false when exiting
*
* @return std::shared_ptr<std::atomic_bool> pointer to the sync point
*/
std::shared_ptr<std::atomic_bool> GetCoreExecutionSyncPoint() { return continue_execution_; }
/**
* @brief Sets whether the Client is allowed to process queue actions
* @param process_queued_actions value to set it to
*/
void SetProcessQueuedActions(bool process_queued_actions) { process_queued_actions_ = process_queued_actions; }
/**
* @brief Get whether the Client can process queued actions
* @return boolean value indicating status
*/
bool CanProcessQueuedActions() { return process_queued_actions_; }
/**
* @brief Process the outbound action queue
*
* This function processes the actions queued up in the Outbound action queue.
* The function accepts a Sync point that can be used to control execution in a separate thread.
* If the value is set to false for the sync point, the function will perform one action from the queue.
* This puts the running thread to sleep if there are no queued up actions.
* DO NOT call from main thread unless you have a separate thread to queue up actions
*
* @param thread_task_out_sync
*/
void ProcessOutboundActionQueue(std::shared_ptr<std::atomic_bool> thread_task_out_sync);
/**
* @brief Perform Action in Blocking Mode
*
* This API will perform the Action in Blocking mode. The timeout for the action to give a valid response
* is provided as an argument. This API stops processing of all outbound actions until Response is received
* for the requested Action Type
*
* @param action_type - Type of the Action to be executed. Must be registered
* @param action_data - Action Data to be passed as argument to the Action instance
* @param action_reponse_timeout - Timeout for this API call
* @return ResponseCode indicating result of the API call
*/
ResponseCode PerformAction(ActionType action_type, std::shared_ptr<ActionData> action_data,
std::chrono::milliseconds action_reponse_timeout);
/**
* @brief Register Action for execution by Client Core
*
* This function allows Actions to be registered to be executed at a later stage by Client Core.
* Actions must be registered before PerformAction can be called using the Action Type.
* This also applies to Creating Action runners which allow running Actions in dedicated Thread Tasks.
* Only one Action can be registered to each Action Type. If a second call is made with the same Action Type,
* the previous registration will be overwritten
*
* @param action_type - Type of the Action that will be created using the provided handler
* @param p_action_create_handler - Factory method pointer which returns an Action instance
* @param p_action_state - Shared_ptr to use as argument for Action create
* @return ResponseCode indicating result of the API call
*/
ResponseCode RegisterAction(ActionType action_type, Action::CreateHandlerPtr p_action_create_handler,
std::shared_ptr<ActionState> p_action_state);
/**
* @brief Get the Create Factory Method for the specified action type
*
* @param action_type - Type of the Action for which the handler is required
* @param p_action_create_handler[out] - Create factory method for this Action
* @return ResponseCode indicating result of the API call
*/
ResponseCode GetActionCreateHandler(ActionType action_type, Action::CreateHandlerPtr *p_action_create_handler);
/**
* @brief Enqueue Action for processing in Outbound Queue
*
* @param action_type - Type of the Action
* @param action_data - Data to be passed to perform Action
* @param action_id_out[out] - Action ID that was assigned to this action by the Client
* @return ResponseCode indicating result of the API call
*/
ResponseCode EnqueueOutboundAction(ActionType action_type, std::shared_ptr<ActionData> action_data,
uint16_t &action_id_out);
/**
* @brief Register Ack Handler for provided action id
* @param action_id - Action ID
* @param p_async_ack_handler - Handler to call on response
* @return ResponseCode indicating result of the API call
*/
ResponseCode RegisterPendingAck(uint16_t action_id,
ActionData::AsyncAckNotificationHandlerPtr p_async_ack_handler);
/**
* @brief Delete Ack Handler for specified Action ID
* @param action_id - Action ID
*/
void DeletePendingAck(uint16_t action_id);
/**
* @brief Call registered Ack handler if it exists for specified Packet id
* @param action_id - Action ID
* @param rc - Response Code to pass to the Handler if found
*/
void ForwardReceivedAck(uint16_t action_id, ResponseCode rc);
/**
* @brief Delete all expired Acks
*
* Deletes all Acks where the timeouts have expired. Responds with Code indicating request timeout
*/
void DeleteExpiredAcks();
/**
* @brief Clears all registered Actions
*
* Utility method to remove all registered actions by the client.
* Also helps in breaking out of cyclic reference introduced when ::RegisterAction is called.
*/
void ClearRegisteredActions();
/**
* @brief Clears all pending outbound Actions.
*
* Utility method to remove all pending outbound actions registered by the client.
* Also helps in breaking out of cyclic reference introduced when ::EnqueueOutboundAction is called.
*/
void ClearOutboundActionQueue();
/**
* @brief Default Constructor
*/
ClientCoreState();
/**
* @brief Destructor
*/
virtual ~ClientCoreState();
// Rule of 5 stuff
// Contains data for running thread tasks, should not be moved or copied
ClientCoreState(const ClientCoreState &) = delete; // Copy constructor
ClientCoreState(ClientCoreState &&) = delete; // Move constructor
ClientCoreState &operator=(const ClientCoreState &) & = delete; // Copy assignment operator
ClientCoreState &operator=(ClientCoreState &&) & = delete; // Move assignment operator
};
}