Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ extern "C" {
/**
* Maximum allowed code string length
*/
#define MAX_STATUS_CODE_STRING_LEN 256
#define MAX_STATUS_CODE_STRING_LEN 8

/**
* Maximum allowed message description length
Expand Down
185 changes: 69 additions & 116 deletions src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1140,17 +1140,10 @@ STATUS getIceConfigLws(PSignalingClient pSignalingClient, UINT64 time)
UNUSED_PARAM(time);

PRequestInfo pRequestInfo = NULL;
CHAR url[MAX_URI_CHAR_LEN + 1];
CHAR paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
CHAR url[MAX_URI_CHAR_LEN + 1], paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
PLwsCallInfo pLwsCallInfo = NULL;
PCHAR pResponseStr;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
jsmntok_t* pToken;
UINT32 i, strLen, resultLen, configCount = 0, tokenCount;
INT32 j;
UINT64 ttl;
BOOL jsonInIceServerList = FALSE;
UINT32 resultLen;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
CHK(pSignalingClient->channelEndpointHttps[0] != '\0', STATUS_INTERNAL_ERROR);
Expand Down Expand Up @@ -2018,162 +2011,127 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
return retStatus;
}

STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT32 messageLen)
STATUS parseSignalingMessage(PCHAR pMessage, UINT32 messageLen, PReceivedSignalingMessage pReceivedSignalingMessage)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
jsmntok_t* pToken;
UINT32 i, strLen, outLen = MAX_SIGNALING_MESSAGE_LEN;
UINT32 tokenCount;
INT32 j;
PSignalingMessageWrapper pSignalingMessageWrapper = NULL;
TID receivedTid = INVALID_TID_VALUE;
BOOL parsedMessageType = FALSE, parsedStatusResponse = FALSE, jsonInIceServerList = FALSE;
PSignalingMessage pOngoingMessage;
UINT64 ttl;
INT32 tokenCount;
UINT32 i, strLen, outLen, printResult;
BOOL parsedMessageType = FALSE, parsedStatusResponse = FALSE;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
CHK(pMessage != NULL && pReceivedSignalingMessage != NULL, STATUS_NULL_ARG);
CHK(messageLen <= MAX_SIGNALING_MESSAGE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);

// If we have a signalingMessage and if there is a correlation id specified then the response should be non-empty
if (pMessage == NULL || messageLen == 0) {
if (BLOCK_ON_CORRELATION_ID) {
// Get empty correlation id message from the ongoing if exists
CHK_STATUS(signalingGetOngoingMessage(pSignalingClient, EMPTY_STRING, EMPTY_STRING, &pOngoingMessage));
if (pOngoingMessage == NULL) {
DLOGW("Received an empty body for a message with no correlation id which has been already removed from the queue. Warning 0x%08x",
STATUS_SIGNALING_RECEIVE_EMPTY_DATA_NOT_SUPPORTED);
} else {
CHK_STATUS(signalingRemoveOngoingMessage(pSignalingClient, EMPTY_STRING));
}
}

// Check if anything needs to be done
CHK_WARN(pMessage != NULL && messageLen != 0, retStatus, "Signaling received an empty message");
}
MEMSET(pReceivedSignalingMessage, 0x00, SIZEOF(ReceivedSignalingMessage));
pReceivedSignalingMessage->signalingMessage.messageType = SIGNALING_MESSAGE_TYPE_UNKNOWN;

// Parse the response
jsmn_init(&parser);
tokenCount = jsmn_parse(&parser, pMessage, messageLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);

CHK(NULL != (pSignalingMessageWrapper = (PSignalingMessageWrapper) MEMCALLOC(1, SIZEOF(SignalingMessageWrapper))), STATUS_NOT_ENOUGH_MEMORY);

pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.version = SIGNALING_MESSAGE_CURRENT_VERSION;

// Loop through the tokens and extract the stream description
for (i = 1; i < tokenCount; i++) {
if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "senderClientId")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_CLIENT_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId[MAX_SIGNALING_CLIENT_ID_LEN] = '\0';
printResult = SNPRINTF(pReceivedSignalingMessage->signalingMessage.peerClientId, MAX_SIGNALING_CLIENT_ID_LEN + 1, "%.*s", strLen,
pMessage + tokens[i + 1].start);
CHK(printResult >= 0 && printResult <= MAX_SIGNALING_CLIENT_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
i++;
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messageType")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_MESSAGE_TYPE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK_STATUS(getMessageTypeFromString(pMessage + tokens[i + 1].start, strLen,
&pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));

CHK_STATUS(getMessageTypeFromString(pMessage + tokens[i + 1].start, strLen, &pReceivedSignalingMessage->signalingMessage.messageType));
parsedMessageType = TRUE;
i++;
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messagePayload")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_SIGNALING_MESSAGE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
outLen = SIZEOF(pReceivedSignalingMessage->signalingMessage.payload);
// Base64 method will set outLen <= original input outLen
CHK_STATUS(base64Decode(pMessage + tokens[i + 1].start, strLen, (PBYTE) (pReceivedSignalingMessage->signalingMessage.payload), &outLen));

// Base64 decode the message
CHK_STATUS(base64Decode(pMessage + tokens[i + 1].start, strLen,
(PBYTE) (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload), &outLen));
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload[MAX_SIGNALING_MESSAGE_LEN] = '\0';
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payloadLen = outLen;
// Need to manually null-terminate the output of base64Decode
pReceivedSignalingMessage->signalingMessage.payload[outLen] = '\0';
pReceivedSignalingMessage->signalingMessage.payloadLen = outLen;
i++;
} else if (!parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusResponse")) {
parsedStatusResponse = TRUE;
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "correlationId")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_CORRELATION_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId[MAX_CORRELATION_ID_LEN] = '\0';

printResult = SNPRINTF(pReceivedSignalingMessage->signalingMessage.correlationId, MAX_CORRELATION_ID_LEN + 1, "%.*s", strLen,
pMessage + tokens[i + 1].start);
CHK(printResult >= 0 && printResult <= MAX_CORRELATION_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "errorType")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_ERROR_TYPE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.errorType, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.errorType[MAX_ERROR_TYPE_STRING_LEN] = '\0';

printResult =
SNPRINTF(pReceivedSignalingMessage->errorType, MAX_ERROR_TYPE_STRING_LEN + 1, "%.*s", strLen, pMessage + tokens[i + 1].start);
CHK(printResult >= 0 && printResult <= MAX_ERROR_TYPE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusCode")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_STATUS_CODE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);

// Parse the status code
CHK_STATUS(STRTOUI32(pMessage + tokens[i + 1].start, pMessage + tokens[i + 1].end, 10,
&pSignalingMessageWrapper->receivedSignalingMessage.statusCode));

CHK_STATUS(STRTOUI32(pMessage + tokens[i + 1].start, pMessage + tokens[i + 1].end, 10, &pReceivedSignalingMessage->statusCode));
i++;
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "description")) {
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_MESSAGE_DESCRIPTION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.description, pMessage + tokens[i + 1].start, strLen);
pSignalingMessageWrapper->receivedSignalingMessage.description[MAX_MESSAGE_DESCRIPTION_LEN] = '\0';

printResult =
SNPRINTF(pReceivedSignalingMessage->description, MAX_MESSAGE_DESCRIPTION_LEN + 1, "%.*s", strLen, pMessage + tokens[i + 1].start);
CHK(printResult >= 0 && printResult <= MAX_MESSAGE_DESCRIPTION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
i++;
} else if (!jsonInIceServerList &&
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_OFFER &&
compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "IceServerList")) {
jsonInIceServerList = TRUE;

CHK(tokens[i + 1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[i + 1].size <= MAX_ICE_CONFIG_COUNT, STATUS_SIGNALING_MAX_ICE_CONFIG_COUNT);

// Zero the ice configs
MEMSET(&pSignalingClient->iceConfigs, 0x00, MAX_ICE_CONFIG_COUNT * SIZEOF(IceConfigInfo));
pSignalingClient->iceConfigCount = 0;
} else if (jsonInIceServerList) {
pToken = &tokens[i];
if (pToken->type == JSMN_OBJECT) {
pSignalingClient->iceConfigCount++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Username")) {
strLen = (UINT32) (pToken[1].end - pToken[1].start);
CHK(strLen <= MAX_ICE_CONFIG_USER_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName, pMessage + pToken[1].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_USER_NAME_LEN] = '\0';
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Password")) {
strLen = (UINT32) (pToken[1].end - pToken[1].start);
CHK(strLen <= MAX_ICE_CONFIG_CREDENTIAL_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].password, pMessage + pToken[1].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_CREDENTIAL_LEN] = '\0';
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Ttl")) {
CHK_STATUS(STRTOUI64(pMessage + pToken[1].start, pMessage + pToken[1].end, 10, &ttl));
}
}

// NOTE: Ttl value is in seconds
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].ttl = ttl * HUNDREDS_OF_NANOS_IN_A_SECOND;
i++;
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Uris")) {
// Expect an array of elements
CHK(pToken[1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(pToken[1].size <= MAX_ICE_CONFIG_URI_COUNT, STATUS_SIGNALING_MAX_ICE_URI_COUNT);
for (j = 0; j < pToken[1].size; j++) {
strLen = (UINT32) (pToken[j + 2].end - pToken[j + 2].start);
CHK(strLen <= MAX_ICE_CONFIG_URI_LEN, STATUS_SIGNALING_MAX_ICE_URI_LEN);
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j], pMessage + pToken[j + 2].start, strLen);
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j][MAX_ICE_CONFIG_URI_LEN] = '\0';
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uriCount++;
}
CHK(parsedMessageType, STATUS_SIGNALING_INVALID_MESSAGE_TYPE);

i += pToken[1].size + 1;
CleanUp:
CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
}

STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT32 messageLen)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT32 i, strLen;
PSignalingMessageWrapper pSignalingMessageWrapper = NULL;
TID receivedTid = INVALID_TID_VALUE;
PSignalingMessage pOngoingMessage;

CHK(pSignalingClient != NULL, STATUS_NULL_ARG);

// If we have a signalingMessage and if there is a correlation id specified then the response should be non-empty
if (pMessage == NULL || messageLen == 0) {
if (BLOCK_ON_CORRELATION_ID) {
// Get empty correlation id message from the ongoing if exists
CHK_STATUS(signalingGetOngoingMessage(pSignalingClient, EMPTY_STRING, EMPTY_STRING, &pOngoingMessage));
if (pOngoingMessage == NULL) {
DLOGW("Received an empty body for a message with no correlation id which has been already removed from the queue. Warning 0x%08x",
STATUS_SIGNALING_RECEIVE_EMPTY_DATA_NOT_SUPPORTED);
} else {
CHK_STATUS(signalingRemoveOngoingMessage(pSignalingClient, EMPTY_STRING));
}
}

// Check if anything needs to be done
CHK_WARN(pMessage != NULL && messageLen != 0, retStatus, "Signaling received an empty message");
}

// Message type is a mandatory field.
CHK(parsedMessageType, STATUS_SIGNALING_INVALID_MESSAGE_TYPE);
// Parse the response
CHK(NULL != (pSignalingMessageWrapper = (PSignalingMessageWrapper) MEMCALLOC(1, SIZEOF(SignalingMessageWrapper))), STATUS_NOT_ENOUGH_MEMORY);
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.version = SIGNALING_MESSAGE_CURRENT_VERSION;
CHK_STATUS(parseSignalingMessage(pMessage, messageLen, &pSignalingMessageWrapper->receivedSignalingMessage));

pSignalingMessageWrapper->pSignalingClient = pSignalingClient;

switch (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType) {
Expand Down Expand Up @@ -2250,11 +2208,6 @@ STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT
DLOGD("Client received message of type: %s",
getMessageTypeInString(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));

// Validate and process the ice config
if (jsonInIceServerList && STATUS_FAILED(validateIceConfiguration(pSignalingClient))) {
DLOGW("Failed to validate the ICE server configuration received with an Offer");
}

#ifdef ENABLE_KVS_THREADPOOL
// This would fail if threadpool was not created
CHK_STATUS(threadpoolContextPush(receiveLwsMessageWrapper, pSignalingMessageWrapper));
Expand Down
18 changes: 18 additions & 0 deletions src/source/Signaling/LwsApiCalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ STATUS configureLwsLogging(UINT32 kvsLogLevel);
*/
STATUS parseIceConfigResponse(PCHAR, UINT32, UINT8, PIceConfigInfo, PUINT32);

/**
* Parses the signaling message from a JSON response string.
* The payload is base64-decoded in the output.
* See <a href="https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/async-message-reception-api.html">Asynchronous message
* reception</a>.
*
* @param[in] pMessage JSON of the signaling message.
* @param[in] messageLen Length of the JSON string (excluding null-terminator).
* @param[out] pReceivedSignalingMessage Pointer to receive the parsed signaling message.
*
* @return STATUS code of the execution:
* - STATUS_SUCCESS: Successfully parsed ICE configuration.
* - STATUS_NULL_ARG: Invalid NULL argument provided.
* - STATUS_SIGNALING_INVALID_MESSAGE_TYPE: If the required field 'messageType' is missing for non error response messages.
* - STATUS_INVALID_API_CALL_RETURN_JSON: Malformed JSON or missing other required fields.
*/
STATUS parseSignalingMessage(PCHAR, UINT32, PReceivedSignalingMessage);

#ifdef __cplusplus
}
#endif
Expand Down
Loading
Loading