From 1b1e93232c1f9577b311052f80bb371797040718 Mon Sep 17 00:00:00 2001 From: zhangyunping Date: Mon, 3 Jul 2023 19:55:13 +0800 Subject: [PATCH] add meta transmit for brpc mesh --- src/application_protocols/brpc/brpc_codec.cc | 117 ++++++++++++++++--- src/application_protocols/brpc/protocol.cc | 21 +++- src/application_protocols/brpc/protocol.h | 11 +- 3 files changed, 122 insertions(+), 27 deletions(-) diff --git a/src/application_protocols/brpc/brpc_codec.cc b/src/application_protocols/brpc/brpc_codec.cc index ab8d6a07..1b35c9ca 100644 --- a/src/application_protocols/brpc/brpc_codec.cc +++ b/src/application_protocols/brpc/brpc_codec.cc @@ -44,23 +44,104 @@ void BrpcCodec::encode(const MetaProtocolProxy::Metadata& metadata, (void)buffer; } -void BrpcCodec::onError(const MetaProtocolProxy::Metadata& /*metadata*/, - const MetaProtocolProxy::Error& /*error*/, Buffer::Instance& /*buffer*/) { - // BrpcHeader response; - // // Make sure to set the request id if the application protocol has one, otherwise MetaProtocol +void BrpcCodec::onError(const MetaProtocolProxy::Metadata& metadata, + const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) { + ASSERT(buffer.length() == 0); + BrpcHeader response; + // Make sure to set the request id if the application protocol has one, otherwise MetaProtocol // framework will - // // complaint that the id in the error response is not equal to the one in the request - // response.set_pack_flow(metadata.getRequestId()); - // response.set_pack_len(BrpcHeader::HEADER_SIZE); - // switch (error.type) { - // case MetaProtocolProxy::ErrorType::RouteNotFound: - // response.set_rsp_code(static_cast(BrpcCode::NoRoute)); - // break; - // default: - // response.set_rsp_code(static_cast(BrpcCode::Error)); - // break; - // } - // response.encode(buffer); + // complaint that the id in the error response is not equal to the one in the request + + (void)metadata; + //response.set_pack_flow(metadata.getRequestId()); + int16_t error_code = 0; // just for rsp error code + int brpc_error_code = 0; + std::string error_msg = "from_mesh:" + error.message; + switch (error.type) { + case MetaProtocolProxy::ErrorType::RouteNotFound: + error_code = static_cast(BrpcCode::NoRoute); + brpc_error_code = 1019; // EROUTE + break; + case MetaProtocolProxy::ErrorType::ClusterNotFound: + error_code = static_cast(BrpcCode::NoCluster); + brpc_error_code = 1001; // ENOSERVICE + break; + case MetaProtocolProxy::ErrorType::NoHealthyUpstream: + error_code = static_cast(BrpcCode::UnHealthy); + brpc_error_code = 2005; // ECLOSE + break; + case MetaProtocolProxy::ErrorType::BadResponse: + error_code = static_cast(BrpcCode::BadResponse); + brpc_error_code = 2002; // ERESPONSE + break; + case MetaProtocolProxy::ErrorType::Unspecified: + error_code = static_cast(BrpcCode::UnspecifiedError); + brpc_error_code = 2001; // EINTERNAL + break; + case MetaProtocolProxy::ErrorType::OverLimit: + error_code = static_cast(BrpcCode::OverLimit); + brpc_error_code = 2004; // ELIMIT + break; + default: + error_code = static_cast(BrpcCode::Error); + brpc_error_code = 2001; // EINTERNAL + break; + } + + //response.set_rsp_code(error_code); + + aeraki::meta_protocol::brpc::RpcMeta meta; + aeraki::meta_protocol::brpc::RpcResponseMeta* response_meta = meta.mutable_response(); + response_meta->set_error_code(brpc_error_code); + response_meta->set_error_text(error_msg); + //response_meta->set_load_baalancer_code(?); + meta.set_correlation_id(meta_.correlation_id()); + meta.set_compress_type(meta_.compress_type()); + + const int meta_size = meta.ByteSize(); + //response.set_pack_len(BrpcHeader::HEADER_SIZE + meta_size); + int payload_size = 0; + response.set_body_len(meta_size + payload_size); + response.set_meta_len(meta_size); + response.encode(buffer); + + char* meta_buffer = NULL; + char buffer_local[1024]; + if (meta_size < 1024) + { + meta_buffer = buffer_local; + } + else + { + meta_buffer = new char[meta_size]; + } + + ::google::protobuf::io::ArrayOutputStream arr_out(meta_buffer, meta_size); + ::google::protobuf::io::CodedOutputStream coded_out(&arr_out); + meta.SerializeWithCachedSizes(&coded_out); + ASSERT(!coded_out.HadError()); + buffer.add(meta_buffer, meta_size); + + ENVOY_LOG(warn, "brpc onError error={} buffer_len={} detail={}", + static_cast(error.type), static_cast(buffer.length()), + meta.ShortDebugString()); + + if (meta_size < 1024) + { + meta_buffer = NULL; + } + else + { + delete []meta_buffer; + meta_buffer = NULL; + } + + /* + if (span && received_us > 0) + { + response_meta->set_process_time_us(base::cpuwide_time_us() - received_us); + } + */ } BrpcDecodeStatus BrpcCodec::handleState(Buffer::Instance& buffer) { @@ -112,11 +193,11 @@ BrpcDecodeStatus BrpcCodec::decodeBody(Buffer::Instance& buffer) { void BrpcCodec::toMetadata(MetaProtocolProxy::Metadata& metadata) { // metadata.setRequestId(brpc_header_.get_pack_flow()); // metadata.putString("cmd", std::to_string(brpc_header_.get_req_cmd())); - metadata.originMessage().move(*origin_msg_); + metadata.getOriginMessage().move(*origin_msg_); } } // namespace Brpc } // namespace MetaProtocolProxy } // namespace NetworkFilters } // namespace Extensions -} // namespace Envoy +} // namespace Envoy \ No newline at end of file diff --git a/src/application_protocols/brpc/protocol.cc b/src/application_protocols/brpc/protocol.cc index ae21d084..c7b6f6eb 100644 --- a/src/application_protocols/brpc/protocol.cc +++ b/src/application_protocols/brpc/protocol.cc @@ -17,12 +17,20 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) { uint32_t pos = 0; - // todo : Check MAGIC - pos += sizeof(uint32_t); + // Check MAGIC + uint32_t header_magic = 0; + header_magic = buffer.peekBEInt(pos); + if (header_magic != MAGIC) + { + ENVOY_LOG(warn, "Brpc Header magic_error need={} real={}.", MAGIC, header_magic); + } + pos += sizeof(uint32_t); _body_len = buffer.peekBEInt(pos); + pos += sizeof(uint32_t); _meta_len = buffer.peekBEInt(pos); + pos += sizeof(uint32_t); ASSERT(pos == HEADER_SIZE); @@ -31,9 +39,10 @@ bool BrpcHeader::decode(Buffer::Instance& buffer) { } bool BrpcHeader::encode(Buffer::Instance& buffer) { - buffer.writeBEInt(MAGIC); - buffer.writeBEInt(_body_len); - buffer.writeBEInt(_meta_len); + //buffer.writeBEInt(MAGIC); + buffer.add("PRPC", 4); + buffer.writeBEInt(_body_len); + buffer.writeBEInt(_meta_len); return true; } @@ -41,4 +50,4 @@ bool BrpcHeader::encode(Buffer::Instance& buffer) { } // namespace MetaProtocolProxy } // namespace NetworkFilters } // namespace Extensions -} // namespace Envoy +} // namespace Envoy \ No newline at end of file diff --git a/src/application_protocols/brpc/protocol.h b/src/application_protocols/brpc/protocol.h index bd83f708..d0368a2c 100644 --- a/src/application_protocols/brpc/protocol.h +++ b/src/application_protocols/brpc/protocol.h @@ -11,7 +11,12 @@ namespace Brpc { enum class BrpcCode { NoRoute = 1, - Error = 1, + Error = 2, + NoCluster = 3, + UnHealthy = 4, + BadResponse = 5, + UnspecifiedError = 6, + OverLimit = 7 }; struct BrpcHeader : public Logger::Loggable { @@ -26,11 +31,11 @@ struct BrpcHeader : public Logger::Loggable { uint32_t get_body_len() const {return _body_len;}; uint32_t get_meta_len() const {return _meta_len;}; void set_body_len(uint32_t body_len) {_body_len = body_len;}; - void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;}; + void set_meta_len(uint32_t meta_len) {_meta_len = meta_len;}; }; } // namespace Brpc } // namespace MetaProtocolProxy } // namespace NetworkFilters } // namespace Extensions -} // namespace Envoy +} // namespace Envoy \ No newline at end of file