Skip to content
25 changes: 24 additions & 1 deletion cloud_webserver_v2/internal/background/mcap_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ type PostProcessMCAPUploadJob struct{}
// It also saves all this information to the database and stores files on S3.
func (p *PostProcessMCAPUploadJob) Process(fp *FileProcessor, job *FileJob) error {
ctx := context.TODO()
recordId := primitive.NewObjectID()
fp.setCurrentlyProcessing(true)
fp.updateJobStatus(job, StatusProcessing)
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "job_started", nil)

genericFileName := strings.Split(job.Filename, ".")[0]
mcapResults, err := p.readMCAPMessages(ctx, job, genericFileName)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "job_failed", &errMsg)
return err
}

Expand Down Expand Up @@ -61,22 +65,29 @@ func (p *PostProcessMCAPUploadJob) Process(fp *FileProcessor, job *FileJob) erro
// Uploading MCAP file to S3
mcapFileS3Reader, err := os.Open(job.FilePath)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "mcap_upload_failed", &errMsg)
log.Fatalf("could not open mcap file %v", job.FilePath)
}
defer mcapFileS3Reader.Close()

recordId := primitive.NewObjectID()
// recordId := primitive.NewObjectID() moved to top
mcapFileName := job.Filename
mcapObjectFilePath := fmt.Sprintf("%s/%s", recordId.Hex(), mcapFileName)
err = fp.s3Repository.WriteObjectReader(ctx, mcapFileS3Reader, mcapObjectFilePath)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "mcap_upload_failed", &errMsg)
log.Fatal(err)
}
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "mcap_uploaded", nil)
log.Printf("uploaded mcap file %v to s3", mcapFileName)

// Uploading HDF5 file to S3
hdf5File, err := os.Open(hdf5Location)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "hdf5_failed", &errMsg)
log.Fatalf("could not open mat matFile: %v", err)
}
defer hdf5File.Close()
Expand All @@ -85,26 +96,35 @@ func (p *PostProcessMCAPUploadJob) Process(fp *FileProcessor, job *FileJob) erro
matObjectFilePath := fmt.Sprintf("%s/%s", recordId.Hex(), hdf5FileName)
err = fp.s3Repository.WriteObjectReader(ctx, hdf5File, matObjectFilePath)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "hdf5_failed", &errMsg)
log.Fatal(err)
}
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "hdf5_uploaded", nil)
log.Printf("uploaded hdf5 file %v to s3", hdf5FileName)

// Uploading Lat-Lon file to S3
vnLatLonPlotName := fmt.Sprintf("%v_LatLon.png", genericFileName)
vnLatLonPlotFileObjectPath := fmt.Sprintf("%s/%s", recordId.Hex(), vnLatLonPlotName)
err = fp.s3Repository.WriteObjectWriterTo(ctx, vnLatLonPlotWriter, vnLatLonPlotFileObjectPath)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "plots_failed", &errMsg)
log.Fatal(err)
}
// _, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "plots_uploaded", nil)
log.Printf("uploaded vn lat lon plot %v to s3", vnLatLonPlotName)

// Uploading Time-Vel file to S3
vnTimeVelPlotName := fmt.Sprintf("%v_Velocity.png", genericFileName)
vnTimeVelPlotFileObjectPath := fmt.Sprintf("%s/%s", recordId.Hex(), vnTimeVelPlotName)
err = fp.s3Repository.WriteObjectWriterTo(ctx, vnTimeVelPlotWriter, vnTimeVelPlotFileObjectPath)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "plots_failed", &errMsg)
log.Fatal(err)
}
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "plots_uploaded", nil)
log.Printf("uploaded vn time vel plot %v to s3", vnTimeVelPlotName)

// After successful processing, if we are in PRODUCTION, save the mcap and h5 file to our docker volume
Expand Down Expand Up @@ -193,6 +213,8 @@ func (p *PostProcessMCAPUploadJob) Process(fp *FileProcessor, job *FileJob) erro

_, err = fp.dbClient.VehicleRunUseCase().CreateVehicleRun(ctx, vehicleRunModel)
if err != nil {
errMsg := err.Error()
_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "job_failed", &errMsg)
log.Fatal(err)
}

Expand All @@ -202,6 +224,7 @@ func (p *PostProcessMCAPUploadJob) Process(fp *FileProcessor, job *FileJob) erro
fp.updateJobStatus(job, StatusCompleted)
fp.setCurrentlyProcessing(false)

_, _ = fp.dbClient.EventsUseCase().LogEvent(ctx, recordId, job.Filename, "job_completed", nil)
log.Printf("Completed job %v", job.ID)
return nil
}
Expand Down
11 changes: 11 additions & 0 deletions cloud_webserver_v2/internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type DatabaseClient struct {
databaseClient *mongo.Client
vehicleRunRepository repository.VehicleRunRepository
carMetricsRepository repository.CarMetricsRepository
eventsRepository repository.EventsRepository
}

const VehicleDataDatabase = "vehicle_data_db"
Expand Down Expand Up @@ -55,6 +56,12 @@ func NewDatabaseClient(ctx context.Context, uri string) (*DatabaseClient, error)
}
databaseClient.carMetricsRepository = carMetricsRepository

eventsRepository, err := repository.NewMongoEventsRepository(client, vehicleDataDatabase)
if err != nil {
return nil, fmt.Errorf("could not create eventsRepository: %v", err)
}
databaseClient.eventsRepository = eventsRepository

return databaseClient, nil
}

Expand All @@ -66,6 +73,10 @@ func (client *DatabaseClient) CarMetricsUseCase() *usecase.CarMetricsUseCase {
return usecase.NewCarMetricsUseCase(client.carMetricsRepository)
}

func (client *DatabaseClient) EventsUseCase() *usecase.EventsUseCase {
return usecase.NewEventsUseCase(client.eventsRepository)
}

func (client *DatabaseClient) Disonnect(ctx context.Context) error {
err := client.databaseClient.Disconnect(ctx)
if err != nil {
Expand Down
109 changes: 109 additions & 0 deletions cloud_webserver_v2/internal/database/repository/events_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package repository

import (
"context"
"fmt"

"github.com/hytech-racing/cloud-webserver-v2/internal/models"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

const EventsCollection string = "events"

type EventsRepository interface {
Save(ctx context.Context, events *models.EventsModel) (*models.EventsModel, error)
GetWithEventsFilters(ctx context.Context, filters *bson.M) ([]models.EventsModel, error)
GetEventsFromId(ctx context.Context, id primitive.ObjectID) (*models.EventsModel, error)
DeleteEventsFromId(ctx context.Context, id primitive.ObjectID) error
UpdateEventsFromId(ctx context.Context, id primitive.ObjectID, events *models.EventsModel) error
}

type MongoEventsRepository struct {
dbClient *mongo.Client
db *mongo.Database
collection *mongo.Collection
}

func NewMongoEventsRepository(dbClient *mongo.Client, database *mongo.Database) (*MongoEventsRepository, error) {
collection := database.Collection(EventsCollection)
if collection == nil {
return nil, fmt.Errorf("could not get collection %s", EventsCollection)
}

return &MongoEventsRepository{
dbClient: dbClient,
db: database,
collection: collection,
}, nil
}

// Inserts a EventsModel into the MongoDB database
func (repo *MongoEventsRepository) Save(ctx context.Context, events *models.EventsModel) (*models.EventsModel, error) {
res, err := repo.collection.InsertOne(ctx, events)
if err != nil {
return nil, fmt.Errorf("could not insert events data: %v, received error: %v", events, err)
}

events.ID = res.InsertedID.(primitive.ObjectID)
return events, nil
}

// Get a EventsModel from the MongoDB database with filters
func (repo *MongoEventsRepository) GetWithEventsFilters(ctx context.Context, filters *bson.M) ([]models.EventsModel, error) {
cursor, err := repo.collection.Find(ctx, filters)
if err != nil {
return nil, fmt.Errorf("could not find in events data with filters %v, received error: %v", filters, err)
}

var modelResults []models.EventsModel

if err = cursor.All(ctx, &modelResults); err != nil {
return nil, err
}

if modelResults == nil {
modelResults = make([]models.EventsModel, 0)
}

return modelResults, nil
}

// Get a EventsModel from the MongoDB database from a Events ID
func (repo *MongoEventsRepository) GetEventsFromId(ctx context.Context, id primitive.ObjectID) (*models.EventsModel, error) {
filter := bson.M{"_id": id}
result := repo.collection.FindOne(ctx, filter)
if result.Err() != nil {
return nil, result.Err()
}

var model models.EventsModel
err := result.Decode(&model)
if err != nil {
return nil, fmt.Errorf("could not decode result into model: %v", err)
}

return &model, nil
}

// Delete a EventsModel from the MongoDB database from a Events ID
func (repo *MongoEventsRepository) DeleteEventsFromId(ctx context.Context, id primitive.ObjectID) error {
filter := bson.M{"_id": id}
_, err := repo.collection.DeleteOne(ctx, filter)
if err != nil {
return err
}

return nil
}

// Updates a EventsModel from the MongoDB database from a Events ID and given Events
func (repo *MongoEventsRepository) UpdateEventsFromId(ctx context.Context, id primitive.ObjectID, events *models.EventsModel) error {
filter := bson.M{"_id": id}
resp := repo.collection.FindOneAndReplace(ctx, filter, events)
if resp.Err() != nil {
return resp.Err()
}
return nil
}
90 changes: 90 additions & 0 deletions cloud_webserver_v2/internal/database/usecase/events_usecase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package usecase

import (
"context"

"github.com/hytech-racing/cloud-webserver-v2/internal/database/repository"
"github.com/hytech-racing/cloud-webserver-v2/internal/models"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)

type EventsUseCase struct {
EventsRepo repository.EventsRepository
}

func NewEventsUseCase(eventsRepo repository.EventsRepository) *EventsUseCase {
return &EventsUseCase{
EventsRepo: eventsRepo,
}
}

func (uc *EventsUseCase) CreateEvents(ctx context.Context, model *models.EventsModel) (*models.EventsModel, error) {
model, err := uc.EventsRepo.Save(ctx, model)
if err != nil {
return nil, err
}
return model, nil
}

func (uc *EventsUseCase) GetEventsByFilters(ctx context.Context, filters *models.EventsModelFilters) ([]models.EventsModel, error) {
bson_filters_m := bson.M{}
bson_or := bson.A{}

if filters.ID != nil {
id, err := primitive.ObjectIDFromHex(filters.ID.Hex())
if err != nil {
return nil, err
}
bson_filters_m["id"] = id
}

if filters.BeforeDate != nil || filters.AfterDate != nil {
dateFilter := bson.M{}
if filters.BeforeDate != nil {
dateFilter["$gte"] = *filters.BeforeDate
}
if filters.AfterDate != nil {
dateFilter["$lte"] = *filters.AfterDate
}
bson_filters_m["date"] = dateFilter
}

if filters.EventType != nil {
bson_filters_m["event_type"] = bson.M{"$regex": primitive.Regex{Pattern: *filters.EventType, Options: "i"}}
}

if len(bson_or) != 0 {
bson_filters_m["$or"] = bson_or
}

result, err := uc.EventsRepo.GetWithEventsFilters(context.TODO(), &bson_filters_m)
if err != nil {
return nil, err
}

return result, nil
}

func (uc *EventsUseCase) GetEventsById(ctx context.Context, id primitive.ObjectID) (*models.EventsModel, error) {
return uc.EventsRepo.GetEventsFromId(ctx, id)
}

func (uc *EventsUseCase) DeleteEventsById(ctx context.Context, id primitive.ObjectID) error {
return uc.EventsRepo.DeleteEventsFromId(ctx, id)
}

func (uc *EventsUseCase) UpdateEventsRun(ctx context.Context, id primitive.ObjectID, model *models.EventsModel) error {
return uc.EventsRepo.UpdateEventsFromId(ctx, id, model)
}
func (uc *EventsUseCase) LogEvent(ctx context.Context, mcapID primitive.ObjectID, filename string, event string, errMsg *string) (*models.EventsModel, error) {
now := time.Now()
model := &models.EventsModel{
McapID: &mcapID,
Event: &event,
Error: errMsg,
CreatedAt: &now,
}
return uc.CreateEvents(ctx, model)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,39 @@ func (uc *VehicleRunUseCase) DeleteVehicleRunById(ctx context.Context, id primit
func (uc *VehicleRunUseCase) UpdateVehicleRun(ctx context.Context, id primitive.ObjectID, model *models.VehicleRunModel) error {
return uc.vechicleRunRepo.UpdateVehicleRunFromId(ctx, id, model)
}

func (uc *VehicleRunUseCase) AddMiscFile(ctx context.Context, vehicleRunID primitive.ObjectID, awsBucket string, fileName string, filePath string) (*models.VehicleRunModel, error) {
vehicleRun, err := uc.vechicleRunRepo.GetVehicleRunFromId(ctx, vehicleRunID)
if err != nil {
return nil, err
}
miscFile := models.FileModel{
AwsBucket: awsBucket,
FilePath: filePath,
FileName: fileName,
}
if vehicleRun.ContentFiles["misc_files"] == nil {
vehicleRun.ContentFiles["misc_files"] = []models.FileModel{}
}
vehicleRun.ContentFiles["misc_files"] = append(vehicleRun.ContentFiles["misc_files"], miscFile)
err = uc.vechicleRunRepo.UpdateVehicleRunFromId(ctx, vehicleRunID, vehicleRun)
if err != nil {
return nil, err
}
return vehicleRun, nil
}

func (uc *VehicleRunUseCase) FileNameExists(ctx context.Context, vehicleRunID primitive.ObjectID, fileName string) (bool, error) {
vehicleRun, err := uc.vechicleRunRepo.GetVehicleRunFromId(ctx, vehicleRunID)
if err != nil {
return true, err
}
if vehicleRun.ContentFiles["misc_files"] != nil {
for _, f := range vehicleRun.ContentFiles["misc_files"] {
if f.FileName == fileName {
return true, nil
}
}
}
return false, nil
}
Loading