-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathPhxPush.cpp
121 lines (102 loc) · 3.15 KB
/
PhxPush.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "PhxPush.h"
#include "PhxChannel.h"
#include "PhxSocket.h"
#include <algorithm>
#include <chrono>
#include <future>
#include <thread>
PhxPush::PhxPush(std::shared_ptr<PhxChannel> channel,
const std::string& event,
nlohmann::json payload) {
this->channel = channel;
this->event = event;
this->payload = payload;
this->receivedResp = nullptr;
this->afterHook = nullptr;
this->sent = false;
}
void PhxPush::send() {
int64_t ref = this->channel->getSocket()->makeRef();
this->refEvent = this->channel->replyEventName(ref);
this->receivedResp = nullptr;
this->sent = false;
// FIXME: Should this be weak?
this->channel->onEvent(
this->refEvent, [this](nlohmann::json message, int64_t ref) {
this->receivedResp = message;
this->matchReceive(message);
this->cancelRefEvent();
this->cancelAfter();
});
this->startAfter();
this->sent = true;
// clang-format off
this->channel->getSocket()->push(
{ { "topic", this->channel->getTopic() },
{ "event", this->event },
{ "payload", this->payload },
{ "ref", ref }
});
// clang-format on
}
std::shared_ptr<PhxPush> PhxPush::onReceive(
const std::string& status, OnMessage callback) {
// receivedResp could actually be a std::string.
if (this->receivedResp.is_object()
&& this->receivedResp["status"] == status) {
callback(this->receivedResp);
}
this->recHooks.emplace_back(status, callback);
return this->shared_from_this();
}
std::shared_ptr<PhxPush> PhxPush::after(int ms, After callback) {
if (this->afterHook) {
// ERROR
}
this->afterInterval = ms;
this->afterHook = callback;
return this->shared_from_this();
}
void PhxPush::cancelRefEvent() {
this->channel->offEvent(this->refEvent);
}
void PhxPush::cancelAfter() {
if (!this->afterHook) {
return;
}
std::thread thread([this]() {
std::lock_guard<std::mutex> guard(this->afterTimerMutex);
this->shouldContinueAfterCallback = false;
});
thread.detach();
}
void PhxPush::startAfter() {
if (!this->afterHook) {
return;
}
// FIXME: Should this be weak?
int interval = this->afterInterval;
std::thread thread([this, interval]() {
// Use sleep_for to wait specified time (or sleep_until).
this->shouldContinueAfterCallback = true;
std::this_thread::sleep_for(std::chrono::seconds{ interval });
std::lock_guard<std::mutex> guard(this->afterTimerMutex);
if (this->shouldContinueAfterCallback) {
this->cancelRefEvent();
this->afterHook();
this->shouldContinueAfterCallback = false;
}
});
thread.detach();
}
void PhxPush::matchReceive(nlohmann::json payload) {
for (int i = 0; i < this->recHooks.size(); i++) {
std::tuple<std::string, OnMessage> tuple = this->recHooks.at(i);
if (std::get<0>(tuple) == payload["status"]) {
std::get<1>(tuple)(payload["response"]);
}
}
}
void PhxPush::setPayload(nlohmann::json payload) {
this->payload = payload;
}