Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Create Network driver instead of directly using sockets. #330

Draft
wants to merge 289 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
289 commits
Select commit Hold shift + click to select a range
878715e
Try fixing net function for git history
Jakio815 Feb 1, 2024
e16f8a0
Remove net_wait_for_federates for history
Jakio815 Feb 1, 2024
2daa206
Fix function names to original names
Jakio815 Feb 1, 2024
6b654a7
Fix send_rejects
Jakio815 Feb 1, 2024
a61d2d2
Fix respond_to_erroneous_connections
Jakio815 Feb 1, 2024
b1f7841
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 1, 2024
d24cab6
Minor fix
Jakio815 Feb 1, 2024
183285e
Minor fix
Jakio815 Feb 1, 2024
7605169
Minor fix
Jakio815 Feb 1, 2024
2cac773
Temporarily add create_real_time_tcp_socket_errexit()
Jakio815 Feb 2, 2024
5a62b8d
Fix clock_synchronization thread
Jakio815 Feb 2, 2024
91ea5a1
Add breaks to switch case
Jakio815 Feb 2, 2024
3a95d81
Handle_timed message
Jakio815 Feb 2, 2024
0fbfda4
Fix handle adress query and address ad
Jakio815 Feb 3, 2024
eda21c0
Fix debug message
Jakio815 Feb 5, 2024
7e9f360
Fix read_netdrv to make it stateful
Jakio815 Feb 8, 2024
c2f6f0a
Fixed tagged message bugs
Jakio815 Feb 8, 2024
7d6f14f
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 8, 2024
c400a13
Fix handle_federate_resign && fix comments && add message_type unders…
Jakio815 Feb 8, 2024
c66be9b
Fix send_failed signal && add resign msg_type
Jakio815 Feb 9, 2024
5e7e796
Fix authenticated bugs
Jakio815 Feb 9, 2024
0f9a5c1
Erase unused code
Jakio815 Feb 9, 2024
1a5aa83
Add timed message forwarding large buffers
Jakio815 Feb 9, 2024
46906c1
Fix return of read messages to return the bytes read, not success and…
Jakio815 Feb 9, 2024
68178c4
Change read return type to ssize_t
Jakio815 Feb 12, 2024
cbfd302
Make inner switch case to be handled in a different helper function.
Jakio815 Feb 14, 2024
5da5cef
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 14, 2024
31461aa
Make socket_open to api
Jakio815 Feb 15, 2024
054a8f3
Add some comments
Jakio815 Feb 15, 2024
5643b28
Fix netdrv_accept fixing /DistributedPhysicalActionUpstream.lf
Jakio815 Feb 15, 2024
b5e42bc
Add commits
Jakio815 Feb 15, 2024
4acfdce
Add helper functions
Jakio815 Feb 15, 2024
8f33200
Starting changes to federate.c
Jakio815 Feb 15, 2024
2099dcd
Finished until lf_create_server()
Jakio815 Feb 15, 2024
059c6c4
Finished lf_handle_p2p_connections_from_federates
Jakio815 Feb 16, 2024
db2ef8b
Minor fix
Jakio815 Feb 16, 2024
cd8532c
Finished lf_connect_to_federate
Jakio815 Feb 16, 2024
4a12bbd
Fix handling messsages
Jakio815 Feb 16, 2024
9569c96
Fix federate's p2p tagged message and similiar messages
Jakio815 Feb 19, 2024
9946584
Fix socket open and netdrv free
Jakio815 Feb 20, 2024
1d81a9b
Minor fix
Jakio815 Feb 20, 2024
58af408
Fix bug on tagged message
Jakio815 Feb 20, 2024
6189256
Fix clock sync
Jakio815 Feb 23, 2024
474c7c1
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 23, 2024
9d2c3d5
Fix proto
Jakio815 Feb 23, 2024
5ead7cc
Return -1 when priv is not initialized yet and try to get the port nu…
byeonggiljun Feb 24, 2024
05abbb5
Add ADDRESS_QUERY_REPLY msg
Jakio815 Feb 24, 2024
1c3ae84
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 24, 2024
1d46d89
Fix handle address query when port is unavailable.
Jakio815 Feb 24, 2024
a0e9931
Fix proto initializztion
Jakio815 Feb 25, 2024
ad1ee04
Fix probe message
Jakio815 Feb 25, 2024
59e4ffe
Remove unnecessary socket related structs
Jakio815 Feb 25, 2024
eada726
Minor changes
Jakio815 Feb 27, 2024
dd4085c
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Feb 27, 2024
a5b491a
Add include to clock_sync.h
Jakio815 Mar 12, 2024
a205b2f
Fix stack smashing for RTI
Jakio815 Mar 15, 2024
a053a54
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Mar 15, 2024
50faf05
Run clang-format
Jakio815 Mar 15, 2024
25b021f
Fix wrong emerged
Jakio815 Mar 15, 2024
a45d5c8
Fix merge conflict error
Jakio815 Mar 15, 2024
8401fd8
Finally fixed bug.... The federation ID should have a maximum value.
Jakio815 Mar 18, 2024
d2ff61b
Fix hmac authentication MSG read.
Jakio815 Mar 22, 2024
79b8c70
Minor fix of comments
Jakio815 Mar 22, 2024
af06b33
Add gitmodules adding sst-c-api
Jakio815 Mar 23, 2024
9a5550e
Add sst-c-api as submodule
Jakio815 Mar 23, 2024
b83f35b
Add lf_sst_support.c .h bases
Jakio815 Mar 23, 2024
586128c
Revert mistakes
Jakio815 Mar 25, 2024
bd5de3a
Add empty mqtt support .c .h
Jakio815 Mar 27, 2024
42274f7
Remove unneeded functions yet for sst_support.c
Jakio815 Mar 27, 2024
ea976ea
Clean net_util.h
Jakio815 Mar 27, 2024
bbd39de
Clean net_common.h
Jakio815 Mar 27, 2024
d16f4f7
Clean rti_remote.h
Jakio815 Mar 27, 2024
aed4c89
Make socket_common. h .c && Implement create_server(), which is used …
Jakio815 Mar 27, 2024
ce7c565
Apply create_server() to rti and fed.
Jakio815 Mar 27, 2024
f4c4d24
Minor fix
Jakio815 Mar 27, 2024
60a06ec
Cleanup socket_support.c & .h
Jakio815 Mar 27, 2024
11096d7
Cleanup net_util .h .c
Jakio815 Mar 27, 2024
db69d8f
Update accept_connection to establilsh_communication_session
Jakio815 Mar 27, 2024
0cde9f0
Add skeleton for lf_mqtt_support.c
Jakio815 Mar 27, 2024
55449d1
Change all netdrv_accepts to establish_communication_session
Jakio815 Mar 27, 2024
7395a99
Fix comments
Jakio815 Mar 27, 2024
f8f5c7b
Fix not to use clock_netdrv on rti.
Jakio815 Mar 27, 2024
d8bc0de
Fix clock_synchronization_thread
Jakio815 Mar 27, 2024
77b6d66
Minor cleanup
Jakio815 Mar 27, 2024
c61fafa
Enable MQTT build
chanijjani Mar 28, 2024
e36e41d
All tests except FeedbackDelay5.lf works
Jakio815 Mar 28, 2024
5640ec9
Formatting federate.c
Jakio815 Mar 28, 2024
2e44179
Fixed federate compile
Jakio815 Mar 28, 2024
3693035
Merge branch 'netdriver' of https://github.com/lf-lang/reactor-c into…
chanijjani Mar 28, 2024
4626b95
Fix merge conflicts
Jakio815 Mar 28, 2024
bc0e864
RTI now compiles selectively by COMM_TYPE keyword
Jakio815 Mar 29, 2024
5f3f3d6
Add cmake to reactor-c cmake
Jakio815 Mar 29, 2024
6f8d8c1
Start sst support
Jakio815 Mar 29, 2024
050df43
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
chanijjani Mar 29, 2024
2f009ef
Enable MQTT build
chanijjani Mar 29, 2024
56129b1
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Mar 29, 2024
3d526a0
Change reactor-c CMakelists.txt, Add network apis.
Jakio815 Mar 29, 2024
6643bb7
Comment out federated/network/CMakeLists.txt
Jakio815 Mar 29, 2024
0613db7
Change path for core/Cmakelists.txt
Jakio815 Mar 29, 2024
6840b65
Fix federate build
chanijjani Mar 29, 2024
6193944
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
chanijjani Mar 29, 2024
51665b3
Change netdriver to drv in arguments
Jakio815 Mar 29, 2024
79af83a
Move create server to lf_socket support.c && Move socket_open to sock…
Jakio815 Mar 29, 2024
adbc4f6
Revert commented out parts
Jakio815 Mar 29, 2024
0f4706b
Make sst build work
Jakio815 Mar 29, 2024
193c096
Remove server_type_t temporarily for compile...
Jakio815 Mar 30, 2024
aae595b
Add create_server and establish_communication_session for SST
Jakio815 Mar 30, 2024
a628dcb
Remove gitmodules and sst-c-api
Jakio815 Mar 30, 2024
be7c1d1
Fix errors during lfc-dev build
chanijjani Mar 30, 2024
09df989
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
chanijjani Mar 30, 2024
39256a3
Minor fix on sst support.c
Jakio815 Mar 31, 2024
5816734
Remove comments
Jakio815 Mar 31, 2024
9693579
Fix handle clock sync warnings and errors.
Jakio815 Apr 1, 2024
025b53e
Apply formatting
Jakio815 Apr 1, 2024
771958b
Re-link lov-level-platform-impl to lf-network-api
Jakio815 Apr 1, 2024
194c9cc
Add skeleton for netdrv_connect, peek_from_netdrv at lf_socket_suppor…
Jakio815 Apr 1, 2024
41ecd1f
Enable comm-type target property
chanijjani Apr 1, 2024
7bd0048
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
chanijjani Apr 1, 2024
43ee9df
Revert reactor-c/core/CMakelists.
Jakio815 Apr 1, 2024
1831e93
Fix handle tagged_messages
Jakio815 Apr 1, 2024
8732830
Remove unused header includes in main.c
Jakio815 Apr 1, 2024
9ce244d
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
chanijjani Apr 1, 2024
194f6f6
Merge branch 'netdriver' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Apr 1, 2024
45e4992
Minor fix
Jakio815 Apr 2, 2024
1e5e5c0
Add SST configs
Jakio815 Apr 2, 2024
b2b8eaa
Add set_specified_port()
Jakio815 Apr 2, 2024
5cf168a
Minor fix
Jakio815 Apr 2, 2024
9d2d66d
Change logic at lf_connect_to_rti
Jakio815 Apr 2, 2024
9da6645
Minor fix
Jakio815 Apr 2, 2024
3f3a125
Update sst support. establish comm, netdrv connect. Still needs fix
Jakio815 Apr 2, 2024
8e8cda3
Minor fix
Jakio815 Apr 4, 2024
cfbcd7c
Fix fed config
Jakio815 Apr 4, 2024
26c4f4a
Fix fed_config
Jakio815 Apr 4, 2024
8aa92a9
Fix netdrv->open to get federtate ID
Jakio815 Apr 4, 2024
e7e3c27
Change logic for federate's connect.
Jakio815 Apr 5, 2024
2fbb8dc
Change sst_open, get federate ID. && Add read and write functions usi…
Jakio815 Apr 5, 2024
1be65ef
Minor fix
Jakio815 Apr 5, 2024
f5150b9
Fix build issues on Mac
hokeun Apr 5, 2024
3add411
" Fix header includes"
Jakio815 Apr 5, 2024
aedef8c
Add #include <unistd.h>
Jakio815 Apr 5, 2024
c4bf6a4
Fixed to read on mesasge based, not going over the next message.
Jakio815 Apr 6, 2024
5d649ef
Minor fix
Jakio815 Apr 6, 2024
d89f8f3
Fix call to undeclared function write; ISO C99 and later do not suppo…
hokeun Apr 6, 2024
0fba8c6
Fix fatal error: sst-c-api/c_api.h file not found by adding include_d…
hokeun Apr 6, 2024
b3dc2fc
Fix fatal error: sst-c-api/c_api.h file not found b
hokeun Apr 6, 2024
543617d
Fix compiler warnings
hokeun Apr 6, 2024
491a671
Fix error: passing 'unsigned char *' to parameter of type 'char *' co…
hokeun Apr 6, 2024
4680193
Minor fix - remove commented out functions
Jakio815 Apr 6, 2024
e842d32
Get federate's sst config path as command line
Jakio815 Apr 6, 2024
542dd1b
Change federate config name
Jakio815 Apr 6, 2024
06284da
Add --sst options to federate
Jakio815 Apr 7, 2024
6667bc0
Add SSTskeleton.config
Jakio815 Apr 7, 2024
a4153b1
Minor fix -remove unneeded configs.
Jakio815 Apr 7, 2024
c634add
Warning fix
Jakio815 Apr 7, 2024
4a7a07b
Add RTI options to get -sst as sstconfig path.
Jakio815 Apr 7, 2024
6d687d6
Fix a function not returnning anything
hokeun Apr 8, 2024
4ce3526
Revert wrong commit
hokeun Apr 8, 2024
baf5723
Add mqtt netdrv_init()
Jakio815 Apr 8, 2024
2d2bce9
Change netdrv_init() to get federate id and federation id.
Jakio815 Apr 8, 2024
91781d2
Remove type/ header from each support. files.
Jakio815 Apr 8, 2024
13e4b7f
Add each support.h headers to the c files.
Jakio815 Apr 8, 2024
82a450c
Add address, qos, timeout values.
Jakio815 Apr 8, 2024
0492207
Move qos address, timeout values to .c file.
Jakio815 Apr 8, 2024
6995ca3
Add create server and read to mqtt support.
Jakio815 Apr 9, 2024
769772c
Minor fix
Jakio815 Apr 9, 2024
d22b32e
Implement mqtt support. Almost working....
Jakio815 Apr 9, 2024
65329b7
Implement read, write, connect... so on...
Jakio815 Apr 9, 2024
17bd272
Add mqtt join and ack messages
Jakio815 Apr 9, 2024
fd431d4
Fix bugs and add mqtt join and ack protocols.
Jakio815 Apr 9, 2024
7a789cb
Remove function pointers of open and close.
Jakio815 Apr 9, 2024
5832b51
Reorganize code and change some names.
Jakio815 Apr 9, 2024
9818da8
Add netdriver.c to take out common functions of the read and write cl…
Jakio815 Apr 9, 2024
e435ca2
Reorganize sst support.c
Jakio815 Apr 9, 2024
b08aee7
Reorganize
Jakio815 Apr 9, 2024
1b08f0f
Reorganize
Jakio815 Apr 9, 2024
d1444a9
Minor fix
Jakio815 Apr 10, 2024
2aac754
Fix mqtt support. Remove base64 encoding decoding and it's libraries.
Jakio815 Apr 27, 2024
822530b
Fix read to return 0 on connection lost.
Jakio815 Apr 27, 2024
3422b81
Fix rti config
Jakio815 Apr 28, 2024
2691170
Add skeleton for openssl_required definition.
Jakio815 Apr 28, 2024
e13376b
Add temporary configs.
Jakio815 Apr 28, 2024
a14801b
Make a common connect to socket function.
Jakio815 Apr 28, 2024
e959a67
Minor fix
Jakio815 Apr 28, 2024
28d9278
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Apr 28, 2024
862febd
Fix conflicts
Jakio815 Apr 28, 2024
873dd8d
Fix unsused variable errors.
Jakio815 Apr 28, 2024
8315cb4
Add resign message
Jakio815 Apr 28, 2024
bbe209a
FIx sst read
Jakio815 Apr 29, 2024
f7d8a72
Remove library
Jakio815 May 11, 2024
e3ac4f2
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 May 11, 2024
4b4d2b2
Fix to return code
Jakio815 May 12, 2024
a59d915
Fix to get server_type_t when create_server()
Jakio815 May 13, 2024
35e0711
Minor fix
Jakio815 May 13, 2024
d4eeded
Add initialize_common_netdrv() to not repeat code && Apply server_type_t
Jakio815 May 13, 2024
9b0ca53
Add initialize_common_netdrv
Jakio815 May 13, 2024
c58bbad
Minor fix on netdriver CMake.
Jakio815 May 14, 2024
187dee4
Add send_address_advertisement_to_RTI && minor formatting.
Jakio815 May 14, 2024
2556451
Minor comments and formatting.
Jakio815 May 14, 2024
b1b030b
Fix declaration of send_address_advertisement_to_RTI
Jakio815 May 14, 2024
199ec22
Change function names to connector and listener
Jakio815 May 14, 2024
ca645a6
Move send_address_advertisement_to_RTI() back to federate.c
Jakio815 May 14, 2024
5362a71
Change multiple term changes.
Jakio815 May 14, 2024
3b3e550
Fix connect_to_socket with timeout time and better errors
Jakio815 May 14, 2024
327068b
Fix federate ID to uint16_t
Jakio815 May 15, 2024
c54d449
Create function create_topic_federation_id_A_to_B to make a topic nam…
Jakio815 May 15, 2024
60d155e
Add set_MQTT_target_ID.
Jakio815 May 15, 2024
f86d82f
Add stdint.h to mqtt_support.h
Jakio815 May 15, 2024
19856a5
Minor fixes.
Jakio815 May 16, 2024
23acbe5
Set back federate ID to int
Jakio815 May 16, 2024
707fd1e
Fix bugs, add comments.
Jakio815 May 16, 2024
b442b81
Make connect retries for MQTT connect()
Jakio815 May 17, 2024
63ba4b3
Minor comments.
Jakio815 May 17, 2024
eb61921
Minor change on MQTT connect.
Jakio815 May 17, 2024
499be8b
Minor fix
Jakio815 May 20, 2024
90efa9a
Add retries for connect and subscribe
Jakio815 May 20, 2024
b029972
Add retry for the total logic itself.
Jakio815 May 21, 2024
72a6c03
Take the read outside from the while loop
Jakio815 May 21, 2024
00dc72f
Fix to establish communication first, then do the fedID etc.. hanshake.
Jakio815 May 21, 2024
1b0e985
Remove MQTT connect with retries.
Jakio815 May 21, 2024
ca5798e
Change message name from MQTT_RTI_RESIGNED to MQTT_RESIGNED.
Jakio815 May 21, 2024
28e6999
Better close_netdrv() on MQTT support && Minor fix
Jakio815 May 21, 2024
ab44dc3
Update comments from socket to netdriver
Jakio815 May 21, 2024
69e1403
Update create_TCP_server for better port handling.
Jakio815 May 21, 2024
2b25a95
Minor fix.
Jakio815 May 21, 2024
2d93e2c
Add MQTTClient_receive to retry.
Jakio815 May 21, 2024
0202275
Minor fixes. Enable handling resign.
Jakio815 May 22, 2024
a48e850
Fix Clock-sync to work on MQTT.
Jakio815 May 22, 2024
9150fe4
Fix many comments and bugs.
Jakio815 May 23, 2024
1043170
Add better logs && Fix names for conflicts on topic and ClientID names.
Jakio815 May 24, 2024
f9c0d03
Add checking for drv is open.
Jakio815 May 24, 2024
74b2ef3
Fix checking if netdrv to rti is not NULL
Jakio815 May 24, 2024
5fe49a7
Move netdrv_mutex to netdriver.c
Jakio815 May 24, 2024
d71f9d0
Add LF_MUTEX_INIT(&netdrv_mutex); for also the rti use the same mutex…
Jakio815 May 24, 2024
32b459b
Minor fix
Jakio815 May 24, 2024
5ca9fe2
Remove log
Jakio815 May 24, 2024
d677b75
Add mutex and error handling. Still getting segfaults on termination.
Jakio815 May 24, 2024
1ad0a34
Reveal mutex locks..
Jakio815 May 24, 2024
30fef48
Removing close netdrivers on netdrv_close_on_error
Jakio815 May 24, 2024
c27ff6b
Move _lf_termination_executed to netdriver.c for checking the status …
Jakio815 May 30, 2024
9f3cd86
Remove old configs.
Jakio815 Jun 18, 2024
eddbe73
Add comments.
Jakio815 Jun 19, 2024
afb4c44
Apply Erling's fix on clearing out chunks.
Jakio815 Jun 20, 2024
ff613cf
Checked buffer length for read
Jakio815 Jun 20, 2024
abac681
Change bytes_read to 0 for close() situations.
Jakio815 Jun 21, 2024
9f4b822
Minor fix
Jakio815 Jun 25, 2024
18a1c16
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Jun 25, 2024
7e5997a
Fix RTI build
Jakio815 Jun 26, 2024
02b5b0d
Merge branch 'main' of github.com:lf-lang/reactor-c into netdriver
Jakio815 Nov 25, 2024
dd590b8
Temp commit.
Jakio815 Dec 9, 2024
2abafc9
Minor fix.
Jakio815 Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix many comments and bugs.
Jakio815 committed May 23, 2024
commit 9150fe42e77178e0e8d19ffeed9abdf2a53f612b
14 changes: 9 additions & 5 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
@@ -1649,6 +1649,7 @@ void send_address_advertisement_to_RTI(netdrv_t* fed_netdrv, netdrv_t* rti_netdr
* @return int
*/
static void get_remote_federate_info_from_RTI(uint16_t remote_federate_id, netdrv_t* rti_netdrv, netdrv_t* fed_netdrv) {
// Do not send port for MQTT. It only needs to know the target federate's ID.
#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST)
// The buffer is used for both sending and receiving replies.
// The size is what is needed for receiving replies.
@@ -1705,10 +1706,10 @@ static void get_remote_federate_info_from_RTI(uint16_t remote_federate_id, netdr
// Must set as specified port. Or else, the port will be increased when connecting to the other federate.
set_specified_port(fed_netdrv, port);
#elif defined(COMM_TYPE_MQTT)
// Do not send port for MQTT. It only needs to know the target federate's ID.
set_target_id(fed_netdrv, remote_federate_id);
if (rti_netdrv == NULL) {
} // JUST TO PASS COMPILER.
// // Do not send port for MQTT. It only needs to know the target federate's ID.
// set_target_id(fed_netdrv, remote_federate_id);
// if (rti_netdrv == NULL) {
// } // JUST TO PASS COMPILER.
#endif
}

@@ -1722,6 +1723,9 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {

// Initialize the netdriver to connect the remote federate.
netdrv_t* netdrv = initialize_netdrv(_lf_my_fed_id, federation_metadata.federation_id);
#if defined(COMM_TYPE_MQTT)
set_target_id(netdrv, remote_federate_id);
#endif
create_connector(netdrv);
get_remote_federate_info_from_RTI(remote_federate_id, _fed.netdrv_to_rti, netdrv);
result = connect_to_netdrv(netdrv);
@@ -1793,11 +1797,11 @@ void lf_connect_to_rti(const char* hostname, int port) {

// Initialize netdriver to rti.
_fed.netdrv_to_rti = initialize_netdrv(_lf_my_fed_id, federation_metadata.federation_id); // set memory.
create_connector(_fed.netdrv_to_rti);
set_host_name(_fed.netdrv_to_rti, hostname);
set_port(_fed.netdrv_to_rti, uport);
set_specified_port(_fed.netdrv_to_rti, port);
set_target_id(_fed.netdrv_to_rti, -1);
create_connector(_fed.netdrv_to_rti);

if (connect_to_netdrv(_fed.netdrv_to_rti) < 0) {
lf_print_error_and_exit("Failed to connect() to RTI after %d tries.", CONNECT_MAX_RETRIES);
228 changes: 112 additions & 116 deletions core/federated/network/lf_mqtt_support.c
Original file line number Diff line number Diff line change
@@ -21,10 +21,9 @@
static MQTT_priv_t* MQTT_priv_init();
static char* create_topic_federation_id_listener_id(const char* federation_id, int listener_id);
static char* create_topic_federation_id_A_to_B(const char* federation_id, int A, int B);
static void set_MQTTServer_id(MQTT_priv_t* MQTT_priv, int my_id, int client_id);
static void set_MQTTClient_id(MQTT_priv_t* MQTT_priv, int client_id);
// int MQTT_connect_with_retry(MQTTClient client, MQTTClient_connectOptions* conn_opts);
// int MQTT_subscribe_with_retry(MQTTClient client, const char* topic, int qos);
static void set_MQTTClient_id(MQTT_priv_t* MQTT_priv, int my_id, int target_id);
int MQTT_connect_with_retry(MQTTClient client, MQTTClient_connectOptions* conn_opts);
int MQTT_subscribe_with_retry(MQTTClient client, const char* topic, int qos);

void log_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) {
(void)level; // Explicitly mark the parameter as unused
@@ -64,6 +63,9 @@ void close_netdrv(netdrv_t* drv) {
lf_print("Failed to disconnect, return code %d.", rc);
}
MQTTClient_destroy(&MQTT_priv->client);
if (MQTT_priv->topic_name) {
free(MQTT_priv->topic_name);
}
}

/**
@@ -77,39 +79,43 @@ void close_netdrv(netdrv_t* drv) {
* @return int
*/
int create_listener(netdrv_t* drv, server_type_t server_type, uint16_t port) {
MQTT_priv_t* MQTT_priv = (MQTT_priv_t*)drv->priv;
if (server_type == RTI) {
} // JUST TO PASS COMPILER.

}
else {

}
if (port == 0) {
} // JUST TO PASS COMPILER.
MQTT_priv_t* MQTT_priv = (MQTT_priv_t*)drv->priv;

// Target is not available for listeners. We set it to -2 if it is uninitialized or unavailable. This is used when
// close_netdrv() is called.
MQTT_priv->target_id = -2;
// If RTI calls this, it will be -1. If federate server calls, it will be it's federate ID.
set_MQTTServer_id(MQTT_priv, drv->my_federate_id, drv->my_federate_id);
set_MQTTClient_id(MQTT_priv, drv->my_federate_id, drv->my_federate_id);

int rc;

if ((rc = MQTTClient_create(&MQTT_priv->client, ADDRESS, MQTT_priv->client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL)) !=
MQTTCLIENT_SUCCESS) {
lf_print_error_and_exit("Failed to create client, return code %d.", rc);
lf_print_error_and_exit("Failed to create client during create_listener(), return code %d.", rc);
}

MQTT_priv->conn_opts.keepAliveInterval = MQTTkeepAliveInterval;
MQTT_priv->conn_opts.cleansession = MQTTcleansession;
if ((rc = MQTTClient_connect(MQTT_priv->client, &MQTT_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
LF_PRINT_DEBUG("Connecting MQTTClient %s to broker.", MQTT_priv->client_id);
if ((rc = MQTT_connect_with_retry(MQTT_priv->client, &MQTT_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
MQTTClient_destroy(&MQTT_priv->client);
lf_print_error_and_exit("Failed to connect, return code %d. Check if MQTT broker is available.", rc);
lf_print_error_and_exit(
"Failed to connect during create_listener(), return code %d. Check if MQTT broker is available.", rc);
}
MQTT_priv->topic_name = (const char*)create_topic_federation_id_listener_id(drv->federation_id, drv->my_federate_id);
if ((rc = MQTTClient_subscribe(MQTT_priv->client, MQTT_priv->topic_name, QOS)) != MQTTCLIENT_SUCCESS) {
LF_PRINT_DEBUG("Connected MQTTClient %s to broker, return code %d.",MQTT_priv->client_id, rc);
MQTT_priv->topic_name = create_topic_federation_id_listener_id(drv->federation_id, drv->my_federate_id);
LF_PRINT_DEBUG("Subscribing on topic %s.", MQTT_priv->topic_name);
if ((rc = MQTT_subscribe_with_retry(MQTT_priv->client, MQTT_priv->topic_name, QOS)) != MQTTCLIENT_SUCCESS) {
MQTTClient_disconnect(MQTT_priv->client, TIMEOUT);
MQTTClient_destroy(&MQTT_priv->client);
lf_print_error_and_exit("Failed to subscribe, return code %d.", rc);
lf_print_error_and_exit("Failed to subscribe during create_listener(), return code %d.", rc);
}
int64_t current_physical_time = lf_time_physical();
LF_PRINT_LOG("Subscribing on topic %s.", MQTT_priv->topic_name);
LF_PRINT_LOG("PHYSICAL_TIME ON SUBSCRIBING TOPIC: " PRINTF_TIME, current_physical_time);

LF_PRINT_DEBUG("Subscribed on topic %s, return code %d.", MQTT_priv->topic_name, rc);
return 1;
}

@@ -146,56 +152,43 @@ netdrv_t* establish_communication_session(netdrv_t* listener_netdrv) {
uint16_t fed_id = extract_uint16(buffer + 1);
LF_PRINT_LOG("Received MSG_TYPE_MQTT_JOIN message from federate %d.", fed_id);

// The conncetor netdriver connects to the broker.
// The connector netdriver connects to the broker.
connector_nedrv->my_federate_id = (int)fed_id;
LF_PRINT_DEBUG("Setting up MQTTServer_id for federate %d.", fed_id);
set_MQTTServer_id(connector_priv, listener_netdrv->my_federate_id, connector_nedrv->my_federate_id);
LF_PRINT_DEBUG("Setup MQTTServer_id for federate %d as %s.", fed_id, connector_priv->client_id);
LF_PRINT_DEBUG("Setting up MQTTClient_id to target federate %d.", fed_id);
// If RTI calls this, it will be RTI_targetfedID. If federate calls this, it will be myfedID_targetfedID
set_MQTTClient_id(connector_priv, listener_netdrv->my_federate_id, connector_nedrv->my_federate_id);
LF_PRINT_DEBUG("Setup MQTTClient_id to target federate %d as %s.", fed_id, connector_priv->client_id);

LF_PRINT_DEBUG("Creating topic for federate %d.", fed_id);
LF_PRINT_DEBUG("Creating topic to target federate %d.", fed_id);
// Subscribe to topic: federationID_fedID_to_listenerID
// This is the channel where the federate sends messages to the listener.
topic_to_subscribe =
create_topic_federation_id_A_to_B(connector_nedrv->federation_id, fed_id, listener_netdrv->my_federate_id);

LF_PRINT_DEBUG("Creating MQTTClient for federate %d.", fed_id);
LF_PRINT_DEBUG("Creating MQTTClient to target federate %d.", fed_id);
if ((rc = MQTTClient_create(&connector_priv->client, ADDRESS, connector_priv->client_id, MQTTCLIENT_PERSISTENCE_NONE,
NULL)) != MQTTCLIENT_SUCCESS) {
lf_print_error("Failed to create client, return code %d.", rc);
lf_print_error_and_exit("Failed to create client during establish_communication_session(), return code %d.", rc);
}
connector_priv->conn_opts.keepAliveInterval = MQTTkeepAliveInterval;
connector_priv->conn_opts.cleansession = MQTTcleansession;
instant_t start_connect = lf_time_physical();
while (1) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
break;
}
LF_PRINT_DEBUG("Connecting MQTTClient for federate %d.", fed_id);
if ((rc = MQTTClient_connect(connector_priv->client, &connector_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
MQTTClient_destroy(&connector_priv->client);
free(topic_to_subscribe);
lf_print_error("Failed to connect, return code %d.", rc);
continue;
}
LF_PRINT_DEBUG("Connected, return code %d.", rc);
LF_PRINT_LOG("Starting subscribe");
if ((rc = MQTTClient_subscribe(connector_priv->client, (const char*)topic_to_subscribe, QOS)) !=
MQTTCLIENT_SUCCESS) {
MQTTClient_disconnect(connector_priv->client, TIMEOUT);
MQTTClient_destroy(&connector_priv->client);
free(topic_to_subscribe);
lf_print_error("Failed to subscribe, return code %d.", rc);
continue;
}
LF_PRINT_DEBUG("Subscribed on topic %s, return code %d.", topic_to_subscribe, rc);
break;

LF_PRINT_DEBUG("Connecting MQTTClient %s to broker.", connector_priv->client_id);
if ((rc = MQTT_connect_with_retry(connector_priv->client, &connector_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
MQTTClient_destroy(&connector_priv->client);
lf_print_error_and_exit("Failed to connect during establish_communication_session(), return code %d.", rc);
}
LF_PRINT_DEBUG("Connected MQTTClient %s to broker, return code %d.", connector_priv->client_id, rc);
LF_PRINT_DEBUG("Subscribing on topic %s.", topic_to_subscribe);
if ((rc = MQTT_subscribe_with_retry(connector_priv->client, (const char*)topic_to_subscribe, QOS)) != MQTTCLIENT_SUCCESS) {
MQTTClient_disconnect(connector_priv->client, TIMEOUT);
MQTTClient_destroy(&connector_priv->client);
lf_print_error_and_exit("Failed to subscribe during establish_communication_session(), return code %d.", rc);
}
LF_PRINT_DEBUG("Subscribed on topic %s, return code %d.", topic_to_subscribe, rc);

// Step2: The listener sends a MSG_TYPE_MQTT_ACCEPT message to the federate.
// Publish to topic: federationID_listenerID_to_fedID
connector_priv->topic_name = (const char*)create_topic_federation_id_A_to_B(connector_nedrv->federation_id,
listener_netdrv->my_federate_id, fed_id);
connector_priv->topic_name =
create_topic_federation_id_A_to_B(connector_nedrv->federation_id, listener_netdrv->my_federate_id, fed_id);
buffer[0] = MSG_TYPE_MQTT_ACCEPT;
encode_uint16((uint16_t)connector_nedrv->my_federate_id, buffer + 1);
LF_PRINT_LOG("Publishing MSG_TYPE_MQTT_ACCEPT message on topic %s.", connector_priv->topic_name);
@@ -219,19 +212,19 @@ netdrv_t* establish_communication_session(netdrv_t* listener_netdrv) {
*/
void create_connector(netdrv_t* drv) {
MQTT_priv_t* MQTT_priv = (MQTT_priv_t*)drv->priv;
set_MQTTClient_id(MQTT_priv, drv->my_federate_id);
// Only federates call this. It will be myfedID_RTI or myfedID_targetfedID.
set_MQTTClient_id(MQTT_priv, drv->my_federate_id, MQTT_priv->target_id);
int rc;
if ((rc = MQTTClient_create(&MQTT_priv->client, ADDRESS, MQTT_priv->client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL)) !=
MQTTCLIENT_SUCCESS) {
lf_print_error_and_exit("Failed to create client, return code %d.", rc);
}

MQTT_priv->conn_opts.keepAliveInterval = MQTTkeepAliveInterval;
MQTT_priv->conn_opts.cleansession = MQTTcleansession;
if ((rc = MQTTClient_connect(MQTT_priv->client, &MQTT_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
LF_PRINT_DEBUG("Connecting MQTTClient %s to broker.", MQTT_priv->client_id);
if ((rc = MQTT_connect_with_retry(MQTT_priv->client, &MQTT_priv->conn_opts)) != MQTTCLIENT_SUCCESS) {
MQTTClient_destroy(&MQTT_priv->client);
lf_print_error_and_exit("Failed to connect, return code %d. Check if MQTT broker is available.", rc);
lf_print_error_and_exit("Failed to connect during create_connector(), return code %d. Check if MQTT broker is available.", rc);
}
LF_PRINT_DEBUG("Connected MQTTClient %s to broker, return code %d.", MQTT_priv->client_id, rc);
}
/**
* @brief Federate publishes it's federate ID to "{Federation_ID}_RTI", then subscribes to
@@ -252,20 +245,22 @@ int connect_to_netdrv(netdrv_t* drv) {
char* topic_to_subscribe =
create_topic_federation_id_A_to_B(drv->federation_id, MQTT_priv->target_id, drv->my_federate_id);

if ((rc = MQTTClient_subscribe(MQTT_priv->client, (const char*)topic_to_subscribe, QOS)) != MQTTCLIENT_SUCCESS) {
LF_PRINT_DEBUG("Subscribing on topic %s.", topic_to_subscribe);
if ((rc = MQTT_subscribe_with_retry(MQTT_priv->client, (const char*)topic_to_subscribe, QOS)) != MQTTCLIENT_SUCCESS) {
MQTTClient_disconnect(MQTT_priv->client, TIMEOUT);
MQTTClient_destroy(&MQTT_priv->client);
free(topic_to_subscribe);
lf_print_error_and_exit("Failed to subscribe, return code %d.", rc);
lf_print_error_and_exit("Failed to subscribe during connect_to_netdrv(), return code %d.", rc);
}
LF_PRINT_LOG("Subscribed on topic %s.", topic_to_subscribe);
LF_PRINT_DEBUG("Subscribed on topic %s, return code %d.", topic_to_subscribe, rc);

// Step1: The federate sends a MSG_TYPE_MQTT_JOIN message including it's federateID to the listener.
// Publish to topic: federationID_listenerID
MQTT_priv->topic_name = (const char*)create_topic_federation_id_listener_id(drv->federation_id, MQTT_priv->target_id);
MQTT_priv->topic_name = create_topic_federation_id_listener_id(drv->federation_id, MQTT_priv->target_id);
unsigned char buffer[1 + sizeof(uint16_t)];
buffer[0] = MSG_TYPE_MQTT_JOIN;
encode_uint16((uint16_t)drv->my_federate_id, buffer + 1);

// The connect_to_netdrv() can be called in the federates before the establish_communication_session() is called in
// the RTI. The MQTT QOS2 ensures the message to arrive to the subscribed client, but this can be called even before
// the netdriver was initialized in the RTI side. Thus, the federate must retry sending messages to the RTI until it
@@ -274,7 +269,7 @@ int connect_to_netdrv(netdrv_t* drv) {
instant_t start_connect = lf_time_physical();
while (1) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
lf_print_error("Failed to handshake with target with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
return -1;
}
LF_PRINT_LOG("Publishing MSG_TYPE_MQTT_JOIN message with federateID %d on topic %s.", drv->my_federate_id,
@@ -336,7 +331,7 @@ int connect_to_netdrv(netdrv_t* drv) {
// Step3: Send MSG_TYPE_MQTT_ACCEPT_ACK message to the listener.
// Publish to topic: federationID_fedID_to_listenorID
MQTT_priv->topic_name =
(const char*)create_topic_federation_id_A_to_B(drv->federation_id, drv->my_federate_id, MQTT_priv->target_id);
create_topic_federation_id_A_to_B(drv->federation_id, drv->my_federate_id, MQTT_priv->target_id);
buffer[0] = MSG_TYPE_MQTT_ACCEPT_ACK;
write_to_netdrv_fail_on_error(drv, 1, buffer, NULL,
"Failed to write MSG_TYPE_MQTT_ACCEPT_ACK_to RTI for connection through MQTT.");
@@ -384,7 +379,7 @@ ssize_t read_from_netdrv(netdrv_t* drv, unsigned char* buffer, size_t buffer_len
MQTTClient_message* message = NULL;
int rc;
int bytes_read;
LF_PRINT_LOG("RECEIVING message from federateID %d", drv->my_federate_id);
// LF_PRINT_LOG("RECEIVING message from federateID %d", drv->my_federate_id);
instant_t start_receive = lf_time_physical();
while (1) {
if (CHECK_TIMEOUT(start_receive, CONNECT_TIMEOUT)) {
@@ -395,17 +390,19 @@ ssize_t read_from_netdrv(netdrv_t* drv, unsigned char* buffer, size_t buffer_len
rc = MQTTClient_receive(MQTT_priv->client, &topicName, &topicLen, &message, 10000);
if (rc != MQTTCLIENT_SUCCESS) {
lf_print_error("Failed to receive message, return code %d.", rc);
lf_sleep(CONNECT_RETRY_INTERVAL);
continue;
} else if (message == NULL) {
// This means the call succeeded but no message was received within the timeout
lf_print_log("No message received within the MQTTClient_receive() timeout period.");
lf_sleep(CONNECT_RETRY_INTERVAL);
continue;
} else {
// Successfully received a message
lf_print_log("Successfully received message, return code %d.", rc);
memcpy(buffer, (unsigned char*)message->payload, message->payloadlen);
bytes_read = message->payloadlen;
LF_PRINT_LOG("RECEIVED message from federateID %d", drv->my_federate_id);
// LF_PRINT_LOG("RECEIVED message from federateID %d", drv->my_federate_id);
if (buffer[0] == MQTT_RESIGNED) {
LF_PRINT_LOG("Received MQTT_RESIGNED message from federateID %d", drv->my_federate_id);
bytes_read = 0;
@@ -489,6 +486,9 @@ static MQTT_priv_t* MQTT_priv_init() {
memset(MQTT_priv, 0, sizeof(MQTT_priv_t));
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
memcpy(&MQTT_priv->conn_opts, &conn_opts, sizeof(MQTTClient_connectOptions));
MQTT_priv->conn_opts.keepAliveInterval = MQTTkeepAliveInterval;
MQTT_priv->conn_opts.cleansession = MQTTcleansession;
MQTT_priv->conn_opts.connectTimeout = MQTTconnectTimeout;
return MQTT_priv;
}

@@ -515,7 +515,7 @@ static char* create_topic_federation_id_listener_id(const char* federation_id, i
if (listener_id == -1) {
snprintf(result, max_length, "%s_RTI", federation_id);
} else {
snprintf(result, max_length, "%s_fed__%d", federation_id, listener_id);
snprintf(result, max_length, "%s_fed_%d", federation_id, listener_id);
}
return result;
}
@@ -556,54 +556,50 @@ static char* create_topic_federation_id_A_to_B(const char* federation_id, int A,
return result;
}

static void set_MQTTServer_id(MQTT_priv_t* MQTT_priv, int my_id, int client_id) {
if (my_id == -1 && client_id == -1) {
static void set_MQTTClient_id(MQTT_priv_t* MQTT_priv, int my_id, int target_id) {
if (my_id == -1 && target_id == -1) {
strcat(MQTT_priv->client_id, "RTI_RTI");
} else if (my_id == -1) {
sprintf(MQTT_priv->client_id, "RTI_%d", client_id);
sprintf(MQTT_priv->client_id, "RTI_fed_%d", target_id);
} else if (target_id == -1) {
sprintf(MQTT_priv->client_id, "fed_%d_RTI", my_id);
} else {
sprintf(MQTT_priv->client_id, "fed%d_fed%d", my_id, client_id);
sprintf(MQTT_priv->client_id, "fed_%d_fed_%d", my_id, target_id);
}
}

int MQTT_connect_with_retry(MQTTClient client, MQTTClient_connectOptions* conn_opts) {
int rc = -1;
instant_t start_connect = lf_time_physical();
while (1) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
break;
}
if ((rc = MQTTClient_connect(client, conn_opts)) != MQTTCLIENT_SUCCESS) {
lf_print_error("Failed to connect, return code %d. Retrying to connect.", rc);
lf_sleep(CONNECT_RETRY_INTERVAL);
continue;
}
break;
}
return rc;
}

static void set_MQTTClient_id(MQTT_priv_t* MQTT_priv, int client_id) { sprintf(MQTT_priv->client_id, "%d", client_id); }

// int MQTT_connect_with_retry(MQTTClient client, MQTTClient_connectOptions* conn_opts) {
// int rc;
// int retries = 0;
// instant_t start_connect = lf_time_physical();
// while (1) {
// if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
// lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
// break;
// }
// if ((rc = MQTTClient_connect(client, conn_opts)) != MQTTCLIENT_SUCCESS) {
// LF_PRINT_LOG("Failed to connect to MQTT broker, return code %d.", rc);
// retries++;
// if (retries >= MAX_RETRIES) {
// LF_PRINT_LOG("Max retries reached. Giving up.");
// return rc;
// }
// lf_print_warning("Could not connect. Will try again every " PRINTF_TIME " nanoseconds.",
// CONNECT_RETRY_INTERVAL); lf_sleep(CONNECT_RETRY_INTERVAL);
// }
// return MQTTCLIENT_SUCCESS;
// }

// int MQTT_subscribe_with_retry(MQTTClient client, const char* topic, int qos) {
// int rc;
// int retries = 0;

// while ((rc = MQTTClient_subscribe(client, topic, qos)) != MQTTCLIENT_SUCCESS) {
// LF_PRINT_LOG("Failed to subscribe to MQTT broker, return code %d.", rc);
// retries++;
// if (retries >= MAX_RETRIES) {
// LF_PRINT_LOG("Max retries reached. Giving up.");
// return rc;
// }
// lf_print_warning("Could not subscribe to topic. Will try again every " PRINTF_TIME " nanoseconds.",
// CONNECT_RETRY_INTERVAL);
// lf_sleep(CONNECT_RETRY_INTERVAL);
// }
// return MQTTCLIENT_SUCCESS;
// }
int MQTT_subscribe_with_retry(MQTTClient client, const char* topic, int qos) {
int rc = -1;
instant_t start_connect = lf_time_physical();
while (1) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error("Failed to subscribe with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT);
break;
}
if ((rc = MQTTClient_subscribe(client, topic, qos)) != MQTTCLIENT_SUCCESS) {
lf_print_error("Failed to subscribe, return code %d. Retrying to subscribe.", rc);
lf_sleep(CONNECT_RETRY_INTERVAL);
continue;
}
break;
}
return rc;
}
4 changes: 3 additions & 1 deletion include/core/federated/network/type/lf_mqtt_support.h
Original file line number Diff line number Diff line change
@@ -6,14 +6,16 @@

#define MQTTkeepAliveInterval 20
#define MQTTcleansession 1
#define MQTTconnectTimeout 2

#define MQTT_RESIGNED 88

typedef struct MQTT_priv_t {
MQTTClient client;
MQTTClient_connectOptions conn_opts; // = MQTTClient_connectOptions_initializer;
const char* topic_name;
char* topic_name;
char client_id[20];

int target_id; // Must be int. Not uint_16_t. -1 stands for RTI, -2 means uninitialized.
} MQTT_priv_t;