Library for collecting UDP-notif protocol messages defined in the IETF draft draft-ietf-netconf-udp-notif-06.
See INSTALL
The collector allows to read and parse UDP-notif protocol messages from a ip/port specified on the parameters. It allows to get directly the buffer and the metadata of the message in a struct.
The api is in unyte_udp_collector.h:
int unyte_udp_create_socket(char *address, char *port, uint64_t buffer_size)fromunyte_udp_utils.h: Helper that creates and binds a socket to an address and port.int unyte_udp_create_interface_bound_socket(char *interface, char *address, char *port, uint64_t buffer_size)fromunyte_udp_utils.h: Helper that creates a socket, binds it to an interface using SO_BINDTODEVICE option and binds it to an address and port.unyte_udp_collector_t *unyte_udp_start_collector(unyte_udp_options_t *options)fromunyte_udp_collector.h: Initialize the UDP-notif messages collector. It accepts a struct with different options: socketfd of the socket to listen to, recvmmsg_vlen (vlen used on recvmmsg syscall meaning how many messages to receive on every syscall, by default 10)...void *unyte_udp_queue_read(unyte_udp_queue_t *queue)fromunyte_udp_queue.h: read from a queue a struct with all the message buffer and metadata.int unyte_udp_free_all(unyte_seg_met_t *seg)fromunyte_udp_collector.h: free all struct used on a message received.
Simple example of usage :
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
// include installed library headers
#include <unyte-udp-notif/unyte_udp_collector.h>
#define PORT "10001"
#define ADDR "192.168.0.17"
int main()
{
  // Initialize socket and bind it to the address
  int socketfd = unyte_udp_create_socket(ADDR, PORT, DEFAULT_SK_BUFF_SIZE);
  // Initialize collector options
  unyte_udp_options_t options = {0};
  // add socket fd reference to options
  options.socket_fd = socketfd
  // if argument set to 0, defaults are used
  options.recvmmsg_vlen = 0;       // vlen parameter for recvmmsg. Default: 10
  options.output_queue_size = 0;   // output queue size. Default: 1000
  options.nb_parsers = 0;          // number of parsers threads to instantiate. Default: 10
  options.socket_buff_size = 0;    // user socket buffer size in bytes. Default: 20971520 (20MB)
  options.parsers_queue_size = 0;  // parser queue size. Default: 500
  options.msg_dst_ip = false;      // destination IP not parsed from IP packet to improve performance. Default: false
  options.legacy = false;          // Use legacy UDP pub channel protocol: draft-ietf-netconf-udp-pub-channel-05. Default: false.
                                   // For legacy UDP pub channel: /!\ Used encoding types identifiers are taken from IANA.
  
  // Initialize collector
  unyte_udp_collector_t *collector = unyte_udp_start_collector(&options);
  // Example with infinity loop, change the break condition to be able to free all gracefully
  while (1)
  {
    // Read message on queue
    unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_udp_queue_read(collector->queue);
    // TODO: Process the UDP-notif message here
    printf("unyte_udp_get_version: %u\n", unyte_udp_get_version(seg));
    printf("unyte_udp_get_space: %u\n", unyte_udp_get_space(seg));
    printf("unyte_udp_get_media_type: %u\n", unyte_udp_get_media_type(seg));
    printf("unyte_udp_get_header_length: %u\n", unyte_udp_get_header_length(seg));
    printf("unyte_udp_get_message_length: %u\n", unyte_udp_get_message_length(seg));
    printf("unyte_udp_get_observation_domain_id: %u\n", unyte_udp_get_observation_domain_id(seg));
    printf("unyte_udp_get_message_id: %u\n", unyte_udp_get_message_id(seg));
    printf("unyte_udp_get_src[family]: %u\n", unyte_udp_get_src(seg)->ss_family);               // AF_INET for IPv4 or AF_INET6 for IPv6
    printf("unyte_udp_get_dest_addr[family]: %u\n", unyte_udp_get_dest_addr(seg)->ss_family);   // AF_INET for IPv4 or AF_INET6 for IPv6
    printf("unyte_udp_get_payload: %s\n", unyte_udp_get_payload(seg));
    printf("unyte_udp_get_payload_length: %u\n", unyte_udp_get_payload_length(seg));
    // Free UDP-notif message after
    unyte_udp_free_all(seg);
  }
  // To shut down the collector, just close the socket.
  close(*collector->sockfd);
  // wait for main_tread to finish
  pthread_join(*collector->main_thread, NULL);
  // Freeing collector mallocs and last messages for every queue if there is any message not consumed
  unyte_udp_free_collector(collector);
  return 0;
}To process the message data, all the headers, meta-data and payload are found on the struct unyte_seg_met_t defined on unyte_udp_utils.h:
typedef struct unyte_segment_with_metadata
{
  unyte_metadata_t *metadata; // source/port
  unyte_header_t *header;     // UDP-notif headers
  char *payload;              // payload of message
} unyte_seg_met_t;uint8_t unyte_udp_get_version(unyte_seg_met_t *message);: header encoding versionuint8_t unyte_udp_get_space(unyte_seg_met_t *message);: space of media type versionuint8_t unyte_udp_get_media_type(unyte_seg_met_t *message);: dentifier to indicate the media type used for the Notification Messageuint16_t unyte_udp_get_header_length(unyte_seg_met_t *message);: length of the message header in octetsuint16_t unyte_udp_get_message_length(unyte_seg_met_t *message);: total length of the message within one UDP datagram, measured in octets, including the message headeruint32_t unyte_udp_get_observation_domain_id(unyte_seg_met_t *message);: observation domain id of the messageuint32_t unyte_udp_get_message_id(unyte_seg_met_t *message);: message id of the messagestruct sockaddr_storage * unyte_udp_get_src(unyte_seg_met_t *message);: source IP and port of the message. Could be IPv4 or IPv6.struct sockaddr_storage * unyte_udp_get_dest_addr(unyte_seg_met_t *message);: collector address. Could be IPv4 or IPv6.char *unyte_udp_get_payload(unyte_seg_met_t *message);: payload bufferuint16_t unyte_udp_get_payload_length(unyte_seg_met_t *message);: payload length
There is a monitoring thread that could be started to monitor packets loss and packets received in bad order.
To activate this thread, you must initiate the monitoring thread queue size (monitoring_queue_size):
typedef struct
{
  int socket_fd;                // socket file descriptor
  ...
  uint monitoring_queue_size;   // monitoring queue size if wanted to activate the monitoring thread. Default: 0. Recommended: 500.
  uint monitoring_delay;        // monitoring queue frequence in seconds. Default: 5 seconds
} unyte_udp_options_t;The thread will every monitoring_delay seconds send all observation domain id's counters.
The threads types are defined in monitoring_worker.h:
PARSER_WORKER: worker in charge of parsing the segments. Reassembles or saves in memory the segmented messages.LISTENER_WORKER: worker in charge of receiving the bytes from the socket. It callsrecvmmsg()syscall to receive multiple messages at once.
Two usecases are possible monitoring packets loss:
- Drops on 
PARSER_WORKER: It means the client consuming the parsed messages is not consuming that fast. You may want to multithread the client consuming thecollector->queue(output_queue) or increase theoutput_queue_sizeoption to avoid packets drops on spikes. - Drops on 
LISTENER_WORKER: It means theNparsers are not consuming that fast and theLISTENER_WORKERis pushing to theinput_queuefaster than the parsers could read. You may want to increment the number of parsers instantiated or increaseparsers_queue_sizeoption to avoid packets drops on spikes. 
The library can support the legacy UDP-notif protocol specified in draft-ietf-netconf-udp-pub-channel-05.
There is an example client_legacy_proto.c.
To use this legacy protocol activate the flag legacy in the collector options:
typedef struct
{
  int socket_fd;            // socket file descriptor
  ...
  bool legacy;              // legacy udp-notif: draft-ietf-netconf-udp-pub-channel-05.
} unyte_udp_options_t;Limitations of udp-pub-channel-05:
- Same output 
struct unyte_seg_met_tis given to the user. - Flags from the protocol are not parsed.
 - No options are possible and thus no segmentation is supported
 - The media type identifiers are taken from the IANA instead of the draft to maintain consistency in the different pipelines. IANA codes could be checked in the main draft.
 - Google protobuf is returned as RESERVED(0) encoding type.
 
The sender allows the user to send UDP-notif protocol to a IP/port specified. It cuts the message into segments of the protocol if it is larger than the MTU specified in parameters.
The api is in unyte_sender.h.
The message to send have the following structure:
typedef struct unyte_message
{
  uint used_mtu;                  // MTU to use for cutting the message to segments
  void *buffer;                   // pointer to buffer to send
  uint buffer_len;                // length of the buffer to send
  // UDP-notif
  uint8_t version : 3;            // UDP-notif protocol version
  uint8_t space : 1;              // UDP-notif protocol space
  uint8_t media_type : 4;         // UDP-notif protocol media type
  uint32_t observation_domain_id; // UDP-notif protocol observation domain id
  uint32_t message_id;            // UDP-notif protocol message id
} unyte_message_t;Simple usage of the sender :
#include <stdio.h>
#include <stdlib.h>
#include <unyte-udp-notif/unyte_sender.h>
#define PORT "10001"
#define ADDR "192.168.0.17"
#define MTU 1500
int main()
{
  // Initialize collector options
  unyte_sender_options_t options = {0};
  options.address = ADDR;
  options.port = PORT;
  options.default_mtu = MTU;
  // Initializing the sender --> it connect the socket to the address and port in options
  struct unyte_sender_socket *sender_sk = unyte_start_sender(&options);
  // pointer to the buffer to send
  char *string_to_send = "Hello world1! Hello world2! Hello world3! Hello world4! Hello world5! Hello world6! Hello world7!";
  
  // unyte message struct to send
  unyte_message_t *message = (unyte_message_t *)malloc(sizeof(unyte_message_t));
  message->buffer = string_to_send;
  message->buffer_len = 97;
  // UDP-notif
  message->version = 0;
  message->space = 0;
  message->media_type = UNYTE_MEDIATYPE_YANG_JSON; // json but sending string
  message->observation_domain_id = 1000;
  message->message_id = 2147483669;
  message->used_mtu = 200; // If set to 0, the default mtu set on options is used, else, this one is used
  // Send the message
  unyte_send(sender_sk, message);
  // Freeing message and socket
  free(message);
  free_sender_socket(sender_sk);
  return 0;
}There are some samples implemented during the development of the project here.
client_sample.c: simple example for minimal usage of the collector library.client_monitoring.c: sample implementing the monitoring thread to read packets statistics.client_socket.c: example using a custom socket instead of creating a new one from the library.client_legacy_proto.c: example using a collector for legacy UDP-notif protocol: draft-ietf-netconf-udp-pub-channel-05.client_interface_bind_socket.c: example using a socket bound to an interface, ip and port.sender_sample.c: simple example for minimal usage of the sender library.sender_json.c: sample reading a json file and sending the bytes by the library.sender_json_bind_interface.c: sample reading a json file and sending the bytes by the library to a specific interface.sender_custom_encoding.c: sample configurating a custom space and encoding type.sender_cbor.c: sample reading a CBOR (RFC7049) file and sending the bytes by the library.eBPF/client_ebpf_user.c: example with a custom eBPF load balancer.
See Docker docs
See License