Skip to content

Commit

Permalink
add meta transmit for brpc mesh
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyunping <[email protected]>
  • Loading branch information
zhangyunping committed Jul 3, 2023
1 parent d2258d3 commit 5906529
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 27 deletions.
117 changes: 99 additions & 18 deletions src/application_protocols/brpc/brpc_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,104 @@ void BrpcCodec::encode(const MetaProtocolProxy::Metadata& metadata,
(void)buffer;
}

void BrpcCodec::onError(const MetaProtocolProxy::Metadata& /*metadata*/,
const MetaProtocolProxy::Error& /*error*/, Buffer::Instance& /*buffer*/) {
// BrpcHeader response;
// // Make sure to set the request id if the application protocol has one, otherwise MetaProtocol
void BrpcCodec::onError(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) {
ASSERT(buffer.length() == 0);
BrpcHeader response;
// Make sure to set the request id if the application protocol has one, otherwise MetaProtocol
// framework will
// // complaint that the id in the error response is not equal to the one in the request
// response.set_pack_flow(metadata.getRequestId());
// response.set_pack_len(BrpcHeader::HEADER_SIZE);
// switch (error.type) {
// case MetaProtocolProxy::ErrorType::RouteNotFound:
// response.set_rsp_code(static_cast<int16_t>(BrpcCode::NoRoute));
// break;
// default:
// response.set_rsp_code(static_cast<int16_t>(BrpcCode::Error));
// break;
// }
// response.encode(buffer);
// complaint that the id in the error response is not equal to the one in the request

(void)metadata;
//response.set_pack_flow(metadata.getRequestId());
int16_t error_code = 0; // just for rsp error code
int brpc_error_code = 0;
std::string error_msg = "from_mesh:" + error.message;
switch (error.type) {
case MetaProtocolProxy::ErrorType::RouteNotFound:
error_code = static_cast<int16_t>(BrpcCode::NoRoute);
brpc_error_code = 1019; // EROUTE
break;
case MetaProtocolProxy::ErrorType::ClusterNotFound:
error_code = static_cast<int16_t>(BrpcCode::NoCluster);
brpc_error_code = 1001; // ENOSERVICE
break;
case MetaProtocolProxy::ErrorType::NoHealthyUpstream:
error_code = static_cast<int16_t>(BrpcCode::UnHealthy);
brpc_error_code = 2005; // ECLOSE
break;
case MetaProtocolProxy::ErrorType::BadResponse:
error_code = static_cast<int16_t>(BrpcCode::BadResponse);
brpc_error_code = 2002; // ERESPONSE
break;
case MetaProtocolProxy::ErrorType::Unspecified:
error_code = static_cast<int16_t>(BrpcCode::UnspecifiedError);
brpc_error_code = 2001; // EINTERNAL
break;
case MetaProtocolProxy::ErrorType::OverLimit:
error_code = static_cast<int16_t>(BrpcCode::OverLimit);
brpc_error_code = 2004; // ELIMIT
break;
default:
error_code = static_cast<int16_t>(BrpcCode::Error);
brpc_error_code = 2001; // EINTERNAL
break;
}

//response.set_rsp_code(error_code);

aeraki::meta_protocol::brpc::RpcMeta meta;
aeraki::meta_protocol::brpc::RpcResponseMeta* response_meta = meta.mutable_response();
response_meta->set_error_code(brpc_error_code);
response_meta->set_error_text(error_msg);
//response_meta->set_load_baalancer_code(?);
meta.set_correlation_id(meta_.correlation_id());
meta.set_compress_type(meta_.compress_type());

const int meta_size = meta.ByteSize();
//response.set_pack_len(BrpcHeader::HEADER_SIZE + meta_size);
int payload_size = 0;
response.set_body_len(meta_size + payload_size);
response.set_meta_len(meta_size);
response.encode(buffer);

char* meta_buffer = NULL;
char buffer_local[1024];
if (meta_size < 1024)
{
meta_buffer = buffer_local;
}
else
{
meta_buffer = new char[meta_size];
}

::google::protobuf::io::ArrayOutputStream arr_out(meta_buffer, meta_size);
::google::protobuf::io::CodedOutputStream coded_out(&arr_out);
meta.SerializeWithCachedSizes(&coded_out);
ASSERT(!coded_out.HadError());
buffer.add(meta_buffer, meta_size);

ENVOY_LOG(warn, "brpc onError error={} buffer_len={} detail={}",
static_cast<int>(error.type), static_cast<int>(buffer.length()),
meta.ShortDebugString());

if (meta_size < 1024)
{
meta_buffer = NULL;
}
else
{
delete []meta_buffer;
meta_buffer = NULL;
}

/*
if (span && received_us > 0)
{
response_meta->set_process_time_us(base::cpuwide_time_us() - received_us);
}
*/
}

BrpcDecodeStatus BrpcCodec::handleState(Buffer::Instance& buffer) {
Expand Down Expand Up @@ -112,11 +193,11 @@ BrpcDecodeStatus BrpcCodec::decodeBody(Buffer::Instance& buffer) {
void BrpcCodec::toMetadata(MetaProtocolProxy::Metadata& metadata) {
// metadata.setRequestId(brpc_header_.get_pack_flow());
// metadata.putString("cmd", std::to_string(brpc_header_.get_req_cmd()));
metadata.originMessage().move(*origin_msg_);
metadata.getOriginMessage().move(*origin_msg_);
}

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy
21 changes: 15 additions & 6 deletions src/application_protocols/brpc/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) {

uint32_t pos = 0;

// todo : Check MAGIC
pos += sizeof(uint32_t);
// Check MAGIC
uint32_t header_magic = 0;
header_magic = buffer.peekBEInt<uint32_t>(pos);
if (header_magic != MAGIC)
{
ENVOY_LOG(warn, "Brpc Header magic_error need={} real={}.", MAGIC, header_magic);
}

pos += sizeof(uint32_t);
_body_len = buffer.peekBEInt<uint32_t>(pos);

pos += sizeof(uint32_t);
_meta_len = buffer.peekBEInt<uint32_t>(pos);

pos += sizeof(uint32_t);

ASSERT(pos == HEADER_SIZE);
Expand All @@ -31,14 +39,15 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) {
}

bool BrpcHeader::encode(Buffer::Instance& buffer) {
buffer.writeBEInt(MAGIC);
buffer.writeBEInt(_body_len);
buffer.writeBEInt(_meta_len);
//buffer.writeBEInt<uint32_t>(MAGIC);
buffer.add("PRPC", 4);
buffer.writeBEInt<uint32_t>(_body_len);
buffer.writeBEInt<uint32_t>(_meta_len);
return true;
}

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy
11 changes: 8 additions & 3 deletions src/application_protocols/brpc/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ namespace Brpc {

enum class BrpcCode {
NoRoute = 1,
Error = 1,
Error = 2,
NoCluster = 3,
UnHealthy = 4,
BadResponse = 5,
UnspecifiedError = 6,
OverLimit = 7
};

struct BrpcHeader : public Logger::Loggable<Logger::Id::filter> {
Expand All @@ -26,11 +31,11 @@ struct BrpcHeader : public Logger::Loggable<Logger::Id::filter> {
uint32_t get_body_len() const {return _body_len;};
uint32_t get_meta_len() const {return _meta_len;};
void set_body_len(uint32_t body_len) {_body_len = body_len;};
void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;};
void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;};
};

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy

0 comments on commit 5906529

Please sign in to comment.