@@ -82,6 +82,56 @@ class RpcDecoder {
8282 return send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ()) == packer.size ();
8383 }
8484
85+ MsgPack::str_t fetch_rpc_method (){
86+
87+ if (!packet_incoming ()){return " " ;}
88+
89+ if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {
90+ return " " ; // No RPC
91+ }
92+
93+ MsgPack::Unpacker unpacker;
94+
95+ unpacker.clear ();
96+ if (!unpacker.feed (_raw_buffer, _packet_size)) { // feed should not fail at this point
97+ consume (_packet_size);
98+ reset_packet ();
99+ return " " ;
100+ };
101+
102+ int msg_type;
103+ int msg_id;
104+ MsgPack::str_t method;
105+ MsgPack::arr_size_t req_size;
106+
107+ if (!unpacker.deserialize (req_size, msg_type)) {
108+ consume (_packet_size);
109+ reset_packet ();
110+ return " " ; // Header not unpackable
111+ }
112+
113+ if (msg_type == CALL_MSG && req_size.size () == REQUEST_SIZE) {
114+ if (!unpacker.deserialize (msg_id, method)) {
115+ consume (_packet_size);
116+ reset_packet ();
117+ return " " ; // Method not unpackable
118+ }
119+ } else if (msg_type == NOTIFY_MSG && req_size.size () == NOTIFY_SIZE) {
120+ if (!unpacker.deserialize (method)) {
121+ consume (_packet_size);
122+ reset_packet ();
123+ return " " ; // Method not unpackable
124+ }
125+ } else {
126+ consume (_packet_size);
127+ reset_packet ();
128+ return " " ; // Invalid request size/type
129+ }
130+
131+ return method;
132+
133+ }
134+
85135 size_t get_request (uint8_t * buffer, size_t buffer_size) {
86136
87137 if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {
@@ -151,6 +201,8 @@ class RpcDecoder {
151201
152202 inline size_t size () const {return _bytes_stored;}
153203
204+ friend class DecoderTester ;
205+
154206private:
155207 ITransport& _transport;
156208 uint8_t _raw_buffer[BufferSize];
@@ -163,8 +215,17 @@ class RpcDecoder {
163215
164216 inline bool buffer_empty () const { return _bytes_stored == 0 ;}
165217
218+ // This is a blocking send, under the assumption _transport.write will always succeed eventually
166219 inline size_t send (const uint8_t * data, const size_t size) {
167- return _transport.write (data, size);
220+
221+ size_t offset = 0 ;
222+
223+ while (offset < size) {
224+ size_t bytes_written = _transport.write (data + offset, size - offset);
225+ offset += bytes_written;
226+ }
227+
228+ return offset;
168229 }
169230
170231 size_t pop_packet (uint8_t * buffer, size_t buffer_size) {
@@ -188,22 +249,19 @@ class RpcDecoder {
188249 _packet_size = 0 ;
189250 }
190251
191- size_t consume (size_t size) {
192-
193- if (size > _bytes_stored) return 0 ;
194-
195- const size_t remaining_bytes = _bytes_stored - size;
196-
197- // Shift remaining data forward (manual memmove for compatibility)
198- for (size_t i = 0 ; i < remaining_bytes; i++) {
199- _raw_buffer[i] = _raw_buffer[size + i];
200- }
201-
202- _bytes_stored = remaining_bytes;
203-
204- return size;
252+ size_t consume (size_t size, size_t offset = 0 ) {
253+ // Boundary checks
254+ if (offset + size > _bytes_stored || size == 0 ) return 0 ;
255+
256+ size_t remaining_bytes = _bytes_stored - size;
257+ for (size_t i=offset; i<remaining_bytes; i++){
258+ _raw_buffer[i] = _raw_buffer[i+size];
205259 }
206260
261+ _bytes_stored = remaining_bytes;
262+ return size;
263+ }
264+
207265};
208266
209267#endif
0 commit comments