From 8dbcbd293c9e425e327fbd2a284a2b62171a256e Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 29 Jan 2025 16:38:48 +0100 Subject: [PATCH] Implement get --- src/api/api.c | 6 +-- src/net/matching.c | 3 -- tests/z_api_matching_test.c | 86 ++++++++++++++++++++++++++++++++++--- 3 files changed, 82 insertions(+), 13 deletions(-) diff --git a/src/api/api.c b/src/api/api.c index ae86c9aa4..c3750c5ab 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1135,8 +1135,6 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, z_matching_status_t *matching_status) { - // Ideally this should be implemented as a real request to the router, but this works much faster. - // And it works as long as filtering is enabled along with interest matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE; return _Z_RES_OK; } @@ -1378,9 +1376,7 @@ z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier } z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) { - // TODO(sashacmc): Implement - (void)querier; - (void)matching_status; + matching_status->matching = querier->_filter.ctx->state != WRITE_FILTER_ACTIVE; return _Z_RES_OK; } diff --git a/src/net/matching.c b/src/net/matching.c index eeaaa0a68..e47142253 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -29,9 +29,6 @@ #if Z_FEATURE_MATCHING == 1 static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) { _z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg; - // TODO(sashacmc): remove debug print - fprintf(stderr, "Matching listener callback: type=%d, msgid=%d, decl_id=%d\n", (int)msg->type, (int)msg->id, - (int)ctx->decl_id); switch (msg->type) { case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: { diff --git a/tests/z_api_matching_test.c b/tests/z_api_matching_test.c index 95119fa59..4824930b7 100644 --- a/tests/z_api_matching_test.c +++ b/tests/z_api_matching_test.c @@ -249,7 +249,7 @@ void test_matching_querier_sub(bool background) { _context_drop(&context); } -static void _check_status(z_owned_publisher_t* pub, bool expected) { +static void _check_publisher_status(z_owned_publisher_t* pub, bool expected) { z_matching_status_t status; status.matching = !expected; z_clock_t clock = z_clock_now(); @@ -264,6 +264,8 @@ static void _check_status(z_owned_publisher_t* pub, bool expected) { } void test_matching_publisher_get(void) { + printf("test_matching_publisher_get\n"); + z_owned_session_t s1, s2; z_owned_config_t c1, c2; z_config_default(&c1); @@ -285,7 +287,7 @@ void test_matching_publisher_get(void) { assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); z_sleep_s(1); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_owned_subscriber_t sub_wrong; z_owned_closure_sample_t callback_wrong; @@ -294,7 +296,7 @@ void test_matching_publisher_get(void) { z_closure_sample_move(&callback_wrong), NULL)); z_sleep_s(1); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_owned_subscriber_t sub; z_owned_closure_sample_t callback; @@ -302,11 +304,11 @@ void test_matching_publisher_get(void) { assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), z_closure_sample_move(&callback), NULL)); - _check_status(&pub, true); + _check_publisher_status(&pub, true); z_subscriber_drop(z_subscriber_move(&sub)); - _check_status(&pub, false); + _check_publisher_status(&pub, false); z_publisher_drop(z_publisher_move(&pub)); z_subscriber_drop(z_subscriber_move(&sub_wrong)); @@ -320,6 +322,79 @@ void test_matching_publisher_get(void) { z_session_drop(z_session_move(&s2)); } +static void _check_querier_status(z_owned_querier_t* querier, bool expected) { + z_matching_status_t status; + status.matching = !expected; + z_clock_t clock = z_clock_now(); + while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) { + assert_ok(z_querier_get_matching_status(z_querier_loan(querier), &status)); + z_sleep_ms(100); + } + if (status.matching != expected) { + fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching); + assert(false); + } +} + +void test_matching_querier_get(void) { + printf("test_matching_querier_get\n"); + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k_sub, k_querier, k_querier_wrong; + z_view_keyexpr_from_str(&k_sub, QUERIABLE_EXPR); + z_view_keyexpr_from_str(&k_querier, QUERY_EXPR); + z_view_keyexpr_from_str(&k_querier_wrong, SUB_EXPR_WRONG); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_querier_t querier; + assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL)); + z_sleep_s(1); + + _check_querier_status(&querier, false); + + z_owned_queryable_t queryable_wrong; + z_owned_closure_query_t callback_wrong; + z_closure_query(&callback_wrong, NULL, NULL, NULL); + assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable_wrong, z_view_keyexpr_loan(&k_querier_wrong), + z_closure_query_move(&callback_wrong), NULL)); + z_sleep_s(1); + + _check_querier_status(&querier, false); + + z_owned_queryable_t queryable; + z_owned_closure_query_t callback; + z_closure_query(&callback, NULL, NULL, NULL); + assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable, z_view_keyexpr_loan(&k_sub), + z_closure_query_move(&callback), NULL)); + + _check_querier_status(&querier, true); + + z_queryable_drop(z_queryable_move(&queryable)); + + _check_querier_status(&querier, false); + + z_querier_drop(z_querier_move(&querier)); + z_queryable_drop(z_queryable_move(&queryable_wrong)); + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); +} + int main(int argc, char** argv) { (void)argc; (void)argv; @@ -329,6 +404,7 @@ int main(int argc, char** argv) { test_matching_querier_sub(true); test_matching_querier_sub(false); + test_matching_querier_get(); } #else