Skip to content

Commit

Permalink
feature: allow limiting the retransmission queue in the server (fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Dec 2, 2016
1 parent 9fdd7df commit 243237e
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 25 deletions.
3 changes: 2 additions & 1 deletion include/ua_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ typedef struct {
UA_UInt32Range lifeTimeCountLimits;
UA_UInt32Range keepAliveCountLimits;
UA_UInt32 maxNotificationsPerPublish;
UA_UInt32 maxRetransmissionQueueSize; /* 0 -> unlimited size */

/* Limits for MonitoredItems */
UA_DoubleRange samplingIntervalLimits;
UA_UInt32Range queueSizeLimits;
UA_UInt32Range queueSizeLimits; /* Negotiated with the client */
} UA_ServerConfig;

/* Add a new namespace to the server. Returns the index of the new namespace */
Expand Down
1 change: 1 addition & 0 deletions plugins/ua_config_standard.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const UA_EXPORT UA_ServerConfig UA_ServerConfig_standard = {
.lifeTimeCountLimits = { .max = 15000, .min = 3 },
.keepAliveCountLimits = { .max = 100, .min = 1 },
.maxNotificationsPerPublish = 1000,
.maxRetransmissionQueueSize = 0, /* unlimited */

/* Limits for MonitoredItems */
.samplingIntervalLimits = { .min = 50.0, .max = 24.0 * 3600.0 * 1000.0 },
Expand Down
12 changes: 1 addition & 11 deletions src/server/ua_services_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -439,17 +439,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
continue;
}
/* Remove the acked transmission from the retransmission queue */
response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
UA_NotificationMessageEntry *pre, *pre_tmp;
TAILQ_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
if(pre->message.sequenceNumber == ack->sequenceNumber) {
TAILQ_REMOVE(&sub->retransmissionQueue, pre, listEntry);
response->results[i] = UA_STATUSCODE_GOOD;
UA_NotificationMessage_deleteMembers(&pre->message);
UA_free(pre);
break;
}
}
response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
}

/* Queue the publish response */
Expand Down
57 changes: 45 additions & 12 deletions src/server/ua_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptio
new->currentLifetimeCount = 0;
new->lastMonitoredItemId = 0;
new->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
TAILQ_INIT(&new->retransmissionQueue);
LIST_INIT(&new->monitoredItems);
TAILQ_INIT(&new->retransmissionQueue);
new->retransmissionQueueSize = 0;
return new;
}

Expand All @@ -282,6 +283,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
UA_NotificationMessage_deleteMembers(&nme->message);
UA_free(nme);
}
subscription->retransmissionQueueSize = 0;
}

UA_MonitoredItem *
Expand Down Expand Up @@ -327,6 +329,40 @@ countQueuedNotifications(UA_Subscription *sub, UA_Boolean *moreNotifications) {
return notifications;
}

static void
UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
UA_NotificationMessageEntry *entry) {
/* Release the oldest entry if there is not enough space */
if(server->config.maxRetransmissionQueueSize > 0 &&
sub->retransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
UA_NotificationMessageEntry *lastentry =
TAILQ_LAST(&sub->retransmissionQueue, UA_ListOfNotificationMessages);
TAILQ_REMOVE(&sub->retransmissionQueue, lastentry, listEntry);
--sub->retransmissionQueueSize;
UA_NotificationMessage_deleteMembers(&lastentry->message);
UA_free(lastentry);
}

/* Add entry */
TAILQ_INSERT_HEAD(&sub->retransmissionQueue, entry, listEntry);
++sub->retransmissionQueueSize;
}

UA_StatusCode
UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
UA_NotificationMessageEntry *entry, *entry_tmp;
TAILQ_FOREACH_SAFE(entry, &sub->retransmissionQueue, listEntry, entry_tmp) {
if(entry->message.sequenceNumber != sequenceNumber)
continue;
TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry);
--sub->retransmissionQueueSize;
UA_NotificationMessage_deleteMembers(&entry->message);
UA_free(entry);
return UA_STATUSCODE_GOOD;
}
return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
}

static UA_StatusCode
prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
size_t notifications) {
Expand Down Expand Up @@ -466,23 +502,20 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
* be done here, so that the message itself is included in the available
* sequence numbers for acknowledgement. */
retransmission->message = response->notificationMessage;
TAILQ_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
UA_Subscription_addRetransmissionMessage(server, sub, retransmission);
}

/* Get the available sequence numbers from the retransmission queue */
size_t available = 0;
UA_NotificationMessageEntry *nme;
TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry)
++available;
// cppcheck-suppress knownConditionTrueFalse
size_t available = sub->retransmissionQueueSize;
if(available > 0) {
response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
response->availableSequenceNumbersSize = available;
}
size_t i = 0;
TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
++i;
size_t i = 0;
UA_NotificationMessageEntry *nme;
TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
++i;
}
}

/* Send the response */
Expand Down
9 changes: 8 additions & 1 deletion src/server/ua_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct UA_Subscription {
UA_Session *session;
UA_UInt32 lifeTimeCount;
UA_UInt32 maxKeepAliveCount;
UA_Double publishingInterval; // [ms]
UA_Double publishingInterval; /* in ms */
UA_UInt32 subscriptionID;
UA_UInt32 notificationsPerPublish;
UA_Boolean publishingEnabled;
Expand All @@ -100,8 +100,12 @@ struct UA_Subscription {
UA_Guid publishJobGuid;
UA_Boolean publishJobIsRegistered;

/* MonitoredItems */
LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;

/* Retransmission Queue */
TAILQ_HEAD(UA_ListOfNotificationMessages, UA_NotificationMessageEntry) retransmissionQueue;
UA_UInt32 retransmissionQueueSize;
};

UA_Subscription *UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
Expand All @@ -118,6 +122,9 @@ UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID

void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);

UA_StatusCode
UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber);

void
UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
UA_NodeId *sessionToken);
Expand Down

0 comments on commit 243237e

Please sign in to comment.