Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending pad_server state over telemetry #47

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pad_server/src/actuator.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "state.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdint.h>

Expand Down Expand Up @@ -47,6 +48,7 @@ int actuator_on(actuator_t *act) {
return err;
}
atomic_store(&act->state, true);

return 0;
}

Expand All @@ -61,6 +63,7 @@ int actuator_off(actuator_t *act) {
return err;
}
atomic_store(&act->state, false);

return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions pad_server/src/arm.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,9 @@ int change_arm_level(padstate_t *state, arm_lvl_e new_arm) {
}
} while (!err);

pthread_mutex_lock(&state->update_mut);
state->update_recorded = true;
pthread_cond_signal(&state->update_cond);
pthread_mutex_unlock(&state->update_mut);
return ARM_OK;
}
20 changes: 13 additions & 7 deletions pad_server/src/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ void padstate_init(padstate_t *state) {
for (unsigned int i = 0; i < NUM_ACTUATORS; i++) {
actuator_init(&state->actuators[i], i, gpio_actuator_on, gpio_actuator_off, NULL);
}

pthread_mutex_init(&state->update_mut, NULL);
pthread_cond_init(&state->update_cond, NULL);
state->update_recorded = false;
}

/* TODO: docs
Expand Down Expand Up @@ -103,15 +107,17 @@ int pad_actuate(padstate_t *state, uint8_t id, uint8_t req_state) {
return -1;
}

if (new_state == current_state) {
return ACT_OK;
if (new_state != current_state) {
err = actuator_set(&state->actuators[id], new_state);
if (err) {
return -1;
}
}

err = actuator_set(&state->actuators[id], new_state);

if (err) {
return -1;
}
pthread_mutex_lock(&state->update_mut);
state->update_recorded = true;
pthread_cond_signal(&state->update_cond);
pthread_mutex_unlock(&state->update_mut);

return ACT_OK;
}
10 changes: 10 additions & 0 deletions pad_server/src/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@
/* Number of actuators in the system: 12 solenoid valves, 1 fire valve, 1 quick disconnect, 1 igniter */
#define NUM_ACTUATORS (12 + 1 + 1 + 1)

typedef struct {
enum { ACT, ARM } target;
bskdany marked this conversation as resolved.
Show resolved Hide resolved
act_id_e act_id;
bool act_val;
arm_lvl_e arm_lvl;
bskdany marked this conversation as resolved.
Show resolved Hide resolved
} padstate_last_update_t;

/* State of the entire pad control system */
typedef struct {
actuator_t actuators[NUM_ACTUATORS];
_Atomic(arm_lvl_e) arm_level;
pthread_rwlock_t rw_lock;
pthread_mutex_t update_mut;
pthread_cond_t update_cond;
bool update_recorded;
} padstate_t;

void padstate_init(padstate_t *state);
Expand Down
149 changes: 107 additions & 42 deletions pad_server/src/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,20 @@
#include <time.h>
#include <unistd.h>

#include "state.h"
#include "telemetry.h"

/* Helper macro for dereferencing pointers */

#define deref(type, data) *((type *)(data))

/* Helper function for returning an error code from a thread */
/* Helper macro for dereferencing pointers */

#define thread_return(e) pthread_exit((void *)(unsigned long)((e)))
#define deref(type, data) *((type *)(data))
bskdany marked this conversation as resolved.
Show resolved Hide resolved

/* The main telemetry socket */
/* Helper function for returning an error code from a thread */

typedef struct {
int sock;
struct sockaddr_in addr;
} telemetry_sock_t;
#define thread_return(e) pthread_exit((void *)(unsigned long)((e)))

/*
* Set up the telemetry socket for connection.
Expand Down Expand Up @@ -80,6 +78,14 @@ static int telemetry_publish(telemetry_sock_t *sock, struct msghdr *msg) {
*/
static void telemetry_cleanup(void *arg) { telemetry_close((telemetry_sock_t *)(arg)); }

static void telemetry_cancel_padstate_thread(void *arg) {
pthread_t telemetry_padstate_thread = *(pthread_t *)arg;

pthread_cancel(telemetry_padstate_thread);
pthread_join(telemetry_padstate_thread, NULL);
fprintf(stderr, "Telemetry pad state thread terminated\n");
}

/*
* Cleanup function to kill a thread.
* @param arg A pointer to the pthread_t thread handle.
Expand Down Expand Up @@ -143,17 +149,7 @@ static void telemetry_publish_data(telemetry_sock_t *sock, telem_subtype_e type,
/* A function to create random data if not put in any file to read from
* @params arg The argument to run the telemetry thread
*/
static void random_data(telemetry_args_t *args) {

/* Start telemetry socket */
telemetry_sock_t telem;
int err;
err = telemetry_init(&telem, args->port, args->addr);
if (err) {
fprintf(stderr, "Could not start telemetry socket: %s\n", strerror(err));
thread_return(err);
}
pthread_cleanup_push(telemetry_cleanup, &telem);
static void random_data(telemetry_sock_t *telem) {

uint32_t time = 0;
uint32_t pressure = 0;
Expand All @@ -165,34 +161,33 @@ static void random_data(telemetry_args_t *args) {

pressure = (pressure + 1) % 255;
uint32_t pressure_i = 100 + pressure * 10;
telemetry_publish_data(&telem, TELEM_PRESSURE, 1, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 1, time, &pressure_i);
pressure_i = 200 + pressure * 20;
telemetry_publish_data(&telem, TELEM_PRESSURE, 2, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 2, time, &pressure_i);
pressure_i = 300 + pressure * 30;
telemetry_publish_data(&telem, TELEM_PRESSURE, 3, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 3, time, &pressure_i);
pressure_i = 250 + pressure * 40;
telemetry_publish_data(&telem, TELEM_PRESSURE, 4, time, &pressure_i);
telemetry_publish_data(telem, TELEM_PRESSURE, 4, time, &pressure_i);

temperature = (temperature + 1) % 20000 + 20000;
int32_t temp_i = temperature - 1;
telemetry_publish_data(&telem, TELEM_TEMP, 1, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 1, time, &temp_i);
temp_i = temperature + 1;
telemetry_publish_data(&telem, TELEM_TEMP, 2, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 2, time, &temp_i);
temp_i = temperature - 2;
telemetry_publish_data(&telem, TELEM_TEMP, 3, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 3, time, &temp_i);
temp_i = temperature + 2;
telemetry_publish_data(&telem, TELEM_TEMP, 4, time, &temp_i);
telemetry_publish_data(telem, TELEM_TEMP, 4, time, &temp_i);

mass = (mass + 10) % 4000 + 3900;
uint32_t mass_i = mass + 2;
telemetry_publish_data(&telem, TELEM_MASS, 1, time, &mass_i);
telemetry_publish_data(telem, TELEM_MASS, 1, time, &mass_i);

time = (time + 1) % 1000000;
usleep(100000);
}

thread_return(0);
pthread_cleanup_pop(1);
}

/*
Expand All @@ -205,12 +200,6 @@ void *telemetry_run(void *arg) {
char buffer[BUFSIZ];
int err;

/* Null telemetry file means nothing to do */
if (args->data_file == NULL) {
// printf("No telemetry data to send.\n");
random_data(args);
}

/* Start telemetry socket */
telemetry_sock_t telem;
err = telemetry_init(&telem, args->port, args->addr);
Expand All @@ -220,6 +209,20 @@ void *telemetry_run(void *arg) {
}
pthread_cleanup_push(telemetry_cleanup, &telem);

pthread_t telemetry_padstate_thread;
telemetry_padstate_args_t telemetry_padstate_args = {.sock = &telem, .state = args->state};
err = pthread_create(&telemetry_padstate_thread, NULL, telemetry_update_padstate, &telemetry_padstate_args);
if (err) {
fprintf(stderr, "Could not start telemetry padstate sending thread: %s\n", strerror(err));
thread_return(err);
}
pthread_cleanup_push(telemetry_cancel_padstate_thread, &telemetry_padstate_thread);

/* Null telemetry file means nothing to do */
if (args->data_file == NULL) {
random_data(&telem);
}

/* Open telemetry file */
FILE *data = fopen(args->data_file, "r");
if (data == NULL) {
Expand Down Expand Up @@ -259,26 +262,88 @@ void *telemetry_run(void *arg) {
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_PRESSURE};
pressure_p body = {.id = 1, .time = time, .pressure = pressure};

/* Send data to all clients. */
struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0,
};
telemetry_publish(&telem, &msg);

usleep(1000);
usleep(1000000);
}

thread_return(0); // Normal return

pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
}

/*
* Helper function to send the entire padstate over telemetry
* @param state the pad state
* @param sock the telemetry socket
*/
void telemetry_send_padstate(padstate_t *state, telemetry_sock_t *sock) {
struct timespec time;
clock_gettime(CLOCK_MONOTONIC, &time);
uint32_t time_ms = time.tv_sec * 1000 + time.tv_nsec / 1000000;

// sending arming update
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_ARM};
arm_lvl_e arm_lvl = padstate_get_level(state);
arm_state_p body = {.time = time_ms, .state = arm_lvl};

struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
};
telemetry_publish(sock, &msg);

// sending actuator update
for (int i = 0; i < NUM_ACTUATORS; i++) {
header_p hdr = {.type = TYPE_TELEM, .subtype = TELEM_ACT};
bool act_state;
padstate_get_actstate(state, i, &act_state);
act_state_p body = {.time = time_ms, .id = i, .state = act_state};

struct iovec pkt[2] = {
{.iov_base = &hdr, .iov_len = sizeof(hdr)},
{.iov_base = &body, .iov_len = sizeof(body)},
};
struct msghdr msg = {
.msg_iov = pkt,
.msg_iovlen = (sizeof(pkt) / sizeof(struct iovec)),
};
telemetry_publish(sock, &msg);
}
}

void *telemetry_update_padstate(void *arg) {
telemetry_padstate_args_t *args = (telemetry_padstate_args_t *)arg;
padstate_t *state = args->state;

for (;;) {
telemetry_send_padstate(state, args->sock);

struct timespec cond_timeout;
clock_gettime(CLOCK_REALTIME, &cond_timeout);
cond_timeout.tv_sec += PADSTATE_UPDATE_TIMEOUT_SEC;

// waiting until cond_timedwait times out
pthread_mutex_lock(&state->update_mut);
while (pthread_cond_timedwait(&state->update_cond, &state->update_mut, &cond_timeout) == 0 &&
bskdany marked this conversation as resolved.
Show resolved Hide resolved
state->update_recorded == true) {
telemetry_send_padstate(state, args->sock);
state->update_recorded = false;
}
pthread_mutex_unlock(&state->update_mut);
}

thread_return(0);
}
14 changes: 14 additions & 0 deletions pad_server/src/telemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
#include <sys/socket.h>

#define MAX_TELEMETRY 5
#define PADSTATE_UPDATE_TIMEOUT_SEC 5

/* The main telemetry socket */
typedef struct {
int sock;
struct sockaddr_in addr;
} telemetry_sock_t;

typedef struct {
telemetry_sock_t *sock;
padstate_t *state;
} telemetry_padstate_args_t;

typedef struct {
padstate_t *state;
Expand All @@ -16,5 +28,7 @@ typedef struct {
} telemetry_args_t;

void *telemetry_run(void *arg);
void *telemetry_update_padstate(void *arg);
void telemetry_send_padstate(padstate_t *state, telemetry_sock_t *sock);

#endif // _TELEMETRY_H_
Loading