Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions Plugins/nosReflect/Nodes/FrameRateConverter.nosnode
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"nodes": [
{
"class_name": "FrameRateConverter",
"menu_info": {
"category": "Type",
"display_name": "Frame Rate Converter",
"name_aliases": [
"circular buffer",
"circular queue",
"ring buffer"
]
},
"node": {
"name": "FrameRateConverter",
"display_name": "Frame Rate Converter",
"description": "Features a ring buffer that outputs data at a different rate than input rate.",
"contents_type": "Job",
"pins": [
{
"name": "Thread",
"type_name": "nos.exe",
"show_as": "INPUT_PIN",
"can_show_as": "INPUT_PIN_ONLY"
},
{
"name": "Input",
"type_name": "nos.Generic",
"show_as": "INPUT_PIN",
"can_show_as": "INPUT_PIN_ONLY"
},
{
"name": "Capacity",
"type_name": "uint",
"show_as": "PROPERTY",
"can_show_as": "INPUT_PIN_OR_PROPERTY",
"description": "Number of frames to buffer. Will be larger if Ratio requires it.",
"data": 2,
"min": 1
},
{
"name": "Output",
"type_name": "nos.Generic",
"show_as": "OUTPUT_PIN",
"can_show_as": "OUTPUT_PIN_ONLY",
"live": true
},
{
"name": "Ratio",
"type_name": "nos.fb.vec2u",
"show_as": "PROPERTY",
"can_show_as": "INPUT_PIN_OR_PROPERTY",
"data": {
"x": 1,
"y": 1
},
"min": {
"x": 1,
"y": 1
}
}
]
}
}
],
"schema_version": "1.4-v1"
}
13 changes: 11 additions & 2 deletions Plugins/nosReflect/Source/BoundedQueueNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@
namespace nos::reflect
{

using CopyingBoundedQueueNode = RingBufferNodeBase<CopyingSlot, ServeMode::Immediate>;
using BoundedObjectQueueNode = RingBufferNodeBase<ObjectSlot, ServeMode::Immediate>;
struct CopyingBoundedQueueNode : RingBufferNodeBase<CopyingSlot>
{
CopyingBoundedQueueNode()
: RingBufferNodeBase(RingBufferServeMode::ServeImmediately) {}
};

struct BoundedObjectQueueNode : RingBufferNodeBase<ObjectSlot>
{
BoundedObjectQueueNode()
: RingBufferNodeBase(RingBufferServeMode::ServeImmediately) {}
};

nosResult RegisterCopyingBoundedQueue(nosNodeFunctions* funcs)
{
Expand Down
308 changes: 308 additions & 0 deletions Plugins/nosReflect/Source/FrameRateConverter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
#include "RingBuffer.hpp"

namespace nos::reflect
{

struct FrameRateConverterNode : NodeContext
{
struct Slot
{
ObjectRef Object;
uint64_t FrameNumber = 0;
};

nos::Name TypeName = NSN_TypeNameGeneric;

RingBuffer<Slot> Ring;
uint32_t Capacity = 1;
uint32_t EffectiveCapacity = 1;
uint32_t RemainingRepeatCount = 0;
fb::vec2u Ratio = {1, 1};

enum class StatusType
{
Ratio,
Capacity,
};

std::unordered_map<StatusType, fb::TNodeStatusMessage> StatusMessages;

bool CapacityUpdatedViaPathCommand = false;

FrameRateConverterNode() : Ring(1, RingBufferServeMode::WaitUntilFull)
{
Ring.Reset(EffectiveCapacity);
AddPinValueWatcher<uint32_t>(NOS_NAME("Capacity"), std::bind(&FrameRateConverterNode::OnCapacityPinValueChanged, this, std::placeholders::_1, std::placeholders::_2));
AddPinValueWatcher<fb::vec2u>(NOS_NAME("Ratio"), std::bind(&FrameRateConverterNode::OnRatioPinValueChanged, this, std::placeholders::_1, std::placeholders::_2));
}

void OnRatioPinValueChanged(fb::vec2u const* newRatio, std::optional<fb::vec2u const*> oldRatio)
{
if (*newRatio == Ratio)
return;
if (newRatio->x() == 0 || newRatio->y() == 0)
{
nosEngine.LogW("%s: Ratio components cannot be 0.", GetItemPath(NodeId).value_or("<unknown>").c_str());
SetPinValue(NOS_NAME("Ratio"), Ratio);
return;
}
Ratio = *newRatio;
auto lcm = std::lcm(Ratio.x(), Ratio.y());
auto requiredEffectiveCapacity = lcm * Capacity;
if (requiredEffectiveCapacity != EffectiveCapacity)
{
EffectiveCapacity = requiredEffectiveCapacity;
}
SendPathRestart(NodeId);
}

void OnCapacityPinValueChanged(uint32_t const* newCapacity, std::optional<uint32_t const*> oldCapacity)
{
if (*newCapacity == Capacity)
return;
Capacity = std::max(1u, *newCapacity);
EffectiveCapacity = std::lcm(Ratio.x(), Ratio.y()) * Capacity;
if (*newCapacity != Capacity)
{
nosEngine.LogW("%s: Capacity cannot be %u.",
GetItemPath(NodeId).value_or("<unknown>").c_str(),
*newCapacity);
SetPinValue(NOS_NAME("Capacity"), Capacity);
return;
}
if (!CapacityUpdatedViaPathCommand)
{
nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity};
nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange);
CapacityUpdatedViaPathCommand = false;
}
SendPathRestart(NSN_Input);
}

void SendRingStats(const char* state) const
{
auto nodeName = NodeName.AsString();
nosEngine.WatchLog((nodeName + " Size").c_str(), std::to_string(Ring.GetSize()).c_str());
nosEngine.WatchLog((nodeName + " Capacity").c_str(), std::to_string(Ring.GetCapacity()).c_str());
nosEngine.WatchLog((nodeName + " State").c_str(), state);
}

void OnPathStart() override
{
nosEngine.LogD("%s: Effective Capacity set to %u", GetItemPath(NodeId).value_or("<unknown>").c_str(), EffectiveCapacity);
SetStatus(StatusType::Capacity,
"Capacity: " + std::to_string(Capacity) + " (Effective: " + std::to_string(EffectiveCapacity) + ")",
fb::NodeStatusMessageType::INFO);
Ring.Reset(EffectiveCapacity);
RemainingRepeatCount = Capacity - 1;
SendScheduleRequest(Capacity);
}

void OnPathStop() override { Ring.Shutdown(); }

nosResult OnCreate(nosFbNodePtr node) override
{
for (auto pin : *node->pins())
{
auto name = nos::Name(pin->name()->c_str());
if (NSN_Output == name)
{
if (pin->type_name()->c_str() == NSN_TypeNameGeneric.AsString())
continue;
SetType(nos::Name(pin->type_name()->c_str()));
}
}
return NOS_RESULT_SUCCESS;
}

nosResult CopyFrom(nosCopyFromInfo* cpy) override
{
SendRingStats("Pre Begin Pop");
if (Ring.GetMode() == RingBufferServeMode::WaitUntilFull)
{
if (RemainingRepeatCount > 0)
{
--RemainingRepeatCount;
return NOS_RESULT_SUCCESS;
}
}
std::vector<ObjectRef> outputObjectRefs;
uint64_t frameNumber = 0;
uint32_t popCount = Ratio.y();
std::optional<std::vector<Slot*>> maybeSrcSlots;
{
ScopedProfilerEvent _({.Name = "Wait For Read"});
maybeSrcSlots = Ring.BeginPop(popCount, 100);
}
if (maybeSrcSlots)
{
auto& srcSlots = *maybeSrcSlots;
for (auto& srcSlot : srcSlots)
outputObjectRefs.push_back(std::move(srcSlot->Object));
frameNumber = srcSlots[0]->FrameNumber;
Ring.EndPop(popCount);
SendRingStats("Post Begin Pop");
}
else if (Ring.IsShuttingDown())
{
return NOS_RESULT_FAILED;
}
else
{
// Timeout
return NOS_RESULT_PENDING;
}
if (!outputObjectRefs.empty())
{
// Convert ObjectRefs to IDs for the API call
std::vector<nosObjectId> outputObjects;
outputObjects.reserve(outputObjectRefs.size());
for (const auto& ref : outputObjectRefs)
outputObjects.push_back(ref.GetObjectId());

ObjectRef outputArrayObject;
auto res = nosEngine.ObjectAPI->CreateArrayObject(
TypeName, outputObjects.data(), outputObjects.size(), &outputArrayObject.GetStorage());
if (res != NOS_RESULT_SUCCESS)
return res;
SetPinObject(NSN_Output, outputArrayObject);
cpy->ShouldSetSourceFrameNumber = true;
cpy->FrameNumber = frameNumber;
SendScheduleRequest(1);
return NOS_RESULT_SUCCESS;
}
return NOS_RESULT_PENDING;
}

void OnPathCommand(const nosPathCommand* command) override
{
switch (command->Event)
{
case NOS_RING_SIZE_CHANGE: {
if (command->RingSize == 0)
{
nosEngine.LogW((GetDisplayName() + " capacity cannot be 0.").c_str());
return;
}
CapacityUpdatedViaPathCommand = true;
SetPinValue(NOS_NAME("Capacity"), command->RingSize);
break;
}
default: return;
}
}

nosResult OnResolvePinDataTypes(nosResolvePinDataTypesParams* params) override
{
TypeInfo typeInfo(params->IncomingTypeName);
if (typeInfo->BaseType != NOS_BASE_TYPE_ARRAY)
{
strncpy(params->OutErrorMessage, "Connected pin type must be an array type!", 42);
return NOS_RESULT_FAILED;
}
return NOS_RESULT_SUCCESS;
}

void OnPinUpdated(nosPinUpdate const* update) override
{
if (TypeName != NSN_TypeNameGeneric)
return;
if (update->UpdatedField == NOS_PIN_FIELD_TYPE_NAME)
{
if (update->PinName != NSN_Input)
return;
SetType(update->TypeName);
}
}

void SetType(nos::Name typeName)
{
TypeName = typeName;
SendPathRestart(NodeId);
}

nosResult ExecuteNode(NodeExecuteParams const& params) override
{
if (NSN_TypeNameGeneric == TypeName)
return NOS_RESULT_FAILED;
ArrayObjectRef inputArrayObject = params.GetPinObject(NSN_Input);
if (!inputArrayObject)
return NOS_RESULT_FAILURE;

SendRingStats("Pre Push");
uint32_t pushCount = Ratio.x();
std::optional<std::vector<Slot*>> maybeDstSlots;
{
ScopedProfilerEvent _({.Name = "Wait For Empty Slot"});
maybeDstSlots = Ring.BeginPush(pushCount, 100);
}
if (maybeDstSlots)
{
auto& dstSlots = *maybeDstSlots;
auto inSize = inputArrayObject.GetSize();
if (inSize != pushCount)
{
SetStatus(StatusType::Ratio,
"Input array size (" + std::to_string(inSize) + ") does not match required input size (" +
std::to_string(pushCount) + ")!",
fb::NodeStatusMessageType::FAILURE);
Ring.EndPush(pushCount);
SendRingStats("Post Push");
return NOS_RESULT_FAILED;
}
// TODO: Maybe a more understandable message here?
SetStatus(StatusType::Ratio, "In " + std::to_string(Ratio.x()) + ":" + std::to_string(Ratio.y()) + " Out", fb::NodeStatusMessageType::INFO);
uint32_t i = 0;
for (auto& elem : inputArrayObject)
{
auto& dstSlot = dstSlots[i++];
dstSlot->Object = elem;
dstSlot->FrameNumber = params.FrameNumber;
}
Ring.EndPush(pushCount);
SendRingStats("Post Push");
return NOS_RESULT_SUCCESS;
}
if (Ring.IsShuttingDown())
return NOS_RESULT_FAILED;
// Timeout
return NOS_RESULT_PENDING;
}

void SetStatus(StatusType statusType, std::string const& message, fb::NodeStatusMessageType messageType)
{
auto msg = fb::TNodeStatusMessage{{}, message, messageType};
if (StatusMessages[statusType] != msg)
{
StatusMessages[statusType] = msg;
UpdateStatus();
}
}

void ClearStatus(StatusType statusType)
{
auto it = StatusMessages.find(statusType);
if (it != StatusMessages.end())
{
StatusMessages.erase(it);
UpdateStatus();
}
}

void UpdateStatus()
{
ClearNodeStatusMessages();
std::vector<fb::TNodeStatusMessage> messages;
for (auto const& [_, msg] : StatusMessages)
messages.push_back(msg);
SetNodeStatusMessages(messages);
}
};

nosResult RegisterFrameRateConverter(nosNodeFunctions* node)
{
NOS_BIND_NODE_CLASS(NOS_NAME("FrameRateConverter"), FrameRateConverterNode, node)
return NOS_RESULT_SUCCESS;
}

} // namespace nos::reflect
Loading