Skip to content

Commit

Permalink
working storage and service
Browse files Browse the repository at this point in the history
  • Loading branch information
akhenakh committed Nov 21, 2019
1 parent 5cab161 commit 7441a48
Show file tree
Hide file tree
Showing 18 changed files with 1,867 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@

!/cmd/geottnd/geottnd
!/cmd/ttncli/ttncli
!/cmd/geottnd/geottnd
!/cmd/geottnd/geo.db/
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.EXPORT_ALL_VARIABLES:

ifndef VERSION
VERSION := $(shell git describe --always --tags)
endif

DATE := $(shell date -u +%Y%m%d.%H%M%S)

LDFLAGS = -trimpath -ldflags "-X=main.version=$(VERSION)-$(DATE)"
CGO_ENABLED=0

targets = geottnd

.PHONY: all lint test geottnd clean

all: test $(targets)

test: CGO_ENABLED=1
test: lint
go test -race ./...

lint:
golangci-lint run

geottnd:
cd cmd/geottnd && go build $(LDFLAGS)

clean:
rm -f cmd/geottnd/geottnd

generate:
go generate ./...
26 changes: 26 additions & 0 deletions cmd/createpayload/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"encoding/hex"
"flag"
"fmt"

"github.com/akhenakh/cayenne"
)

var (
lat = flag.Float64("lat", 48.8, "The Latitude")
lng = flag.Float64("lng", 2.2, "The Longitude")
channel = flag.Int("channel", 1, "The channel")
)

func main() {
flag.Parse()

e := cayenne.NewEncoder()
e.AddGPS(uint8(*channel), float32(*lat), float32(*lng), 0.0)

b := e.Bytes()
hexPayload := hex.EncodeToString(b)
fmt.Println("Data", hexPayload)
}
99 changes: 99 additions & 0 deletions cmd/geottncli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"bytes"
"context"
"flag"
"log"
"os"
"time"

"github.com/akhenakh/cayenne"
_ "github.com/mbobakov/grpc-consul-resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"

"github.com/akhenakh/geottn/geottnsvc"
)

var (
geoTTNURI = flag.String("geoTTNURI", "localhost:9200", "geoTTN grpc URI")
lat = flag.Float64("lat", 48.8, "Lat")
lng = flag.Float64("lng", 2.2, "Lng")
radius = flag.Float64("radius", 1000, "Radius in meters")
key = flag.String("key", "", "ask for a key, if empty perform radius search")
topic = flag.String("topic", "metar", "topic")
)

func main() {
flag.Parse()

conn, err := grpc.Dial(*geoTTNURI,
grpc.WithInsecure(),
grpc.WithBalancerName(roundrobin.Name), //nolint:staticcheck
)
if err != nil {
log.Fatal(err)
}

c := geottnsvc.NewGeoTTNClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

if *key != "" {
dp, err := c.Get(ctx, &geottnsvc.GetRequest{
Key: *key,
})
if err != nil {
log.Fatal(err)
}

dec := cayenne.NewDecoder(bytes.NewBuffer(dp.Payload))
msg, err := dec.DecodeUplink()
if err != nil {
log.Fatal(err)
}

response := make(map[string]interface{})
for k, v := range msg.Values() {
response[k] = v
}
response["latitude"] = dp.Latitude
response["longitude"] = dp.Longitude
response["device_id"] = dp.DeviceId
response["time"] = dp.Time

log.Println(response)

os.Exit(0)
}
rep, err := c.RadiusSearch(ctx, &geottnsvc.RadiusSearchRequest{
Lat: *lat,
Lng: *lng,
Radius: *radius,
})
if err != nil {
log.Fatal(err)
}

log.Println("query ok", len(rep.Points))

for _, dp := range rep.Points {
dec := cayenne.NewDecoder(bytes.NewBuffer(dp.Payload))
msg, err := dec.DecodeUplink()
if err != nil {
log.Fatal(err)
}

response := make(map[string]interface{})
for k, v := range msg.Values() {
response[k] = v
}
response["device_id"] = dp.DeviceId
response["time"] = dp.Time

log.Println(response)
}

}
105 changes: 87 additions & 18 deletions cmd/geottnd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/hex"
"fmt"
stdlog "log"
"net"
Expand All @@ -13,18 +12,28 @@ import (
"time"

ttnsdk "github.com/TheThingsNetwork/go-app-sdk"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
log "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/gorilla/mux"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_middleware "github.com/mwitkow/go-grpc-middleware"
grpc_opentracing "github.com/mwitkow/go-grpc-middleware/tracing/opentracing"
"github.com/namsral/flag"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"

"github.com/akhenakh/geottn"
"github.com/akhenakh/geottn/geottnsvc"
badgeridx "github.com/akhenakh/geottn/storage/badger"
)

const appName = "geottn"
const appName = "geottnd"

var (
version = "no version from LDFLAGS"
Expand All @@ -33,10 +42,21 @@ var (
appAccessKey = flag.String("appAccessKey", "", "The things network access key")
httpMetricsPort = flag.Int("httpMetricsPort", 8888, "http port")
httpAPIPort = flag.Int("httpAPIPort", 9201, "http API port")
grpcPort = flag.Int("grpcPort", 9200, "gRPC API port")
healthPort = flag.Int("healthPort", 6666, "grpc health port")
channel = flag.Int("channel", 1, "the Cayenne channel where to find gps messages")
dbPath = flag.String("dbPath", "geo.db", "DB path")

key = flag.String("key", "", "The key that will passed in the queries to the tiles server")
tilesURL = flag.String(
"tilesURL",
"http://127.0.0.1:8081",
"the URL where to point to get tiles",
)

httpAPIServer *http.Server
httpServer *http.Server
grpcHealthServer *grpc.Server
grpcServer *grpc.Server
httpMetricsServer *http.Server
)

Expand All @@ -62,6 +82,18 @@ func main() {

g, ctx := errgroup.WithContext(ctx)

// Badger
opts := badger.DefaultOptions(*dbPath)
opts.TableLoadingMode = options.FileIO

bdb, err := badger.Open(opts)
if err != nil {
level.Error(logger).Log("msg", "failed to open DB", "error", err, "path", *dbPath)
os.Exit(2)
}

idx := &badgeridx.Indexer{DB: bdb}

// gRPC Health Server
healthServer := health.NewServer()
g.Go(func() error {
Expand Down Expand Up @@ -98,19 +130,57 @@ func main() {
return nil
})

// web API server
// geottn server
cfg := geottn.Config{
Channel: *channel,
}
s := geottn.NewServer(appName, logger, cfg)
s.GeoDB = idx

// gRPC Server
g.Go(func() error {
mux := runtime.NewServeMux()
addr := fmt.Sprintf(":%d", *grpcPort)
ln, err := net.Listen("tcp", addr)
if err != nil {
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err)
os.Exit(2)
}

grpcServer = grpc.NewServer(
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing
// MaxConnectionAgeGrace will torn them, default to infinity
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_opentracing.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_opentracing.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
)),
)
geottnsvc.RegisterGeoTTNServer(grpcServer, s)
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr))

healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", appName), healthpb.HealthCheckResponse_SERVING)

return grpcServer.Serve(ln)
})

httpAPIServer = &http.Server{
// web server
g.Go(func() error {
r := mux.NewRouter()
r.HandleFunc("/api/data/{key}", s.DataQuery)
r.HandleFunc("/api/rect/{urlat}/{urlng}/{bllat}/{bllng}", s.RectQuery)
httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", *httpAPIPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: mux,
Handler: r,
}
level.Info(logger).Log("msg", fmt.Sprintf("HTTP API server serving at :%d", *httpAPIPort))

if err := httpAPIServer.ListenAndServe(); err != http.ErrServerClosed {
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}

Expand Down Expand Up @@ -172,12 +242,7 @@ func main() {
if msg == nil {
break
}
hexPayload := hex.EncodeToString(msg.PayloadRaw)
level.Info(logger).Log(
"devID", msg.DevID,
"msg", "received msg",
"payload", hexPayload)

s.HandleMessage(ctx, msg)
}

}
Expand All @@ -204,15 +269,19 @@ func main() {
_ = httpMetricsServer.Shutdown(shutdownCtx)
}

if httpAPIServer != nil {
_ = httpAPIServer.Shutdown(shutdownCtx)
if httpServer != nil {
_ = httpServer.Shutdown(shutdownCtx)
}

if grpcServer != nil {
grpcServer.GracefulStop()
}

if grpcHealthServer != nil {
grpcHealthServer.GracefulStop()
}

err := g.Wait()
err = g.Wait()
if err != nil {
level.Error(logger).Log("msg", "server returning an error", "error", err)
os.Exit(2)
Expand Down
3 changes: 3 additions & 0 deletions geottnsvc/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//go:generate protoc --proto_path=..:. -I .. --go_out=plugins=grpc:. geottnsvc.proto

package geottnsvc
Loading

0 comments on commit 7441a48

Please sign in to comment.