Skip to content

Commit f313cf0

Browse files
committed
feat: connect to InfluxDB and store telemetry + car data
1 parent d51ef0f commit f313cf0

File tree

13 files changed

+179
-20
lines changed

13 files changed

+179
-20
lines changed

GAPP/apps/gapp-server/src/app.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import swaggerUi from '@fastify/swagger-ui';
66
import { carsController } from './controllers/cars.controller';
77
import sondehubPlugin from './plugins/sondehub';
88
import { sondesController } from './controllers/sondes.controller';
9+
import carsServicePlugin from './plugins/cars-service';
10+
import telemetryServicePlugin from './plugins/telemetry-service';
911

1012
interface AppOptions extends FastifyPluginOptions {
1113
influxDbToken: string;
@@ -23,12 +25,12 @@ export const app = async (fastify: FastifyInstance, opts: AppOptions) => {
2325
token: opts.influxDbToken,
2426
org: opts.influxDbOrg,
2527
});
26-
await fastify.register(sondehubPlugin, { dev: true });
28+
await fastify.register(sondehubPlugin, { dev: false });
29+
await fastify.register(carsServicePlugin);
30+
await fastify.register(telemetryServicePlugin);
2731

2832
await fastify.register(swagger);
29-
await fastify.register(swaggerUi, {
30-
routePrefix: '/docs',
31-
});
33+
await fastify.register(swaggerUi, { routePrefix: '/docs' });
3234

3335
// ROUTES
3436
fastify.register(carsController, { prefix: '/cars' });

GAPP/apps/gapp-server/src/controllers/cars.controller.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ export const carsController: FastifyPluginAsyncTypebox = async (fastify) => {
1515
const { callsign } = req.query;
1616
const { latitude, longitude, altitude } = req.body;
1717

18+
req.server.carsService.writeCarStatus(callsign, {
19+
latitude,
20+
longitude,
21+
altitude,
22+
});
23+
1824
await req.server.sondehub.uploadStationPosition({
1925
uploader_callsign: callsign,
2026
uploader_position: [latitude, longitude, altitude],

GAPP/apps/gapp-server/src/controllers/sondes.controller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export const sondesController: FastifyPluginAsyncTypebox = async (fastify) => {
1515
async (req, rep) => {
1616
const telemetryPacket = ttnPacketDto(req.body);
1717

18+
req.server.telemetryService.writeTelemetry(telemetryPacket);
1819
req.server.sondehub.addTelemetry(telemetryPacket);
1920

2021
rep.code(200).send('OK');
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { FastifyPluginAsync } from 'fastify';
2+
import fp from 'fastify-plugin';
3+
import { Plugins } from './plugins';
4+
import { CarsService } from '../services/cars.service';
5+
6+
declare module 'fastify' {
7+
interface FastifyInstance {
8+
carsService: CarsService;
9+
}
10+
}
11+
12+
const carsServicePlugin: FastifyPluginAsync = async (fastify) => {
13+
const carsService = new CarsService(fastify.influxClient, fastify.influxOrg);
14+
15+
await carsService.init();
16+
17+
fastify.decorate('carsService', carsService);
18+
fastify.addHook('onClose', async () => {
19+
await carsService.deinit();
20+
});
21+
};
22+
23+
export default fp(carsServicePlugin, {
24+
name: Plugins.CARS_SERVICE,
25+
dependencies: [Plugins.INFLUXDB],
26+
});

GAPP/apps/gapp-server/src/plugins/influxdb.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { FastifyPluginAsync, FastifyPluginOptions } from 'fastify';
22
import fp from 'fastify-plugin';
3-
import { InfluxDB, QueryApi, WriteApi } from '@influxdata/influxdb-client';
3+
import { InfluxDB } from '@influxdata/influxdb-client';
44
import { Plugins } from './plugins';
5+
import { Organization, OrgsAPI } from '@influxdata/influxdb-client-apis';
56

67
interface InfluxdbPluginOptions extends FastifyPluginOptions {
78
host: string;
@@ -11,8 +12,8 @@ interface InfluxdbPluginOptions extends FastifyPluginOptions {
1112

1213
declare module 'fastify' {
1314
interface FastifyInstance {
14-
influxWriteApi: WriteApi;
15-
influxQueryApi: QueryApi;
15+
influxClient: InfluxDB;
16+
influxOrg: Organization;
1617
}
1718
}
1819

@@ -22,16 +23,21 @@ const influxDbPlugin: FastifyPluginAsync<InfluxdbPluginOptions> = async (fastify
2223
url: options.host,
2324
});
2425

25-
const writeApi = influxClient.getWriteApi(options.org, 'fik');
26-
const queryApi = influxClient.getQueryApi(options.org);
26+
const orgsApi = new OrgsAPI(influxClient);
27+
const orgs = await orgsApi.getOrgs();
28+
let org = orgs.orgs.find((org) => org.name === options.org);
29+
if (!org) {
30+
fastify.log.info(`Creating organization ${options.org}`);
31+
org = await orgsApi.postOrgs({
32+
body: {
33+
name: options.org,
34+
description: 'Organization for storing telemetry data',
35+
},
36+
});
37+
}
2738

28-
fastify.decorate('influxWriteApi', writeApi);
29-
fastify.decorate('influxQueryApi', queryApi);
30-
fastify.addHook('onClose', async () => {
31-
fastify.log.info('Closing influxdb write api...');
32-
await writeApi.close();
33-
fastify.log.info('Influxdb write api closed');
34-
});
39+
fastify.decorate('influxOrg', org);
40+
fastify.decorate('influxClient', influxClient);
3541
};
3642

3743
export default fp(influxDbPlugin, {
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export enum Plugins {
22
INFLUXDB = 'influxdb',
33
SONDEHUB = 'sondehub',
4-
MQTT = 'mqtt',
4+
CARS_SERVICE = 'carsService',
5+
TELEMETRY_SERVICE = 'telemetryService',
56
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { FastifyPluginAsync } from 'fastify';
2+
import fp from 'fastify-plugin';
3+
import { Plugins } from './plugins';
4+
import { TelemetryService } from '../services/telemetry.service';
5+
6+
declare module 'fastify' {
7+
interface FastifyInstance {
8+
telemetryService: TelemetryService;
9+
}
10+
}
11+
12+
const telemetryServicePlugin: FastifyPluginAsync = async (fastify) => {
13+
const telemetryService = new TelemetryService(fastify.influxClient, fastify.influxOrg);
14+
15+
await telemetryService.init();
16+
17+
fastify.decorate('telemetryService', telemetryService);
18+
fastify.addHook('onClose', async () => {
19+
await telemetryService.deinit();
20+
});
21+
};
22+
23+
export default fp(telemetryServicePlugin, {
24+
name: Plugins.TELEMETRY_SERVICE,
25+
dependencies: [Plugins.INFLUXDB],
26+
});

GAPP/apps/gapp-server/src/schemas/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Type as T } from '@sinclair/typebox';
1+
import { Static, Type as T } from '@sinclair/typebox';
22

33
export const Q_Callsign = T.Object({
44
callsign: T.String(),
@@ -11,6 +11,7 @@ export const B_CarStatus = T.Object({
1111
longitude: T.Number(),
1212
altitude: T.Number(),
1313
});
14+
export type CarStatus = Static<typeof B_CarStatus>;
1415

1516
export const B_SondeTelemetry = T.Object({
1617
end_device_ids: T.Object({
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { InfluxDB, Point, WriteApi } from '@influxdata/influxdb-client';
2+
import { InfluxDbServiceBase } from '../utils/influxdb-service-base';
3+
import { Organization } from '@influxdata/influxdb-client-apis';
4+
import { CarStatus } from '../schemas';
5+
6+
export class CarsService extends InfluxDbServiceBase {
7+
private writeAPi: WriteApi;
8+
9+
constructor(private client: InfluxDB, private org: Organization) {
10+
super(client, org.id);
11+
}
12+
13+
public async init() {
14+
await this.ensureBucket('cars');
15+
this.writeAPi = this.client.getWriteApi(this.org.id, 'cars', 'ms');
16+
}
17+
18+
public async deinit() {
19+
await this.writeAPi.close();
20+
}
21+
22+
public writeCarStatus(callsign: string, status: CarStatus) {
23+
const point = new Point('car_status')
24+
.timestamp(Date.now())
25+
.tag('callsign', callsign)
26+
.floatField('latitude', status.latitude)
27+
.floatField('longitude', status.longitude)
28+
.floatField('altitude', status.altitude);
29+
30+
this.writeAPi.writePoint(point);
31+
}
32+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { InfluxDB, Point, WriteApi } from '@influxdata/influxdb-client';
2+
import { Organization } from '@influxdata/influxdb-client-apis';
3+
import { InfluxDbServiceBase } from '../utils/influxdb-service-base';
4+
import { TelemetryPacket } from '@gapp/sondehub';
5+
6+
export class TelemetryService extends InfluxDbServiceBase {
7+
private writeAPi: WriteApi;
8+
9+
constructor(private client: InfluxDB, private org: Organization) {
10+
super(client, org.id);
11+
}
12+
13+
public async init() {
14+
await this.ensureBucket('telemetry');
15+
this.writeAPi = this.client.getWriteApi(this.org.id, 'telemetry', 'ms');
16+
}
17+
18+
public async deinit() {
19+
await this.writeAPi.close();
20+
}
21+
22+
public writeTelemetry(telemetry: TelemetryPacket) {
23+
const point = new Point('telemetry_packet')
24+
.timestamp(new Date(telemetry.time_received))
25+
.tag('callsign', telemetry.payload_callsign)
26+
.floatField('latitude', telemetry.lat)
27+
.floatField('longitude', telemetry.lon)
28+
.floatField('altitude', telemetry.alt);
29+
30+
this.writeAPi.writePoint(point);
31+
}
32+
}

0 commit comments

Comments
 (0)