diff --git a/cmd/exporter/export.go b/cmd/exporter/export.go index 027b240..42b59bc 100644 --- a/cmd/exporter/export.go +++ b/cmd/exporter/export.go @@ -28,7 +28,7 @@ func run(c *cli.Context) error { go stats.RuntimeFromContext(ctx, stats.DefaultRuntimeInterval) s := &http.Client{ - Timeout: 2 * time.Second, + Timeout: 30 * time.Second, } db, err := getDb(c.String(flagDBUri)) @@ -59,7 +59,8 @@ func run(c *cli.Context) error { go func() { if err := anyError(errs); err != nil { - log.Fatal(ctx, err.Error()) + log.Error(ctx, err.Error()) + time.Sleep(c.Duration(flagImportPeriod)) } }() diff --git a/importer/csv.go b/importer/csv.go index cc6b331..93a5ed6 100644 --- a/importer/csv.go +++ b/importer/csv.go @@ -20,6 +20,8 @@ import ( "github.com/rafalmnich/exporter/sink" ) +const dataHourPart = "Data;Hour;" + // CsvImporter is a service for importing data from csv file that is online type CsvImporter struct { db *gorm.DB @@ -35,7 +37,22 @@ func NewCsvImporter(db *gorm.DB, doer sling.Doer, startOffset time.Duration, bas // Import imports data (inputs and outputs) from given mass func (c *CsvImporter) Import(ctx context.Context) ([]*sink.Reading, error) { - return c.getNewReadings(ctx, c.getLastSync(), sink.Input) + inputs, err := c.getNewReadings(ctx, c.getLastSync(), sink.Input) + if err != nil { + return nil, xerrors.Errorf("cannot get new input readings: %w", err) + } + + outputs, err := c.getNewReadings(ctx, c.getLastSync(), sink.Output) + if err != nil { + return nil, xerrors.Errorf("cannot get new output readings: %w", err) + } + + totalLen := len(inputs) + len(outputs) + data := make([]*sink.Reading, 0, totalLen) + data = append(data, inputs...) + data = append(data, outputs...) + + return data, nil } func (c *CsvImporter) getLastSync() *sink.Import { @@ -50,7 +67,7 @@ func (c *CsvImporter) getLastSync() *sink.Import { } func (c *CsvImporter) getNewReadings(ctx context.Context, reading *sink.Import, tp sink.Type) ([]*sink.Reading, error) { - uri := c.baseUri + c.fileName(reading) + uri := c.baseUri + c.fileName(reading, tp) var response *http.Response request, err := sling. @@ -59,12 +76,12 @@ func (c *CsvImporter) getNewReadings(ctx context.Context, reading *sink.Import, Request() if err != nil { - return nil, xerrors.Errorf(": %w", err) + return nil, xerrors.Errorf("error creating request: %w", err) } response, err = c.doer.Do(request) if err != nil { - return nil, xerrors.Errorf(": %w", err) + return nil, xerrors.Errorf("error requesting uri: %w", err) } if response.StatusCode != http.StatusOK { return nil, xerrors.New("Couldn't read from source: " + uri) @@ -73,57 +90,80 @@ func (c *CsvImporter) getNewReadings(ctx context.Context, reading *sink.Import, return c.prepareReading(ctx, response, tp) } -func (c *CsvImporter) fileName(lastImport *sink.Import) string { +func (c *CsvImporter) fileName(lastImport *sink.Import, tp sink.Type) string { nextImportDate := c.nextImportDate(lastImport) dir := nextImportDate.Format("200601") date := nextImportDate.Format("20060102") - return fmt.Sprintf("/logs/%s/i_%s.csv", dir, date) + if tp == sink.Input { + return fmt.Sprintf("/logs/%s/i_%s.csv", dir, date) + } + + return fmt.Sprintf("/logs/%s/o_%s.csv", dir, date) } func (c *CsvImporter) prepareReading(ctx context.Context, response *http.Response, tp sink.Type) ([]*sink.Reading, error) { body, err := ioutil.ReadAll(response.Body) if err != nil { - return nil, xerrors.Errorf(": %w", err) + return nil, xerrors.Errorf("cannot read response body: %w", err) } - reader := csv.NewReader(strings.NewReader(string(body))) - reader.Comma = ';' + parts := parseCSV(body) + readings := make([]*sink.Reading, 0, 100*len(parts)) - records, err := reader.ReadAll() - if err != nil { - return nil, xerrors.Errorf(": %w", err) - } - - if len(records) == 0 { - return nil, errors.New("empty or wrong reading") - } + for _, part := range parts { + reader := csv.NewReader(strings.NewReader(part)) + reader.Comma = ';' - names := records[0] - readings := make([]*sink.Reading, 0, len(names)*len(records)) - - for rowNumber, row := range records { - if rowNumber == 0 { - continue + records, err := reader.ReadAll() + if err != nil { + return nil, xerrors.Errorf("cannot read csv file: %w", err) } - rs, err := c.extract(row, ctx, names, tp) - if err == nil { - readings = append(readings, rs...) + if len(records) == 0 { + return nil, errors.New("empty or wrong reading") } + names := records[0] + + for rowNumber, row := range records { + if rowNumber == 0 { + continue + } + + rs, err := c.extract(row, ctx, names, tp) + if err == nil { + readings = append(readings, rs...) + } + } } return readings, nil } +func parseCSV(body []byte) []string { + parts := strings.Count(string(body), dataHourPart) + if parts == 1 { + return []string{string(body)} + } + + splitted := strings.Split(string(body), dataHourPart) + splitted = splitted[1:] + + for i := range splitted { + splitted[i] = dataHourPart + splitted[i] + } + + return splitted +} + func (c *CsvImporter) extract(row []string, ctx context.Context, names []string, tp sink.Type) ([]*sink.Reading, error) { dateTime := row[0] + " " + row[1] occurred, err := time.Parse("2006-01-02 15:04:05", dateTime) if err != nil { log.Error(ctx, "Cannot parse time: "+dateTime) - return nil, xerrors.Errorf(": %w", err) + return nil, xerrors.Errorf("cannot parse time: %w", err) } readings := make([]*sink.Reading, 0, len(row)) diff --git a/importer/csv_internal_test.go b/importer/csv_internal_test.go new file mode 100644 index 0000000..a72bfdb --- /dev/null +++ b/importer/csv_internal_test.go @@ -0,0 +1,141 @@ +package importer + +import ( + "context" + "errors" + "io/ioutil" + "net/http" + "strings" + "testing" + "time" + + "github.com/rafalmnich/exporter/sink" + "github.com/stretchr/testify/assert" +) + +func Test_prepareReading_readError(t *testing.T) { + c := NewCsvImporter(nil, nil, 0, "") + + resp := &http.Response{ + Body: ioutil.NopCloser(&erroredReaderMock{}), + } + + _, err := c.prepareReading(context.Background(), resp, sink.Input) + assert.Error(t, err) +} + +func Test_prepareReading_MultiCSV(t *testing.T) { + c := NewCsvImporter(nil, nil, 0, "") + + wrongCSV := `Data;Hour;In7;In8;In9; +2019-09-20;00:01:24;0;10;0; +2019-09-20;00:02:24;10;0;0; +Data;Hour;In7;In8;In9;In10; +2019-09-20;00:03:24;10;0;0;1; +2019-09-20;00:04:24;10;0;0;2;` + + resp := &http.Response{ + Body: ioutil.NopCloser(strings.NewReader(wrongCSV)), + } + + readings, err := c.prepareReading(context.Background(), resp, sink.Input) + assert.NoError(t, err) + expected := []*sink.Reading{ + { + Name: "In7", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In10", + Type: 0, + Value: 1, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In10", + Type: 0, + Value: 2, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + } + + assert.Equal(t, expected, readings) +} + +type erroredReaderMock struct { +} + +func (e *erroredReaderMock) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} diff --git a/importer/csv_test.go b/importer/csv_test.go index 7e667fa..cbb5cf8 100644 --- a/importer/csv_test.go +++ b/importer/csv_test.go @@ -170,23 +170,6 @@ func TestCsvImporter_Import_WithFetchError(t *testing.T) { assert.Error(t, err) } -func TestCsvImporter_Import_WithResponseError(t *testing.T) { - mock, db := tests.MockGormDB() - - sl := mockDoer([]byte("")) - c := importer.NewCsvImporter(db, sl, 0, "") - now := time.Date(2019, 9, 20, 10, 0, 0, 0, time.UTC) - - mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "iqc"."reading" ORDER BY "iqc"."reading"."id" DESC LIMIT 1`)). - WillReturnRows(sqlmock.NewRows([]string{"id", "name", "type", "value", "occurred"}). - AddRow(1, "In81", 0, 210, now)) - - ctx := context.Background() - ctx = log.WithLogger(ctx, new(mocks.Logger)) - _, err := c.Import(ctx) - assert.Error(t, err) -} - func TestCsvImporter_Import_WithTimeError(t *testing.T) { mock, db := tests.MockGormDB() response := []byte(`Data;Hour;In7;In8;In9; diff --git a/sink/db.go b/sink/db.go index c630bd3..053d07a 100644 --- a/sink/db.go +++ b/sink/db.go @@ -107,10 +107,6 @@ func prepareOccurredArray(readings []*Reading) string { return prepareArray(readings, occuredFunc, timestampGlue) } -func addTimestamp(s string) string { - return s + "::timestamp" -} - func (e *Exporter) updateImported(occurred time.Time) error { readingMorning := getMorning(occurred) todayMorning := getMorning(clock.Now()) diff --git a/sink/db_test.go b/sink/db_test.go index 01777fe..b6bebaa 100644 --- a/sink/db_test.go +++ b/sink/db_test.go @@ -60,6 +60,72 @@ func TestExporter_Export(t *testing.T) { assert.NoError(t, err) } +func TestExporter_ExportTwoBatches(t *testing.T) { + input := []*sink.Reading{ + reading1, + reading2, + reading3, + } + + mock, db := tests.MockGormDB() + e := sink.NewExporter(db, 2) + + _ = clock.Mock(time.Date(2019, 1, 1, 17, 2, 1, 0, time.UTC)) + defer clock.Restore() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name1', 'name2']), + UNNEST(ARRAY[0, 1]), + UNNEST(ARRAY[20, 150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp, '2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(2, 1)) + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name3']), + UNNEST(ARRAY[1]), + UNNEST(ARRAY[150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(3, 1)) + + err := e.Export(context.TODO(), input) + assert.NoError(t, err) +} + +func TestExporter_ExportTwoBatchesErrored(t *testing.T) { + input := []*sink.Reading{ + reading1, + reading2, + reading3, + } + + mock, db := tests.MockGormDB() + e := sink.NewExporter(db, 2) + + _ = clock.Mock(time.Date(2019, 1, 1, 17, 2, 1, 0, time.UTC)) + defer clock.Restore() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name1', 'name2']), + UNNEST(ARRAY[0, 1]), + UNNEST(ARRAY[20, 150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp, '2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(2, 1)) + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name3']), + UNNEST(ARRAY[1]), + UNNEST(ARRAY[150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnError(errors.New("test error")) + + err := e.Export(context.TODO(), input) + assert.Error(t, err) +} + func TestExporter_ExportWithSavingLastImport(t *testing.T) { input := []*sink.Reading{ reading1,