Skip to content

Commit

Permalink
Log OBS calls duration + various improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
avoitenko-logitech committed Jul 26, 2024
1 parent 71ed172 commit eb95cd7
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 41 deletions.
20 changes: 12 additions & 8 deletions include/ipc-client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@
#include "ipc.hpp"
#include "ipc-socket.hpp"

typedef void (*call_return_t)(void *data, const std::vector<ipc::value> &rval);
typedef void (*call_return_t)(void *data, const std::vector<ipc::value> &rval, std::chrono::high_resolution_clock::duration obs_call_duration);
extern call_return_t g_fn;
extern void *g_data;
extern int64_t g_cbid;

typedef void (*call_on_freez_t)(bool freez_detected, std::string app_state_path, std::string call_name, int timeout);
typedef void (*call_on_freeze_t)(const std::string& app_state_path, const std::string& call_name, int total_time, int obs_time);

namespace ipc {
class client {
public:
using call_on_disconnect_t = std::function<void()>;

client(){};
virtual ~client(){};

// |disconnectionCallback| is called when the server disconnection is detected.
// If the callback is not set or you use the other constructor,
// the client will just call |exit(1)| when the server disconnects.
static std::shared_ptr<client> create(const std::string &socketPath, call_on_disconnect_t disconnectionCallback);

static std::shared_ptr<client> create(std::string socketPath);
client(){};
virtual ~client(){};

// Stop all internal threads and the background disconnection detection.
// Call this if you do not plan to use the object anymore
Expand All @@ -55,9 +55,13 @@ class client {

virtual std::vector<ipc::value> call_synchronous_helper(const std::string &cname, const std::string &fname, const std::vector<ipc::value> &args) = 0;

void set_freeze_callback(call_on_freeze_t cb, std::string app_state);

protected:
std::string m_app_state_path;
call_on_freeze_t m_freeze_cb = nullptr;

private:
std::atomic_bool m_shutting_down = false;
call_on_freez_t freez_cb = nullptr;
std::string app_state_path;
void set_freez_callback(call_on_freez_t cb, std::string app_state);
};
}
8 changes: 5 additions & 3 deletions include/ipc-server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
#include "ipc.hpp"
#include "ipc-class.hpp"
#include "ipc-server-instance.hpp"
#include "ipc-socket.hpp"

#include <chrono>
#include <functional>
#include <list>
#include <map>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <functional>
#include "ipc-socket.hpp"

namespace ipc {
class server_instance;
Expand Down Expand Up @@ -111,7 +113,7 @@ class server {

public: // Client -> Server
bool client_call_function(int64_t cid, const std::string &cname, const std::string &fname, std::vector<ipc::value> &args, std::vector<ipc::value> &rval,
std::string &errormsg);
std::string &errormsg, std::chrono::high_resolution_clock::duration& call_duration);

friend class server_instance;
};
Expand Down
1 change: 1 addition & 0 deletions include/ipc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ struct function_call {

struct function_reply {
ipc::value uid = ipc::value(0ull);
ipc::value obs_call_duration_ms = ipc::value(std::uint32_t(0u));
std::vector<ipc::value> values;
ipc::value error = ipc::value("");

Expand Down
2 changes: 1 addition & 1 deletion source/apple/ipc-client-osx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,4 @@ bool ipc::client_osx::cancel(int64_t const &id)
return m_cb.erase(id) != 0;
}

void ipc::client::set_freez_callback(call_on_freez_t cb, std::string app_state) {}
void ipc::client::set_freeze_callback(call_on_freeze_t cb, std::string app_state) {}
3 changes: 2 additions & 1 deletion source/apple/ipc-server-instance-osx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ void ipc::server_instance_osx::worker_rep()
msgs.pop();
msg_mtx.unlock();

std::chrono::high_resolution_clock::duration call_duration;
proc_rval.resize(0);
success = m_parent->client_call_function(m_clientId, fnc_call_msg.class_name.value_str, fnc_call_msg.function_name.value_str,
fnc_call_msg.arguments, proc_rval, proc_error);
fnc_call_msg.arguments, proc_rval, proc_error, call_duration);

// Set
fnc_reply_msg.uid = fnc_call_msg.uid;
Expand Down
4 changes: 3 additions & 1 deletion source/ipc-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ bool ipc::server::register_collection(std::shared_ptr<ipc::collection> cls)
}

bool ipc::server::client_call_function(int64_t cid, const std::string &cname, const std::string &fname, std::vector<ipc::value> &args,
std::vector<ipc::value> &rval, std::string &errormsg)
std::vector<ipc::value> &rval, std::string &errormsg, std::chrono::high_resolution_clock::duration& call_duration)
{
if (m_classes.count(cname) == 0) {
errormsg = "Class '" + cname + "' is not registered.";
Expand All @@ -279,7 +279,9 @@ bool ipc::server::client_call_function(int64_t cid, const std::string &cname, co
m_preCallback.first(cname, fname, args, m_preCallback.second);
}

const auto start = std::chrono::high_resolution_clock::now();
fnc->call(cid, args, rval);
call_duration = std::chrono::high_resolution_clock::now() - start;

if (m_postCallback.first) {
m_postCallback.first(cname, fname, rval, m_postCallback.second);
Expand Down
7 changes: 5 additions & 2 deletions source/ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ size_t ipc::message::function_call::deserialize(std::vector<char> &buf, size_t o
size_t ipc::message::function_reply::size()
{
size_t size = sizeof(size_t) + uid.size() /* timestamp */
+ error.size() /* error */
+ sizeof(uint32_t) /* values */;
+ obs_call_duration_ms.size() /* call duration */
+ error.size() /* error */
+ sizeof(uint32_t) /* values */;

for (ipc::value &v : values) {
size += v.size();
Expand All @@ -213,6 +214,7 @@ size_t ipc::message::function_reply::serialize(std::vector<char> &buf, size_t of
noffset += sizeof(size_t);

noffset += uid.serialize(buf, noffset);
noffset += obs_call_duration_ms.serialize(buf, noffset);
noffset += error.serialize(buf, noffset);

reinterpret_cast<uint32_t &>(buf[noffset]) = (uint32_t)this->values.size();
Expand All @@ -238,6 +240,7 @@ size_t ipc::message::function_reply::deserialize(std::vector<char> &buf, size_t
size_t noffset = offset + sizeof(size_t);

noffset += uid.deserialize(buf, noffset);
noffset += obs_call_duration_ms.deserialize(buf, noffset);
noffset += error.deserialize(buf, noffset);

uint32_t cnt = reinterpret_cast<uint32_t &>(buf[noffset]);
Expand Down
56 changes: 32 additions & 24 deletions source/windows/ipc-client-win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ call_return_t g_fn = NULL;
void *g_data = NULL;
int64_t g_cbid = NULL;

static const auto freeze_timeout = std::chrono::seconds(15);

using namespace std::placeholders;

std::shared_ptr<ipc::client> ipc::client::create(const std::string &socketPath, call_on_disconnect_t disconnectionCallback)
Expand Down Expand Up @@ -101,9 +103,9 @@ bool ipc::client_win::call(const std::string &cname, const std::string &fname, s
return false;
}

while ((ec = write_op->wait(std::chrono::seconds(15))) == os::error::TimedOut) {
if (freez_cb)
freez_cb(false, app_state_path, cname + "::" + fname + " sync", 15000);
while ((ec = write_op->wait(freeze_timeout)) == os::error::TimedOut) {
if (m_freeze_cb)
m_freeze_cb(m_app_state_path, cname + "::" + fname + " sync", 15000, -1);
}

if (ec != os::error::Success) {
Expand All @@ -124,15 +126,17 @@ std::vector<ipc::value> ipc::client_win::call_synchronous_helper(const std::stri
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();

std::vector<ipc::value> values;
std::chrono::high_resolution_clock::duration obs_call_duration = std::chrono::milliseconds(-1);
} cd;

auto cb = [](void *data, const std::vector<ipc::value> &rval) {
auto cb = [](void *data, const std::vector<ipc::value> &rval, std::chrono::high_resolution_clock::duration obs_call_duration) {
CallData &cd = *static_cast<CallData *>(data);

// This copies the data off of the reply thread to the main thread.
cd.values.reserve(rval.size());
std::copy(rval.begin(), rval.end(), std::back_inserter(cd.values));

cd.obs_call_duration = obs_call_duration;
cd.called = true;
cd.sgn->signal();
};
Expand All @@ -143,34 +147,38 @@ std::vector<ipc::value> ipc::client_win::call_synchronous_helper(const std::stri
return {};
}

static std::chrono::nanoseconds freez_timeout = std::chrono::seconds(1);
bool freez_flagged = false;
while (cd.sgn->wait(freez_timeout) == os::error::TimedOut) {
if (freez_flagged)
continue;
freez_flagged = true;

int t = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - cd.start).count();

if (freez_cb)
freez_cb(true, app_state_path, cname + "::" + fname, t);
static const auto long_call_timeout = std::chrono::milliseconds(100);
bool long_call_flagged = false;
bool freeze_flagged = false;
while (cd.sgn->wait(long_call_timeout) == os::error::TimedOut) {
long_call_flagged = true;

// Logging of probable freeze
const auto total_time = (std::chrono::high_resolution_clock::now() - cd.start);
if (!freeze_flagged && total_time > freeze_timeout) {
freeze_flagged = true;
m_freeze_cb(m_app_state_path, cname + "::" + fname, std::chrono::duration_cast<std::chrono::milliseconds>(total_time).count(), -1);
}
}
if (freez_flagged) {
int t = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - cd.start).count();
if (freez_cb)
freez_cb(false, app_state_path, cname + "::" + fname, t);

if (long_call_flagged) {
const int total_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - cd.start).count();
const int obs_time = std::chrono::duration_cast<std::chrono::milliseconds>(cd.obs_call_duration).count();
if (m_freeze_cb)
m_freeze_cb(m_app_state_path, cname + "::" + fname, total_time, obs_time);
}

if (!cd.called) {
cancel(cbid);
return {};
}
return std::move(cd.values);
}

void ipc::client::set_freez_callback(call_on_freez_t cb, std::string app_state)
void ipc::client::set_freeze_callback(call_on_freeze_t cb, std::string app_state)
{
freez_cb = cb;
app_state_path = app_state;
m_freeze_cb = cb;
m_app_state_path = app_state;
}

void ipc::client_win::worker()
Expand Down Expand Up @@ -214,7 +222,7 @@ void ipc::client_win::worker()
{
std::unique_lock<std::mutex> ulock(m_lock);
for (auto &cb : m_cb) {
cb.second.first(cb.second.second, proc_rval);
cb.second.first(cb.second.second, proc_rval, std::chrono::milliseconds(0));
}

m_cb.clear();
Expand Down Expand Up @@ -279,7 +287,7 @@ void ipc::client_win::read_callback_msg(os::error ec, size_t size)
}

// Call Callback
cb.first(cb.second, fnc_reply_msg.values);
cb.first(cb.second, fnc_reply_msg.values, std::chrono::milliseconds(fnc_reply_msg.obs_call_duration_ms.value_union.ui32));

// Remove cb entry
m_cb.erase(fnc_reply_msg.uid.value_union.ui64);
Expand Down
4 changes: 3 additions & 1 deletion source/windows/ipc-server-instance-win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,14 @@ void ipc::server_instance_win::read_callback_msg(os::error ec, size_t size)
}

// Execute
std::chrono::high_resolution_clock::duration call_duration;
proc_rval.resize(0);
success = m_parent->client_call_function(m_clientId, fnc_call_msg.class_name.value_str, fnc_call_msg.function_name.value_str, fnc_call_msg.arguments,
proc_rval, proc_error);
proc_rval, proc_error, call_duration);

// Set
fnc_reply_msg.uid = fnc_call_msg.uid;
fnc_reply_msg.obs_call_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(call_duration).count();
std::swap(proc_rval, fnc_reply_msg.values); // Fast "copy" of parameters.
if (!success) {
fnc_reply_msg.error = ipc::value(proc_error);
Expand Down

0 comments on commit eb95cd7

Please sign in to comment.