Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdk #78

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
Draft

Sdk #78

Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5b07ca5
base changes for sdk, bug fix for export_dependency_graph to a .dot file
OmerMajNition Feb 19, 2025
0035b41
first draft of sdk, work in progress
OmerMajNition Feb 19, 2025
fb31d00
reverted back info logs
OmerMajNition Feb 20, 2025
9be2807
command line arguments support added to SDK, copied files used by LF
OmerMajNition Feb 20, 2025
b7ace70
homogeneous and heterogeneous configuration support, config files gen…
OmerMajNition Feb 21, 2025
38ebfca
Parameter changes and homogenoeus heterogeneous config generation
OmerMajNition Feb 26, 2025
438c067
added reacion dependencies feature with a test case
OmerMajNition Feb 27, 2025
3ab09b9
parameters with default values, able to be passed from parent when co…
OmerMajNition Mar 3, 2025
008a4a5
Default parameters support with ReactorBanks
OmerMajNition Mar 3, 2025
46c4346
hello world and Workers examples added for LF weekly meeting
OmerMajNition Mar 4, 2025
a21791b
Workers example with published parameters
OmerMajNition Mar 5, 2025
4ea3411
fixed a bug
OmerMajNition Mar 5, 2025
5058c3c
gradually moving towards unified wiring function call that dumps the …
OmerMajNition Mar 7, 2025
591bc73
added reaction internals class that would help users define reactions…
OmerMajNition Mar 10, 2025
6eb494a
added conveneient macros to ease out reactions definition inside reactor
OmerMajNition Mar 10, 2025
294f2a2
bank index support in ReactionInternals and some more examples ported
OmerMajNition Mar 11, 2025
21a4b65
input to reactor bank input port wiring added
OmerMajNition Mar 11, 2025
5821e42
ported examples and tests to new ReactionInternals design, now reacti…
OmerMajNition Mar 12, 2025
5e1ecf2
fixed compilation issue
OmerMajNition Mar 14, 2025
5927709
ReactionChamber renaming, fix for gcc < 13 compiler versions
OmerMajNition Mar 19, 2025
35ce140
removing the function that would never get hit, gcc < 13 is already f…
OmerMajNition Mar 19, 2025
37055b1
compilation fix for TimePoint cout overload
OmerMajNition Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ configure_file(include/reactor-cpp/config.hh.in include/reactor-cpp/config.hh @O
include(GNUInstallDirs)

add_subdirectory(lib)
add_subdirectory(reactor-sdk)

if(NOT DEFINED LF_REACTOR_CPP_SUFFIX)
add_subdirectory(examples)
endif()
@@ -71,3 +73,13 @@ if (DEFINED LF_REACTOR_CPP_SUFFIX)
else()
install(DIRECTORY include/ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}")
endif()

if(NOT TARGET uninstall)
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/cmake_uninstall.cmake.in"
"${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake"
IMMEDIATE @ONLY)

add_custom_target(uninstall
COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake)
endif()
21 changes: 21 additions & 0 deletions cmake_uninstall.cmake.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
if(NOT EXISTS "@CMAKE_BINARY_DIR@/install_manifest.txt")
message(FATAL_ERROR "Cannot find install manifest: @CMAKE_BINARY_DIR@/install_manifest.txt")
endif()

file(READ "@CMAKE_BINARY_DIR@/install_manifest.txt" files)
string(REGEX REPLACE "\n" ";" files "${files}")
foreach(file ${files})
message(STATUS "Uninstalling $ENV{DESTDIR}${file}")
if(IS_SYMLINK "$ENV{DESTDIR}${file}" OR EXISTS "$ENV{DESTDIR}${file}")
exec_program(
"@CMAKE_COMMAND@" ARGS "-E remove \"$ENV{DESTDIR}${file}\""
OUTPUT_VARIABLE rm_out
RETURN_VALUE rm_retval
)
if(NOT "${rm_retval}" STREQUAL 0)
message(FATAL_ERROR "Problem when removing $ENV{DESTDIR}${file}")
endif()
else(IS_SYMLINK "$ENV{DESTDIR}${file}" OR EXISTS "$ENV{DESTDIR}${file}")
message(STATUS "File $ENV{DESTDIR}${file} does not exist.")
endif()
endforeach()
1 change: 1 addition & 0 deletions examples/sdk-SrcSink-Fanout/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*build
28 changes: 28 additions & 0 deletions examples/sdk-SrcSink-Fanout/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
cmake_minimum_required(VERSION 3.9)
project(src_sink_fanout VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET src_sink_fanout)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)

include(Sink/SinkReactor.cmake)
include(Source/SourceReactor.cmake)
include(Main/MainReactor.cmake)
include(Config-a/Config-a.cmake)
17 changes: 17 additions & 0 deletions examples/sdk-SrcSink-Fanout/Config-a/Config-a.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "Config-a.hh"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a folder for all the sdk stuff:

/examples/sdk/SrcSinkFanout/ConfigA/ConfigA.cc


UserParameters cfg_parameters;

ConfigParameter<int, uint32_t>::ParametersMap UserParameters::homogeneous_config() {
return {
{"Main.Source.iterations", ConfigParameterMetadata<int> { 5 } }
};
}

ConfigParameter<int, uint32_t>::ParametersMap UserParameters::heterogeneous_config() {
return {
{"Main.Source.iterations", ConfigParameterMetadata<int> { 20 } },
{"Main.Sink.n_ports", ConfigParameterMetadata<int> { 2 } }

};
}
16 changes: 16 additions & 0 deletions examples/sdk-SrcSink-Fanout/Config-a/Config-a.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
18 changes: 18 additions & 0 deletions examples/sdk-SrcSink-Fanout/Config-a/Config-a.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef USER_PARAMETERS_H
#define USER_PARAMETERS_H

#include <reactor-sdk/reactor-sdk.hh>
#include <map>
#include <variant>
#include <string>

using namespace sdk;

struct UserParameters : public ConfigParameter<int, uint32_t> {
ConfigParameter<int, uint32_t>::ParametersMap homogeneous_config();
ConfigParameter<int, uint32_t>::ParametersMap heterogeneous_config();
};

extern UserParameters cfg_parameters;

#endif // USER_PARAMETERS_H
29 changes: 29 additions & 0 deletions examples/sdk-SrcSink-Fanout/Main/MainReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "MainReactor.hh"

void MainReactor::construction() {

cout << "Construction Main\n";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you overload cout? or did I miss the using namespace std ?


src = std::make_unique<SourceReactor>("Source", this);
snk = std::make_unique<SinkReactor>("Sink", this);
}

void MainReactor::wiring() {
cout << "Wiring Main\n";

src->req -->> snk->req;
snk->rsp --> src->rsp;
}

void REACTION_SCOPE(MainReactor)::add_reactions(MainReactor *reactor) {
reaction("reaction_1").
triggers(&reactor->startup).
dependencies().
effects().
function(
[this](Startup& startup) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << parameters.alias.value << " fqn:" << fqn() << endl;
}
);
}
16 changes: 16 additions & 0 deletions examples/sdk-SrcSink-Fanout/Main/MainReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
47 changes: 47 additions & 0 deletions examples/sdk-SrcSink-Fanout/Main/MainReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>

#include "Source/SourceReactor.hh"
#include "Sink/SinkReactor.hh"

using namespace sdk;

class MainReactor: public Reactor {
public:
struct Parameters {
string alias = "Src-Sink-Fanout-Example";
};
private:
struct PublishParameters : public SystemParameters<Parameters, string> {
REACTOR_PARAMETER(string, alias, "Alternate name", "another", "another", defaults.alias);

PublishParameters(Reactor *container, Parameters &&param)
: SystemParameters<Parameters, string>(container, std::forward<Parameters>(param)) {
register_parameters (alias);
}
};
PublishParameters parameters;

REACTION_SCOPE_START(MainReactor, PublishParameters)
void add_reactions(MainReactor *reactor);
REACTION_SCOPE_END(this, parameters)

std::unique_ptr<SourceReactor> src;
std::unique_ptr<SinkReactor> snk;

public:
MainReactor(const std::string &name, Environment *env)
: Reactor(name, env), parameters{this, Parameters{}} {}
MainReactor(const std::string &name, Reactor *container)
: Reactor(name, container), parameters{this, Parameters{}} {}

MainReactor(const std::string &name, Environment *env, Parameters && param)
: Reactor(name, env), parameters{this, std::forward<Parameters>(param)} {}
MainReactor(const std::string &name, Reactor *container, Parameters && param)
: Reactor(name, container), parameters{this, std::forward<Parameters>(param)} {}

void construction() override;
void wiring() override;
};

45 changes: 45 additions & 0 deletions examples/sdk-SrcSink-Fanout/Sink/SinkReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "SinkReactor.hh"
using namespace std;

void SinkReactor::construction() {
cout << "Construction Sink n_ports:" << parameters.n_ports.value << "\n";
req.set_width (parameters.n_ports.value);
rsp.set_width (parameters.n_ports.value);
}

void SinkReactor::wiring() {
cout << "Wiring Sink\n";
}

void REACTION_SCOPE(SinkReactor)::add_reactions (SinkReactor *reactor) {
reaction("startup_reaction").
triggers(&reactor->startup).
dependencies().
effects().
function(pass_function(startup_reaction)
);

reaction("process_request").
triggers(&reactor->req).
dependencies().
effects(&reactor->rsp).
function(pass_function(process_request)
);
}



void REACTION_SCOPE(SinkReactor)::startup_reaction (Startup& startup) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << parameters.name.value << " fqn:" << fqn() << endl;
}

void REACTION_SCOPE(SinkReactor)::process_request (MultiportInput<int>& req, MultiportOutput<int>& rsp) {
for (int i = 0; i < parameters.n_ports.value; ++i) {
if (req[i].is_present()) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Received input:" << *req[i].get() << " port:" << i << endl;
rsp[i].set (*req[i].get());
}
}
}
14 changes: 14 additions & 0 deletions examples/sdk-SrcSink-Fanout/Sink/SinkReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SinkReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SinkReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
49 changes: 49 additions & 0 deletions examples/sdk-SrcSink-Fanout/Sink/SinkReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>
using namespace std;
using namespace sdk;

class SinkReactor : public Reactor {
public:

struct Parameters {
string name = "Sink";
int n_ports = 1;
};
private:
struct PublishParameters : public SystemParameters<Parameters, string, int> {
REACTOR_PARAMETER(string, name, "Alternate name", "Sink", "Sink", defaults.name);
REACTOR_PARAMETER (int, n_ports, "Size of multiports", 1, 10, defaults.n_ports);

PublishParameters(Reactor *container, Parameters &&param)
: SystemParameters<Parameters, string, int>(container, std::forward<Parameters>(param)) {
register_parameters (name, n_ports);
}
};
PublishParameters parameters;

REACTION_SCOPE_START(SinkReactor, PublishParameters)
void add_reactions(SinkReactor *reactor);

void startup_reaction (Startup &startup);
void process_request (MultiportInput<int>& req, MultiportOutput<int>& rsp);
REACTION_SCOPE_END(this, parameters)

public:
SinkReactor(const std::string &name, Environment *env)
: Reactor(name, env), parameters{this, Parameters{}} {}
SinkReactor(const std::string &name, Reactor *container)
: Reactor(name, container), parameters{this, Parameters{}} {}

SinkReactor(const std::string &name, Environment *env, Parameters && param)
: Reactor(name, env), parameters{this, std::forward<Parameters>(param)} {}
SinkReactor(const std::string &name, Reactor *container, Parameters && param)
: Reactor(name, container), parameters{this, std::forward<Parameters>(param)} {}

MultiportInput<int> req{"req", this};
MultiportOutput<int> rsp{"rsp", this};

void construction() override;
void wiring() override;
};
61 changes: 61 additions & 0 deletions examples/sdk-SrcSink-Fanout/Source/SourceReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

#include "SourceReactor.hh"
using namespace std;

void SourceReactor::construction() {

cout << "Construction Source iterations:" << parameters.iterations.value << "\n";
}

void SourceReactor::wiring() {
cout << "Wiring Source iterations:" << parameters.iterations.value << "\n";
}

void REACTION_SCOPE(SourceReactor)::add_reactions(SourceReactor *reactor) {
reaction("reaction_1").
triggers(&reactor->startup).
dependencies().
effects(&reactor->sch).
function(
[this](Startup& startup, LogicalAction<int>& sched) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << name << " fqn:" << fqn() << " iterations:" << parameters.iterations.value << endl;
if (itr < parameters.iterations.value) {
sched.schedule (itr, 0ms);
++itr;
}
}
);

reaction("reaction_2").
triggers(&reactor->sch).
dependencies().
effects(&reactor->req).
function(
[this](LogicalAction<int>& sch, Output<int>& req) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Scheduling iteration:" << *sch.get() << endl;
req.set (*sch.get());
}
);

reaction("reaction_3").
triggers(&reactor->rsp).
dependencies().
effects(&reactor->sch).
function(
[this](Input<int>& rsp, LogicalAction<int>& sch) {
if (rsp.is_present()) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Recevied response:" << *rsp.get() << endl;
}

if (itr < parameters.iterations.value) {
sch.schedule (itr, 0ms);
++itr;
} else {
request_stop();
}
}
);
}
13 changes: 13 additions & 0 deletions examples/sdk-SrcSink-Fanout/Source/SourceReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SourceReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SourceReactor.cc"
)

foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
40 changes: 40 additions & 0 deletions examples/sdk-SrcSink-Fanout/Source/SourceReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>
using namespace std;
using namespace sdk;

class SourceReactor : public Reactor {
public:
struct Parameters : public SystemParametersStandalone<int> {
REACTOR_PARAMETER(int, iterations, "Number of iterations", 1, 100, 10);

Parameters(Reactor *container)
: SystemParametersStandalone<int>(container) {
register_parameters (iterations);
}
};
private:
LogicalAction<int> sch{"sch", this};

Parameters parameters{this};

REACTION_SCOPE_START(SourceReactor, Parameters)
std::string name = "Source";
int itr = 0;

void add_reactions(SourceReactor *reactor);
REACTION_SCOPE_END(this, parameters)

public:
SourceReactor(const std::string &name, Environment *env)
: Reactor(name, env) {}
SourceReactor(const std::string &name, Reactor *container)
: Reactor(name, container) {}

Input<int> rsp{"rsp", this};
Output<int> req{"req", this};

void construction() override;
void wiring() override;
};
Binary file added examples/sdk-SrcSink-Fanout/example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
44 changes: 44 additions & 0 deletions examples/sdk-SrcSink-Fanout/graph.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
digraph {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file should not be added.

rankdir=LR;
subgraph {
rank=same;
Main_reaction_1 [label="Main.reaction_1"];
Main_Source_reaction_1 [label="Main.Source.reaction_1"];
Main_Sink_0_startup_reaction [label="Main.Sink_0.startup_reaction"];
Main_Sink_1_startup_reaction [label="Main.Sink_1.startup_reaction"];
Main_Sink_2_startup_reaction [label="Main.Sink_2.startup_reaction"];
Main_Sink_3_startup_reaction [label="Main.Sink_3.startup_reaction"];
}
subgraph {
rank=same;
Main_Source_reaction_2 [label="Main.Source.reaction_2"];
}
subgraph {
rank=same;
Main_Sink_0_process_request [label="Main.Sink_0.process_request"];
Main_Sink_1_process_request [label="Main.Sink_1.process_request"];
Main_Sink_2_process_request [label="Main.Sink_2.process_request"];
Main_Sink_3_process_request [label="Main.Sink_3.process_request"];
}
subgraph {
rank=same;
Main_Source_reaction_3 [label="Main.Source.reaction_3"];
}
Main_reaction_1 -> Main_Source_reaction_2 [style=invis];
Main_Source_reaction_2 -> Main_Sink_0_process_request [style=invis];
Main_Sink_0_process_request -> Main_Source_reaction_3 [style=invis];
Main_Source_reaction_3 -> Main_Sink_0_process_request
Main_Source_reaction_3 -> Main_Sink_1_process_request
Main_Source_reaction_3 -> Main_Sink_2_process_request
Main_Source_reaction_3 -> Main_Sink_3_process_request
Main_Source_reaction_2 -> Main_Source_reaction_1
Main_Source_reaction_3 -> Main_Source_reaction_2
Main_Sink_0_process_request -> Main_Source_reaction_2
Main_Sink_0_process_request -> Main_Sink_0_startup_reaction
Main_Sink_1_process_request -> Main_Source_reaction_2
Main_Sink_1_process_request -> Main_Sink_1_startup_reaction
Main_Sink_2_process_request -> Main_Source_reaction_2
Main_Sink_2_process_request -> Main_Sink_2_startup_reaction
Main_Sink_3_process_request -> Main_Source_reaction_2
Main_Sink_3_process_request -> Main_Sink_3_startup_reaction
}
51 changes: 51 additions & 0 deletions examples/sdk-SrcSink-Fanout/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

#include <reactor-sdk/reactor-sdk.hh>

#include "Config-a/Config-a.hh"
#include "Main/MainReactor.hh"

using namespace std;
using namespace sdk;

int main(int argc, char **argv) {
cxxopts::Options options("sdk-SrcSink-Fanout", "Multiport source connecting to banked sink reactors");

unsigned workers = std::thread::hardware_concurrency();
bool fast{false};
reactor::Duration timeout = reactor::Duration::max();
bool cfg_gen{false};

// the timeout variable needs to be tested beyond fitting the Duration-type
options
.set_width(120)
.add_options()
("w,workers", "the number of worker threads used by the scheduler", cxxopts::value<unsigned>(workers)->default_value(std::to_string(workers)), "'unsigned'")
("o,timeout", "Time after which the execution is aborted.", cxxopts::value<reactor::Duration>(timeout)->default_value(time_to_string(timeout)), "'FLOAT UNIT'")
("f,fast", "Allow logical time to run faster than physical time.", cxxopts::value<bool>(fast)->default_value("false"))
("c,config-gen", "Generate configuration files for the topology.", cxxopts::value<bool>(cfg_gen)->default_value("false"))
("help", "Print help");

cxxopts::ParseResult result{};
bool parse_error{false};
try {
result = options.parse(argc, argv);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

} catch (const cxxopts::OptionException& e) {
reactor::log::Error() << e.what();
parse_error = true;
}

// if parameter --help was used or there was a parse error, print help
if (parse_error || result.count("help"))
{
std::cout << options.help({""});
return parse_error ? -1 : 0;
}

std::cout << "parameters - workers:" << workers << " fast:" << (fast ? "True" : "False") << " timeout:" << timeout << " cfg_gen:" << (cfg_gen ? "True" : "False") << std::endl;

Environment sim {&cfg_parameters, workers, fast, timeout, cfg_gen};
auto main = new MainReactor("Main", &sim);

sim.run();
return 0;
}
1 change: 1 addition & 0 deletions examples/sdk-SrcSink/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*build
28 changes: 28 additions & 0 deletions examples/sdk-SrcSink/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
cmake_minimum_required(VERSION 3.9)
project(src_sink VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET src_sink)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)

include(Sink/SinkReactor.cmake)
include(Source/SourceReactor.cmake)
include(Main/MainReactor.cmake)
include(Config-a/Config-a.cmake)
23 changes: 23 additions & 0 deletions examples/sdk-SrcSink/Config-a/Config-a.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "Config-a.hh"

UserParameters cfg_parameters;

ConfigParameter<int, uint32_t, string>::ParametersMap UserParameters::homogeneous_config() {
return {
{"Main.Source.iterations", ConfigParameterMetadata<int> { 5 } },
// {"Main.Sink.name", ConfigParameterMetadata<string> { "Homog Name" } },
};
}

ConfigParameter<int, uint32_t, string>::ParametersMap UserParameters::heterogeneous_config() {
return {
{"Main.Source.iterations", ConfigParameterMetadata<int> { 20 } },
{"Main.Source.n_ports", ConfigParameterMetadata<int> { 4 } },
{"Main.n_sinks", ConfigParameterMetadata<int> { 4 } },
// {"Main.Sink_0.name", ConfigParameterMetadata<string> { "Hetero Name 0" } },
// {"Main.Sink_1.name", ConfigParameterMetadata<string> { "Hetero Name 1" } },
// {"Main.Sink_2.name", ConfigParameterMetadata<string> { "Hetero Name 2" } },
// {"Main.Sink_3.name", ConfigParameterMetadata<string> { "Hetero Name 3" } },

};
}
16 changes: 16 additions & 0 deletions examples/sdk-SrcSink/Config-a/Config-a.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
18 changes: 18 additions & 0 deletions examples/sdk-SrcSink/Config-a/Config-a.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef USER_PARAMETERS_H
#define USER_PARAMETERS_H

#include <reactor-sdk/reactor-sdk.hh>
#include <map>
#include <variant>
#include <string>

using namespace sdk;

struct UserParameters : public ConfigParameter<int, uint32_t, string> {
ConfigParameter<int, uint32_t, string>::ParametersMap homogeneous_config();
ConfigParameter<int, uint32_t, string>::ParametersMap heterogeneous_config();
};

extern UserParameters cfg_parameters;

#endif // USER_PARAMETERS_H
41 changes: 41 additions & 0 deletions examples/sdk-SrcSink/Main/MainReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include "MainReactor.hh"

void MainReactor::construction() {

cout << "Construction Main n_sinks:" << parameters.n_sinks.value << " default n_sinks:" << parameters.defaults.n_sinks << "\n";

src = std::make_unique<SourceReactor>("Source", this);

for (int i = 0; i < parameters.n_sinks.value; i++) {
snk.create_reactor(SinkReactor::Parameters{.name = "Default Sink Name"});
}
}

void MainReactor::wiring() {
cout << "Wiring Main n_sinks:" << parameters.n_sinks.value << " default n_sinks:" << parameters.defaults.n_sinks << "\n";

src->req --> snk.for_each(select_default(snk).req);
// src->req --> snk.for_each(&SinkReactor::req); // alternative
// src->req --> snk.for_each(&snk[0].req); // alternative
// src->req --> snk->*(select_default(snk).req); // alternative
// src->req --> snk->*(&SinkReactor::req); // alternative

snk.for_each(select_default(snk).rsp) --> src->rsp;
// snk.for_each(&SinkReactor::rsp) --> src->rsp; // alternative
// snk.for_each(&snk[0].rsp) --> src->rsp; // alternative
// (snk->*(select_default(snk).rsp)) --> src->rsp; // alternative
// (snk->*(&SinkReactor::rsp)) --> src->rsp; // alternative
}

void REACTION_SCOPE(MainReactor)::add_reactions(MainReactor *reactor) {
reaction("reaction_1").
triggers(&reactor->startup).
dependencies().
effects().
function(
[this](Startup& startup) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << parameters.alias.value << " fqn:" << fqn() << " n_sinks:" << parameters.n_sinks.value << endl;
}
);
}
16 changes: 16 additions & 0 deletions examples/sdk-SrcSink/Main/MainReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
48 changes: 48 additions & 0 deletions examples/sdk-SrcSink/Main/MainReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>

#include "Source/SourceReactor.hh"
#include "Sink/SinkReactor.hh"

using namespace sdk;

class MainReactor: public Reactor {
public:
struct Parameters {
string alias = "Src-Sink-Example";
int n_sinks = 2;
};
private:
struct PublishParameters : public SystemParameters<Parameters, string, int> {
REACTOR_PARAMETER(string, alias, "Alternate name", "another", "another", defaults.alias);
REACTOR_PARAMETER(int, n_sinks, "Sink reactors bank width", 1, 10, defaults.n_sinks);

PublishParameters(Reactor *container, Parameters &&param)
: SystemParameters<Parameters, string, int>(container, std::forward<Parameters>(param)) {
register_parameters (alias, n_sinks);
}
};
PublishParameters parameters;

REACTION_SCOPE_START(MainReactor, PublishParameters)
void add_reactions(MainReactor *reactor);
REACTION_SCOPE_END(this, parameters)

std::unique_ptr<SourceReactor> src;
ReactorBank<SinkReactor> snk{"Sink", this};

public:
MainReactor(const std::string &name, Environment *env)
: Reactor(name, env), parameters{this, Parameters{}} {}
MainReactor(const std::string &name, Reactor *container)
: Reactor(name, container), parameters{this, Parameters{}} {}

MainReactor(const std::string &name, Environment *env, Parameters && param)
: Reactor(name, env), parameters{this, std::forward<Parameters>(param)} {}
MainReactor(const std::string &name, Reactor *container, Parameters && param)
: Reactor(name, container), parameters{this, std::forward<Parameters>(param)} {}

void construction() override;
void wiring() override;
};
40 changes: 40 additions & 0 deletions examples/sdk-SrcSink/Sink/SinkReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "SinkReactor.hh"
using namespace std;

void SinkReactor::construction() {
cout << "Construction Sink\n";
}

void SinkReactor::wiring() {

cout << "Wiring Sink\n";
}

void REACTION_SCOPE(SinkReactor)::add_reactions(SinkReactor *reactor) {
reaction("startup_reaction").
triggers(&reactor->startup).
dependencies().
effects().
function(pass_function(startup_reaction)
);

reaction("process_request").
triggers(&reactor->req).
dependencies().
effects(&reactor->rsp).
function(pass_function(process_request)
);
}



void REACTION_SCOPE(SinkReactor)::startup_reaction (Startup& startup) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << parameters.name.value << " fqn:" << fqn() << endl;
}

void REACTION_SCOPE(SinkReactor)::process_request (Input<int>& req, Output<int>& rsp) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Received input:" << *req.get() << " bank:" << bank_index << endl;
rsp.set (*req.get());
}
14 changes: 14 additions & 0 deletions examples/sdk-SrcSink/Sink/SinkReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SinkReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SinkReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
47 changes: 47 additions & 0 deletions examples/sdk-SrcSink/Sink/SinkReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>
using namespace std;
using namespace sdk;

class SinkReactor : public Reactor {
public:

struct Parameters {
string name = "Sink";
};
private:
struct PublishParameters : public SystemParameters<Parameters, string> {
REACTOR_PARAMETER(string, name, "Alternate name", "Sink", "Sink", defaults.name);

PublishParameters(Reactor *container, Parameters &&param)
: SystemParameters<Parameters, string>(container, std::forward<Parameters>(param)) {
register_parameters (name);
}
};
PublishParameters parameters;

REACTION_SCOPE_START(SinkReactor, PublishParameters)
void add_reactions(SinkReactor *reactor);

void startup_reaction (Startup &startup);
void process_request (Input<int>& req, Output<int>& rsp);
REACTION_SCOPE_END(this, parameters)

public:
SinkReactor(const std::string &name, Environment *env)
: Reactor(name, env), parameters{this, Parameters{}} {}
SinkReactor(const std::string &name, Reactor *container)
: Reactor(name, container), parameters{this, Parameters{}} {}

SinkReactor(const std::string &name, Environment *env, Parameters && param)
: Reactor(name, env), parameters{this, std::forward<Parameters>(param)} {}
SinkReactor(const std::string &name, Reactor *container, Parameters && param)
: Reactor(name, container), parameters{this, std::forward<Parameters>(param)} {}

Input<int> req{"req", this};
Output<int> rsp{"rsp", this};

void construction() override;
void wiring() override;
};
75 changes: 75 additions & 0 deletions examples/sdk-SrcSink/Source/SourceReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@

#include "SourceReactor.hh"
using namespace std;

void SourceReactor::construction() {

cout << "Construction Source n_ports:" << parameters.n_ports.value << "\n";

req.set_width (parameters.n_ports.value);
rsp.set_width (parameters.n_ports.value);
}

void SourceReactor::wiring() {
cout << "Wiring Source n_ports:" << parameters.n_ports.value << "\n";
}

void REACTION_SCOPE(SourceReactor)::add_reactions(SourceReactor *reactor) {
reaction("reaction_1").
triggers(&reactor->startup).
dependencies().
effects(&reactor->sch).
function(
[this](Startup& startup, LogicalAction<int>& sched) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Starting up reaction\n" << "Bank:" << bank_index << " name:" << name << " fqn:" << fqn() << " iterations:" << parameters.iterations.value << endl;
if (itr < parameters.iterations.value) {
sched.schedule (itr, 0ms);
++itr;
}
}
);

reaction("reaction_2").
triggers(&reactor->sch).
dependencies().
effects(&reactor->req).
function(
[this](LogicalAction<int>& sch, MultiportOutput<int>& req) {
for (int i = 0; i < parameters.n_ports.value; ++i) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Scheduling iteration:" << *sch.get() << " out_port:" << i << endl;
req[i].set (*sch.get());
}
}
);

reaction("reaction_3").
triggers(&reactor->rsp).
dependencies().
effects(&reactor->sch).
function(
[this](MultiportInput<int>& rsp, LogicalAction<int>& sch) {
for (int i = 0; i < parameters.n_ports.value; ++i) {
if (rsp[i].is_present()) {
cout << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " <<
"Recevied response:" << *rsp[i].get() << " in_port:" << i << endl;
++rsp_itr;
}
}

if (rsp_itr < parameters.n_ports.value) {
return;
}

rsp_itr = 0;

if (itr < parameters.iterations.value) {
sch.schedule (itr, 0ms);
++itr;
} else {
request_stop();
}
}
);
}
13 changes: 13 additions & 0 deletions examples/sdk-SrcSink/Source/SourceReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SourceReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/SourceReactor.cc"
)

foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
42 changes: 42 additions & 0 deletions examples/sdk-SrcSink/Source/SourceReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>
using namespace std;
using namespace sdk;

class SourceReactor : public Reactor {
public:
struct Parameters : public SystemParametersStandalone<int> {
REACTOR_PARAMETER(int, iterations, "Number of iterations", 1, 100, 10);
REACTOR_PARAMETER(int, n_ports, "Size of multiports", 1, 10, 1);

Parameters(Reactor *container)
: SystemParametersStandalone<int>(container) {
register_parameters (iterations, n_ports);
}
};
private:
LogicalAction<int> sch{"sch", this};

Parameters parameters{this};

REACTION_SCOPE_START(SourceReactor, Parameters)
std::string name = "Source";
int itr = 0;
int rsp_itr = 0;

void add_reactions(SourceReactor *reactor);
REACTION_SCOPE_END(this, parameters)

public:
SourceReactor(const std::string &name, Environment *env)
: Reactor(name, env) {}
SourceReactor(const std::string &name, Reactor *container)
: Reactor(name, container) {}

MultiportInput<int> rsp{"rsp", this};
MultiportOutput<int> req{"req", this};

void construction() override;
void wiring() override;
};
Binary file added examples/sdk-SrcSink/example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
44 changes: 44 additions & 0 deletions examples/sdk-SrcSink/graph.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
digraph {
rankdir=LR;
subgraph {
rank=same;
Main_reaction_1 [label="Main.reaction_1"];
Main_Source_reaction_1 [label="Main.Source.reaction_1"];
Main_Sink_0_startup_reaction [label="Main.Sink_0.startup_reaction"];
Main_Sink_1_startup_reaction [label="Main.Sink_1.startup_reaction"];
Main_Sink_2_startup_reaction [label="Main.Sink_2.startup_reaction"];
Main_Sink_3_startup_reaction [label="Main.Sink_3.startup_reaction"];
}
subgraph {
rank=same;
Main_Source_reaction_2 [label="Main.Source.reaction_2"];
}
subgraph {
rank=same;
Main_Sink_0_process_request [label="Main.Sink_0.process_request"];
Main_Sink_1_process_request [label="Main.Sink_1.process_request"];
Main_Sink_2_process_request [label="Main.Sink_2.process_request"];
Main_Sink_3_process_request [label="Main.Sink_3.process_request"];
}
subgraph {
rank=same;
Main_Source_reaction_3 [label="Main.Source.reaction_3"];
}
Main_reaction_1 -> Main_Source_reaction_2 [style=invis];
Main_Source_reaction_2 -> Main_Sink_0_process_request [style=invis];
Main_Sink_0_process_request -> Main_Source_reaction_3 [style=invis];
Main_Source_reaction_3 -> Main_Sink_0_process_request
Main_Source_reaction_3 -> Main_Sink_1_process_request
Main_Source_reaction_3 -> Main_Sink_2_process_request
Main_Source_reaction_3 -> Main_Sink_3_process_request
Main_Source_reaction_2 -> Main_Source_reaction_1
Main_Source_reaction_3 -> Main_Source_reaction_2
Main_Sink_0_process_request -> Main_Source_reaction_2
Main_Sink_0_process_request -> Main_Sink_0_startup_reaction
Main_Sink_1_process_request -> Main_Source_reaction_2
Main_Sink_1_process_request -> Main_Sink_1_startup_reaction
Main_Sink_2_process_request -> Main_Source_reaction_2
Main_Sink_2_process_request -> Main_Sink_2_startup_reaction
Main_Sink_3_process_request -> Main_Source_reaction_2
Main_Sink_3_process_request -> Main_Sink_3_startup_reaction
}
51 changes: 51 additions & 0 deletions examples/sdk-SrcSink/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

#include <reactor-sdk/reactor-sdk.hh>

#include "Config-a/Config-a.hh"
#include "Main/MainReactor.hh"

using namespace std;
using namespace sdk;

int main(int argc, char **argv) {
cxxopts::Options options("sdk-SrcSink", "Multiport source connecting to banked sink reactors");

unsigned workers = std::thread::hardware_concurrency();
bool fast{false};
reactor::Duration timeout = reactor::Duration::max();
bool cfg_gen{false};

// the timeout variable needs to be tested beyond fitting the Duration-type
options
.set_width(120)
.add_options()
("w,workers", "the number of worker threads used by the scheduler", cxxopts::value<unsigned>(workers)->default_value(std::to_string(workers)), "'unsigned'")
("o,timeout", "Time after which the execution is aborted.", cxxopts::value<reactor::Duration>(timeout)->default_value(time_to_string(timeout)), "'FLOAT UNIT'")
("f,fast", "Allow logical time to run faster than physical time.", cxxopts::value<bool>(fast)->default_value("false"))
("c,config-gen", "Generate configuration files for the topology.", cxxopts::value<bool>(cfg_gen)->default_value("false"))
("help", "Print help");

cxxopts::ParseResult result{};
bool parse_error{false};
try {
result = options.parse(argc, argv);
} catch (const cxxopts::OptionException& e) {
reactor::log::Error() << e.what();
parse_error = true;
}

// if parameter --help was used or there was a parse error, print help
if (parse_error || result.count("help"))
{
std::cout << options.help({""});
return parse_error ? -1 : 0;
}

std::cout << "parameters - workers:" << workers << " fast:" << (fast ? "True" : "False") << " timeout:" << timeout << " cfg_gen:" << (cfg_gen ? "True" : "False") << std::endl;

Environment sim {&cfg_parameters, workers, fast, timeout, cfg_gen};
auto main = new MainReactor("Main", &sim, MainReactor::Parameters{.alias = "Test Param", .n_sinks = 3});

sim.run();
return 0;
}
23 changes: 23 additions & 0 deletions examples/sdk-Workers/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.9)
project(workers VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET workers)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)
463 changes: 463 additions & 0 deletions examples/sdk-Workers/main.cc

Large diffs are not rendered by default.

219 changes: 219 additions & 0 deletions examples/sdk-Workers/src/Workers.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
target Cpp {
fast: false,
// logging: debug
}

// lf_set_destructor(alloc_rsp, cache_entry_destructor);

public preamble {=
#include <stdint.h>
=}

reactor Relay (bank_index:size_t = 0, n_outputs:int = 1) {
input in_req:int;
output out_rsp:int;
output [n_outputs] out_req:int;
input [n_outputs] in_rsp:int;

output all_workers_busy:bool;

state index:int = 0;
state busy:int* = 0;

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
busy = (int*) calloc (n_outputs, sizeof(int));
=}

reaction (in_req) -> all_workers_busy, out_req {=
for (int i = 0; i < n_outputs; ++i, index = (index + 1) % n_outputs) {
if (busy[index] == 0) {
out_req[index].set(*in_req.get());
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << *in_req.get() << " to worker:" << index << std::endl;
busy[index] = 1;
index = (index + 1) % n_outputs;
break;
}
}
int busy_count = 0;
for (int i = 0; i < n_outputs; ++i) {
busy_count = busy[i] ? (busy_count + 1) : busy_count;
}

if (busy_count == n_outputs) {
all_workers_busy.set(true);
}
=}

reaction (in_rsp) -> out_rsp {=
for (int i = 0; i < n_outputs; ++i) {
if (in_rsp[i].is_present()) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Receiving task_id:" << *in_rsp[i].get() << " from worker:" << i << std::endl;
busy[i] = 0;
out_rsp.set(*in_rsp[i].get());
}
}
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}

reactor Worker (bank_index:size_t = 0, processing_delay:time = 2s) {
input req:int;
output rsp:int;

logical action sch_rsp(0):int;

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
=}

reaction (req) -> sch_rsp {=
auto req_ref = *req.get();
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Receiving task_id:" << req_ref << std::endl;
sch_rsp.schedule (req_ref, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(processing_delay)));
=}

reaction (sch_rsp) -> rsp {=
auto req_ref = *sch_rsp.get();
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_ref << std::endl;
rsp.set(req_ref);
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}


reactor Pool (bank_index:size_t = 0, n_workers:int = 1) {

input req:int;
output rsp:int;

output all_workers_busy:bool;

logical action sch_rsp(0):int;

workers = new [n_workers] Worker(processing_delay = 1s);
relay = new Relay(n_outputs = n_workers);

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}

req -> relay.in_req;
relay.out_req -> workers.req;
workers.rsp -> relay.in_rsp;
relay.out_rsp -> rsp;

relay.all_workers_busy -> all_workers_busy;

}

reactor Tasks (bank_index:size_t = 0, n_tasks:int = 10, n_pools:int = 1) {

output[n_pools] req:int;
input[n_pools] rsp:int;

input[n_pools] hybernate:bool;

state req_itr:int = 0;
state rsp_itr:int = 0;
state busy:bool* = 0;

logical action sch(0):int;

reaction (startup) -> sch {=
sch.schedule (-1, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
busy = (bool*) calloc (n_pools, sizeof(bool));
=}

reaction (sch) -> sch, req {=
auto index = *sch.get();
if (index < 0) {
for (int i = 0; i < n_pools; ++i) {
if (busy[i]) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Busy Pool:" << i << std::endl;
continue;
}
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_itr << " to pool:" << i << std::endl;
req[i].set (req_itr++);
}

int busy_count = 0;
for (int i = 0; i < n_pools; ++i) {
busy_count = busy[i] ? (busy_count + 1) : busy_count;
}

if (busy_count == n_pools) {
return;
}
sch.schedule (-1, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
} else {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_itr << " to pool:" << index << std::endl;
req[index].set (req_itr++);
}
=}

reaction (rsp) -> sch {=
for (int i = 0; i < n_pools; ++i) {
if (rsp[i].is_present()) {
++rsp_itr;
if (req_itr < n_tasks) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Scheduling task_id:" << req_itr << " to pool:" << i << std::endl;
sch.schedule (i, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
}
}
}
if (rsp_itr == n_tasks) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< "Terminating Run\n";
request_stop();
}
=}

reaction (hybernate) {=
for (int i = 0; i < n_pools; ++i) {
if (hybernate[i].is_present()) {
busy[i] = *hybernate[i].get();
}
}
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}

main reactor (n_tasks:int = 10, n_pools:int = 2, n_workers:int = 4) {
tasks = new Tasks(n_tasks = n_tasks, n_pools = n_pools);
pool = new [n_pools] Pool(n_workers = n_workers);

tasks.req -> pool.req;
pool.rsp -> tasks.rsp;
pool.all_workers_busy -> tasks.hybernate;
}
23 changes: 23 additions & 0 deletions examples/sdk-WorkersParams/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.9)
project(workers_with_params VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET workers_with_params)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)
567 changes: 567 additions & 0 deletions examples/sdk-WorkersParams/main.cc

Large diffs are not rendered by default.

226 changes: 226 additions & 0 deletions examples/sdk-WorkersParams/src/Workers.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
target Cpp {
fast: false,
// logging: debug
}

// lf_set_destructor(alloc_rsp, cache_entry_destructor);

public preamble {=
#include <stdint.h>
=}

reactor Relay (bank_index:size_t = 0, n_outputs:int = 1) {
input in_req:int;
output out_rsp:int;
output [n_outputs] out_req:int;
input [n_outputs] in_rsp:int;

output all_workers_busy:bool;

state index:int = 0;
state busy:int* = 0;

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
busy = (int*) calloc (n_outputs, sizeof(int));
=}

reaction (in_req) -> all_workers_busy, out_req {=
for (int i = 0; i < n_outputs; ++i, index = (index + 1) % n_outputs) {
if (busy[index] == 0) {
out_req[index].set(*in_req.get());
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << *in_req.get() << " to worker:" << index << std::endl;
busy[index] = 1;
index = (index + 1) % n_outputs;
break;
}
}
int busy_count = 0;
for (int i = 0; i < n_outputs; ++i) {
busy_count = busy[i] ? (busy_count + 1) : busy_count;
}

if (busy_count == n_outputs) {
all_workers_busy.set(true);
}
=}

reaction (in_rsp) -> out_rsp {=
for (int i = 0; i < n_outputs; ++i) {
if (in_rsp[i].is_present()) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Receiving task_id:" << *in_rsp[i].get() << " from worker:" << i << std::endl;
busy[i] = 0;
out_rsp.set(*in_rsp[i].get());
}
}
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}

reactor Worker (bank_index:size_t = 0, processing_delay:time = 2s) {
input req:int;
output rsp:int;

logical action sch_rsp(0):int;

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
=}

reaction (req) -> sch_rsp {=
auto req_ref = *req.get();
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Receiving task_id:" << req_ref << std::endl;
sch_rsp.schedule (req_ref, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(processing_delay)));
=}

reaction (sch_rsp) -> rsp {=
auto req_ref = *sch_rsp.get();
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_ref << std::endl;
rsp.set(req_ref);
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}


reactor Pool (bank_index:size_t = 0, n_workers:int = 1) {

input req:int;
output rsp:int;

output all_workers_busy:bool;

logical action sch_rsp(0):int;

workers = new [n_workers] Worker(processing_delay = 1s);
relay = new Relay(n_outputs = n_workers);

reaction (startup) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}

req -> relay.in_req;
relay.out_req -> workers.req;
workers.rsp -> relay.in_rsp;
relay.out_rsp -> rsp;

relay.all_workers_busy -> all_workers_busy;

}

reactor Tasks (bank_index:size_t = 0, n_tasks:int = 10, n_pools:int = 1) {

output[n_pools] req:int;
input[n_pools] rsp:int;

input[n_pools] hybernate:bool;

state req_itr:int = 0;
state rsp_itr:int = 0;
state busy:bool* = 0;

logical action sch(0):int;

reaction (startup) -> sch {=
sch.schedule (-1, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Startup\n";
busy = (bool*) calloc (n_pools, sizeof(bool));
=}

reaction (sch) -> sch, req {=
if (req_itr == n_tasks) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Tasks queue empty" << std::endl;
return;
}
auto index = *sch.get();
if (index < 0) {
for (int i = 0; i < n_pools; ++i) {
if (busy[i]) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Busy Pool:" << i << std::endl;
continue;
}
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_itr << " to pool:" << i << std::endl;
req[i].set (req_itr++);
}

int busy_count = 0;
for (int i = 0; i < n_pools; ++i) {
busy_count = busy[i] ? (busy_count + 1) : busy_count;
}

if (busy_count == n_pools) {
return;
}
sch.schedule (-1, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
} else {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Sending task_id:" << req_itr << " to pool:" << index << std::endl;
req[index].set (req_itr++);
}
=}

reaction (rsp) -> sch {=
for (int i = 0; i < n_pools; ++i) {
if (rsp[i].is_present()) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< "Received response of task:" << *rsp[i].get() << "\n";
++rsp_itr;
busy[i] = 0;
}
}
if (rsp_itr == n_tasks) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< "Terminating Run\n";
request_stop();
} else {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Scheduling tasks\n";
sch.schedule (-1, std::chrono::duration_cast<reactor::Duration>(std::chrono::nanoseconds(0)));
}
=}

reaction (hybernate) {=
for (int i = 0; i < n_pools; ++i) {
if (hybernate[i].is_present()) {
busy[i] = *hybernate[i].get();
}
}
=}

reaction (shutdown) {=
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Shutdown\n";
=}
}

main reactor (n_tasks:int = 10, n_pools:int = 2, n_workers:int = 4) {
tasks = new Tasks(n_tasks = n_tasks, n_pools = n_pools);
pool = new [n_pools] Pool(n_workers = n_workers);

tasks.req -> pool.req;
pool.rsp -> tasks.rsp;
pool.all_workers_busy -> tasks.hybernate;
}
27 changes: 27 additions & 0 deletions examples/sdk-deadlines/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cmake_minimum_required(VERSION 3.9)
project(node VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET node)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)

include(Node/NodeReactor.cmake)
include(Main/MainReactor.cmake)
include(Config-a/Config-a.cmake)
18 changes: 18 additions & 0 deletions examples/sdk-deadlines/Config-a/Config-a.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include "Config-a.hh"

UserParameters cfg_parameters;

ConfigParameter<int, Duration>::ParametersMap UserParameters::homogeneous_config() {
return {
};
}

ConfigParameter<int, Duration>::ParametersMap UserParameters::heterogeneous_config() {
return {
{"Main.slow.period", ConfigParameterMetadata<Duration> { 1s } },
{"Main.slow.duration", ConfigParameterMetadata<Duration> { 5s } },
{"Main.n_fast", ConfigParameterMetadata<int> { 3 } },
{"Main.fast_0.period", ConfigParameterMetadata<Duration> { 500ms } },
{"Main.fast_0.duration", ConfigParameterMetadata<Duration> { 10ms } }
};
}
16 changes: 16 additions & 0 deletions examples/sdk-deadlines/Config-a/Config-a.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/Config-a.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
18 changes: 18 additions & 0 deletions examples/sdk-deadlines/Config-a/Config-a.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef USER_PARAMETERS_H
#define USER_PARAMETERS_H

#include <reactor-sdk/reactor-sdk.hh>
#include <map>
#include <variant>
#include <string>

using namespace sdk;

struct UserParameters : public ConfigParameter<int, Duration> {
ConfigParameter<int, Duration>::ParametersMap homogeneous_config();
ConfigParameter<int, Duration>::ParametersMap heterogeneous_config();
};

extern UserParameters cfg_parameters;

#endif // USER_PARAMETERS_H
16 changes: 16 additions & 0 deletions examples/sdk-deadlines/Main/MainReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "MainReactor.hh"

void MainReactor::construction() {

std::cout << "Construction Main n_fast:" << parameters.n_fast.value << "\n";

slow = std::make_unique<NodeReactor>("slow", this);

for (int i = 0; i < parameters.n_fast.value; i++) {
fast.create_reactor();
}
}

void MainReactor::wiring() {
std::cout << "Wiring Main n_sinks:" << parameters.n_fast.value << "\n";
}
16 changes: 16 additions & 0 deletions examples/sdk-deadlines/Main/MainReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/MainReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
34 changes: 34 additions & 0 deletions examples/sdk-deadlines/Main/MainReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>

#include "Node/NodeReactor.hh"

using namespace sdk;

class MainReactor: public Reactor {
public:
struct Parameters : public SystemParametersStandalone<int> {
REACTOR_PARAMETER(int, n_fast, "Number of fast nodes", 1, 10, 2);

Parameters(Reactor *container)
: SystemParametersStandalone<int>(container) {
register_parameters (n_fast);
}
};
private:
Parameters parameters{this};
std::unique_ptr<NodeReactor> slow;
ReactorBank<NodeReactor> fast{"fast", this};

public:
MainReactor(const std::string &name, Environment *env)
: Reactor(name, env) {}
MainReactor(const std::string &name, Reactor *container)
: Reactor(name, container) {}

void construction() override;
void wiring() override;
};


29 changes: 29 additions & 0 deletions examples/sdk-deadlines/Node/NodeReactor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "NodeReactor.hh"

void NodeReactor::construction() {
std::cout << "Construction:" << fqn() << " period:" << parameters.period.value << " duration:" << parameters.duration.value << "\n";
}

void NodeReactor::wiring() {
std::cout << "Assembling Node\n";
}

void NodeReactor::Internals::add_reactions(NodeReactor *reactor) {
reaction("reaction_1").
triggers(&reactor->startup, &reactor->a).
dependencies().
effects().
function(
[this](Startup& startup, LogicalAction<void> &a) {
reactor::log::Info() << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " << fqn() << " reaction executes.";
std::this_thread::sleep_for(parameters.duration.value);
reactor::log::Info() << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " << fqn() << " reaction done.";
a.schedule(parameters.period.value);
}
).deadline (parameters.period.value,
[this](Startup& startup, LogicalAction<void> &a) {
reactor::log::Error() << "(" << get_elapsed_logical_time() << ", " << get_microstep() << "), physical_time: " << get_elapsed_physical_time() << " " << fqn() << " deadline was violated!";
exit(1);
}
);
}
16 changes: 16 additions & 0 deletions examples/sdk-deadlines/Node/NodeReactor.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include_directories(${CMAKE_CURRENT_LIST_DIR})

set(INCLUDE_FILES
"${CMAKE_CURRENT_LIST_DIR}/NodeReactor.hh"
)

set(SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/NodeReactor.cc"
)


foreach(file IN LISTS INCLUDE_FILES)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -include ${file}")
endforeach()

target_sources(${LF_MAIN_TARGET} PRIVATE ${SOURCE_FILES})
38 changes: 38 additions & 0 deletions examples/sdk-deadlines/Node/NodeReactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include <reactor-sdk/reactor-sdk.hh>

using namespace sdk;

class NodeReactor: public Reactor {
public:
struct Parameters : public SystemParametersStandalone<Duration> {
REACTOR_PARAMETER(Duration, period, "Schedule and deadline period", 10ms, 10s, 500ms);
REACTOR_PARAMETER(Duration, duration, "Sleep duration", 5ms, 5s, 10ms);

Parameters(Reactor *container)
: SystemParametersStandalone<Duration>(container) {
register_parameters (period, duration);
}
};

private:
Parameters parameters{this};
LogicalAction<void> a{"a", this};

REACTION_SCOPE_START(NodeReactor, Parameters)
void add_reactions(NodeReactor *reactor);
REACTION_SCOPE_END(this, parameters)


public:
NodeReactor(const std::string &name, Environment *env)
: Reactor(name, env) {}
NodeReactor(const std::string &name, Reactor *container)
: Reactor(name, container) {}

void construction() override;
void wiring() override;
};


51 changes: 51 additions & 0 deletions examples/sdk-deadlines/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

#include <reactor-sdk/reactor-sdk.hh>

#include "Config-a/Config-a.hh"
#include "Main/MainReactor.hh"

using namespace std;
using namespace sdk;

int main(int argc, char **argv) {
cxxopts::Options options("sdk-deadlines", "Multiport source connecting to banked sink reactors");

unsigned workers = std::thread::hardware_concurrency();
bool fast{false};
reactor::Duration timeout = reactor::Duration::max();
bool cfg_gen{false};

// the timeout variable needs to be tested beyond fitting the Duration-type
options
.set_width(120)
.add_options()
("w,workers", "the number of worker threads used by the scheduler", cxxopts::value<unsigned>(workers)->default_value(std::to_string(workers)), "'unsigned'")
("o,timeout", "Time after which the execution is aborted.", cxxopts::value<reactor::Duration>(timeout)->default_value(time_to_string(timeout)), "'FLOAT UNIT'")
("f,fast", "Allow logical time to run faster than physical time.", cxxopts::value<bool>(fast)->default_value("false"))
("c,config-gen", "Generate configuration files for the topology.", cxxopts::value<bool>(cfg_gen)->default_value("false"))
("help", "Print help");

cxxopts::ParseResult result{};
bool parse_error{false};
try {
result = options.parse(argc, argv);
} catch (const cxxopts::OptionException& e) {
reactor::log::Error() << e.what();
parse_error = true;
}

// if parameter --help was used or there was a parse error, print help
if (parse_error || result.count("help"))
{
std::cout << options.help({""});
return parse_error ? -1 : 0;
}

std::cout << "parameters - workers:" << workers << " fast:" << (fast ? "True" : "False") << " timeout:" << timeout << " cfg_gen:" << (cfg_gen ? "True" : "False") << std::endl;

Environment sim {&cfg_parameters, workers, fast, timeout, cfg_gen};
auto main = new MainReactor("Main", &sim);

sim.run();
return 0;
}
23 changes: 23 additions & 0 deletions examples/sdk-hello/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.9)
project(hello VERSION 0.0.0 LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard is cached for visibility in external tools." FORCE)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)

set(LF_MAIN_TARGET hello)

find_package(reactor-cpp PATHS )
find_package(reactor-sdk PATHS )

add_executable(${LF_MAIN_TARGET}
main.cc
)

include_directories(${CMAKE_CURRENT_LIST_DIR})

target_link_libraries(${LF_MAIN_TARGET} reactor-cpp)
target_link_libraries(${LF_MAIN_TARGET} reactor-sdk)

target_compile_options(${LF_MAIN_TARGET} PRIVATE -Wall -Wextra -pedantic)
65 changes: 65 additions & 0 deletions examples/sdk-hello/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

#include <reactor-sdk/reactor-sdk.hh>

using namespace sdk;

class Hello : public Reactor {
private:
struct Parameters {
};
Parameters parameters;

Timer timer{"timer", this};

class Chamber : public ReactionChamber<Hello, Parameters> {
public:
Chamber(Reactor *reactor, Parameters &params)
: ReactionChamber<Hello, Parameters>(reactor, params) {}
private:

void terminate(Shutdown& shutdown) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Good Bye!\n";
}

void add_reactions (Hello *reactor) {
reaction("reaction_1").
triggers(&reactor->timer).
dependencies().
effects().
function(
[this](Timer& timer) {
std::cout << "(" << get_elapsed_logical_time().count() << ", " << get_microstep() << ") physical_time:" << get_elapsed_physical_time().count()
<< fqn() << " Bank:" << bank_index << " Hello World!\n";
}
);

reaction("reaction_2").
triggers(&reactor->shutdown).
dependencies().
effects().
function(pass_function(terminate));
}
};
Chamber reaction_chamber{this, parameters};

public:
Hello(const std::string &name, Environment *env)
: Reactor(name, env) {}

void construction() {
timer.set_timer (1s, 2s);
}

void wiring() override {

}
};

int main(int argc, char **argv) {
Environment env {nullptr, 4, false, 4s, false};
auto main = new Hello("Hello", &env);

env.run();
return 0;
}
19 changes: 19 additions & 0 deletions examples/sdk-hello/src/HelloWorld.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
target Cpp {
timeout: 4s
}

reactor Hello (bank_index: size_t = 0) {
timer Timer(1s, 2s)

reaction(Timer) {=
std::cout << "Bank:" << bank_index << " Hello World!\n";
=}

reaction(shutdown) {=
std::cout << "Good Bye!\n";
=}
}

main reactor {
hello = new Hello();
}
2 changes: 1 addition & 1 deletion include/reactor-cpp/action.hh
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ public:
};

class Timer : public BaseAction {
private:
protected:
Duration offset_{0};
Duration period_{0};

11 changes: 7 additions & 4 deletions include/reactor-cpp/environment.hh
Original file line number Diff line number Diff line change
@@ -30,10 +30,11 @@ constexpr bool default_fast_fwd_execution = false;
enum class Phase : std::uint8_t {
Construction = 0,
Assembly = 1,
Startup = 2,
Execution = 3,
Shutdown = 4,
Deconstruction = 5
Indexing = 2,
Startup = 3,
Execution = 4,
Shutdown = 5,
Deconstruction = 6
};

class Environment {
@@ -107,7 +108,9 @@ public:
void register_reactor(Reactor* reactor);
void register_port(BasePort* port) noexcept;
void register_input_action(BaseAction* action);
void construct();
void assemble();
void dependency_graph_and_indexes();
auto startup() -> std::thread;
void sync_shutdown();
void async_shutdown();
1 change: 1 addition & 0 deletions include/reactor-cpp/reactor.hh
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ public:
void shutdown() final;

virtual void assemble() = 0;
virtual void construct() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, here is some food for thought:

Currently, the construction phase if for instantiating all the reactors. Here you introduce this new construct method that reactors need to implement, so the question: what should go into the constructor of the class and what should go into this construct function.

and sed /{}/=default


[[nodiscard]] static auto get_physical_time() noexcept -> TimePoint;
[[nodiscard]] auto get_logical_time() const noexcept -> TimePoint;
135 changes: 135 additions & 0 deletions include/reactor-sdk/ConfigParameters.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#pragma once

#include <map>
#include <string>
#include <variant>

namespace sdk
{

template <typename T, typename = void>
struct is_comparable : std::false_type {};

template <typename T>
struct is_comparable<
T,
std::void_t<decltype(std::declval<T>() < std::declval<T>())>
> : std::true_type {};

extern std::map<std::string, std::string> type_convert;
template <typename T>
struct ParameterMetadata;

class ConfigParameterBase {
protected:
virtual void pull_config() = 0;
virtual void display() = 0;
virtual int validate() = 0;

public:
virtual ~ConfigParameterBase() = default;
virtual int pull_config_parameter(const bool &is_heterogeneous, const std::string &key, void *user_param, const std::type_info& ti) = 0;

template<typename T>
int PullConfigParameter(const bool &is_heterogeneous, const std::string &key, ParameterMetadata<T>* user_param) {
return pull_config_parameter(is_heterogeneous, key, static_cast<void*>(user_param), typeid(T));
}
friend class Environment;
};

template <typename T>
struct ConfigParameterMetadata {
std::vector<T> values;

ConfigParameterMetadata(std::initializer_list<T> val) : values(val) {}
};

template <typename... ParameterValueType>
class ConfigParameter : public ConfigParameterBase {
public:
using ParameterValue = std::variant<ConfigParameterMetadata<ParameterValueType>...>;
using ParametersMap = std::map<std::string, ParameterValue>;

virtual ParametersMap homogeneous_config() = 0;
virtual ParametersMap heterogeneous_config() = 0;
int pull_config_parameter(const bool &is_heterogeneous, const std::string &key, void *user_param, const std::type_info& ti) override {
std::map<std::string, ParameterValue> *param_map = is_heterogeneous ? &hetero_param_map : &homoge_param_map;
std::set<std::string> *invalid_keys = is_heterogeneous ? &hetero_invalid_keys : &homoge_invalid_keys;
auto itr_system = param_map->find(key);
if (itr_system != param_map->end()) {
auto v_it = invalid_keys->find(key);
if (v_it != invalid_keys->end()) {
invalid_keys->erase(v_it);
}
std::visit([is_heterogeneous, user_param, &ti, key](auto&& system_param) {
using ContainerType = std::decay_t<decltype(system_param.values)>;
using U = typename ContainerType::value_type;

if (ti == typeid(U)) {
ParameterMetadata<U>* param = static_cast<ParameterMetadata<U>*>(user_param);
if constexpr (is_comparable<U>::value && !std::is_same<U, std::string>::value) {
if ((system_param.values[0] < param->min_value) ||
(system_param.values[0] > param->max_value)) {
reactor::log::Error() << "Error: " << ((is_heterogeneous) ? "Heterogeneous Map" : "Homogeneous Map") << " -- Range mismatch for parameter name: " << key << " value:" << system_param.values[0] <<
" min_value:" << param->min_value << " max_value:" << param->max_value;
std::exit(EXIT_FAILURE);
}
}
param->value = system_param.values[0];

} else {
reactor::log::Error() << "Error: Type mismatch for parameter name: " << key << "\n"
<< "Expected type: " << type_convert[ti.name()]
<< ", Provided type: " << type_convert[typeid(U).name()];
std::exit(EXIT_FAILURE);
}
}, itr_system->second);
return 0;
}
return -1;
}

protected:
std::map<std::string, ParameterValue> homoge_param_map;
std::set<std::string> homoge_invalid_keys;
std::map<std::string, ParameterValue> hetero_param_map;
std::set<std::string> hetero_invalid_keys;
void pull_config() override {
homoge_param_map = homogeneous_config();
for (const auto& entry : homoge_param_map) {
bool result = homoge_invalid_keys.insert(entry.first).second;
assert(result);
}

hetero_param_map = heterogeneous_config();
for (const auto& entry : hetero_param_map) {
bool result = hetero_invalid_keys.insert(entry.first).second;
assert(result);
}
}

int validate() override {
for (const auto &key : hetero_invalid_keys) {
reactor::log::Error() << "Heterogeneous Invalid key:" << key << "\n";
}

for (const auto &key : homoge_invalid_keys) {
reactor::log::Error() << "Homogeneous Invalid key:" << key << "\n";
}
return (hetero_invalid_keys.size() + homoge_invalid_keys.size());
}

void display() override {
for (const auto& entry : hetero_param_map) {
reactor::log::Debug() << "Parameter: " << entry.first;

std::visit([](auto&& param) {
for (auto val : param.values) {
reactor::log::Debug() << "Value: " << val;
}
}, entry.second);
}
}
};

} // namespace sdk
34 changes: 34 additions & 0 deletions include/reactor-sdk/Environment.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"
#include "ConfigParameters.hh"

namespace sdk
{

class Reactor;
class Environment: public reactor::Environment {
private:
std::set<Reactor*> top_tier_reactors;
ConfigParameterBase *config_parameters;
bool cfg_gen = false;

public:
Environment(ConfigParameterBase *sys_param = nullptr, unsigned int num_workers = 1, bool fast_fwd_execution = true,
const reactor::Duration& timeout = reactor::Duration::max(), bool cfg_gen = false);

Environment(const Environment&) = delete;
Environment& operator=(const Environment&) = delete;
void run();

void add_reactor (Reactor* reactor) {
bool result = top_tier_reactors.insert(reactor).second;
reactor_assert(result);
}

ConfigParameterBase *get_config_params() { return config_parameters; }

friend class SystemParameterBase;
};

} // namespace sdk
65 changes: 65 additions & 0 deletions include/reactor-sdk/InputPort.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"

namespace sdk
{

template <typename T>
class Input : public reactor::Input<T> {
class WiringProxy {
public:
WiringProxy(Input& origin) : origin(origin) {}

void operator>(Input<T>& input) {
origin.connect (input);
}

void operator>(MultiportInput<T>& input) {
origin.connect (input);
}

void operator>>(MultiportInput<T>& input) {
origin.connect_fanout (input);
}

template <typename OtherReactorType>
void operator>(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports) {
origin.connect(std::move(other_bank_ports));
}

template <typename OtherReactorType>
void operator>>(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports) {
origin.connect_fanout(std::move(other_bank_ports));
}

private:
Input& origin;
};

void connect(Input<T>& input);
void connect(MultiportInput<T>& input);
void connect_fanout(MultiportInput<T>& input);

template <typename ReactorType>
void connect(ReactorBankInputPortOffset<ReactorType, T> &&other_bank_ports);

template <typename ReactorType>
void connect_fanout(ReactorBankInputPortOffset<ReactorType, T> &&other_bank_ports);

public:
using value_type = T;
Input(const std::string& name, reactor::Reactor* container)
: reactor::Input<T>(name, container) {}

Input(Input&&) noexcept = default;
~Input() {}

WiringProxy operator--(int) {
return WiringProxy(*this);
}
};

} // namespace sdk

#include "impl/InputPort_wiring_impl.hh"
108 changes: 108 additions & 0 deletions include/reactor-sdk/Misc.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"
#include <iomanip>

namespace sdk
{

class Reactor;

template<typename T>
using LogicalAction = reactor::LogicalAction<T>;

using Startup = reactor::StartupTrigger;
using Shutdown = reactor::ShutdownTrigger;

using Duration = reactor::Duration;
using TimePoint = reactor::TimePoint;

#define select_default(obj) &obj[0]

template <typename T>
struct inspect_function_args;

template <typename Ret, typename Class, typename... Args>
struct inspect_function_args<Ret(Class::*)(Args...)> {
static constexpr size_t nargs = sizeof...(Args);
};

template <typename Func, typename Object>
auto bind_function(Object* obj, Func&& func) {
constexpr size_t nargs = inspect_function_args<Func>::nargs;

if constexpr (nargs == 0) {
static_assert(nargs > 0, "Reactors must have one or more parameters");
return nullptr;
} else if constexpr (nargs == 1) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1);
} else if constexpr (nargs == 2) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2);
} else if constexpr (nargs == 3) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
} else if constexpr (nargs == 4) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
} else if constexpr (nargs == 5) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5);
} else if constexpr (nargs == 6) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6);
} else if constexpr (nargs == 7) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7);
} else if constexpr (nargs == 8) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8);
} else if constexpr (nargs == 9) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9);
} else if constexpr (nargs == 10) {
return std::bind(std::forward<Func>(func), obj, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9, std::placeholders::_10);
} else {
static_assert(nargs <= 10, "This needs to be extended as per requirement of more parameters");
return nullptr;
}

}

#define pass_function(func) \
bind_function(this, &std::decay_t<decltype(*this)>::func)

class Timer : public reactor::Timer {
std::string name;
Reactor *reactor;
public:
Timer(const std::string& name, Reactor* container)
: reactor::Timer(name, (reactor::Reactor *) container), name (name), reactor (container){}

void set_timer (Duration period = Duration::zero(), Duration offset = Duration::zero()) {
period_ = period;
offset_ = offset;
}

Timer(Timer&&) noexcept = default;
};

inline auto operator<<(std::ostream& os, Duration dur) -> std::ostream& {
os << dur.count() << " nsecs";
return os;
}

constexpr std::size_t TIME_TO_STR_BUFFER_SIZE_{20};
constexpr std::size_t NANOSECONDS_IN_ONE_SECOND_{1'000'000'000UL};
constexpr std::size_t NANOSECOND_DIGITS_{9};

inline auto operator<<(std::ostream& os, TimePoint tp) -> std::ostream& {
std::array<char, TIME_TO_STR_BUFFER_SIZE_> buf{};
time_t time =
std::chrono::system_clock::to_time_t(std::chrono::time_point_cast<std::chrono::system_clock::duration>(tp));
auto res = std::strftime(buf.data(), sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time));
auto epoch = std::chrono::duration_cast<Duration>(tp.time_since_epoch());

if (res != 0) {
os << buf.data() << '.' << std::setw(NANOSECOND_DIGITS_) << std::setfill('0')
<< epoch.count() % NANOSECONDS_IN_ONE_SECOND_;
} else {
os << "[INVALID TIME]";
}

return os;
}

} // namespace sdk
65 changes: 65 additions & 0 deletions include/reactor-sdk/MultiportInput.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include <string>
#include "reactor-cpp/reactor-cpp.hh"

namespace sdk
{

template<typename T>
class Input;

class Reactor;

template<typename T>
class MultiportInput : public reactor::ModifableMultiport<Input<T>> {
size_t n_inputs;
std::string name;
Reactor *reactor;

class WiringProxy {
public:
WiringProxy(MultiportInput& origin) : origin(origin) {}

void operator>(Input<T>& input) {
origin.connect (input);
}

void operator>(MultiportInput<T>& input) {
origin.connect (input);
}

private:
MultiportInput& origin;
};

void connect(Input<T>& input);
void connect(MultiportInput<T>& input);

public:
using value_type = T;
MultiportInput(const std::string& name, Reactor* container)
: name (name), reactor (container) {}

void set_width (int width)
{
this->reserve(width);
n_inputs = width;
for (int idx = 0; idx < width; idx++) {
std::string input_name = name + "_" + std::to_string(idx);
this->emplace_back(input_name, reactor);
}
}

MultiportInput(MultiportInput&&) noexcept = default;
auto get_nports() -> int { return n_inputs; }

WiringProxy operator--(int) {
return WiringProxy(*this);
}
};


} // namespace sdk

#include "impl/InputMultiport_wiring_impl.hh"
108 changes: 108 additions & 0 deletions include/reactor-sdk/MultiportOutput.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#pragma once

#include <string>
#include "reactor-cpp/reactor-cpp.hh"

namespace sdk
{

template<typename T>
class Input;

template<typename T>
class Output;

template<typename T>
class MultiportOutput;

template<typename T>
class MultiportInput;

class Reactor;

template<typename T>
class MultiportOutput : public reactor::ModifableMultiport<Output<T>> {
size_t n_inputs;
std::string name;
Reactor *reactor;
class WiringProxy {
public:
WiringProxy(MultiportOutput& origin) : origin(origin) {}

void operator>(Input<T>& input) {
origin.connect (input);
}

void operator>(Output<T>& input) {
origin.connect (input);
}

void operator>(MultiportInput<T>& input) {
origin.connect (input);
}

void operator>(MultiportOutput<T>& input) {
origin.connect (input);
}

template <typename ReactorType>
void operator>(std::pair<std::vector<std::unique_ptr<ReactorType>>*, Input<T> ReactorType::*> connections)
{
origin.connect (connections.first, connections.second);
}

template <typename OtherReactorType>
void operator>(ReactorBankInputPort<OtherReactorType, T> &&other_bank_ports) {
origin.connect(std::move(other_bank_ports));
}

template <typename OtherReactorType>
void operator>(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports) {
origin.connect(std::move(other_bank_ports));
}

private:
MultiportOutput& origin;
};

void connect(Input<T>& input);
void connect(Output<T>& input);
void connect(MultiportInput<T>& input);
void connect(MultiportOutput<T>& input);

template <typename ReactorType>
void connect(std::vector<std::unique_ptr<ReactorType>>* reactors, Input<T> ReactorType::*member);

template <typename OtherReactorType>
void connect(ReactorBankInputPort<OtherReactorType, T> &&other_bank_ports);

template <typename OtherReactorType>
void connect(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports);

public:
using value_type = T;
MultiportOutput(const std::string& name, Reactor* container)
: name (name), reactor (container) {}

void set_width (int width)
{
this->reserve(width);
n_inputs = width;
for (int idx = 0; idx < width; idx++) {
std::string input_name = name + "_" + std::to_string(idx);
this->emplace_back(input_name, reactor);
}
}

MultiportOutput(MultiportOutput&&) noexcept = default;
auto get_nports() -> int { return n_inputs; }

WiringProxy operator--(int) {
return WiringProxy(*this);
}
};


} // namespace sdk

#include "impl/OutputMultiport_wiring_impl.hh"
113 changes: 113 additions & 0 deletions include/reactor-sdk/OutputPort.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"

namespace sdk
{

template<typename T>
class Input;

template<typename T>
class Output;

template<typename T>
class MultiportOutput;

template<typename T>
class MultiportInput;

class Reactor;

template <typename T>
class Output : public reactor::Output<T> {
std::set<Output<T>*> accumulated;
bool is_accumulated = false;

class WiringProxy {
public:
WiringProxy(Output& origin) : origin(origin) {}

void operator>(Input<T>& input) {
origin.connect (input);
}

void operator>(Output<T>& input) {
origin.connect (input);
}

void operator>(MultiportInput<T>& input) {
origin.connect (input);
}

void operator>>(MultiportInput<T>& input) {
origin.connect_fanout (input);
}

template <typename ReactorType>
void operator>(std::pair<std::vector<std::unique_ptr<ReactorType>>*, Input<T> ReactorType::*> connections)
{
origin.connect (connections.first, connections.second);
}

template <typename OtherReactorType>
void operator>(ReactorBankInputPort<OtherReactorType, T> &&other_bank_ports) {
origin.connect(std::move(other_bank_ports));
}

template <typename OtherReactorType>
void operator>(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports) {
origin.connect(std::move(other_bank_ports));
}

template <typename OtherReactorType>
void operator>>(ReactorBankInputPortOffset<OtherReactorType, T> &&other_bank_ports) {
origin.connect_fanout(std::move(other_bank_ports));
}

private:
Output& origin;
};

void connect(Input<T>& input);
void connect(Output<T>& input);
void connect(MultiportInput<T>& input);
void connect_fanout(MultiportInput<T>& input);

template <typename ReactorType>
void connect(std::vector<std::unique_ptr<ReactorType>>* reactors, Input<T> ReactorType::*member);

template <typename ReactorType>
void connect(ReactorBankInputPort<ReactorType, T> &&other_bank_ports);

template <typename ReactorType>
void connect(ReactorBankInputPortOffset<ReactorType, T> &&other_bank_ports);

template <typename ReactorType>
void connect_fanout(ReactorBankInputPortOffset<ReactorType, T> &&other_bank_ports);

public:
using value_type = T;
Output(const std::string& name, reactor::Reactor* container)
: reactor::Output<T>(name, container) {}

~Output() {}

Output(Output&&) noexcept = default;

WiringProxy operator--(int) {
return WiringProxy(*this);
}

Output<T>& operator+(Output<T> &output) {
[[maybe_unused]] bool result = accumulated.insert(&output).second;
reactor_assert(result);
is_accumulated = true;
return *this;
}
};


} // namespace sdk

#include "impl/OutputPort_wiring_impl.hh"
365 changes: 365 additions & 0 deletions include/reactor-sdk/Reaction.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,365 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"
#include "ReactionBase.hh"
#include "Reactor.hh"

namespace sdk
{

template <typename Fn, typename InputTuple, typename DependencyTuple, typename OutputTuple>
class Reaction;

template <typename T, template <typename...> class Template>
struct is_specialization : std::false_type
{
};

template <typename... Args, template <typename...> class Template>
struct is_specialization<Template<Args...>, Template> : std::true_type
{
};

template <typename T, template <typename...> class Template>
inline constexpr bool is_specialization_v = is_specialization<T, Template>::value;

// fix for gcc < 13
template <typename T>
constexpr bool templated_false = false;

template <typename InputTuple, typename DependencyTuple, typename OutputTuple>
class ReactionOutput: public ReactionBase
{
private:
InputTuple input_triggers;
DependencyTuple dependencies;
OutputTuple output_triggers;

public:
explicit ReactionOutput(std::string name, Reactor *parent, InputTuple inputs, DependencyTuple deps, OutputTuple outputs)
: ReactionBase (name, parent), input_triggers(std::move(inputs)), dependencies(std::move(deps)), output_triggers(std::move(outputs)) {}
~ReactionOutput() {}

template <typename Fn>
Reaction<Fn, InputTuple, DependencyTuple, OutputTuple> &function(Fn func)
{
if constexpr (std::is_bind_expression<Fn>::value) {
}
else if (sizeof(func) != sizeof(void*)) {
reactor::log::Error() << "Reactor: " << reactor->fqn() << " Reaction: " << name << " Accesses variables outside of its scope";
exit(EXIT_FAILURE);
}

auto ReactionRef = std::make_shared<Reaction<Fn, InputTuple, DependencyTuple, OutputTuple>> (name, reactor, std::move(input_triggers), std::move(dependencies), std::move(output_triggers), std::forward<Fn>(func));
ReactionRef->execute();
return *ReactionRef;
}
};

template <typename InputTuple, typename DependencyTuple>
class ReactionDependency: public ReactionBase
{
private:
InputTuple input_triggers;
DependencyTuple dependencies;

public:
explicit ReactionDependency(std::string name, Reactor *parent, InputTuple inputs, DependencyTuple deps)
: ReactionBase (name, parent), input_triggers(inputs), dependencies(std::move(deps)) {}
~ReactionDependency() {}

template <typename... Outputs>
ReactionOutput<InputTuple, DependencyTuple, std::tuple<Outputs...>> &effects(Outputs&&... outputs)
{
auto output_tuple = std::make_tuple(outputs...);
auto ReactionOutputRef = std::make_shared<ReactionOutput<InputTuple, DependencyTuple, std::tuple<Outputs...>>> (name, reactor, std::move(input_triggers), std::move(dependencies), std::move(output_tuple));
next = ReactionOutputRef;
return *ReactionOutputRef;
}
};

template <typename InputTuple>
class ReactionInput: public ReactionBase
{
private:
InputTuple input_triggers;

public:
explicit ReactionInput(std::string name, Reactor *parent, InputTuple inputs)
: ReactionBase (name, parent), input_triggers(std::move(inputs)) {}
~ReactionInput() {}

template <typename... Dependencies>
ReactionDependency<InputTuple, std::tuple<Dependencies...>> &dependencies(Dependencies&&... deps)
{
auto deps_tuple = std::make_tuple(deps...);
auto ReactionDependenciesRef = std::make_shared<ReactionDependency<InputTuple, std::tuple<Dependencies...>>> (name, reactor, std::move(input_triggers), std::move(deps_tuple));
next = ReactionDependenciesRef;
return *ReactionDependenciesRef;
}
};

class ReactionName: public ReactionBase {
public:
explicit ReactionName(std::string name, Reactor *parent)
: ReactionBase (name, parent) {}
~ReactionName() = default;

template <typename... Inputs>
ReactionInput<std::tuple<Inputs...>> &triggers(Inputs&&... inputs)
{
auto input_tuple = std::make_tuple(inputs...);
auto ReactionInputRef = std::make_shared<ReactionInput<std::tuple<Inputs...>>> (name, reactor, std::move(input_tuple));
next = ReactionInputRef;
return *ReactionInputRef;
}
};

template <typename Fn, typename InputTuple, typename DependencyTuple, typename OutputTuple>
class Reaction: public ReactionBase
{
private:
InputTuple input_triggers;
DependencyTuple dependencies;
OutputTuple output_triggers;
Fn user_function;
std::unique_ptr<reactor::Reaction> reaction;

template <typename Reaction, typename Trigger>
void set_input_trigger(Reaction &reaction, Trigger &&trigger)
{
if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, MultiportInput>)
{
for (auto& port : *trigger) {
reaction.declare_trigger(&port);
}
}
else if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, MultiportOutput>)
{
for (auto& port : *trigger) {
reaction.declare_trigger(&port);
}
}
else {
reaction.declare_trigger(trigger);
}
}

template <typename Reaction, typename... Triggers>
void set_input_triggers(std::unique_ptr<Reaction> &reaction, const std::tuple<Triggers...> &inputs)
{
std::apply([this, &reaction](auto &&...input)
{
(void)this;
(..., set_input_trigger(*reaction, std::forward<decltype(input)>(input)));
},
inputs);
}

template <typename Reaction, typename Trigger>
void set_dependency(Reaction &reaction, Trigger &&trigger)
{
if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, MultiportInput>)
{
for (auto& port : *trigger) {
reaction.declare_dependency(&port);
}
}
else {
reaction.declare_dependency(trigger);
}
}

template <typename Reaction, typename... Dependencies>
void set_dependencies(std::unique_ptr<Reaction> &reaction, const std::tuple<Dependencies...> &deps)
{
std::apply([this, &reaction](auto &&...dep)
{
(void)this;
(..., set_dependency(*reaction, std::forward<decltype(dep)>(dep)));
},
deps);
}

template <typename Reaction, typename Trigger>
void set_output_trigger(Reaction &reaction, Trigger &&trigger)
{
if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, Output>)
{
reaction.declare_antidependency(trigger);
} else if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, Input>)
{
reaction.declare_antidependency(trigger);
}
else if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, reactor::LogicalAction>)
{
reaction.declare_schedulable_action(trigger);
}
else if constexpr (is_specialization_v<std::remove_pointer_t<std::decay_t<Trigger>>, MultiportOutput>)
{
for (auto& port : *trigger) {
reaction.declare_antidependency(&port);
}
}
else
{
static_assert(templated_false<Trigger>, "Unsupported trigger type");
}
}

template <typename Reaction, typename... Triggers>
void set_output_triggers(std::unique_ptr<Reaction> &reaction, const std::tuple<Triggers...> &outputs)
{
std::apply([this, &reaction](auto &&...output)
{
(void)this;
(..., set_output_trigger(*reaction, std::forward<decltype(output)>(output)));
},
outputs);
}

public:
Reaction(std::string name, Reactor *parent, InputTuple inputs, DependencyTuple deps, OutputTuple outputs, Fn func)
: ReactionBase(name, parent), input_triggers(std::move(inputs)), dependencies(std::move(deps)), output_triggers(std::move(outputs)), user_function(std::forward<Fn>(func)) { /* std::cout << "Creating Reaction\n"; */ }
~Reaction() {}

void execute () {
int priority = reactor->get_priority();
reactor->add_to_reaction_map(name, shared_from_this());
reactor->validate_reaction (user_function, input_triggers, dependencies, output_triggers);

auto reactor_func = [func = std::move(user_function), this]()
{
(void)this;
auto apply_to_dereferenced = [](auto&& func, auto&& tuple) {
return std::apply(
[&](auto*... ptrs) {
return std::invoke(std::forward<decltype(func)>(func), (*ptrs)...);
},
std::forward<decltype(tuple)>(tuple));
};

apply_to_dereferenced(func, std::tuple_cat(this->input_triggers, this->dependencies, this->output_triggers));
};

reaction = std::make_unique<reactor::Reaction>(name, priority, reactor, reactor_func);

set_input_triggers(reaction, input_triggers);
set_dependencies(reaction, dependencies);
set_output_triggers(reaction, output_triggers);
}

template <typename Dfn>
void deadline(reactor::Duration deadline_period, Dfn fn)
{
reactor->validate_reaction (fn, input_triggers, dependencies, output_triggers);

auto deadline_func = [func = std::move(fn), this]()
{
(void)this;
auto apply_to_dereferenced = [](auto&& func, auto&& tuple) {
return std::apply(
[&](auto*... ptrs) {
return std::invoke(std::forward<decltype(func)>(func), (*ptrs)...);
},
std::forward<decltype(tuple)>(tuple));
};

apply_to_dereferenced(func, std::tuple_cat(this->input_triggers, this->dependencies, this->output_triggers));
};

reaction->set_deadline(deadline_period, deadline_func);
}
};

template <typename ReactorType>
class ReactionChamberParameterless : public ReactionBase {
ReactorType *reactor_;
protected:
const size_t &bank_index = reactor_->bank_index;
public:
ReactionChamberParameterless(Reactor *owner)
: ReactionBase("reaction-internals-parameterless", owner), reactor_((ReactorType*) owner) {
reactor_->add_reaction_internals(this);
}

ReactionName &reaction (const std::string name) {
auto ReactionNameRef = std::make_shared<ReactionName>(name, reactor);
next = ReactionNameRef;
return *ReactionNameRef;
}

virtual void add_reactions(ReactorType *reactor) = 0;
virtual void assemble() override {
add_reactions(reactor_);
}

auto fqn() const noexcept -> const std::string& { return reactor_->fqn(); }
auto get_elapsed_logical_time() const noexcept -> Duration { return reactor_->get_elapsed_logical_time(); }
auto get_microstep() const noexcept -> reactor::mstep_t { return reactor_->get_microstep(); }
auto get_elapsed_physical_time() const noexcept -> Duration { return reactor_->get_elapsed_physical_time(); }
auto get_physical_time() noexcept -> reactor::TimePoint { return reactor_->get_physical_time(); }
auto get_logical_time() const noexcept -> reactor::TimePoint { return reactor_->get_logical_time(); }
auto get_tag() const noexcept -> reactor::Tag { return reactor_->get_tag(); }
void request_stop() { reactor_->environment()->sync_shutdown(); }
};

template <typename ReactorType, typename ParameterType>
class ReactionChamber : public ReactionBase {
ReactorType *reactor_;

protected:
const ParameterType &parameters;
const size_t &bank_index = reactor_->bank_index;
public:
ReactionChamber(Reactor *owner, ParameterType &param)
: ReactionBase("reaction-internals", owner), reactor_((ReactorType*) owner), parameters(param) {
reactor_->add_reaction_internals(this);
}

ReactionName &reaction (const std::string name) {
auto ReactionNameRef = std::make_shared<ReactionName>(name, reactor);
next = ReactionNameRef;
return *ReactionNameRef;
}

virtual void add_reactions(ReactorType *reactor) = 0;
virtual void assemble() override {
add_reactions(reactor_);
}

auto fqn() const noexcept -> const std::string& { return reactor_->fqn(); }
auto get_elapsed_logical_time() const noexcept -> Duration { return reactor_->get_elapsed_logical_time(); }
auto get_microstep() const noexcept -> reactor::mstep_t { return reactor_->get_microstep(); }
auto get_elapsed_physical_time() const noexcept -> Duration { return reactor_->get_elapsed_physical_time(); }
auto get_physical_time() noexcept -> reactor::TimePoint { return reactor_->get_physical_time(); }
auto get_logical_time() const noexcept -> reactor::TimePoint { return reactor_->get_logical_time(); }
auto get_tag() const noexcept -> reactor::Tag { return reactor_->get_tag(); }
void request_stop() { reactor_->environment()->sync_shutdown(); }
};

#define REACTION_SCOPE_START(ReactorType, ParamType) \
class Internals : public ReactionChamber<ReactorType, ParamType> { \
public: \
Internals(Reactor *reactor, ParamType &params) \
: ReactionChamber<ReactorType, ParamType>(reactor, params) {} \
private:

#define REACTION_SCOPE_END(reactor, param) \
}; \
Internals reaction_internals{reactor, param};

#define REACTION_SCOPE_START_NO_PARAMS(ReactorType) \
class Internals : public ReactionChamberParameterless<ReactorType> { \
public: \
Internals(Reactor *reactor) \
: ReactionChamberParameterless<ReactorType>(reactor) {} \
private:

#define REACTION_SCOPE_END_NO_PARAMS(reactor) \
}; \
Internals reaction_internals{reactor};

#define REACTION_SCOPE(ReactorType) ReactorType::Internals

} // namespace sdk
22 changes: 22 additions & 0 deletions include/reactor-sdk/ReactionBase.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

namespace sdk
{
class Reactor;

class ReactionBase : public std::enable_shared_from_this<ReactionBase>
{
public:
Reactor *reactor;
std::shared_ptr<ReactionBase> next;
std::string name;

public:
ReactionBase(std::string name, Reactor *parent)
: reactor(parent), next(nullptr), name(name) {}
virtual ~ReactionBase() = default;

virtual void assemble() {}
};

} // namespace sdk
155 changes: 155 additions & 0 deletions include/reactor-sdk/Reactor.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#pragma once

#include "reactor-cpp/reactor-cpp.hh"
#include <string.h>
#include "ReactionBase.hh"
#include "SystemParameterBase.hh"
#include "Environment.hh"

namespace sdk
{

template <typename InputTuple>
class ReactionInput;

template <typename Fn, typename InputTuple, typename DependencyTuple, typename OutputTuple>
class Reaction;

template<typename T>
class Input;

template<typename T>
class Output;

template<typename T>
class MultiportOutput;

template<typename T>
class MultiportInput;

class Timer;

template <typename T>
struct trigger_value_type;

template <typename T>
struct trigger_value_type<reactor::LogicalAction<T> *>
{
using type = reactor::LogicalAction<T>&;
};

template <typename T>
struct trigger_value_type<Input<T> *>
{
using type = Input<T>&;
};

template <typename T>
struct trigger_value_type<MultiportInput<T> *>
{
using type = MultiportInput<T>&;
};

template <typename T>
struct trigger_value_type<MultiportOutput<T> *>
{
using type = MultiportOutput<T>&;
};

template <typename T>
struct trigger_value_type<Output<T> *>
{
using type = Output<T>&;
};

template <>
struct trigger_value_type<reactor::StartupTrigger *>
{
using type = reactor::StartupTrigger&;
};

template <>
struct trigger_value_type<reactor::ShutdownTrigger *>
{
using type = reactor::ShutdownTrigger&;
};

template <>
struct trigger_value_type<Timer *>
{
using type = Timer&;
};

class Reactor : public reactor::Reactor
{
protected:
reactor::StartupTrigger startup{"startup", this};
reactor::ShutdownTrigger shutdown{"shutdown", this};

private:
size_t bank_index_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see so this is so every reactor knows what index he has inside a bank? (Like the example I gave couple of months ago?)

SystemParameterBase *p_param = nullptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/p_param/system_parameters_/

Environment *env{nullptr};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate. You can get it via the environment() function.

/env/env_/ (https://google.github.io/styleguide/cppguide.html)

Reactor *parent{nullptr};
std::unordered_map<std::string, std::shared_ptr<ReactionBase>> reaction_map;
ReactionBase *reaction_internals_;
int priority = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the priority field?

std::set<Reactor*> child_reactors;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inside the Reactor class you have the reactor_ field which keeps track of the child reactors.

std::string homog_name = "";

void add_child(Reactor* reactor);
void add_to_reaction_map (std::string &name, std::shared_ptr<ReactionBase> reaction);
int get_priority() { return priority++;}

template <typename Fn, typename... InputTriggers, typename... Dependencies, typename... OutputTriggers>
void validate_reaction(Fn func, std::tuple<InputTriggers...> inputs, std::tuple<Dependencies...> deps, std::tuple<OutputTriggers...> outputs) {
(void)func;
(void)inputs;
(void)deps;
(void)outputs;
static_assert(
std::is_invocable_v<
Fn,
typename trigger_value_type<InputTriggers>::type...,
typename trigger_value_type<Dependencies>::type...,
typename trigger_value_type<OutputTriggers>::type...
>,
"Reaction function parameters must match the declared input and output types.");
}

void populate_params(std::set<std::string> &types, std::map<std::string, std::string> &homog_map_entries, std::map<std::string, std::string> &hetero_map_entries);

public:
const size_t &bank_index = bank_index_;

Reactor(const std::string &name, Environment *env);
Reactor(const std::string &name, Reactor *container);

void add_reaction_internals (ReactionBase* internals) {
reaction_internals_ = internals;
}

static std::string BankName(const std::string& name);
static std::string HomogName(const std::string& name);

void set_param (SystemParameterBase *param) { p_param = param; }

Environment *get_env() { return env; }

auto homog_fqn() const noexcept -> const std::string& { return homog_name; }

virtual void construction() = 0;
virtual void wiring() = 0;
void construct() override;
void assemble() override;

template <typename Fn, typename InputTuple, typename DependencyTuple, typename OutputTuple>
friend class Reaction;

template <typename ReactorType>
friend class ReactorBank;

friend class Environment;
};

} // namespace sdk
Loading