Skip to content

Commit

Permalink
Refactor rx layer (#875)
Browse files Browse the repository at this point in the history
* refactor: rx transport layer

* refactor: rx session layer
  • Loading branch information
jean-roland authored Feb 3, 2025
1 parent bbb343c commit e5ec8b2
Show file tree
Hide file tree
Showing 6 changed files with 594 additions and 548 deletions.
20 changes: 20 additions & 0 deletions src/session/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,24 @@ void _z_liveliness_clear(_z_session_t *zn) {
_z_session_mutex_unlock(zn);
}

#else // Z_FEATURE_LIVELINESS == 0

z_result_t _z_liveliness_process_token_declare(_z_session_t *zn, const _z_n_msg_declare_t *decl) {
_ZP_UNUSED(zn);
_ZP_UNUSED(decl);
return _Z_RES_OK;
}

z_result_t _z_liveliness_process_token_undeclare(_z_session_t *zn, const _z_n_msg_declare_t *decl) {
_ZP_UNUSED(zn);
_ZP_UNUSED(decl);
return _Z_RES_OK;
}

z_result_t _z_liveliness_process_declare_final(_z_session_t *zn, const _z_n_msg_declare_t *decl) {
_ZP_UNUSED(zn);
_ZP_UNUSED(decl);
return _Z_RES_OK;
}

#endif // Z_FEATURE_LIVELINESS == 1
3 changes: 1 addition & 2 deletions src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
z_result_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability) {
z_result_t ret = _Z_RES_OK;

// TODO check body to know where to dispatch

// Memory cleaning must be done in the feature layer
if (push->_body._is_put) {
_z_msg_put_t *put = &push->_body._body._put;
ret = _z_trigger_subscriptions_put(zn, &push->_key, &put->_payload, &put->_encoding, &put->_commons._timestamp,
Expand Down
4 changes: 0 additions & 4 deletions src/session/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
z_result_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t *key, _z_msg_reply_t *reply) {
z_result_t ret = _Z_RES_OK;

// TODO check id to know where to dispatch

#if Z_FEATURE_QUERY == 1
ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put,
(reply->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE));
Expand All @@ -39,8 +37,6 @@ z_result_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t
z_result_t _z_trigger_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *error) {
z_result_t ret = _Z_RES_OK;

// TODO check id to know where to dispatch

#if Z_FEATURE_QUERY == 1
ret = _z_trigger_query_reply_err(zn, id, error);
#else
Expand Down
246 changes: 140 additions & 106 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,133 +36,167 @@
#include "zenoh-pico/utils/logging.h"

/*------------------ Handle message ------------------*/
z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, uint16_t local_peer_id) {
z_result_t ret = _Z_RES_OK;
_z_session_t *zn = _Z_RC_IN_VAL(zsrc);

switch (msg->_tag) {
case _Z_N_DECLARE: {
_z_n_msg_declare_t *decl = &msg->_body._declare;
_Z_DEBUG("Handling _Z_N_DECLARE: %i", decl->_decl._tag);
switch (decl->_decl._tag) {
case _Z_DECL_KEXPR: {
if (_z_register_resource(zn, &decl->_decl._body._decl_kexpr._keyexpr,
decl->_decl._body._decl_kexpr._id, local_peer_id) == 0) {
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}
} break;
case _Z_UNDECL_KEXPR: {
_z_unregister_resource(zn, decl->_decl._body._undecl_kexpr._id, local_peer_id);
} break;
case _Z_DECL_SUBSCRIBER: {
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_DECL_QUERYABLE: {
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_DECL_TOKEN: {
#if Z_FEATURE_LIVELINESS == 1
_z_liveliness_process_token_declare(zn, decl);
#endif
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_UNDECL_SUBSCRIBER: {
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_UNDECL_QUERYABLE: {
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_UNDECL_TOKEN: {
#if Z_FEATURE_LIVELINESS == 1
_z_liveliness_process_token_undeclare(zn, decl);
#endif
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_DECL_FINAL: {
#if Z_FEATURE_LIVELINESS == 1
_z_liveliness_process_declare_final(zn, decl);
#endif
// Check that interest id is valid
if (!decl->has_interest_id) {
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_z_interest_process_declare_final(zn, decl->_interest_id);
} break;
static z_result_t _z_handle_declare_inner(_z_session_t *zn, _z_n_msg_declare_t *decl, uint16_t local_peer_id) {
switch (decl->_decl._tag) {
case _Z_DECL_KEXPR:
if (_z_register_resource(zn, &decl->_decl._body._decl_kexpr._keyexpr, decl->_decl._body._decl_kexpr._id,
local_peer_id) == 0) {
return _Z_ERR_ENTITY_DECLARATION_FAILED;
}
_z_n_msg_declare_clear(&msg->_body._declare);
} break;
break;

case _Z_N_PUSH: {
_Z_DEBUG("Handling _Z_N_PUSH");
_z_n_msg_push_t *push = &msg->_body._push;
ret = _z_trigger_push(zn, push, msg->_reliability);
} break;
case _Z_UNDECL_KEXPR:
_z_unregister_resource(zn, decl->_decl._body._undecl_kexpr._id, local_peer_id);
break;

case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST");
_z_n_msg_request_t *req = &msg->_body._request;
switch (req->_tag) {
case _Z_REQUEST_QUERY: {
case _Z_DECL_SUBSCRIBER:
return _z_interest_process_declares(zn, &decl->_decl);

case _Z_DECL_QUERYABLE:
return _z_interest_process_declares(zn, &decl->_decl);

case _Z_DECL_TOKEN:
_Z_RETURN_IF_ERR(_z_liveliness_process_token_declare(zn, decl));
return _z_interest_process_declares(zn, &decl->_decl);

case _Z_UNDECL_SUBSCRIBER:
return _z_interest_process_undeclares(zn, &decl->_decl);

case _Z_UNDECL_QUERYABLE:
return _z_interest_process_undeclares(zn, &decl->_decl);

case _Z_UNDECL_TOKEN:
_Z_RETURN_IF_ERR(_z_liveliness_process_token_undeclare(zn, decl));
return _z_interest_process_undeclares(zn, &decl->_decl);

case _Z_DECL_FINAL:
_Z_RETURN_IF_ERR(_z_liveliness_process_declare_final(zn, decl));
// Check that interest id is valid
if (!decl->has_interest_id) {
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
return _z_interest_process_declare_final(zn, decl->_interest_id);

default:
_Z_INFO("Received unknown declare tag: %d\n", decl->_decl._tag);
break;
}
return _Z_RES_OK;
}

static z_result_t _z_handle_declare(_z_session_t *zn, _z_n_msg_declare_t *decl, uint16_t local_peer_id) {
z_result_t ret = _z_handle_declare_inner(zn, decl, local_peer_id);
_z_n_msg_declare_clear(decl);
return ret;
}

static z_result_t _z_handle_request(_z_session_rc_t *zsrc, _z_session_t *zn, _z_n_msg_request_t *req,
z_reliability_t reliability) {
switch (req->_tag) {
case _Z_REQUEST_QUERY:
#if Z_FEATURE_QUERYABLE == 1
_z_msg_query_t *query = &req->_body._query;
ret = _z_trigger_queryables(zsrc, query, &req->_key, (uint32_t)req->_rid);
// Memory cleaning must be done in the feature layer
return _z_trigger_queryables(zsrc, &req->_body._query, &req->_key, (uint32_t)req->_rid);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported");
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported");
_z_n_msg_request_clear(req);
break;
#endif
} break;
case _Z_REQUEST_PUT: {

case _Z_REQUEST_PUT: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_put_t put = req->_body._put;
ret = _z_trigger_subscriptions_put(zn, &req->_key, &put._payload, &put._encoding,
&put._commons._timestamp, req->_ext_qos, &put._attachment,
msg->_reliability);
_z_msg_put_t put = req->_body._put;
// Memory cleaning must be done in the feature layer
_Z_RETURN_IF_ERR(_z_trigger_subscriptions_put(zn, &req->_key, &put._payload, &put._encoding,
&put._commons._timestamp, req->_ext_qos, &put._attachment,
reliability));
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
case _Z_REQUEST_DEL: {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
z_result_t ret = _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
#if Z_FEATURE_SUBSCRIPTION == 0
_z_n_msg_request_clear(req);
#endif
return ret;
}
case _Z_REQUEST_DEL: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_del_t del = req->_body._del;
ret = _z_trigger_subscriptions_del(zn, &req->_key, &del._commons._timestamp, req->_ext_qos,
&del._attachment, msg->_reliability);
_z_msg_del_t del = req->_body._del;
// Memory cleaning must be done in the feature layer
_Z_RETURN_IF_ERR(_z_trigger_subscriptions_del(zn, &req->_key, &del._commons._timestamp, req->_ext_qos,
&del._attachment, reliability));
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
}
} break;
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
z_result_t ret = _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
#if Z_FEATURE_SUBSCRIPTION == 0
_z_n_msg_request_clear(req);
#endif
return ret;
}

case _Z_N_RESPONSE: {
default:
_Z_INFO("Received unknown request tag: %d\n", req->_tag);
_z_n_msg_request_clear(req);
break;
}
return _Z_RES_OK;
}

static z_result_t _z_handle_response(_z_session_t *zn, _z_n_msg_response_t *resp) {
#if Z_FEATURE_QUERY == 1
switch (resp->_tag) {
case _Z_RESPONSE_BODY_REPLY:
// Memory cleaning must be done in the feature layer
return _z_trigger_reply_partial(zn, resp->_request_id, &resp->_key, &resp->_body._reply);
case _Z_RESPONSE_BODY_ERR:
// Memory cleaning must be done in the feature layer
return _z_trigger_reply_err(zn, resp->_request_id, &resp->_body._err);
default:
_Z_INFO("Received unknown response tag: %d\n", resp->_tag);
_z_n_msg_response_clear(resp);
break;
}
#else
_z_n_msg_response_clear(resp);
#endif
return _Z_RES_OK;
}

z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, uint16_t local_peer_id) {
z_result_t ret = _Z_RES_OK;
_z_session_t *zn = _Z_RC_IN_VAL(zsrc);

switch (msg->_tag) {
case _Z_N_DECLARE:
_Z_DEBUG("Handling _Z_N_DECLARE: %i", msg->_body._declare._decl._tag);
ret = _z_handle_declare(zn, &msg->_body._declare, local_peer_id);
break;

case _Z_N_PUSH:
_Z_DEBUG("Handling _Z_N_PUSH");
ret = _z_trigger_push(zn, &msg->_body._push, msg->_reliability);
break;

case _Z_N_REQUEST:
_Z_DEBUG("Handling _Z_N_REQUEST");
ret = _z_handle_request(zsrc, zn, &msg->_body._request, msg->_reliability);
break;

case _Z_N_RESPONSE:
_Z_DEBUG("Handling _Z_N_RESPONSE");
_z_n_msg_response_t *response = &msg->_body._response;
switch (response->_tag) {
case _Z_RESPONSE_BODY_REPLY: {
_z_msg_reply_t *reply = &response->_body._reply;
ret = _z_trigger_reply_partial(zn, response->_request_id, &response->_key, reply);
} break;
case _Z_RESPONSE_BODY_ERR: {
_z_msg_err_t *error = &response->_body._err;
ret = _z_trigger_reply_err(zn, response->_request_id, error);
} break;
}
} break;
ret = _z_handle_response(zn, &msg->_body._response);
break;

case _Z_N_RESPONSE_FINAL: {
case _Z_N_RESPONSE_FINAL:
_Z_DEBUG("Handling _Z_N_RESPONSE_FINAL");
ret = _z_trigger_reply_final(zn, &msg->_body._response_final);
_z_n_msg_response_final_clear(&msg->_body._response_final);
} break;
break;

case _Z_N_INTEREST: {
_Z_DEBUG("Handling _Z_N_INTEREST");
_z_n_msg_interest_t *interest = &msg->_body._interest;

bool not_final = ((interest->_interest.flags & _Z_INTEREST_NOT_FINAL_MASK) != 0);
if (not_final) {
if ((interest->_interest.flags & _Z_INTEREST_NOT_FINAL_MASK) != 0) {
_z_interest_process_interest(zn, interest->_interest._keyexpr, interest->_interest._id,
interest->_interest.flags);
} else {
Expand Down
Loading

0 comments on commit e5ec8b2

Please sign in to comment.