diff --git a/.gitignore b/.gitignore index 67bc9601..9f3df18d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ cmake-build-debug cmake-build-release .idea result +Makefile diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5f05c960..91e20c03 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -7,3 +7,4 @@ add_subdirectory(count) add_subdirectory(ports) add_subdirectory(hello) add_subdirectory(power_train) +add_subdirectory(multiport_mutation) \ No newline at end of file diff --git a/examples/count/main.cc b/examples/count/main.cc index e0c6e92b..0b1e51f0 100644 --- a/examples/count/main.cc +++ b/examples/count/main.cc @@ -5,7 +5,7 @@ using namespace reactor; using namespace std::chrono_literals; -class Count : public Reactor { +class Count final : public Reactor { private: // actions Timer timer{"timer", this}; diff --git a/examples/hello/main.cc b/examples/hello/main.cc index 3b02fd89..2f6c88f8 100644 --- a/examples/hello/main.cc +++ b/examples/hello/main.cc @@ -6,7 +6,7 @@ using namespace reactor; using namespace std::chrono_literals; -class Hello : public Reactor { +class Hello final : public Reactor { private: // actions Timer timer{"timer", this, 1s, 2s}; diff --git a/examples/multiport_mutation/CMakeLists.txt b/examples/multiport_mutation/CMakeLists.txt new file mode 100644 index 00000000..b19dd98a --- /dev/null +++ b/examples/multiport_mutation/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(mutation_multiports main.cc) +target_link_libraries(mutation_multiports reactor-cpp) +add_dependencies(examples mutation_multiports) diff --git a/examples/multiport_mutation/consumer.hh b/examples/multiport_mutation/consumer.hh new file mode 100644 index 00000000..97fc2d12 --- /dev/null +++ b/examples/multiport_mutation/consumer.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef MULTIPORT_MUTATION_CONSUMER_HH +#define MULTIPORT_MUTATION_CONSUMER_HH + +#include +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Consumer final : public Reactor { // NOLINT + class Inner : public Scope { + Inner(Reactor* reactor, std::size_t index) + : Scope(reactor) + , index_(index) {} + std::size_t index_ = 0; + + void reaction_1(const Input& in) const { + // std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n'; + } + + friend Consumer; + }; + + Inner _lf_inner; + Reaction handle{"handle", 1, this, [this]() { _lf_inner.reaction_1(this->in); }}; + +public: + Consumer(const std::string& name, Environment* env, std::size_t index) + : Reactor(name, env) + , _lf_inner(this, index) { + std::cout << "creating instance of consumer" << '\n'; + } + ~Consumer() override { std::cout << "Consumer Object is deleted" << '\n'; }; + + Input in{"in", this}; // NOLINT + + void assemble() override { handle.declare_trigger(&in); } +}; + +#endif // MULTIPORT_MUTATION_CONSUMER_HH diff --git a/examples/multiport_mutation/load_balancer.hh b/examples/multiport_mutation/load_balancer.hh new file mode 100644 index 00000000..eaeb6375 --- /dev/null +++ b/examples/multiport_mutation/load_balancer.hh @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef MULTIPORT_MUTATION_LOAD_BALANCER_HH +#define MULTIPORT_MUTATION_LOAD_BALANCER_HH + +#include +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class LoadBalancer final : public Reactor { // NOLINT + class Inner : public MutableScope { + explicit Inner(Reactor* reactor) + : MutableScope(reactor) {} + + // reaction bodies + static void reaction_1(const Input& inbound, Output& scale_bank, + Multiport>& outbound) { + if (std::rand() % 15 == 0) { // NOLINT + scale_bank.set(std::rand() % 20 + 1); // NOLINT + } + const unsigned outbound_port = std::rand() % outbound.size(); // NOLINT + outbound[outbound_port].set(inbound.get()); + } + + friend LoadBalancer; + }; + + Inner _lf_inner; + Reaction process{"process", 1, this, [this]() { Inner::reaction_1(this->inbound, this->scale_bank, this->out); }}; + +public: + LoadBalancer(const std::string& name, Environment* env) + : Reactor(name, env) + , _lf_inner(this) { + out.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + out.create_new_port(); + } + } + ~LoadBalancer() override = default; + + ModifableMultiport> out{"out", this}; // NOLINT + std::size_t out_size_ = 0; + + Input inbound{"inbound", this}; // NOLINT + Output scale_bank{"scale_bank", this}; // NOLINT + + void assemble() override { + for (auto& _lf_port : out) { + process.declare_antidependency(&_lf_port); + } + process.declare_trigger(&inbound); + } +}; + +#endif // MULTIPORT_MUTATION_LOAD_BALANCER_HH diff --git a/examples/multiport_mutation/main.cc b/examples/multiport_mutation/main.cc new file mode 100644 index 00000000..62f80ba9 --- /dev/null +++ b/examples/multiport_mutation/main.cc @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#include +#include + +#include + +#include "./consumer.hh" +#include "./load_balancer.hh" +#include "./multiport_to_bank.hh" +#include "./producer.hh" + +class Deployment final : public Reactor { // NOLINT + + std::unique_ptr producer_; + std::unique_ptr load_balancer_; + std::vector> consumers_; + + Reaction scale_bank{"scale_bank", 1, this, + [this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }}; + + class Inner : public MutableScope { + int state = 0; + + public: + explicit Inner(Reactor* reactor) + : MutableScope(reactor) {} + void reaction_1(const Input& scale, std::vector>& reactor_bank, + ModifableMultiport>& load_balancer) { + std::size_t new_size = *scale.get(); + + std::function lambda = [](Environment* env, std::size_t index) { + std::string _lf_inst_name = "consumer_" + std::to_string(index); + return std::make_unique(_lf_inst_name, env, index); + }; + + std::function get_input_port = [](const std::unique_ptr& consumer) { return &consumer->in; }; + const auto rescale = std::make_shared>( + &load_balancer, &reactor_bank, get_input_port, lambda, new_size); + + add_to_transaction(rescale); + + commit_transaction(true); + } + + friend LoadBalancer; + }; + + Inner _inner; + +public: + Deployment(const std::string& name, Environment* env) + : Reactor(name, env) + , _inner(this) + , producer_(std::make_unique("producer", environment())) + , load_balancer_(std::make_unique("load_balancer", environment())) { + std::cout << "creating instance of deployment" << '\n'; + consumers_.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + std::string _lf_inst_name = "consumer_" + std::to_string(_lf_idx); + consumers_.push_back(std::make_unique(_lf_inst_name, environment(), _lf_idx)); + } + } + ~Deployment() override = default; + + Input scale{"scale", this}; // NOLINT + + void assemble() override { + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + environment()->draw_connection(load_balancer_->out[_lf_idx], consumers_[_lf_idx]->in, ConnectionProperties{}); + environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{}); + } + environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{}); + scale_bank.declare_trigger(&this->scale); + } +}; + +auto main() -> int { + Environment env{4, true}; + auto deployment = std::make_unique("c1", &env); + env.optimize(); + env.assemble(); + auto thread = env.startup(); + thread.join(); + return 0; +} diff --git a/examples/multiport_mutation/multiport_to_bank.hh b/examples/multiport_mutation/multiport_to_bank.hh new file mode 100644 index 00000000..a9f234f0 --- /dev/null +++ b/examples/multiport_mutation/multiport_to_bank.hh @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef MULTIPORT_MUTATION_MULTIPORT_TO_BANK_HH +#define MULTIPORT_MUTATION_MULTIPORT_TO_BANK_HH + +#include +#include +#include +#include +#include +#include +#include + +#include "../../lib/mutations/bank.cc" +#include "../../lib/mutations/connection.cc" +#include "../../lib/mutations/multiport.cc" + +#include + +namespace reactor { + +template class ResizeMultiportToBank : public Mutation { + ModifableMultiport>* multiport_; + std::vector>* bank_; + std::function*(const std::unique_ptr&)> get_input_port_; + std::function(Environment* env, std::size_t index)> create_lambda_; + std::size_t new_size_ = 0; + +public: + ResizeMultiportToBank(ModifableMultiport>* multiport, + std::vector>* bank, + std::function*(const std::unique_ptr&)> get_input_port, + std::function(Environment* env, std::size_t index)> create_lambda, + std::size_t new_size) + : multiport_(multiport) + , bank_(bank) + , get_input_port_(get_input_port) + , create_lambda_(create_lambda) + , new_size_(new_size) {} + + ~ResizeMultiportToBank() = default; + auto run() -> MutationResult { + if (multiport_->size() != bank_->size()) { + return NotMatchingBankSize; + } + auto old_size = multiport_->size(); + + if (new_size_ > old_size) { + auto change_multiport_size = std::make_shared>(multiport_, new_size_); + + change_multiport_size->run(); + + auto change_bank_size = std::make_shared>>( + bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); + + change_bank_size->run(); + + for (auto i = old_size; i < new_size_; i++) { + auto add_conn = std::make_shared, Input>>( + &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true); + + add_conn->run(); + } + } else if (new_size_ < old_size) { + for (auto i = old_size - 1; i >= new_size_; i--) { + auto add_conn = std::make_shared, Input>>( + &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false); + + add_conn->run(); + } + + auto change_multiport_size = std::make_shared>(multiport_, new_size_); + + change_multiport_size->run(); + + auto change_bank_size = std::make_shared>>( + bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); + + change_bank_size->run(); + } + + return Success; + } + + auto rollback() -> MutationResult { return Success; } +}; +} // namespace reactor + +#endif // MULTIPORT_MUTATION_MULTIPORT_TO_BANK_HH diff --git a/examples/multiport_mutation/producer.hh b/examples/multiport_mutation/producer.hh new file mode 100644 index 00000000..914b7edf --- /dev/null +++ b/examples/multiport_mutation/producer.hh @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef MULTIPORT_MUTATION_PRODUCER_HH +#define MULTIPORT_MUTATION_PRODUCER_HH + +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Producer final : public Reactor { // NOLINT +private: + Timer timer{"timer", this, 1s, 1s}; + Reaction r_timer{"r_timer", 1, this, [this]() { _lf_inner.reaction_1(this->value); }}; + + class Inner : public Scope { + unsigned int counter_ = 0; + + void reaction_1([[maybe_unused]] Output& out) { + // std::cout << "producing value:" << counter_ << "\n"; + out.set(counter_++); + } + + explicit Inner(Reactor* reactor) + : Scope(reactor) {} + + friend Producer; + }; + + Inner _lf_inner; + +public: + Producer(const std::string& name, Environment* env) + : Reactor(name, env) + , _lf_inner(this) { + std::cout << "creating instance of producer\n"; + } + Producer() = delete; + ~Producer() override = default; + + Output value{"value", this}; // NOLINT + + void assemble() override { + r_timer.declare_trigger(&timer); + r_timer.declare_antidependency(&value); + } +}; + +#endif // MULTIPORT_MUTATION_PRODUCER_HH diff --git a/examples/ports/main.cc b/examples/ports/main.cc index 671d89b7..2b39332f 100644 --- a/examples/ports/main.cc +++ b/examples/ports/main.cc @@ -5,13 +5,13 @@ using namespace reactor; using namespace std::chrono_literals; -class Trigger : public Reactor { +class Trigger final : public Reactor { private: Timer timer; Reaction r_timer{"r_timer", 1, this, [this]() { on_timer(); }}; public: - Trigger(const std::string& name, Environment* env, Duration period) + Trigger(const std::string& name, Environment* env, const Duration period) : Reactor(name, env) , timer{"timer", this, period, Duration::zero()} {} @@ -25,7 +25,7 @@ class Trigger : public Reactor { void on_timer() { trigger.set(); } }; -class Counter : public Reactor { +class Counter final : public Reactor { private: int value_{0}; Reaction r_trigger{"r_trigger", 1, this, [this]() { on_trigger(); }}; @@ -49,7 +49,7 @@ class Counter : public Reactor { } }; -class Printer : public Reactor { +class Printer final : public Reactor { private: Reaction r_value{"r_value", 1, this, [this]() { on_value(); }}; @@ -64,10 +64,10 @@ class Printer : public Reactor { r_value.declare_trigger(&value); } - void on_value() { std::cout << this->name() << ": " << *value.get() << '\n'; } + void on_value() const { std::cout << this->name() << ": " << *value.get() << '\n'; } }; -class Adder : public Reactor { +class Adder final : public Reactor { private: Reaction r_add{"r_add", 1, this, [this]() { add(); }}; diff --git a/examples/power_train/main.cc b/examples/power_train/main.cc index 05efe57f..b55ebf2c 100644 --- a/examples/power_train/main.cc +++ b/examples/power_train/main.cc @@ -1,10 +1,8 @@ -#include - #include "reactor-cpp/reactor-cpp.hh" using namespace reactor; -class LeftPedal : public Reactor { +class LeftPedal final : public Reactor { public: // ports Output angle{"angle", this}; // NOLINT @@ -30,7 +28,7 @@ class LeftPedal : public Reactor { } }; -class RightPedal : public Reactor { +class RightPedal final : public Reactor { public: // ports Output angle{"angle", this}; // NOLINT @@ -60,7 +58,7 @@ class RightPedal : public Reactor { } }; -class BrakeControl : public Reactor { +class BrakeControl final : public Reactor { public: // ports Input angle{"angle", this}; // NOLINT @@ -81,7 +79,7 @@ class BrakeControl : public Reactor { } }; -class EngineControl : public Reactor { +class EngineControl final : public Reactor { public: // ports Input angle{"angle", this}; // NOLINT @@ -118,7 +116,7 @@ class EngineControl : public Reactor { } }; -class Brake : public Reactor { +class Brake final : public Reactor { public: // ports Input force{"force", this}; // NOLINT @@ -136,7 +134,7 @@ class Brake : public Reactor { void assemble() override { r1.declare_trigger(&force); } }; -class Engine : public Reactor { +class Engine final : public Reactor { public: // ports Input torque{"torque", this}; // NOLINT diff --git a/include/reactor-cpp/assert.hh b/include/reactor-cpp/assert.hh index 339eee8b..4dc7fa72 100644 --- a/include/reactor-cpp/assert.hh +++ b/include/reactor-cpp/assert.hh @@ -13,7 +13,6 @@ #include "reactor-cpp/fwd.hh" #include -#include #include #include @@ -71,7 +70,7 @@ public: : std::runtime_error(build_message(msg)) {} }; -constexpr void validate([[maybe_unused]] bool condition, [[maybe_unused]] const std::string_view message) { +constexpr void validate([[maybe_unused]] const bool condition, [[maybe_unused]] const std::string_view message) { if constexpr (runtime_validation) { if (!condition) { print_backtrace(); @@ -80,8 +79,8 @@ constexpr void validate([[maybe_unused]] bool condition, [[maybe_unused]] const } } -template constexpr auto extract_value(E enum_value) -> typename std::underlying_type_t { - return static_cast>(enum_value); +template constexpr auto extract_value(E enum_value) -> std::underlying_type_t { + return static_cast>(enum_value); } void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] Phase phase); diff --git a/include/reactor-cpp/connection.hh b/include/reactor-cpp/connection.hh index 106738fc..f58e0f3d 100644 --- a/include/reactor-cpp/connection.hh +++ b/include/reactor-cpp/connection.hh @@ -17,7 +17,6 @@ #include "logical_time.hh" #include "port.hh" #include "reaction.hh" -#include "reactor.hh" #include "time.hh" #include "time_barrier.hh" @@ -69,7 +68,7 @@ protected: return [this](const BasePort& port) { // We know that port must be of type Port auto& typed_port = reinterpret_cast&>(port); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) - if constexpr (std::is_same::value) { + if constexpr (std::is_same_v) { this->schedule(); } else { this->schedule(std::move(typed_port.get())); @@ -81,7 +80,7 @@ public: void setup() noexcept override { Action::setup(); - if constexpr (std::is_same::value) { + if constexpr (std::is_same_v) { for (auto port : this->downstream_ports()) { port->set(); } @@ -134,7 +133,7 @@ public: // without locking. auto tag = Tag::from_logical_time(scheduler->logical_time()); [[maybe_unused]] bool result{false}; - if constexpr (std::is_same::value) { + if constexpr (std::is_same_v) { result = this->schedule_at(tag); } else { result = this->schedule_at(std::move(typed_port.get()), tag); diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 825b3bba..3ad536f7 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -19,6 +19,7 @@ #include "reactor-cpp/logging.hh" #include "reactor-cpp/time.hh" #include "scheduler.hh" +#include "transaction.hh" namespace reactor { @@ -33,7 +34,8 @@ enum class Phase : std::uint8_t { Startup = 2, Execution = 3, Shutdown = 4, - Deconstruction = 5 + Deconstruction = 5, + Mutation = 6 }; class Environment { @@ -74,7 +76,7 @@ private: Graph graph_{}; Graph optimized_graph_{}; - void build_dependency_graph(Reactor* reactor); + void build_dependency_graph(const Reactor* reactor); void calculate_indexes(); std::mutex shutdown_mutex_{}; @@ -102,11 +104,28 @@ public: } } + template void remove_connection(Port& source, Port& sink) { this->remove_connection(&source, &sink); } + + template void remove_connection(Port* source, Port* sink) { + if (top_environment_ == nullptr || top_environment_ == this) { + log::Debug() << "removing connection: " << source->fqn() << " --> " << sink->fqn(); + graph_.remove_edge(source, sink); + } else { + top_environment_->remove_connection(source, sink); + } + } + + void start_mutation() { phase_ = Phase::Mutation; } + void stop_mutation() { phase_ = Phase::Execution; } + void optimize(); void register_reactor(Reactor* reactor); + void unregister_reactor(Reactor* reactor); void register_port(BasePort* port) noexcept; + void unregister_port(BasePort* reactor) noexcept; void register_input_action(BaseAction* action); + void assemble(); auto startup() -> std::thread; void sync_shutdown(); @@ -133,6 +152,7 @@ public: [[nodiscard]] auto max_reaction_index() const noexcept -> unsigned int { return max_reaction_index_; } friend Scheduler; + friend Transaction; }; } // namespace reactor diff --git a/include/reactor-cpp/graph.hh b/include/reactor-cpp/graph.hh index 4009e169..1a9a9613 100644 --- a/include/reactor-cpp/graph.hh +++ b/include/reactor-cpp/graph.hh @@ -114,15 +114,15 @@ public: return tree; } - [[nodiscard]] auto get_destinations(E source) const noexcept -> std::vector> { - return graph_[source]; - } + auto remove_edge(E source, E destinations) noexcept { + if (graph_.find(source) == std::end(graph_)) { + return; + } + auto conns = std::find_if(std::begin(graph_[source]), std::end(graph_[source]), + [destinations](auto val) { return val.second == destinations; }); - [[nodiscard]] auto get_upstream(E vertex) const noexcept -> std::optional { - for (const auto& [source, sinks] : graph_) { - if (sinks.second.contains(vertex)) { - return source; - } + if (conns != std::end(graph_[source])) { + graph_[source].erase(conns); } } diff --git a/include/reactor-cpp/logging.hh b/include/reactor-cpp/logging.hh index 4ba08434..8b661338 100644 --- a/include/reactor-cpp/logging.hh +++ b/include/reactor-cpp/logging.hh @@ -10,13 +10,11 @@ #define REACTOR_CPP_LOGGING_HH #include "reactor-cpp/config.hh" -#include "reactor-cpp/time.hh" -#include + #include #include #include #include -#include namespace reactor::log { diff --git a/include/reactor-cpp/multiport.hh b/include/reactor-cpp/multiport.hh index 3e66cee5..cd5ba3a7 100644 --- a/include/reactor-cpp/multiport.hh +++ b/include/reactor-cpp/multiport.hh @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "assert.hh" @@ -22,9 +23,13 @@ namespace reactor { class BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions,-warnings-as-errors +protected: + std::atomic size_{0}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes + std::vector present_ports_{}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes + private: - std::atomic size_{0}; - std::vector present_ports_{}; + std::string name_{}; + Reactor* container_ = nullptr; // record that the port with the given index has been set void set_present(std::size_t index); @@ -46,8 +51,13 @@ protected: void register_port(BasePort& port, size_t idx); public: - BaseMultiport() = default; + BaseMultiport(std::string name, Reactor* container) + : name_(std::move(name)) + , container_(container) {} ~BaseMultiport() = default; + + [[nodiscard]] auto name() const noexcept -> const std::string& { return name_; } + [[nodiscard]] auto container() const noexcept -> Reactor* { return container_; } }; template > @@ -55,6 +65,12 @@ class Multiport : public BaseMultiport { // NOLINT cppcoreguidelines-special-mem protected: std::vector ports_{}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes + template void emplace_back(Args&&... args) noexcept { + static_assert(std::is_constructible_v); + ports_.emplace_back(std::forward(args)...); + register_port(ports_.back(), ports_.size() - 1); + } + public: using value_type = typename A::value_type; using size_type = typename A::size_type; @@ -62,7 +78,8 @@ public: using iterator = typename std::vector::iterator; using const_iterator = typename std::vector::const_iterator; - Multiport() noexcept = default; + Multiport(const std::string& name, Reactor* container) noexcept + : BaseMultiport(name, container) {} ~Multiport() noexcept = default; auto operator==(const Multiport& other) const noexcept -> bool { @@ -81,6 +98,14 @@ public: auto size() const noexcept -> size_type { return ports_.size(); }; [[nodiscard]] auto empty() const noexcept -> bool { return ports_.empty(); }; + void resize(std::size_t new_size) { + reactor_assert(size() >= new_size); + + for (auto i = size(); i > new_size; i--) { + ports_.pop_back(); + present_ports_.pop_back(); + } + } [[nodiscard]] auto present_indices_unsorted() const noexcept -> std::vector { return std::vector(std::begin(present_ports()), std::begin(present_ports()) + present_ports_size()); @@ -95,6 +120,9 @@ public: template > class ModifableMultiport : public Multiport { public: + ModifableMultiport(std::string name, Reactor* container) + : Multiport(name, container) {} + void reserve(std::size_t size) noexcept { this->ports_.reserve(size); this->present_ports_reserve(size); @@ -105,9 +133,10 @@ public: this->register_port(this->ports_.back(), this->ports_.size() - 1); } - template void emplace_back(Args&&... args) noexcept { - this->ports_.emplace_back(std::forward(args)...); - this->register_port(this->ports_.back(), this->ports_.size() - 1); + void create_new_port() noexcept { + std::string _lf_port_name = this->name() + "_" + std::to_string(this->size()); + Reactor* container = this->container(); + this->emplace_back(_lf_port_name, container); } }; } // namespace reactor diff --git a/include/reactor-cpp/mutations.hh b/include/reactor-cpp/mutations.hh new file mode 100644 index 00000000..f1872a9d --- /dev/null +++ b/include/reactor-cpp/mutations.hh @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_MUTATIONS_HH +#define REACTOR_CPP_MUTATIONS_HH + +#include + +namespace reactor { +class Reactor; +class Environment; + +enum MutationResult : std::int8_t { + Success = 0, + NotMatchingBankSize = 1, +}; + +class Mutation { +public: + Mutation() = default; + Mutation(const Mutation& other) = default; + Mutation(Mutation&& other) = default; + virtual ~Mutation() = default; + auto operator=(const Mutation& other) -> Mutation& = default; + auto operator=(Mutation&& other) -> Mutation& = default; + + virtual auto run() -> MutationResult = 0; + virtual auto rollback() -> MutationResult = 0; +}; + +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_HH diff --git a/include/reactor-cpp/mutations/bank.hh b/include/reactor-cpp/mutations/bank.hh new file mode 100644 index 00000000..d8075dfd --- /dev/null +++ b/include/reactor-cpp/mutations/bank.hh @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_MUTATIONS_BANK_HH +#define REACTOR_CPP_MUTATIONS_BANK_HH + +#include + +#include "../mutations.hh" +#include "../reactor.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationChangeBankSize final : public Mutation { + std::vector* bank_ = nullptr; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + Environment* env_ = nullptr; + std::function create_lambda_; + + void change_size(std::size_t new_size); + +public: + MutationChangeBankSize() = default; + MutationChangeBankSize(const MutationChangeBankSize& other) noexcept + : bank_(other.bank_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) + , env_(other.env_) + , create_lambda_(other.create_lambda_) {} + MutationChangeBankSize(MutationChangeBankSize&& other) noexcept + : bank_(other.bank_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) + , env_(other.env_) + , create_lambda_(other.create_lambda_) {} + explicit MutationChangeBankSize(std::vector* bank, Environment* env, std::size_t size, + std::function create_lambda); + ~MutationChangeBankSize() override = default; + auto operator=(const MutationChangeBankSize& other) -> MutationChangeBankSize& = default; + auto operator=(MutationChangeBankSize&& other) -> MutationChangeBankSize& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_BANK_HH diff --git a/include/reactor-cpp/mutations/connection.hh b/include/reactor-cpp/mutations/connection.hh new file mode 100644 index 00000000..b042657c --- /dev/null +++ b/include/reactor-cpp/mutations/connection.hh @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_MUTATIONS_CONNECTION_HH +#define REACTOR_CPP_MUTATIONS_CONNECTION_HH + +#include "../mutations.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationAddConnection : public Mutation { +private: + A* source_; + B* sink_; + bool add_connection_ = true; + bool connection_ = false; + Environment* env_ = nullptr; + +public: + explicit MutationAddConnection(A* source, B* sink, Environment* env, bool add_connection); + MutationAddConnection(const MutationAddConnection& other) + : source_(other.source_) + , sink_(other.sink_) + , add_connection_(other.add_connection_) + , connection_(other.connection_) + , env_(other.env_) {} + MutationAddConnection(MutationAddConnection&& other) noexcept + : source_(other.source_) + , sink_(other.sink_) + , connection_(other.connection_) + , env_(other.env_) {} + MutationAddConnection() = default; + ~MutationAddConnection() override = default; + auto operator=(const MutationAddConnection& other) -> MutationAddConnection& = default; + auto operator=(MutationAddConnection&& other) -> MutationAddConnection& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_CONNECTION_HH diff --git a/include/reactor-cpp/mutations/multiport.hh b/include/reactor-cpp/mutations/multiport.hh new file mode 100644 index 00000000..f90b5298 --- /dev/null +++ b/include/reactor-cpp/mutations/multiport.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_MUTATIONS_MULTIPORT_HH +#define REACTOR_CPP_MUTATIONS_MULTIPORT_HH + +#include "../multiport.hh" +#include "../mutations.hh" +#include "../port.hh" + +namespace reactor { +class Reactor; +class Environment; + +template class MutationChangeOutputMultiportSize : public Mutation { +private: + ModifableMultiport>* multiport_ = nullptr; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + + void change_size(std::size_t new_size); + +public: + MutationChangeOutputMultiportSize(ModifableMultiport>* multiport, std::size_t size); + MutationChangeOutputMultiportSize() = default; + MutationChangeOutputMultiportSize(const MutationChangeOutputMultiportSize& other) + : multiport_(other.multiport_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) {} + MutationChangeOutputMultiportSize(MutationChangeOutputMultiportSize&& other) noexcept + : multiport_(other.multiport_) + , desired_size_(other.desired_size_) + , size_before_application_(other.size_before_application_) {} + ~MutationChangeOutputMultiportSize() override = default; + auto operator=(const MutationChangeOutputMultiportSize& other) -> MutationChangeOutputMultiportSize& = default; + auto operator=(MutationChangeOutputMultiportSize&& other) -> MutationChangeOutputMultiportSize& = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} // namespace reactor + +#endif // REACTOR_CPP_MUTATIONS_MULTIPORT_HH diff --git a/include/reactor-cpp/port.hh b/include/reactor-cpp/port.hh index fd3048da..10834be5 100644 --- a/include/reactor-cpp/port.hh +++ b/include/reactor-cpp/port.hh @@ -103,6 +103,17 @@ public: void register_set_callback(const PortCallback& callback); void register_clean_callback(const PortCallback& callback); + void overwrite(const BasePort& other) noexcept { + anti_dependencies_ = other.anti_dependencies_; + dependencies_ = other.dependencies_; + triggers_ = other.triggers_; + inward_binding_ = other.inward_binding_; + outward_bindings_ = other.outward_bindings_; + set_callback_ = other.set_callback_; + clean_callback_ = other.clean_callback_; + type_ = other.type_; + } + friend class Reaction; friend class Scheduler; }; @@ -168,7 +179,7 @@ public: void shutdown() final {} }; -template class Input : public Port { // NOLINT(cppcoreguidelines-special-member-functions) +template class Input final : public Port { // NOLINT(cppcoreguidelines-special-member-functions) public: Input(const std::string& name, Reactor* container) : Port(name, PortType::Input, container) {} @@ -176,12 +187,12 @@ public: Input(Input&&) noexcept = default; }; -template class Output : public Port { // NOLINT(cppcoreguidelines-special-member-functions) +template class Output final : public Port { // NOLINT(cppcoreguidelines-special-member-functions) public: Output(const std::string& name, Reactor* container) : Port(name, PortType::Output, container) {} - Output(Output&&) noexcept = default; + Output(Output&&) noexcept = default; }; } // namespace reactor diff --git a/include/reactor-cpp/reaction.hh b/include/reactor-cpp/reaction.hh index 84b8e539..37e7864e 100644 --- a/include/reactor-cpp/reaction.hh +++ b/include/reactor-cpp/reaction.hh @@ -61,7 +61,7 @@ public: void startup() final {} void shutdown() final {} - void trigger(); + void trigger() const; void set_index(unsigned index); template void set_deadline(Dur deadline, const std::function& handler) { diff --git a/include/reactor-cpp/reactor.hh b/include/reactor-cpp/reactor.hh index 02b698dd..10d055d0 100644 --- a/include/reactor-cpp/reactor.hh +++ b/include/reactor-cpp/reactor.hh @@ -10,7 +10,6 @@ #define REACTOR_CPP_REACTOR_HH #include -#include #include #include "action.hh" @@ -35,10 +34,16 @@ private: void register_reaction(Reaction* reaction); void register_reactor(Reactor* reactor); + void unregister_action(BaseAction* action); + void unregister_input(BasePort* port); + void unregister_output(BasePort* port); + void unregister_reaction(Reaction* reaction); + void unregister_reactor(Reactor* reactor); + public: Reactor(const std::string& name, Reactor* container); Reactor(const std::string& name, Environment* environment); - ~Reactor() override = default; + ~Reactor() override; void register_connection(std::unique_ptr&& connection); [[nodiscard]] auto actions() const noexcept -> const auto& { return actions_; } diff --git a/include/reactor-cpp/reactor_element.hh b/include/reactor-cpp/reactor_element.hh index d78bfeff..5e05eb4b 100644 --- a/include/reactor-cpp/reactor_element.hh +++ b/include/reactor-cpp/reactor_element.hh @@ -11,8 +11,6 @@ #define REACTOR_CPP_REACTOR_ELEMENT_HH #include -#include -#include #include #include @@ -20,6 +18,9 @@ namespace reactor { class ReactorElement { // NOLINT(cppcoreguidelines-special-member-functions) +public: + enum class Type : std::uint8_t { Action, Port, Reaction, Reactor, Input, Output }; + private: std::string name_{}; std::string fqn_{}; @@ -27,15 +28,14 @@ private: // The reactor owning this element Reactor* container_{nullptr}; Environment* environment_{}; + Type type_; auto fqn_detail(std::stringstream& string_stream) const noexcept -> std::stringstream&; public: - enum class Type : std::uint8_t { Action, Port, Reaction, Reactor, Input, Output }; - ReactorElement(const std::string& name, Type type, Reactor* container); ReactorElement(const std::string& name, Type type, Environment* environment); - virtual ~ReactorElement() = default; + virtual ~ReactorElement(); // not copyable, but movable ReactorElement(const ReactorElement&) = delete; diff --git a/include/reactor-cpp/scopes.hh b/include/reactor-cpp/scopes.hh new file mode 100644 index 00000000..fe1de9f4 --- /dev/null +++ b/include/reactor-cpp/scopes.hh @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2024 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_SCOPES_HH +#define REACTOR_CPP_SCOPES_HH + +#include "environment.hh" +#include "logical_time.hh" +#include "reactor.hh" +#include "transaction.hh" + +namespace reactor { + +class Scope { +private: + Reactor* reactor_; + +public: + explicit Scope(Reactor* reactor) + : reactor_(reactor) {} + + [[nodiscard]] static auto get_physical_time() noexcept -> TimePoint { return Reactor::get_physical_time(); } + [[nodiscard]] auto get_tag() const noexcept -> Tag { return reactor_->get_tag(); } + [[nodiscard]] auto get_logical_time() const noexcept -> TimePoint { return reactor_->get_logical_time(); } + [[nodiscard]] auto get_microstep() const noexcept -> mstep_t { return reactor_->get_microstep(); } + [[nodiscard]] auto get_elapsed_logical_time() const noexcept -> Duration { + return reactor_->get_elapsed_logical_time(); + } + [[nodiscard]] auto get_elapsed_physical_time() const noexcept -> Duration { + return reactor_->get_elapsed_physical_time(); + } + [[nodiscard]] auto environment() const noexcept -> Environment* { return reactor_->environment(); } + void request_stop() const { environment()->sync_shutdown(); } +}; + +class MutableScope : public Scope { +public: + Transaction transaction_; + Reactor* reactor_; + Environment* env_ = nullptr; + + explicit MutableScope(Reactor* reactor) + : Scope(reactor) + , transaction_(reactor) + , reactor_(reactor) + , env_(reactor->environment()) {} + MutableScope(const MutableScope& other) + : Scope(other.reactor_) + , transaction_(other.transaction_) + , reactor_(other.reactor_) + , env_(other.env_) {} + MutableScope(MutableScope&& other) noexcept + : Scope(other.reactor_) + , transaction_(std::move(other.transaction_)) + , reactor_(other.reactor_) + , env_(other.env_) {} + ~MutableScope() = default; + auto operator=(const MutableScope& other) -> MutableScope& = default; + auto operator=(MutableScope&& other) -> MutableScope& = default; + + void commit_transaction(bool recalculate = false); + void add_to_transaction(const std::shared_ptr& mutation); +}; + +} // namespace reactor + +#endif // REACTOR_CPP_SCOPES_HH diff --git a/include/reactor-cpp/semaphore.hh b/include/reactor-cpp/semaphore.hh index bc198547..d4330696 100644 --- a/include/reactor-cpp/semaphore.hh +++ b/include/reactor-cpp/semaphore.hh @@ -9,7 +9,6 @@ #ifndef REACTOR_CPP_SEMAPHORE_HH #define REACTOR_CPP_SEMAPHORE_HH -#include #include #include diff --git a/include/reactor-cpp/statistics.hh b/include/reactor-cpp/statistics.hh index 5d9d8930..19875ac9 100644 --- a/include/reactor-cpp/statistics.hh +++ b/include/reactor-cpp/statistics.hh @@ -11,7 +11,6 @@ #include -#include "reactor-cpp/config.hh" #include "reactor-cpp/logging.hh" namespace reactor { @@ -23,7 +22,7 @@ private: #else constexpr static bool enabled_{false}; #endif - // NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables) + // NOLINT BEGIN(cppcoreguidelines-avoid-non-const-global-variables) inline static std::atomic_size_t reactor_instances_{0}; inline static std::atomic_size_t connections_{0}; inline static std::atomic_size_t reactions_{0}; @@ -34,7 +33,7 @@ private: inline static std::atomic_size_t triggered_actions_{0}; inline static std::atomic_size_t set_ports_{0}; inline static std::atomic_size_t scheduled_actions_{0}; - // NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables) + // NOLINT END(cppcoreguidelines-avoid-non-const-global-variables) static void increment(std::atomic_size_t& counter) { if constexpr (enabled_) { @@ -42,18 +41,31 @@ private: } } + static void decrement(std::atomic_size_t& counter) { + if constexpr (enabled_) { + counter.fetch_sub(1, std::memory_order_release); + } + } + public: static void increment_reactor_instances() { increment(reactor_instances_); } static void increment_connections() { increment(connections_); } static void increment_reactions() { increment(reactions_); } static void increment_actions() { increment(actions_); } static void increment_ports() { increment(ports_); } + static void increment_processed_events() { increment(processed_events_); } static void increment_processed_reactions() { increment(processed_reactions_); } static void increment_triggered_actions() { increment(triggered_actions_); } static void increment_set_ports() { increment(set_ports_); } static void increment_scheduled_actions() { increment(scheduled_actions_); } + static void decrement_reactor_instances() { decrement(reactor_instances_); } + static void decrement_connections() { decrement(connections_); } + static void decrement_reactions() { decrement(reactions_); } + static void decrement_actions() { decrement(actions_); } + static void decrement_ports() { decrement(ports_); } + static auto reactor_instances() { return reactor_instances_.load(std::memory_order_acquire); } static auto connections() { return connections_.load(std::memory_order_acquire); } static auto reactions() { return reactions_.load(std::memory_order_acquire); } diff --git a/include/reactor-cpp/time_barrier.hh b/include/reactor-cpp/time_barrier.hh index a5874106..bafa132c 100644 --- a/include/reactor-cpp/time_barrier.hh +++ b/include/reactor-cpp/time_barrier.hh @@ -9,7 +9,6 @@ #ifndef REACTOR_CPP_TIME_BARRIER_HH #define REACTOR_CPP_TIME_BARRIER_HH -#include "fwd.hh" #include "logical_time.hh" #include "scheduler.hh" #include "time.hh" diff --git a/include/reactor-cpp/trace.hh b/include/reactor-cpp/trace.hh index 5031b7b4..3f514197 100644 --- a/include/reactor-cpp/trace.hh +++ b/include/reactor-cpp/trace.hh @@ -11,11 +11,6 @@ // workaround than an actual solution, but apparently clang-tidy does not allow // something nicer at the moment. -#include - -#include "reactor-cpp/config.hh" -#include "reactor-cpp/logical_time.hh" - // Only enable tracing if REACTOR_CPP_TRACE is set. // Also, disable tracing if clang analytics are run as it produces many errors. #if defined(REACTOR_CPP_TRACE) && !defined(__clang_analyzer__) diff --git a/include/reactor-cpp/transaction.hh b/include/reactor-cpp/transaction.hh new file mode 100644 index 00000000..d3683c61 --- /dev/null +++ b/include/reactor-cpp/transaction.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#ifndef REACTOR_CPP_TRANSACTION_HH +#define REACTOR_CPP_TRANSACTION_HH + +#include +#include + +#include "mutations.hh" + +namespace reactor { +class Reactor; +class Environment; + +class Transaction { +private: + Environment* environment_ = nullptr; + Reactor* parent_ = nullptr; + std::vector> mutations_{}; + +public: + explicit Transaction(Reactor* parent); + Transaction(const Transaction& other) = default; + Transaction(Transaction&& other) = default; + auto operator=(const Transaction& other) -> Transaction& = default; + auto operator=(Transaction&& other) -> Transaction& = default; + ~Transaction() = default; + + void push_back(const std::shared_ptr& mutation); + auto execute(bool recalculate = false) -> MutationResult; +}; +} // namespace reactor +#endif // REACTOR_CPP_TRANSACTION_HH diff --git a/include/reactor-cpp/value_ptr.hh b/include/reactor-cpp/value_ptr.hh index a6d55089..47199e1e 100644 --- a/include/reactor-cpp/value_ptr.hh +++ b/include/reactor-cpp/value_ptr.hh @@ -18,8 +18,6 @@ #include #include -#include "reactor-cpp/logging.hh" - namespace reactor { namespace detail { @@ -459,8 +457,7 @@ public: // get_mutable_copy() friend class ImmutableValuePtr; - // Give the factory function make_mutable_value() access to the private - // constructor + // Give the factory function make_mutable_value() access to the private constructor template friend auto reactor::make_mutable_value(Args&&... args) -> reactor::MutableValuePtr; }; @@ -468,7 +465,7 @@ public: template class ImmutableValuePtr { public: /// A type alias that adds ``const`` to ``T`` - using const_T = typename std::add_const_t; + using const_T = std::add_const_t; private: T value_{}; diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 6f2ffd7d..f7001e53 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -10,6 +10,11 @@ set(SOURCE_FILES time.cc multiport.cc reactor_element.cc + scopes.cc + transaction.cc + mutations/bank.cc + mutations/multiport.cc + mutations/connection.cc ) if(REACTOR_CPP_TRACE) @@ -22,7 +27,7 @@ else() set(REACTOR_CPP_INCLUDE "include") endif() -add_library(${LIB_TARGET} SHARED ${SOURCE_FILES}) +add_library(${LIB_TARGET} STATIC ${SOURCE_FILES}) target_include_directories(${LIB_TARGET} PUBLIC "$" "$" diff --git a/lib/action.cc b/lib/action.cc index 04a52a41..ac18cb02 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -15,7 +15,7 @@ namespace reactor { -BaseAction::BaseAction(const std::string& name, Environment* environment, bool logical, Duration min_delay) +BaseAction::BaseAction(const std::string& name, Environment* environment, const bool logical, const Duration min_delay) : ReactorElement(name, ReactorElement::Type::Action, environment) , min_delay_(min_delay) , logical_(logical) { @@ -27,9 +27,8 @@ void BaseAction::register_trigger(Reaction* reaction) { reactor_assert(this->environment() == reaction->environment()); assert_phase(this, Phase::Assembly); validate(this->container() == reaction->container(), - "Action triggers must belong to the same reactor as the triggered " - "reaction"); - [[maybe_unused]] bool result = triggers_.insert(reaction).second; + "Action triggers must belong to the same reactor as the triggered reaction"); + [[maybe_unused]] const bool result = triggers_.insert(reaction).second; reactor_assert(result); } @@ -40,7 +39,7 @@ void BaseAction::register_scheduler(Reaction* reaction) { // the reaction must belong to the same reactor as this action validate(this->container() == reaction->container(), "Scheduable actions must belong to the same reactor as the " "triggered reaction"); - [[maybe_unused]] bool result = schedulers_.insert(reaction).second; + [[maybe_unused]] const bool result = schedulers_.insert(reaction).second; reactor_assert(result); } @@ -62,8 +61,8 @@ void Timer::cleanup() noexcept { BaseAction::cleanup(); // schedule the timer again if (period_ != Duration::zero()) { - Tag now = Tag::from_logical_time(environment()->logical_time()); - Tag next = now.delay(period_); + const Tag now = Tag::from_logical_time(environment()->logical_time()); + const Tag next = now.delay(period_); environment()->scheduler()->schedule_sync(this, next); } } @@ -74,7 +73,7 @@ ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container) void ShutdownTrigger::setup() noexcept { BaseAction::setup(); } void ShutdownTrigger::shutdown() { - Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); + const Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); environment()->scheduler()->schedule_sync(this, tag); } diff --git a/lib/assert.cc b/lib/assert.cc index 96aeb581..0c0facf2 100644 --- a/lib/assert.cc +++ b/lib/assert.cc @@ -18,10 +18,10 @@ auto ValidationError::build_message(const std::string_view msg) noexcept -> std: return string_stream.str(); } -void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] Phase phase) { +void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] const Phase phase) { if constexpr (runtime_assertion) { if (ptr->environment()->phase() != phase) { - auto enum_value_to_name = [](Phase phase) -> std::string { + auto enum_value_to_name = [](const Phase phase) -> std::string { const std::map conversation_map = { {Phase::Construction, "Construction"}, {Phase::Assembly, "Assembly"}, {Phase::Startup, "Startup"}, {Phase::Execution, "Execution"}, diff --git a/lib/environment.cc b/lib/environment.cc index 52c0b6bf..e616e6c4 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -24,7 +24,7 @@ namespace reactor { -Environment::Environment(unsigned int num_workers, bool fast_fwd_execution, const Duration& timeout) +Environment::Environment(const unsigned int num_workers, const bool fast_fwd_execution, const Duration& timeout) : log_("Environment") , num_workers_(num_workers) , fast_fwd_execution_(fast_fwd_execution) @@ -41,23 +41,32 @@ Environment::Environment(const std::string& name, Environment* containing_enviro , top_environment_(containing_environment_->top_environment_) , scheduler_(this) , timeout_(containing_environment->timeout()) { - [[maybe_unused]] bool result = containing_environment->contained_environments_.insert(this).second; + [[maybe_unused]] const bool result = containing_environment->contained_environments_.insert(this).second; reactor_assert(result); } void Environment::register_reactor(Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->phase() == Phase::Construction, "Reactors may only be registered during construction phase!"); + validate(this->phase() == Phase::Construction || this->phase() == Phase::Mutation, + "Reactors may only be registered during construction phase!"); validate(reactor->is_top_level(), "The environment may only contain top level reactors!"); - [[maybe_unused]] bool result = top_level_reactors_.insert(reactor).second; + [[maybe_unused]] const bool result = top_level_reactors_.insert(reactor).second; reactor_assert(result); } +void Environment::unregister_reactor(Reactor* reactor) { + reactor_assert(reactor != nullptr); + validate(this->phase() == Phase::Construction || this->phase() == Phase::Mutation, + "Reactors may only be unregistered during construction phase!"); + validate(reactor->is_top_level(), "The environment may only contain top level reactors!"); + top_level_reactors_.erase(reactor); +} + void Environment::register_input_action(BaseAction* action) { reactor_assert(action != nullptr); validate(this->phase() == Phase::Construction || this->phase() == Phase::Assembly, "Input actions may only be registered during construction or assembly phase!"); - [[maybe_unused]] bool result = input_actions_.insert(action).second; + [[maybe_unused]] const bool result = input_actions_.insert(action).second; reactor_assert(result); run_forever_ = true; } @@ -144,7 +153,7 @@ void Environment::assemble() { // NOLINT(readability-function-cognitive-complexi } } -void Environment::build_dependency_graph(Reactor* reactor) { +void Environment::build_dependency_graph(const Reactor* reactor) { // obtain dependencies from each contained reactor for (auto* sub_reactor : reactor->reactors()) { build_dependency_graph(sub_reactor); @@ -160,7 +169,7 @@ void Environment::build_dependency_graph(Reactor* reactor) { // connect all reactions_ this reaction depends on for (auto* reaction : reactor->reactions()) { for (auto* dependency : reaction->dependencies()) { - auto* source = dependency; + const auto* source = dependency; while (source->has_inward_binding()) { source = source->inward_binding(); } @@ -176,7 +185,7 @@ void Environment::build_dependency_graph(Reactor* reactor) { auto next = std::next(iterator); while (next != priority_map.end()) { dependencies_.emplace_back(next->second, iterator->second); - iterator++; + ++iterator; next = std::next(iterator); } } @@ -211,7 +220,7 @@ void Environment::async_shutdown() { sync_shutdown(); } -auto dot_name([[maybe_unused]] ReactorElement* reactor_element) -> std::string { +auto dot_name([[maybe_unused]] const ReactorElement* reactor_element) -> std::string { std::string fqn{reactor_element->fqn()}; std::replace(fqn.begin(), fqn.end(), '.', '_'); return fqn; diff --git a/lib/logical_time.cc b/lib/logical_time.cc index c17ca2f4..cce3a289 100644 --- a/lib/logical_time.cc +++ b/lib/logical_time.cc @@ -28,7 +28,7 @@ auto Tag::from_logical_time(const LogicalTime& logical_time) noexcept -> Tag { return {logical_time.time_point(), logical_time.micro_step()}; } -auto Tag::delay(Duration offset) const noexcept -> Tag { +auto Tag::delay(const Duration offset) const noexcept -> Tag { if (offset == Duration::zero()) { validate(this->micro_step_ != std::numeric_limits::max(), "Microstep overflow detected!"); return {this->time_point_, this->micro_step_ + 1}; @@ -36,7 +36,7 @@ auto Tag::delay(Duration offset) const noexcept -> Tag { return {this->time_point_ + offset, 0}; } -auto Tag::subtract(Duration offset) const noexcept -> Tag { +auto Tag::subtract(const Duration offset) const noexcept -> Tag { if (offset == Duration::zero()) { return decrement(); } diff --git a/lib/multiport.cc b/lib/multiport.cc index c6674cb1..62c7501e 100644 --- a/lib/multiport.cc +++ b/lib/multiport.cc @@ -20,16 +20,15 @@ auto reactor::BaseMultiport::get_set_callback(std::size_t index) noexcept -> rea }; } -void reactor::BaseMultiport::set_present(std::size_t index) { - auto calculated_index = size_.fetch_add(1, std::memory_order_relaxed); +void reactor::BaseMultiport::set_present(const std::size_t index) { + const auto calculated_index = size_.fetch_add(1, std::memory_order_relaxed); reactor_assert(calculated_index < present_ports_.size()); present_ports_[calculated_index] = index; } -void reactor::BaseMultiport::register_port(BasePort& port, size_t idx) { - // need to add one new slot t the present list +void reactor::BaseMultiport::register_port(BasePort& port, const size_t idx) { reactor_assert(this->present_ports_.size() == idx); this->present_ports_.emplace_back(0); diff --git a/lib/mutations/bank.cc b/lib/mutations/bank.cc new file mode 100644 index 00000000..9db8caf3 --- /dev/null +++ b/lib/mutations/bank.cc @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#include "reactor-cpp/mutations/bank.hh" +#include "reactor-cpp/action.hh" + +template +reactor::MutationChangeBankSize::MutationChangeBankSize( + std::vector* bank, Environment* env, const std::size_t size, + std::function create_lambda) + : bank_(bank) + , desired_size_(size) + , env_(env) + , create_lambda_(std::move(create_lambda)) {} + +template void reactor::MutationChangeBankSize::change_size(std::size_t new_size) { + auto current_size = bank_->size(); + + if (current_size >= new_size) { // down-size + bank_->resize(new_size); + + } else { // up-size + bank_->reserve(new_size); + + for (auto i = 0; i < new_size - current_size; i++) { + bank_->push_back(create_lambda_(env_, current_size + i)); + (*bank_)[bank_->size() - 1]->assemble(); + } + } +} +template auto reactor::MutationChangeBankSize::run() -> MutationResult { + size_before_application_ = bank_->size(); + change_size(desired_size_); + return Success; +} + +template auto reactor::MutationChangeBankSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/mutations/connection.cc b/lib/mutations/connection.cc new file mode 100644 index 00000000..0b3b6bde --- /dev/null +++ b/lib/mutations/connection.cc @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#include "reactor-cpp/mutations/connection.hh" +#include "reactor-cpp/reactor.hh" + +template +reactor::MutationAddConnection::MutationAddConnection(A* source, B* sink, Environment* env, bool add_connection) + : source_(source) + , sink_(sink) + , add_connection_(add_connection) + , env_(env) {} + +template auto reactor::MutationAddConnection::run() -> MutationResult { + if (add_connection_) { + env_->draw_connection(source_, sink_, ConnectionProperties{}); + } else { + env_->remove_connection(source_, sink_); + } + + return Success; +} + +template auto reactor::MutationAddConnection::rollback() -> MutationResult { + if (add_connection_) { + env_->remove_connection(source_, sink_); + } else { + env_->draw_connection(source_, sink_, ConnectionProperties{}); + } + + return Success; +} diff --git a/lib/mutations/multiport.cc b/lib/mutations/multiport.cc new file mode 100644 index 00000000..b194b633 --- /dev/null +++ b/lib/mutations/multiport.cc @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2025 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#include "reactor-cpp/mutations/multiport.hh" + +template +reactor::MutationChangeOutputMultiportSize::MutationChangeOutputMultiportSize( + ModifableMultiport>* multiport, const std::size_t size) + : multiport_(multiport) + , desired_size_(size) {} + +template void reactor::MutationChangeOutputMultiportSize::change_size(std::size_t new_size) { + auto current_size = multiport_->size(); + if (current_size >= new_size) { + // down-size + multiport_->resize(new_size); + + } else { + // up-size + multiport_->reserve(new_size); + for (auto i = 0; i < new_size - current_size; i++) { + std::string port_name_ = multiport_->name() + "_" + std::to_string(current_size + i); + multiport_->create_new_port(); + (*multiport_)[i + current_size].overwrite((*multiport_)[0]); + } + } +} +template auto reactor::MutationChangeOutputMultiportSize::run() -> MutationResult { + size_before_application_ = multiport_->size(); + change_size(desired_size_); + return Success; +} + +template auto reactor::MutationChangeOutputMultiportSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/port.cc b/lib/port.cc index 7260c661..3514c973 100644 --- a/lib/port.cc +++ b/lib/port.cc @@ -16,11 +16,12 @@ namespace reactor { -void BasePort::register_dependency(Reaction* reaction, bool is_trigger) noexcept { +void BasePort::register_dependency(Reaction* reaction, const bool is_trigger) noexcept { reactor_assert(reaction != nullptr); reactor_assert(this->environment() == reaction->environment()); validate(!this->has_outward_bindings(), "Dependencies may no be declared on ports with an outward binding!"); - assert_phase(this, Phase::Assembly); + validate(this->environment()->phase() == Phase::Assembly || this->environment()->phase() == Phase::Mutation, + "Dependencies for Ports can only be declared during assembly or muttion phase"); if (this->is_input()) { validate(this->container() == reaction->container(), "Dependent input ports must belong to the same reactor as the " @@ -53,7 +54,7 @@ void BasePort::register_antidependency(Reaction* reaction) noexcept { "Antidependent input ports must belong to a contained reactor"); } - [[maybe_unused]] bool result = anti_dependencies_.insert(reaction).second; + [[maybe_unused]] const bool result = anti_dependencies_.insert(reaction).second; reactor_assert(result); } @@ -91,7 +92,7 @@ void Port::instantiate_connection_to(const ConnectionProperties& propertie reactor_assert(properties.type_ != ConnectionType::Normal); Environment* enclave = downstream[0]->environment(); - auto index = this->container()->number_of_connections(); + const auto index = this->container()->number_of_connections(); if (properties.type_ == ConnectionType::Delayed) { connection = std::make_unique>( diff --git a/lib/reaction.cc b/lib/reaction.cc index 697ab52b..bd04e061 100644 --- a/lib/reaction.cc +++ b/lib/reaction.cc @@ -17,7 +17,7 @@ namespace reactor { -Reaction::Reaction(const std::string& name, int priority, Reactor* container, std::function body) +Reaction::Reaction(const std::string& name, const int priority, Reactor* container, std::function body) : ReactorElement(name, ReactorElement::Type::Reaction, container) , priority_(priority) , body_(std::move(std::move(body))) { @@ -31,7 +31,7 @@ void Reaction::declare_trigger(BaseAction* action) { validate(this->container() == action->container(), "Action triggers must belong to the same reactor as the triggered " "reaction"); - [[maybe_unused]] bool result = action_triggers_.insert(action).second; + [[maybe_unused]] const bool result = action_triggers_.insert(action).second; reactor_assert(result); action->register_trigger(this); } @@ -43,7 +43,7 @@ void Reaction::declare_schedulable_action(BaseAction* action) { validate(this->container() == action->container(), "Scheduable actions must belong to the same reactor as the " "triggered reaction"); - [[maybe_unused]] bool result = scheduable_actions_.insert(action).second; + [[maybe_unused]] const bool result = scheduable_actions_.insert(action).second; reactor_assert(result); action->register_scheduler(this); } @@ -51,7 +51,8 @@ void Reaction::declare_schedulable_action(BaseAction* action) { void Reaction::declare_trigger(BasePort* port) { reactor_assert(port != nullptr); reactor_assert(this->environment() == port->environment()); - assert_phase(this, Phase::Assembly); + validate(this->environment()->phase() == Phase::Assembly || this->environment()->phase() == Phase::Mutation, + "Ports can only be declared as a trigger during Assembly or Mutation Phase"); if (port->is_input()) { validate(this->container() == port->container(), @@ -82,7 +83,7 @@ void Reaction::declare_dependency(BasePort* port) { "Dependent output ports must belong to a contained reactor"); } - [[maybe_unused]] bool result = dependencies_.insert(port).second; + [[maybe_unused]] const bool result = dependencies_.insert(port).second; reactor_assert(result); port->register_dependency(this, false); } @@ -100,16 +101,15 @@ void Reaction::declare_antidependency(BasePort* port) { "Antidependent input ports must belong to a contained reactor"); } - [[maybe_unused]] bool result = antidependencies_.insert(port).second; + [[maybe_unused]] const bool result = antidependencies_.insert(port).second; reactor_assert(result); port->register_antidependency(this); } -void Reaction::trigger() { +void Reaction::trigger() const { if (has_deadline()) { reactor_assert(deadline_handler_ != nullptr); - auto lag = Reactor::get_physical_time() - container()->get_logical_time(); - if (lag > deadline_) { + if (const auto lag = Reactor::get_physical_time() - container()->get_logical_time(); lag > deadline_) { deadline_handler_(); return; } @@ -118,7 +118,7 @@ void Reaction::trigger() { body_(); } -void Reaction::set_deadline_impl(Duration deadline, const std::function& handler) { +void Reaction::set_deadline_impl(const Duration deadline, const std::function& handler) { reactor_assert(!has_deadline()); reactor_assert(handler != nullptr); this->deadline_ = deadline; @@ -126,7 +126,8 @@ void Reaction::set_deadline_impl(Duration deadline, const std::functionenvironment()->phase() == Phase::Assembly, "Reaction indexes may only be set during assembly phase!"); + validate(this->environment()->phase() == Phase::Assembly || this->environment()->phase() == Phase::Mutation, + "Reaction indexes may only be set during assembly phase!"); this->index_ = index; } diff --git a/lib/reactor.cc b/lib/reactor.cc index f42488b0..459f10bc 100644 --- a/lib/reactor.cc +++ b/lib/reactor.cc @@ -24,56 +24,105 @@ Reactor::Reactor(const std::string& name, Environment* environment) environment->register_reactor(this); } +Reactor::~Reactor() = default; + void Reactor::register_action([[maybe_unused]] BaseAction* action) { reactor_assert(action != nullptr); reactor::validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Assembly, "Actions can only be registered during construction phase!"); - [[maybe_unused]] bool result = actions_.insert(action).second; + [[maybe_unused]] const bool result = actions_.insert(action).second; reactor_assert(result); Statistics::increment_actions(); } +void Reactor::unregister_action([[maybe_unused]] BaseAction* action) { + reactor_assert(action != nullptr); + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Assembly, + "Actions can only be registered during construction phase!"); + actions_.erase(action); + Statistics::decrement_actions(); +} + void Reactor::register_input(BasePort* port) { reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; + [[maybe_unused]] const bool result = inputs_.insert(port).second; reactor_assert(result); Statistics::increment_ports(); } +void Reactor::unregister_input(BasePort* port) { + reactor_assert(port != nullptr); + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, + "Ports can only be registered during construction phase!"); + const std::size_t number_of_elements = inputs_.erase(port); + reactor_assert(number_of_elements > 0); + Statistics::decrement_ports(); +} + void Reactor::register_output(BasePort* port) { reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; + [[maybe_unused]] const bool result = outputs_.insert(port).second; reactor_assert(result); Statistics::increment_ports(); } +void Reactor::unregister_output(BasePort* port) { + reactor_assert(port != nullptr); + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Mutation, + "Ports can only be registered during construction phase!"); + outputs_.erase(port); + Statistics::decrement_ports(); +} + void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { reactor_assert(reaction != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); - [[maybe_unused]] bool result = reactions_.insert(reaction).second; + [[maybe_unused]] const bool result = reactions_.insert(reaction).second; reactor_assert(result); Statistics::increment_reactions(); } +void Reactor::unregister_reaction([[maybe_unused]] Reaction* reaction) { + reactor_assert(reaction != nullptr); + + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, + "Reactions can only be registered during construction phase!"); + reactions_.erase(reaction); + Statistics::decrement_reactions(); +} + void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); - [[maybe_unused]] bool result = reactors_.insert(reactor).second; + [[maybe_unused]] const bool result = reactors_.insert(reactor).second; reactor_assert(result); Statistics::increment_reactor_instances(); } +void Reactor::unregister_reactor([[maybe_unused]] Reactor* reactor) { + reactor_assert(reactor != nullptr); + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, + "Reactions can only be registered during construction phase!"); + reactors_.erase(reactor); + Statistics::decrement_reactor_instances(); +} + void Reactor::register_connection([[maybe_unused]] std::unique_ptr&& connection) { reactor_assert(connection != nullptr); - [[maybe_unused]] auto result = connections_.insert(std::move(connection)).second; + [[maybe_unused]] const auto result = connections_.insert(std::move(connection)).second; reactor_assert(result); } diff --git a/lib/reactor_element.cc b/lib/reactor_element.cc index 32db319f..e6b2de0b 100644 --- a/lib/reactor_element.cc +++ b/lib/reactor_element.cc @@ -18,13 +18,14 @@ namespace reactor { -ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type type, Reactor* container) +ReactorElement::ReactorElement(const std::string& name, const ReactorElement::Type type, Reactor* container) : name_(name) - , container_(container) { + , container_(container) + , type_(type) { reactor_assert(container != nullptr); this->environment_ = container->environment(); reactor_assert(this->environment_ != nullptr); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction || this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); // We need a reinterpret_cast here as the derived class is not yet created @@ -64,14 +65,15 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ fqn_ = string_stream.str(); } -ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type type, Environment* environment) +ReactorElement::ReactorElement(const std::string& name, const ReactorElement::Type type, Environment* environment) : name_(name) , fqn_(name) - , environment_(environment) { + , environment_(environment) + , type_(type) { reactor_assert(environment != nullptr); validate(type == Type::Reactor || type == Type::Action, "Only reactors and actions can be owned by the environment!"); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction || this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); @@ -86,4 +88,37 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ break; } } + +ReactorElement::~ReactorElement() { + switch (type_) { + case Type::Action: + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + container_->unregister_action(reinterpret_cast(this)); + break; + case Type::Input: + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + container_->unregister_input(reinterpret_cast(this)); + break; + case Type::Output: + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + container_->unregister_output(reinterpret_cast(this)); + break; + case Type::Reaction: + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + container_->unregister_reaction(reinterpret_cast(this)); + break; + case Type::Reactor: + if (container_ == nullptr) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + environment_->unregister_reactor(reinterpret_cast(this)); + } else { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + container_->unregister_reactor(reinterpret_cast(this)); + } + break; + default: + break; + } +} + } // namespace reactor diff --git a/lib/scheduler.cc b/lib/scheduler.cc index 5c3532ad..63ab759e 100644 --- a/lib/scheduler.cc +++ b/lib/scheduler.cc @@ -104,7 +104,7 @@ auto ReadyQueue::pop() -> Reaction* { // FIXME: Protect against underflow? } - auto pos = old_size - 1; + const auto pos = old_size - 1; return queue_[pos]; } @@ -115,8 +115,8 @@ void ReadyQueue::fill_up(std::vector& ready_reactions) { // update the atomic size counter and release the semaphore to wake up // waiting worker threads - auto new_size = static_cast(queue_.size()); - auto old_size = size_.exchange(new_size, std::memory_order_acq_rel); + const auto new_size = static_cast(queue_.size()); + const auto old_size = size_.exchange(new_size, std::memory_order_acq_rel); // calculate how many workers to wake up. -old_size indicates the number of // workers who started waiting since the last update. @@ -128,11 +128,10 @@ void ReadyQueue::fill_up(std::vector& ready_reactions) { // one worker running running, new_size - running_workers indicates the // number of additional workers needed to process all reactions. waiting_workers_ += -old_size; - std::ptrdiff_t running_workers{num_workers_ - waiting_workers_}; - auto workers_to_wakeup = std::min(waiting_workers_, new_size - running_workers); + const std::ptrdiff_t running_workers{num_workers_ - waiting_workers_}; // wakeup other workers_ - if (workers_to_wakeup > 0) { + if (auto workers_to_wakeup = std::min(waiting_workers_, new_size - running_workers); workers_to_wakeup > 0) { waiting_workers_ -= workers_to_wakeup; log_.debug() << "Wakeup " << workers_to_wakeup << " workers"; sem_.release(static_cast(workers_to_wakeup)); @@ -158,8 +157,7 @@ auto EventQueue::extract_next_event() -> ActionListPtr { auto EventQueue::insert_event_at(const Tag& tag) -> const ActionListPtr& { auto shared_lock = std::shared_lock(mutex_); - auto event_it = event_queue_.find(tag); - if (event_it == event_queue_.end()) { + if (const auto event_it = event_queue_.find(tag); event_it == event_queue_.end()) { shared_lock.unlock(); { auto unique_lock = std::unique_lock(mutex_); @@ -344,8 +342,8 @@ void Scheduler::next() { // NOLINT(readability-function-cognitive-complexity) if (stop_) { continue_execution_ = false; log_.debug() << "Shutting down the scheduler"; - Tag t_next = Tag::from_logical_time(logical_time_).delay(); - if (!event_queue_.empty() && t_next == event_queue_.next_tag()) { + if (Tag t_next = Tag::from_logical_time(logical_time_).delay(); + !event_queue_.empty() && t_next == event_queue_.next_tag()) { log_.debug() << "Trigger the last round of reactions including all " "shutdown reactions"; triggered_actions_ = event_queue_.extract_next_event(); @@ -359,7 +357,7 @@ void Scheduler::next() { // NOLINT(readability-function-cognitive-complexity) // synchronize with physical time if not in fast forward mode if (!environment_->fast_fwd_execution()) { log_.debug() << "acquire tag " << t_next << " from physical time barrier"; - bool result = PhysicalTimeBarrier::acquire_tag( + const bool result = PhysicalTimeBarrier::acquire_tag( t_next, lock, this, [&t_next, this]() { return t_next != event_queue_.next_tag(); }); // If acquire tag returns false, then a new event was inserted into the queue and we need to start over if (!result) { @@ -372,7 +370,7 @@ void Scheduler::next() { // NOLINT(readability-function-cognitive-complexity) bool result{true}; for (auto* action : environment_->input_actions_) { log_.debug() << "acquire tag " << t_next << " from input action " << action->fqn(); - bool inner_result = + const bool inner_result = action->acquire_tag(t_next, lock, [&t_next, this]() { return t_next != event_queue_.next_tag(); }); // If the wait was aborted or if the next tag changed in the meantime, // we need to break from the loop and continue with the main loop. diff --git a/lib/scopes.cc b/lib/scopes.cc new file mode 100644 index 00000000..011caeaa --- /dev/null +++ b/lib/scopes.cc @@ -0,0 +1,12 @@ + + +#include "reactor-cpp/scopes.hh" + +void reactor::MutableScope::add_to_transaction(const std::shared_ptr& mutation) { + transaction_.push_back(mutation); +} + +void reactor::MutableScope::commit_transaction(const bool recalculate) { + (void)recalculate; + transaction_.execute(recalculate); +} \ No newline at end of file diff --git a/lib/time.cc b/lib/time.cc index 34f58b35..249e984a 100644 --- a/lib/time.cc +++ b/lib/time.cc @@ -25,12 +25,12 @@ constexpr std::size_t NANOSECOND_DIGITS{9}; inline namespace operators { -auto operator<<(std::ostream& os, TimePoint tp) -> std::ostream& { +auto operator<<(std::ostream& os, const TimePoint tp) -> std::ostream& { std::array buf{}; - time_t time = + const time_t time = std::chrono::system_clock::to_time_t(std::chrono::time_point_cast(tp)); - auto res = std::strftime(buf.data(), sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time)); - auto epoch = std::chrono::duration_cast(tp.time_since_epoch()); + const auto res = std::strftime(buf.data(), sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time)); + const auto epoch = std::chrono::duration_cast(tp.time_since_epoch()); if (res != 0) { os << buf.data() << '.' << std::setw(NANOSECOND_DIGITS) << std::setfill('0') @@ -42,19 +42,19 @@ auto operator<<(std::ostream& os, TimePoint tp) -> std::ostream& { return os; } -auto operator<<(std::ostream& os, std::chrono::seconds dur) -> std::ostream& { +auto operator<<(std::ostream& os, const std::chrono::seconds dur) -> std::ostream& { os << dur.count() << " secs"; return os; } -auto operator<<(std::ostream& os, std::chrono::milliseconds dur) -> std::ostream& { +auto operator<<(std::ostream& os, const std::chrono::milliseconds dur) -> std::ostream& { os << dur.count() << " msecs"; return os; } -auto operator<<(std::ostream& os, std::chrono::microseconds dur) -> std::ostream& { +auto operator<<(std::ostream& os, const std::chrono::microseconds dur) -> std::ostream& { os << dur.count() << " usecs"; return os; } -auto operator<<(std::ostream& os, std::chrono::nanoseconds dur) -> std::ostream& { +auto operator<<(std::ostream& os, const std::chrono::nanoseconds dur) -> std::ostream& { os << dur.count() << " nsecs"; return os; } diff --git a/lib/transaction.cc b/lib/transaction.cc new file mode 100644 index 00000000..f6d9c6cf --- /dev/null +++ b/lib/transaction.cc @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2019 TU Dresden + * All rights reserved. + * + * Authors: + * Tassilo Tanneberger + */ + +#include "reactor-cpp/transaction.hh" +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/reactor.hh" + +reactor::Transaction::Transaction(Reactor* parent) + : environment_(parent->environment()) + , parent_(parent) {} + +auto reactor::Transaction::execute(const bool recalculate) -> MutationResult { + this->environment_->start_mutation(); + + std::size_t index = 0; + for (const auto& mutation : mutations_) { + if (mutation->run() != Success) { + break; + } + + index++; + } + + if (index != mutations_.size()) { + for (std::size_t i = 0; i < index; i++) { + mutations_[index - i]->rollback(); + } + } + + if (recalculate) { + this->environment_->calculate_indexes(); + } + + this->environment_->stop_mutation(); + + mutations_.clear(); + return Success; +} + +void reactor::Transaction::push_back(const std::shared_ptr& mutation) { + mutations_.push_back(mutation); +} \ No newline at end of file