Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add meta transmit for brpc mesh #118

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 99 additions & 18 deletions src/application_protocols/brpc/brpc_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,104 @@ void BrpcCodec::encode(const MetaProtocolProxy::Metadata& metadata,
(void)buffer;
}

void BrpcCodec::onError(const MetaProtocolProxy::Metadata& /*metadata*/,
const MetaProtocolProxy::Error& /*error*/, Buffer::Instance& /*buffer*/) {
// BrpcHeader response;
// // Make sure to set the request id if the application protocol has one, otherwise MetaProtocol
void BrpcCodec::onError(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) {
ASSERT(buffer.length() == 0);
BrpcHeader response;
// Make sure to set the request id if the application protocol has one, otherwise MetaProtocol
// framework will
// // complaint that the id in the error response is not equal to the one in the request
// response.set_pack_flow(metadata.getRequestId());
// response.set_pack_len(BrpcHeader::HEADER_SIZE);
// switch (error.type) {
// case MetaProtocolProxy::ErrorType::RouteNotFound:
// response.set_rsp_code(static_cast<int16_t>(BrpcCode::NoRoute));
// break;
// default:
// response.set_rsp_code(static_cast<int16_t>(BrpcCode::Error));
// break;
// }
// response.encode(buffer);
// complaint that the id in the error response is not equal to the one in the request

(void)metadata;
//response.set_pack_flow(metadata.getRequestId());
int16_t error_code = 0; // just for rsp error code
int brpc_error_code = 0;
std::string error_msg = "from_mesh:" + error.message;
switch (error.type) {
case MetaProtocolProxy::ErrorType::RouteNotFound:
error_code = static_cast<int16_t>(BrpcCode::NoRoute);
brpc_error_code = 1019; // EROUTE
break;
case MetaProtocolProxy::ErrorType::ClusterNotFound:
error_code = static_cast<int16_t>(BrpcCode::NoCluster);
brpc_error_code = 1001; // ENOSERVICE
break;
case MetaProtocolProxy::ErrorType::NoHealthyUpstream:
error_code = static_cast<int16_t>(BrpcCode::UnHealthy);
brpc_error_code = 2005; // ECLOSE
break;
case MetaProtocolProxy::ErrorType::BadResponse:
error_code = static_cast<int16_t>(BrpcCode::BadResponse);
brpc_error_code = 2002; // ERESPONSE
break;
case MetaProtocolProxy::ErrorType::Unspecified:
error_code = static_cast<int16_t>(BrpcCode::UnspecifiedError);
brpc_error_code = 2001; // EINTERNAL
break;
case MetaProtocolProxy::ErrorType::OverLimit:
error_code = static_cast<int16_t>(BrpcCode::OverLimit);
brpc_error_code = 2004; // ELIMIT
break;
default:
error_code = static_cast<int16_t>(BrpcCode::Error);
brpc_error_code = 2001; // EINTERNAL
break;
}

//response.set_rsp_code(error_code);

aeraki::meta_protocol::brpc::RpcMeta meta;
aeraki::meta_protocol::brpc::RpcResponseMeta* response_meta = meta.mutable_response();
response_meta->set_error_code(brpc_error_code);
response_meta->set_error_text(error_msg);
//response_meta->set_load_baalancer_code(?);
meta.set_correlation_id(meta_.correlation_id());
meta.set_compress_type(meta_.compress_type());

const int meta_size = meta.ByteSize();
//response.set_pack_len(BrpcHeader::HEADER_SIZE + meta_size);
int payload_size = 0;
response.set_body_len(meta_size + payload_size);
response.set_meta_len(meta_size);
response.encode(buffer);

char* meta_buffer = NULL;
char buffer_local[1024];
if (meta_size < 1024)
{
meta_buffer = buffer_local;
}
else
{
meta_buffer = new char[meta_size];
}

::google::protobuf::io::ArrayOutputStream arr_out(meta_buffer, meta_size);
::google::protobuf::io::CodedOutputStream coded_out(&arr_out);
meta.SerializeWithCachedSizes(&coded_out);
ASSERT(!coded_out.HadError());
buffer.add(meta_buffer, meta_size);

ENVOY_LOG(warn, "brpc onError error={} buffer_len={} detail={}",
static_cast<int>(error.type), static_cast<int>(buffer.length()),
meta.ShortDebugString());

if (meta_size < 1024)
{
meta_buffer = NULL;
}
else
{
delete []meta_buffer;
meta_buffer = NULL;
}

/*
if (span && received_us > 0)
{
response_meta->set_process_time_us(base::cpuwide_time_us() - received_us);
}
*/
}

BrpcDecodeStatus BrpcCodec::handleState(Buffer::Instance& buffer) {
Expand Down Expand Up @@ -112,11 +193,11 @@ BrpcDecodeStatus BrpcCodec::decodeBody(Buffer::Instance& buffer) {
void BrpcCodec::toMetadata(MetaProtocolProxy::Metadata& metadata) {
// metadata.setRequestId(brpc_header_.get_pack_flow());
// metadata.putString("cmd", std::to_string(brpc_header_.get_req_cmd()));
metadata.originMessage().move(*origin_msg_);
metadata.getOriginMessage().move(*origin_msg_);
}

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy
21 changes: 15 additions & 6 deletions src/application_protocols/brpc/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) {

uint32_t pos = 0;

// todo : Check MAGIC
pos += sizeof(uint32_t);
// Check MAGIC
uint32_t header_magic = 0;
header_magic = buffer.peekBEInt<uint32_t>(pos);
if (header_magic != MAGIC)
{
ENVOY_LOG(warn, "Brpc Header magic_error need={} real={}.", MAGIC, header_magic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方 header MAGIC 不对,是不是应该报错并返回错误

}

pos += sizeof(uint32_t);
_body_len = buffer.peekBEInt<uint32_t>(pos);

pos += sizeof(uint32_t);
_meta_len = buffer.peekBEInt<uint32_t>(pos);

pos += sizeof(uint32_t);

ASSERT(pos == HEADER_SIZE);
Expand All @@ -31,14 +39,15 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) {
}

bool BrpcHeader::encode(Buffer::Instance& buffer) {
buffer.writeBEInt(MAGIC);
buffer.writeBEInt(_body_len);
buffer.writeBEInt(_meta_len);
//buffer.writeBEInt<uint32_t>(MAGIC);
buffer.add("PRPC", 4);
buffer.writeBEInt<uint32_t>(_body_len);
buffer.writeBEInt<uint32_t>(_meta_len);
return true;
}

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy
11 changes: 8 additions & 3 deletions src/application_protocols/brpc/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ namespace Brpc {

enum class BrpcCode {
NoRoute = 1,
Error = 1,
Error = 2,
NoCluster = 3,
UnHealthy = 4,
BadResponse = 5,
UnspecifiedError = 6,
OverLimit = 7
};

struct BrpcHeader : public Logger::Loggable<Logger::Id::filter> {
Expand All @@ -26,11 +31,11 @@ struct BrpcHeader : public Logger::Loggable<Logger::Id::filter> {
uint32_t get_body_len() const {return _body_len;};
uint32_t get_meta_len() const {return _meta_len;};
void set_body_len(uint32_t body_len) {_body_len = body_len;};
void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;};
void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;};
};

} // namespace Brpc
} // namespace MetaProtocolProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
} // namespace Envoy