Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending pad_server state over telemetry #47

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pad_server/src/actuator.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "state.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdint.h>

Expand Down Expand Up @@ -47,6 +48,7 @@ int actuator_on(actuator_t *act) {
return err;
}
atomic_store(&act->state, true);

return 0;
}

Expand All @@ -61,6 +63,7 @@ int actuator_off(actuator_t *act) {
return err;
}
atomic_store(&act->state, false);

return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions pad_server/src/arm.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,9 @@ int change_arm_level(padstate_t *state, arm_lvl_e new_arm) {
}
} while (!err);

pthread_mutex_lock(&state->update_mut);
state->update_recorded = true;
pthread_cond_signal(&state->update_cond);
pthread_mutex_unlock(&state->update_mut);
return ARM_OK;
}
20 changes: 13 additions & 7 deletions pad_server/src/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ void padstate_init(padstate_t *state) {
for (unsigned int i = 0; i < NUM_ACTUATORS; i++) {
actuator_init(&state->actuators[i], i, gpio_actuator_on, gpio_actuator_off, NULL);
}

pthread_mutex_init(&state->update_mut, NULL);
pthread_cond_init(&state->update_cond, NULL);
state->update_recorded = false;
}

/* TODO: docs
Expand Down Expand Up @@ -103,15 +107,17 @@ int pad_actuate(padstate_t *state, uint8_t id, uint8_t req_state) {
return -1;
}

if (new_state == current_state) {
return ACT_OK;
if (new_state != current_state) {
err = actuator_set(&state->actuators[id], new_state);
if (err) {
return -1;
}
}

err = actuator_set(&state->actuators[id], new_state);

if (err) {
return -1;
}
pthread_mutex_lock(&state->update_mut);
state->update_recorded = true;
pthread_cond_signal(&state->update_cond);
pthread_mutex_unlock(&state->update_mut);

return ACT_OK;
}
3 changes: 3 additions & 0 deletions pad_server/src/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ typedef struct {
actuator_t actuators[NUM_ACTUATORS];
_Atomic(arm_lvl_e) arm_level;
pthread_rwlock_t rw_lock;
pthread_mutex_t update_mut;
pthread_cond_t update_cond;
bool update_recorded;
} padstate_t;

void padstate_init(padstate_t *state);
Expand Down
146 changes: 104 additions & 42 deletions pad_server/src/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <time.h>
#include <unistd.h>

#include "state.h"
#include "telemetry.h"

/* Helper macro for dereferencing pointers */
Expand All @@ -21,13 +22,6 @@

#define thread_return(e) pthread_exit((void *)(unsigned long)((e)))

/* The main telemetry socket */

typedef struct {
int sock;
struct sockaddr_in addr;
} telemetry_sock_t;

/*
* Set up the telemetry socket for connection.
* @param sock The telemetry socket to initialize.
Expand Down Expand Up @@ -80,6 +74,14 @@ static int telemetry_publish(telemetry_sock_t *sock, struct msghdr *msg) {
*/
static void telemetry_cleanup(void *arg) { telemetry_close((telemetry_sock_t *)(arg)); }

static void telemetry_cancel_padstate_thread(void *arg) {
pthread_t telemetry_padstate_thread = *(pthread_t *)arg;

pthread_cancel(telemetry_padstate_thread);
pthread_join(telemetry_padstate_thread, NULL);
fprintf(stderr, "Telemetry pad state thread terminated\n");
}

/*
* Cleanup function to kill a thread.
* @param arg A pointer to the pthread_t thread handle.
Expand Down Expand Up @@ -143,17 +145,7 @@ static void telemetry_publish_data(telemetry_sock_t *sock, telem_subtype_e type,
/* A function to create random data if not put in any file to read from
* @params arg The argument to run the telemetry thread
*/
static void random_data(telemetry_args_t *args) {

/* Start telemetry socket */
telemetry_sock_t telem;
int err;
err = telemetry_init(&telem, args->port, args->addr);
if (err) {
fprintf(stderr, "Could not start telemetry socket: %s\n", strerror(err));
thread_return(err);
}
pthread_cleanup_push(telemetry_cleanup, &telem);
static void random_data(telemetry_sock_t *telem) {

uint32_t time = 0;
uint32_t pressure = 0;
Expand All @@ -165,34 +157,33 @@ static void random_data(telemetry_args_t *args) {

pressure = (pressure + 1) % 255;
uint32_t pressure_i = 100 + pressure * 10;
telemetry_publish_data(&telem, TELEM_PRESSURE, 1, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 1, time, &pressure_i);
pressure_i = 200 + pressure * 20;
telemetry_publish_data(&telem, TELEM_PRESSURE, 2, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 2, time, &pressure_i);
pressure_i = 300 + pressure * 30;
telemetry_publish_data(&telem, TELEM_PRESSURE, 3, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 3, time, &pressure_i);
pressure_i = 250 + pressure * 40;
telemetry_publish_data(&telem, TELEM_PRESSURE, 4, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 4, time, &pressure_i);

temperature = (temperature + 1) % 20000 + 20000;
int32_t temp_i = temperature - 1;
telemetry_publish_data(&telem, TELEM_TEMP, 1, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 1, time, &temp_i);
temp_i = temperature + 1;
telemetry_publish_data(&telem, TELEM_TEMP, 2, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 2, time, &temp_i);
temp_i = temperature - 2;
telemetry_publish_data(&telem, TELEM_TEMP, 3, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 3, time, &temp_i);
temp_i = temperature + 2;
telemetry_publish_data(&telem, TELEM_TEMP, 4, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 4, time, &temp_i);

mass = (mass + 10) % 4000 + 3900;
uint32_t mass_i = mass + 2;
telemetry_publish_data(&telem, TELEM_MASS, 1, time, &mass_i);
telemetry_publish_data(telem, TELEM_MASS, 1, time, &mass_i);

time = (time + 1) % 1000000;
usleep(100000);
}

thread_return(0);
pthread_cleanup_pop(1);
}

/*
Expand All @@ -205,12 +196,6 @@ void *telemetry_run(void *arg) {
char buffer[BUFSIZ];
int err;

/* Null telemetry file means nothing to do */
if (args->data_file == NULL) {
// printf("No telemetry data to send.\n");
random_data(args);
}

/* Start telemetry socket */
telemetry_sock_t telem;
err = telemetry_init(&telem, args->port, args->addr);
Expand All @@ -220,6 +205,20 @@ void *telemetry_run(void *arg) {
}
pthread_cleanup_push(telemetry_cleanup, &telem);

pthread_t telemetry_padstate_thread;
telemetry_padstate_args_t telemetry_padstate_args = {.sock = &telem, .state = args->state};
err = pthread_create(&telemetry_padstate_thread, NULL, telemetry_update_padstate, &telemetry_padstate_args);
if (err) {
fprintf(stderr, "Could not start telemetry padstate sending thread: %s\n", strerror(err));
thread_return(err);
}
pthread_cleanup_push(telemetry_cancel_padstate_thread, &telemetry_padstate_thread);

/* Null telemetry file means nothing to do */
if (args->data_file == NULL) {
random_data(&telem);
}

/* Open telemetry file */
FILE *data = fopen(args->data_file, "r");
if (data == NULL) {
Expand Down Expand Up @@ -259,26 +258,89 @@ void *telemetry_run(void *arg) {
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_PRESSURE};
pressure_p body = {.id = 1, .time = time, .pressure = pressure};

/* Send data to all clients. */
struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0,
};
telemetry_publish(&telem, &msg);

usleep(1000);
usleep(1000000);
}

thread_return(0); // Normal return

pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
}

/*
* Helper function to send the entire padstate over telemetry
* @param state the pad state
* @param sock the telemetry socket
*/
void telemetry_send_padstate(padstate_t *state, telemetry_sock_t *sock) {
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
uint32_t time_ms = time.tv_sec * 1000 + time.tv_nsec / 1000000;

// sending arming update
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_ARM};
arm_lvl_e arm_lvl = padstate_get_level(state);
arm_state_p body = {.time = time_ms, .state = arm_lvl};

struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
};
telemetry_publish(sock, &msg);

// sending actuator update
for (int i = 0; i < NUM_ACTUATORS; i++) {
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_ACT};
bool act_state;
padstate_get_actstate(state, i, &act_state);
act_state_p body = {.time = time_ms, .id = i, .state = act_state};

struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
};
telemetry_publish(sock, &msg);
}
}

void *telemetry_update_padstate(void *arg) {
telemetry_padstate_args_t *args = (telemetry_padstate_args_t *)arg;
padstate_t *state = args->state;

for (;;) {
struct timespec cond_timeout;
clock_gettime(CLOCK_REALTIME, &cond_timeout);
cond_timeout.tv_sec += PADSTATE_UPDATE_TIMEOUT_SEC;

int err = -1;
pthread_mutex_lock(&state->update_mut);
// waiting until either the cond times out or an update is received
// and we confirmed it was not a spurious wakeup
while (err != ETIMEDOUT && !state->update_recorded) {
pthread_cond_timedwait(&state->update_cond, &state->update_mut, &cond_timeout);
}

telemetry_send_padstate(state, args->sock);
state->update_recorded = false;
pthread_mutex_unlock(&state->update_mut);
}

thread_return(0);
}
14 changes: 14 additions & 0 deletions pad_server/src/telemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
#include <sys/socket.h>

#define MAX_TELEMETRY 5
#define PADSTATE_UPDATE_TIMEOUT_SEC 5

/* The main telemetry socket */
typedef struct {
int sock;
struct sockaddr_in addr;
} telemetry_sock_t;

typedef struct {
telemetry_sock_t *sock;
padstate_t *state;
} telemetry_padstate_args_t;

typedef struct {
padstate_t *state;
Expand All @@ -16,5 +28,7 @@ typedef struct {
} telemetry_args_t;

void *telemetry_run(void *arg);
void *telemetry_update_padstate(void *arg);
void telemetry_send_padstate(padstate_t *state, telemetry_sock_t *sock);

#endif // _TELEMETRY_H_
Loading