-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue-549: Moved Kafka time producer from CDASimAdapter to CARMA-Stre… #550
Changes from all commits
d1bbb0a
7169c45
5819a5f
07c3533
6e611fb
c1e8bea
cc76a51
892fedb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,19 @@ namespace tmx::utils | |
* @param broker_str network address of kafka broker. | ||
*/ | ||
explicit kafka_producer_worker(const std::string &brokers); | ||
/** | ||
* @brief Destroy the kafka producer worker object. Calls stop on producer to clean up resources. | ||
*/ | ||
virtual ~kafka_producer_worker(); | ||
// Rule of 5 because destructor is define (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f) | ||
// delete copy constructor | ||
kafka_producer_worker(kafka_producer_worker& other) = delete; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of deleting the copy and move constructor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rule of 5 is a rule in C++ that if you define one of these you need to explicitly define all to avoid issues with compiler generated versions. This is to address a Code Smell. (https://www.codementor.io/@sandesh87/the-rule-of-five-in-c-1pdgpzb04f) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a comment as well |
||
// delete copy assignment | ||
kafka_producer_worker& operator=(const kafka_producer_worker& other) = delete; | ||
// delete move constructor | ||
kafka_producer_worker(kafka_producer_worker &&producer) = delete; | ||
// delete move assignment | ||
kafka_producer_worker const & operator=(kafka_producer_worker &&producer) = delete; | ||
/** | ||
* @brief Initialize kafka_producer_worker. This method must be called before send! | ||
* | ||
|
@@ -100,16 +113,12 @@ namespace tmx::utils | |
/** | ||
* @brief Stop running kafka producer. | ||
*/ | ||
virtual void stop(); | ||
void stop(); | ||
/** | ||
* @brief Print current configurations. | ||
*/ | ||
virtual void printCurrConf(); | ||
/** | ||
* @brief Destroy the kafka producer worker object | ||
* | ||
*/ | ||
virtual ~kafka_producer_worker() = default; | ||
|
||
}; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
#include "gtest/gtest.h" | ||
#include <librdkafka/rdkafkacpp.h> | ||
|
||
/** | ||
* @brief Kafka Test Environment which allows for Setup/Teardown configuration at the | ||
* test program level. Teardown waits on all rd_kafka_t objects to be destroyed. | ||
*/ | ||
class KafkaTestEnvironment : public ::testing::Environment { | ||
public: | ||
~KafkaTestEnvironment() override {} | ||
|
||
// Override this to define how to set up the environment. | ||
void SetUp() override {} | ||
|
||
// Override this to define how to tear down the environment. | ||
void TearDown() override { | ||
std::cout << "Waiting for all RDKafka objects to be destroyed!" << std::endl; | ||
// Wait for all rd_kafka_t objects to be destroyed | ||
auto error = RdKafka::wait_destroyed(5000); | ||
if (error == RdKafka::ERR__TIMED_OUT) { | ||
std::cout << "Wait destroy attempted timed out!" << std::endl; | ||
} | ||
else { | ||
std::cout << "All Objects are destroyed!" << std::endl; | ||
} | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the if statement to only apply filter when it is in simulation mode? Is it possible for CDAAdapterPlugin to broadcast TimeSyncMessage when it is not in simulation mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is not possible and also not valid. TimeSync messages are only valid when we are running in simulation. Otherwise we use machine time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AddMessageFilter is event based and should only be triggered when there is Timsync message sent to the TMX bus in simulation mode.
so the _simulation_mode should always be true. Can we remove this if check statement? The same to the HandleTimeSyncMessage() function. There is no need to check if _simulation_mode true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is incorrect. Simulation mode will be false in real world deployments. Here the PluginClockAwareClient will not subscribe to TimeSync messages so this is valid in my opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, the CDASimAdapter will only be active in simulation mode, which means we will only broadcast TimeSync messages in Simulation Mode. The PluginClockAwareClient is a PluginClient that is able to configurably use Simulation time or real time. It will only subscribe to TimeSync messages when in simulation mode. So on the broadcast side the plugin will not be active to produce time sync messages if not in simulation. On the filter side, even if something else did produce a time sync message, we will not add a filter for it if not in simulation mode.