Skip to content

Commit 6fd1434

Browse files
committed
Add tx_recv
1 parent 3834f4b commit 6fd1434

File tree

2 files changed

+125
-1
lines changed

2 files changed

+125
-1
lines changed

cpp/examples/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ foreach(example
6161
service_bus
6262
multithreaded_client
6363
multithreaded_client_flow_control
64-
tx_send)
64+
tx_send
65+
tx_recv)
6566
add_executable(${example} ${example}.cpp)
6667
target_link_libraries(${example} Proton::cpp Threads::Threads)
6768
endforeach()

cpp/examples/tx_recv.cpp

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
#include "options.hpp"
23+
24+
#include <proton/connection.hpp>
25+
#include <proton/container.hpp>
26+
#include <proton/message.hpp>
27+
#include <proton/message_id.hpp>
28+
#include <proton/messaging_handler.hpp>
29+
#include <proton/types.hpp>
30+
#include <proton/transaction.hpp>
31+
32+
#include <iostream>
33+
#include <map>
34+
#include <string>
35+
36+
#include <chrono>
37+
#include <thread>
38+
39+
class tx_recv : public proton::messaging_handler, proton::transaction_handler {
40+
private:
41+
proton::receiver receiver;
42+
std::string url;
43+
int expected;
44+
int batch_size;
45+
int current_batch = 0;
46+
int committed = 0;
47+
48+
proton::session session;
49+
public:
50+
tx_recv(const std::string &s, int c, int b):
51+
url(s), expected(c), batch_size(b) {}
52+
53+
void on_container_start(proton::container &c) override {
54+
receiver = c.open_receiver(url);
55+
}
56+
57+
void on_session_open(proton::session &s) override {
58+
session = s;
59+
std::cout << "Session open, declare_txn" << std::endl;
60+
s.declare_transaction(*this);
61+
}
62+
63+
void on_transaction_declare_failed(proton::session) {}
64+
void on_transaction_commit_failed(proton::session s) {
65+
std::cout << "Transaction Commit Failed" << std::endl;
66+
s.connection().close();
67+
exit(-1);
68+
}
69+
70+
void on_transaction_declared(proton::session s) override {
71+
std::cout << "Transaction is declared!" << (&s)
72+
<< std::endl;
73+
receiver.add_credit(batch_size);
74+
}
75+
76+
void on_message(proton::delivery &d, proton::message &msg) override {
77+
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
78+
session.txn_accept(d);
79+
current_batch += 1;
80+
if(current_batch == batch_size) {
81+
}
82+
}
83+
84+
void on_transaction_committed(proton::session s) override {
85+
committed += current_batch;
86+
current_batch = 0;
87+
std::cout<<"Transaction Committed:"<< committed<< std::endl;
88+
if(committed == expected) {
89+
std::cout << "All messages committed" << std::endl;
90+
s.connection().close();
91+
}
92+
else {
93+
s.declare_transaction(*this);
94+
}
95+
}
96+
97+
};
98+
99+
int main(int argc, char **argv) {
100+
std::string address("127.0.0.1:5672/examples");
101+
int message_count = 6;
102+
int batch_size = 3;
103+
example::options opts(argc, argv);
104+
105+
opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
106+
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT");
107+
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE");
108+
109+
try {
110+
opts.parse();
111+
112+
tx_recv recv(address, message_count, batch_size);
113+
proton::container(recv).run();
114+
115+
return 0;
116+
} catch (const example::bad_option& e) {
117+
std::cout << opts << std::endl << e.what() << std::endl;
118+
} catch (const std::exception& e) {
119+
std::cerr << e.what() << std::endl;
120+
}
121+
122+
return 1;
123+
}

0 commit comments

Comments
 (0)