Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Common/CRC32/pabb_CRC32_AVR8.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ extern const PROGMEM uint32_t PABB_CRC32_TABLE8[];

void pabb_crc32_buffer(uint32_t* crc, const void* data, uint8_t length);

static inline void pabb_crc32_write_to_message(void* data, size_t full_message_length){
static inline void pabb_crc32_write_to_message(uint32_t seed, void* data, size_t full_message_length){
char* ptr = (char*)data;
size_t length_before_crc = full_message_length - sizeof(uint32_t);
uint32_t* crc = (uint32_t*)(ptr + length_before_crc);
*crc = 0xffffffff;
*crc = seed;
pabb_crc32_buffer(crc, ptr, length_before_crc);
}

Expand Down
3 changes: 1 addition & 2 deletions Common/CRC32/pabb_CRC32_Basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ extern const uint32_t PABB_CRC32_TABLE8[];

void pabb_crc32_buffer(uint32_t* crc, const void* data, uint8_t length);

static inline void pabb_crc32_write_to_message(void* data, size_t full_message_length){
static inline void pabb_crc32_write_to_message(uint32_t crc, void* data, size_t full_message_length){
char* ptr = (char*)data;
size_t length_before_crc = full_message_length - sizeof(uint32_t);
uint32_t crc = 0xffffffff;
pabb_crc32_buffer(&crc, ptr, length_before_crc);
memcpy(ptr + length_before_crc, &crc, sizeof(uint32_t));
}
Expand Down
3 changes: 1 addition & 2 deletions Common/CRC32/pabb_CRC32_x86_SSE4.1.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ static inline void pabb_crc32_buffer(uint32_t* crc, const void* data, size_t len
*crc = tmp;
}

static inline void pabb_crc32_write_to_message(void* data, size_t full_message_length){
static inline void pabb_crc32_write_to_message(uint32_t crc, void* data, size_t full_message_length){
char* ptr = (char*)data;
size_t length_before_crc = full_message_length - sizeof(uint32_t);
uint32_t crc = 0xffffffff;
pabb_crc32_buffer(&crc, ptr, length_before_crc);
memcpy(ptr + length_before_crc, &crc, sizeof(uint32_t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "Common/Cpp/PrettyPrint.h"
//#include "Common/Cpp/Exceptions.h"
#include "Common/PABotBase2/PABotBase2CC_MessageDumper.h"
#include "CommonTools/Random.h"
//#include "PABotBase2_ConnectionDebug.h"
#include "PABotBase2CC_ReliableStreamConnection.h"

Expand All @@ -32,7 +33,7 @@ ReliableStreamConnection::ReliableStreamConnection(
, m_unreliable_connection(unreliable_connection)
, m_retransmit_timeout(retransmit_timeout)
, m_print_lock(print_lock)
, m_reliable_sender(*this, 20)
, m_reliable_sender(*this, 24, random_u32())
, m_log_everything(log_everything)
// , m_version_verified(false)
, m_remote_protocol_compatible(false)
Expand Down Expand Up @@ -123,7 +124,11 @@ void ReliableStreamConnection::on_recv(const void* data, size_t bytes){
cout << "ReliableStreamConnection::on_recv(): " << bytes << endl;
}
#endif
m_parser.push_bytes(*this, (const uint8_t*)data, bytes);
m_parser.push_bytes(
*this,
m_reliable_sender.session_id(),
(const uint8_t*)data, bytes
);
}


Expand Down Expand Up @@ -165,14 +170,22 @@ size_t ReliableStreamConnection::unreliable_send(const void* data, size_t bytes)
// Send Path
//

bool ReliableStreamConnection::reset(WallDuration timeout){
bool ReliableStreamConnection::reset(bool random_session_id, WallDuration timeout){
{
std::lock_guard<Mutex> lg(m_lock);
m_reliable_sender.reset();
if (random_session_id){
m_reliable_sender.reset(m_reliable_sender.session_id() + 1);
}else{
m_reliable_sender.reset(0xffffffff);
}
m_parser.reset();
m_stream_coalescer.reset();
throw_if_cancelled();
m_reliable_sender.send_packet(PABB2_CONNECTION_OPCODE_ASK_RESET, 0, nullptr);
if (random_session_id){
m_reliable_sender.send_reset();
}else{
m_reliable_sender.send_packet(PABB2_CONNECTION_OPCODE_ASK_RESET, 0, nullptr);
}
}
m_cv.notify_all();
return wait_for_pending(timeout);
Expand Down Expand Up @@ -217,7 +230,7 @@ void ReliableStreamConnection::send_ack(uint8_t seqnum, uint8_t opcode){
packet.header.seqnum = seqnum;
packet.header.packet_bytes = sizeof(packet);
packet.header.opcode = opcode;
pabb_crc32_write_to_message(&packet, sizeof(packet));
pabb_crc32_write_to_message(m_reliable_sender.session_id(), &packet, sizeof(packet));
unreliable_send(&packet, sizeof(packet));
}
void ReliableStreamConnection::send_ack_u16(uint8_t seqnum, uint8_t opcode, uint16_t data){
Expand All @@ -231,7 +244,7 @@ void ReliableStreamConnection::send_ack_u16(uint8_t seqnum, uint8_t opcode, uint
packet.header.packet_bytes = sizeof(packet);
packet.header.opcode = opcode;
packet.header.data = data;
pabb_crc32_write_to_message(&packet, sizeof(packet));
pabb_crc32_write_to_message(m_reliable_sender.session_id(), &packet, sizeof(packet));
unreliable_send(&packet, sizeof(packet));
}

Expand Down Expand Up @@ -376,6 +389,7 @@ void ReliableStreamConnection::on_packet(const PacketHeader* packet){
case PABB2_CONNECTION_OPCODE_INFO_LABEL_H32:
case PABB2_CONNECTION_OPCODE_INFO_LABEL_U32:
case PABB2_CONNECTION_OPCODE_INFO_LABEL_I32:
// case PABB2_CONNECTION_OPCODE_WRONG_SESSION:
// cout << "Received ack" << endl;
if (!m_log_everything){
m_logger.log("[RSC]: Receive: (0x" + tostr_hex(packet->opcode) + ") " + tostr(packet), COLOR_PURPLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ReliableStreamConnection final
}
virtual bool cancel(std::exception_ptr exception) noexcept override;

bool reset(WallDuration timeout = WallDuration::max());
bool reset(bool random_session_id, WallDuration timeout = WallDuration::max());

bool remote_protocol_is_compatible() const{
return m_remote_protocol_compatible;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ bool ReliableStreamConnectionFW::run_recv_events(const WallDuration& timeout){
? POLL_RATE
: timeout;

const PacketHeader* header = m_parser.pull_bytes(m_unreliable_connection, adjusted_timeout);
const PacketHeader* header = m_parser.pull_bytes(
m_unreliable_connection,
m_reliable_sender.session_id(),
adjusted_timeout
);
if (header == nullptr){
return false;
}
Expand Down Expand Up @@ -149,22 +153,33 @@ bool ReliableStreamConnectionFW::run_recv_events(const WallDuration& timeout){

// Now handle the different opcodes.
uint8_t opcode = header->opcode & PABB2_CONNECTION_OPCODE_MASK;

switch (opcode){
case PABB2_CONNECTION_OPCODE_ASK_RESET:
case PABB2_CONNECTION_OPCODE_ASK_RESET:{
if (header->packet_bytes < sizeof(PacketHeader_u32)){
return true;
}

const PacketHeader_u32* packet = (const PacketHeader_u32*)header;

#ifdef PABB2_SUPPORTS_PRINTF_LOGGING
printf("Resetting to session ID: %zx\n", (size_t)packet->data);
#endif
m_stream_ready = false;
m_send_is_currently_full = false;
m_reliable_sender.reset();
m_reliable_sender.reset(packet->data);
m_parser.reset();
m_stream_coalescer.reset();
m_stream_coalescer.push_packet(0);
#ifdef PABB2_ENABLE
issue_reset_to_all();
#endif
m_reliable_sender.send_oob_packet_empty(
header->seqnum,
PABB2_CONNECTION_OPCODE_RET_RESET
);
#ifdef PABB2_ENABLE
issue_reset_to_all();
#endif
return true;
}
case PABB2_CONNECTION_OPCODE_ASK_VERSION:
m_stream_coalescer.push_packet(header->seqnum);
m_reliable_sender.send_oob_packet_u32(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace PABotBase2{

const PacketHeader* PacketParser::pull_bytes(
UnreliableStreamConnectionPolling& connection,
const uint32_t& session_id,
WallDuration timeout
){
const uint8_t MIN_PACKET_SIZE = sizeof(PacketHeader) + sizeof(uint32_t);
Expand Down Expand Up @@ -114,7 +115,9 @@ const PacketHeader* PacketParser::pull_bytes(

// Verify the CRC.

uint32_t actual_crc = 0xffffffff;
uint32_t actual_crc = header->opcode == PABB2_CONNECTION_OPCODE_ASK_RESET
? 0xffffffff
: session_id;
pabb_crc32_buffer(&actual_crc, m_buffer, packet_bytes - sizeof(uint32_t));

uint32_t expected_crc;
Expand All @@ -137,6 +140,7 @@ const PacketHeader* PacketParser::pull_bytes(

void PacketParser::push_bytes(
PacketRunner& packet_runner,
const uint32_t& session_id,
const uint8_t* data, size_t bytes
){
// cout << std::string((const char*)data, bytes) << endl;
Expand Down Expand Up @@ -206,7 +210,9 @@ void PacketParser::push_bytes(

// Verify the CRC.

uint32_t actual_crc = 0xffffffff;
uint32_t actual_crc = header->opcode == PABB2_CONNECTION_OPCODE_ASK_RESET
? 0xffffffff
: session_id;
pabb_crc32_buffer(&actual_crc, m_buffer, packet_bytes - sizeof(uint32_t));

uint32_t expected_crc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct PacketParser{
//
const PacketHeader* pull_bytes(
UnreliableStreamConnectionPolling& connection,
const uint32_t& session_id,
WallDuration timeout
);

Expand All @@ -72,6 +73,7 @@ struct PacketParser{
//
void push_bytes(
PacketRunner& packet_runner,
const uint32_t& session_id,
const uint8_t* data, size_t bytes
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,29 @@ namespace PABotBase2{
#define PABB2_CONNECTION_RETRANSMIT_FLAG 0x80
#define PABB2_CONNECTION_OPCODE_MASK 0x7f

#define PABB2_CONNECTION_OPCODE_INVALID 0x00


//
// Special
// Packets
//

using SessionId = uint32_t;

struct PABB_PACK PacketHeader{
uint8_t magic_number;
uint8_t seqnum;
uint8_t packet_bytes;
uint8_t opcode;
};

#define PABB2_CONNECTION_OPCODE_INVALID 0x00

#define PABB2_CONNECTION_OPCODE_ASK_RESET 0x01
#define PABB2_CONNECTION_OPCODE_RET_RESET 0x41


struct PABB_PACK PacketHeader_u8{
uint8_t magic_number;
uint8_t seqnum;
uint8_t packet_bytes;
uint8_t opcode;
struct PABB_PACK PacketHeader_u8 : PacketHeader{
uint8_t data;
};
struct PABB_PACK PacketHeader_u16{
uint8_t magic_number;
uint8_t seqnum;
uint8_t packet_bytes;
uint8_t opcode;
struct PABB_PACK PacketHeader_u16 : PacketHeader{
uint16_t data;
};
struct PABB_PACK PacketHeader_u32{
uint8_t magic_number;
uint8_t seqnum;
uint8_t packet_bytes;
uint8_t opcode;
struct PABB_PACK PacketHeader_u32 : PacketHeader{
uint32_t data;
};

Expand All @@ -84,6 +70,9 @@ struct PABB_PACK PacketHeader_u32{
// Requests (acks required)
//

#define PABB2_CONNECTION_OPCODE_ASK_RESET 0x01
#define PABB2_CONNECTION_OPCODE_RET_RESET 0x41

#define PABB2_CONNECTION_OPCODE_ASK_VERSION 0x02
#define PABB2_CONNECTION_OPCODE_RET_VERSION 0x42

Expand All @@ -99,11 +88,7 @@ struct PABB_PACK PacketHeader_u32{
#define PABB2_CONNECTION_OPCODE_ASK_STREAM_DATA 0x12
#define PABB2_CONNECTION_OPCODE_RET_STREAM_DATA 0x52
#define PABB2_CONNECTION_OPCODE_ASK_STREAM_REQUEST 0x13 // Unused for now.
struct PABB_PACK PacketHeaderData{
uint8_t magic_number;
uint8_t seqnum;
uint8_t packet_bytes;
uint8_t opcode;
struct PABB_PACK PacketHeaderData : PacketHeader{
uint16_t stream_offset;
};

Expand Down Expand Up @@ -135,7 +120,6 @@ struct PABB_PACK PacketHeaderData{
#define PABB2_CONNECTION_OPCODE_UNKNOWN_OPCODE 0x32



}
}

Expand Down
Loading
Loading