Skip to content

Commit

Permalink
Add template type on WFTaskFactory::send_by_name()/signal_by_name(). (#…
Browse files Browse the repository at this point in the history
…1677)

* Add template type on WFTaskFactory::send_by_name()/signal_by_name().

* Remove unused parameter name.
  • Loading branch information
Barenboim authored Jan 3, 2025
1 parent d791011 commit 5fbe73f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 44 deletions.
5 changes: 3 additions & 2 deletions docs/about-conditional.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ public:
static WFConditional *create_conditional(const std::string& cond_name, SubTask *task, void **msgbuf);
static int signal_by_name(const std::string& cond_name, void *msg);
static int signal_by_name(const std::string& cond_name, void *msg, size_t max);
static int signal_by_name(const std::string& cond_name, void *const msg[], size_t max);
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[], size_t max);
};
~~~
我们看到,与普通条件任务唯一区别是,命名条件任务创建时,需要传入一个cond_name。
而signal_by_name()接口,默认将msg发送到所有在这个名称上等待的条件任务,将它们全部唤醒。
也可以通过max参数指定唤醒的最大任务数。此时,msg还可以是一个数组,可给不同的条件任务发送不同的消息。
也可以通过max参数指定唤醒的最大任务数。此时,msg还可以是一个指针数组,可给不同的条件任务发送不同的消息。
任何一个signal_by_name的重载函数,其返回值都是表示实际唤醒的条件任务个数。
这就相当于实现了观察者模式。

Expand Down
2 changes: 1 addition & 1 deletion src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
snprintf(name, 64, "%p.cgroup", member);
member->mutex.unlock();

WFTaskFactory::signal_by_name(name, (void *)NULL, max);
WFTaskFactory::signal_by_name(name, NULL, max);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions src/factory/WFTaskFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ int WFTaskFactory::send_by_name(const std::string& name, void *msg,
return __mailbox_map.send(name, &msg, max, 0);
}

template<>
int WFTaskFactory::send_by_name(const std::string& name, void *const msg[],
size_t max)
{
Expand Down Expand Up @@ -903,6 +904,7 @@ int WFTaskFactory::signal_by_name(const std::string& name, void *msg,
return __conditional_map.signal(name, &msg, max, 0);
}

template<>
int WFTaskFactory::signal_by_name(const std::string& name, void *const msg[],
size_t max)
{
Expand Down
41 changes: 5 additions & 36 deletions src/factory/WFTaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <time.h>
#include <stdint.h>
#include <utility>
#include <functional>
#include <openssl/ssl.h>
Expand Down Expand Up @@ -254,8 +253,7 @@ class WFTaskFactory

/* Count by a counter's name. When count_by_name(), it's safe to count
* exceeding target_value. When multiple counters share a same name,
* this operation will be performed on the first created. If no counter
* matches the name, nothing is performed. */
* this operation will be performed on the first created. */
static int count_by_name(const std::string& counter_name)
{
return WFTaskFactory::count_by_name(counter_name, 1);
Expand Down Expand Up @@ -296,7 +294,8 @@ class WFTaskFactory
static int send_by_name(const std::string& mailbox_name, void *msg,
size_t max);

static int send_by_name(const std::string& mailbox_name, void *const msg[],
template<typename T>
static int send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max);

public:
Expand Down Expand Up @@ -331,7 +330,8 @@ class WFTaskFactory
static int signal_by_name(const std::string& cond_name, void *msg,
size_t max);

static int signal_by_name(const std::string& cond_name, void *const msg[],
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[],
size_t max);

public:
Expand Down Expand Up @@ -409,37 +409,6 @@ class WFTaskFactory
task->sub_series()->set_last_task(last);
return task;
}

private:
/* Some compilers don't declare 'nullptr_t' although required by C++11. */
using nullptr_t = std::nullptr_t;

public:
/* The following functions are for overload resolution only. */

static int send_by_name(const std::string& mailbox_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)msg, max);
}

static int send_by_name(const std::string& mailbox_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)0, max);
}

static int signal_by_name(const std::string& cond_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)msg, max);
}

static int signal_by_name(const std::string& cond_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)0, max);
}
};

template<class REQ, class RESP>
Expand Down
30 changes: 25 additions & 5 deletions src/factory/WFTaskFactory.inl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ WFTaskFactory::create_dynamic_task(dynamic_create_t create)
return new __WFDynamicTask(std::move(create));
}

template<>
int WFTaskFactory::send_by_name(const std::string&, void *const *, size_t);

template<typename T>
int WFTaskFactory::send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *const *)msg, max);
}

template<>
int WFTaskFactory::signal_by_name(const std::string&, void *const *, size_t);

template<typename T>
int WFTaskFactory::signal_by_name(const std::string& cond_name, T *const msg[],
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *const *)msg, max);
}

template<class REQ, class RESP, typename CTX = bool>
class WFComplexClientTask : public WFClientTask<REQ, RESP>
{
Expand Down Expand Up @@ -709,7 +729,7 @@ void WFTaskFactory::reset_go_task(WFGoTask *task, FUNC&& func, ARGS&&... args)

template<> inline
WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(WFGlobal::get_exec_queue(queue_name),
WFGlobal::get_compute_executor(),
Expand All @@ -719,7 +739,7 @@ WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds,
WFGlobal::get_exec_queue(queue_name),
Expand All @@ -729,21 +749,21 @@ WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,

template<> inline
WFGoTask *WFTaskFactory::create_go_task(ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(queue, executor, nullptr);
}

template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds, queue, executor, nullptr);
}

template<> inline
void WFTaskFactory::reset_go_task(WFGoTask *task, nullptr_t&& func)
void WFTaskFactory::reset_go_task(WFGoTask *task, std::nullptr_t&&)
{
((__WFGoTask *)task)->set_go_func(nullptr);
}
Expand Down

0 comments on commit 5fbe73f

Please sign in to comment.