Skip to content

Commit

Permalink
Implement get
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 29, 2025
1 parent 5070ff3 commit 8dbcbd2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 13 deletions.
6 changes: 1 addition & 5 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1135,8 +1135,6 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub

z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
z_matching_status_t *matching_status) {
// Ideally this should be implemented as a real request to the router, but this works much faster.
// And it works as long as filtering is enabled along with interest
matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}
Expand Down Expand Up @@ -1378,9 +1376,7 @@ z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier
}

z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) {
// TODO(sashacmc): Implement
(void)querier;
(void)matching_status;
matching_status->matching = querier->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}

Expand Down
3 changes: 0 additions & 3 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
#if Z_FEATURE_MATCHING == 1
static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) {
_z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg;
// TODO(sashacmc): remove debug print
fprintf(stderr, "Matching listener callback: type=%d, msgid=%d, decl_id=%d\n", (int)msg->type, (int)msg->id,
(int)ctx->decl_id);
switch (msg->type) {
case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER:
case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: {
Expand Down
86 changes: 81 additions & 5 deletions tests/z_api_matching_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void test_matching_querier_sub(bool background) {
_context_drop(&context);
}

static void _check_status(z_owned_publisher_t* pub, bool expected) {
static void _check_publisher_status(z_owned_publisher_t* pub, bool expected) {
z_matching_status_t status;
status.matching = !expected;
z_clock_t clock = z_clock_now();
Expand All @@ -264,6 +264,8 @@ static void _check_status(z_owned_publisher_t* pub, bool expected) {
}

void test_matching_publisher_get(void) {
printf("test_matching_publisher_get\n");

z_owned_session_t s1, s2;
z_owned_config_t c1, c2;
z_config_default(&c1);
Expand All @@ -285,7 +287,7 @@ void test_matching_publisher_get(void) {
assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL));
z_sleep_s(1);

_check_status(&pub, false);
_check_publisher_status(&pub, false);

z_owned_subscriber_t sub_wrong;
z_owned_closure_sample_t callback_wrong;
Expand All @@ -294,19 +296,19 @@ void test_matching_publisher_get(void) {
z_closure_sample_move(&callback_wrong), NULL));
z_sleep_s(1);

_check_status(&pub, false);
_check_publisher_status(&pub, false);

z_owned_subscriber_t sub;
z_owned_closure_sample_t callback;
z_closure_sample(&callback, NULL, NULL, NULL);
assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub),
z_closure_sample_move(&callback), NULL));

_check_status(&pub, true);
_check_publisher_status(&pub, true);

z_subscriber_drop(z_subscriber_move(&sub));

_check_status(&pub, false);
_check_publisher_status(&pub, false);

z_publisher_drop(z_publisher_move(&pub));
z_subscriber_drop(z_subscriber_move(&sub_wrong));
Expand All @@ -320,6 +322,79 @@ void test_matching_publisher_get(void) {
z_session_drop(z_session_move(&s2));
}

static void _check_querier_status(z_owned_querier_t* querier, bool expected) {
z_matching_status_t status;
status.matching = !expected;
z_clock_t clock = z_clock_now();
while (status.matching != expected && z_clock_elapsed_s(&clock) < DEFAULT_TIMEOUT_S) {
assert_ok(z_querier_get_matching_status(z_querier_loan(querier), &status));
z_sleep_ms(100);
}
if (status.matching != expected) {
fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching);
assert(false);
}
}

void test_matching_querier_get(void) {
printf("test_matching_querier_get\n");

z_owned_session_t s1, s2;
z_owned_config_t c1, c2;
z_config_default(&c1);
z_config_default(&c2);
z_view_keyexpr_t k_sub, k_querier, k_querier_wrong;
z_view_keyexpr_from_str(&k_sub, QUERIABLE_EXPR);
z_view_keyexpr_from_str(&k_querier, QUERY_EXPR);
z_view_keyexpr_from_str(&k_querier_wrong, SUB_EXPR_WRONG);

assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

assert_ok(zp_start_read_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_read_task(z_loan_mut(s2), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));

z_owned_querier_t querier;
assert_ok(z_declare_querier(z_session_loan(&s1), &querier, z_view_keyexpr_loan(&k_querier), NULL));
z_sleep_s(1);

_check_querier_status(&querier, false);

z_owned_queryable_t queryable_wrong;
z_owned_closure_query_t callback_wrong;
z_closure_query(&callback_wrong, NULL, NULL, NULL);
assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable_wrong, z_view_keyexpr_loan(&k_querier_wrong),
z_closure_query_move(&callback_wrong), NULL));
z_sleep_s(1);

_check_querier_status(&querier, false);

z_owned_queryable_t queryable;
z_owned_closure_query_t callback;
z_closure_query(&callback, NULL, NULL, NULL);
assert_ok(z_declare_queryable(z_session_loan(&s2), &queryable, z_view_keyexpr_loan(&k_sub),
z_closure_query_move(&callback), NULL));

_check_querier_status(&querier, true);

z_queryable_drop(z_queryable_move(&queryable));

_check_querier_status(&querier, false);

z_querier_drop(z_querier_move(&querier));
z_queryable_drop(z_queryable_move(&queryable_wrong));

assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
assert_ok(zp_stop_lease_task(z_loan_mut(s1)));
assert_ok(zp_stop_lease_task(z_loan_mut(s2)));

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));
}

int main(int argc, char** argv) {
(void)argc;
(void)argv;
Expand All @@ -329,6 +404,7 @@ int main(int argc, char** argv) {

test_matching_querier_sub(true);
test_matching_querier_sub(false);
test_matching_querier_get();
}

#else
Expand Down

0 comments on commit 8dbcbd2

Please sign in to comment.