38
38
39
39
class tx_send : public proton ::messaging_handler, proton::transaction_handler {
40
40
private:
41
- proton::sender sender;
41
+ proton::sender sender;
42
42
std::string url;
43
43
int total;
44
44
int batch_size;
45
45
int sent;
46
46
int batch_index = 0 ;
47
47
int current_batch = 0 ;
48
48
int committed = 0 ;
49
- int confirmed = 0 ;
50
-
51
- proton::session session;
52
49
53
50
public:
54
51
tx_send (const std::string &s, int c, int b):
@@ -59,35 +56,34 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
59
56
}
60
57
61
58
void on_session_open (proton::session &s) override {
62
- session = s;
63
- std::cout << " [on_session_open] declare_txn started..." << std::endl;
59
+ std::cout << " New session is open, declaring transaction now..." << std::endl;
64
60
s.declare_transaction (*this );
65
- std::cout << " [on_session_open] declare_txn ended..." << std::endl;
66
61
}
67
62
68
- void on_transaction_declare_failed (proton::session) {}
63
+ void on_transaction_declare_failed (proton::session s) {
64
+ std::cout << " Transaction declarion failed" << std::endl;
65
+ s.connection ().close ();
66
+ exit (-1 );
67
+ }
68
+
69
69
void on_transaction_commit_failed (proton::session s) {
70
- std::cout << " Transaction Commit Failed " << std::endl;
70
+ std::cout << " Transaction commit failed! " << std::endl;
71
71
s.connection ().close ();
72
72
exit (-1 );
73
73
}
74
74
75
75
void on_transaction_declared (proton::session s) override {
76
- std::cout << " [on_transaction_declared] Session: " << (&s)
77
- << std::endl;
78
- std::cout << " [on_transaction_declared] txn is_empty " << (s.txn_is_empty ())
79
- << " \t " << std::endl;
80
- send (sender);
76
+ std::cout << " Transaction is declared" << std::endl;
77
+ send ();
81
78
}
82
79
83
- void on_sendable (proton::sender &s) override {
84
- std::cout << " [OnSendable] session: " << &session
85
- << std::endl;
86
- send (s);
80
+ void on_sendable (proton::sender&) override {
81
+ send ();
87
82
}
88
83
89
- void send (proton::sender &s ) {
84
+ void send () {
90
85
static int unique_id = 10000 ;
86
+ proton::session session = sender.session ();
91
87
while (session.txn_is_declared () && sender.credit () &&
92
88
(committed + current_batch) < total) {
93
89
proton::message msg;
@@ -96,47 +92,42 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
96
92
97
93
msg.id (unique_id++);
98
94
msg.body (m);
99
- std::cout << " ##### [example] transaction send msg: " << msg
100
- << std::endl;
95
+ std::cout << " Sending: " << msg << std::endl;
101
96
session.txn_send (sender, msg);
102
97
current_batch += 1 ;
103
98
if (current_batch == batch_size)
104
99
{
105
- std::cout << " >> Txn attempt commit" << std::endl;
106
100
if (batch_index % 2 == 0 ) {
101
+ std::cout << " Commiting transaction..." << std::endl;
107
102
session.txn_commit ();
108
103
} else {
104
+ std::cout << " Aborting transaction..." << std::endl;
109
105
session.txn_abort ();
110
- }
106
+ }
111
107
batch_index++;
112
108
}
113
109
}
114
110
}
115
111
116
- void on_tracker_accept (proton::tracker &t) override {
117
- confirmed += 1 ;
118
- std::cout << " [example] on_tracker_accept:" << confirmed
119
- << std::endl;
120
- }
121
-
122
112
void on_transaction_committed (proton::session s) override {
123
113
committed += current_batch;
124
114
current_batch = 0 ;
125
- std::cout<< " [OnTxnCommitted] Committed: " << committed << std::endl;
115
+ std::cout << " Transaction commited " << std::endl;
126
116
if (committed == total) {
127
- std::cout << " All messages committed" << std::endl;
117
+ std::cout << " All messages committed, closing connection. " << std::endl;
128
118
s.connection ().close ();
129
119
}
130
120
else {
131
- std::cout << " redlcaring txn " << std::endl;
132
- session .declare_transaction (*this );
121
+ std::cout << " Re-declaring transaction now... " << std::endl;
122
+ s .declare_transaction (*this );
133
123
}
134
124
}
135
125
136
126
void on_transaction_aborted (proton::session s) override {
137
- std::cout << " Meesages Aborted ....." << std::endl;
127
+ std::cout << " Transaction aborted!" << std::endl;
128
+ std::cout << " Re-delaring transaction now..." << std::endl;
138
129
current_batch = 0 ;
139
- session .declare_transaction (*this );
130
+ s .declare_transaction (*this );
140
131
}
141
132
142
133
void on_sender_close (proton::sender &s) override {
0 commit comments