diff --git a/importer/csv.go b/importer/csv.go index 1eeedf2..3fe1548 100644 --- a/importer/csv.go +++ b/importer/csv.go @@ -20,6 +20,8 @@ import ( "github.com/rafalmnich/exporter/sink" ) +const dataHourPart = "Data;Hour;" + // CsvImporter is a service for importing data from csv file that is online type CsvImporter struct { db *gorm.DB @@ -107,35 +109,55 @@ func (c *CsvImporter) prepareReading(ctx context.Context, response *http.Respons return nil, xerrors.Errorf("cannot read response body: %w", err) } - reader := csv.NewReader(strings.NewReader(string(body))) - reader.Comma = ';' + parts := splitCSV(body) + readings := make([]*sink.Reading, 0, 100*len(parts)) - records, err := reader.ReadAll() - if err != nil { - return nil, xerrors.Errorf("cannot read csv file: %w", err) - } - - if len(records) == 0 { - return nil, errors.New("empty or wrong reading") - } + for _, part := range parts { + reader := csv.NewReader(strings.NewReader(part)) + reader.Comma = ';' - names := records[0] - readings := make([]*sink.Reading, 0, len(names)*len(records)) + records, err := reader.ReadAll() + if err != nil { + return nil, xerrors.Errorf("cannot read csv file: %w", err) + } - for rowNumber, row := range records { - if rowNumber == 0 { - continue + if len(records) == 0 { + return nil, errors.New("empty or wrong reading") } - rs, err := c.extract(row, ctx, names, tp) - if err == nil { - readings = append(readings, rs...) + names := records[0] + + for rowNumber, row := range records { + if rowNumber == 0 { + continue + } + + rs, err := c.extract(row, ctx, names, tp) + if err == nil { + readings = append(readings, rs...) + } } } return readings, nil } +func splitCSV(body []byte) []string { + parts := strings.Count(string(body), dataHourPart) + if parts == 1 { + return []string{string(body)} + } + + splitted := strings.Split(string(body), dataHourPart) + splitted = splitted[1:] + + for i := range splitted { + splitted[i] = dataHourPart + splitted[i] + } + + return splitted +} + func (c *CsvImporter) extract(row []string, ctx context.Context, names []string, tp sink.Type) ([]*sink.Reading, error) { dateTime := row[0] + " " + row[1] occurred, err := time.Parse("2006-01-02 15:04:05", dateTime) diff --git a/importer/csv_internal_test.go b/importer/csv_internal_test.go new file mode 100644 index 0000000..a72bfdb --- /dev/null +++ b/importer/csv_internal_test.go @@ -0,0 +1,141 @@ +package importer + +import ( + "context" + "errors" + "io/ioutil" + "net/http" + "strings" + "testing" + "time" + + "github.com/rafalmnich/exporter/sink" + "github.com/stretchr/testify/assert" +) + +func Test_prepareReading_readError(t *testing.T) { + c := NewCsvImporter(nil, nil, 0, "") + + resp := &http.Response{ + Body: ioutil.NopCloser(&erroredReaderMock{}), + } + + _, err := c.prepareReading(context.Background(), resp, sink.Input) + assert.Error(t, err) +} + +func Test_prepareReading_MultiCSV(t *testing.T) { + c := NewCsvImporter(nil, nil, 0, "") + + wrongCSV := `Data;Hour;In7;In8;In9; +2019-09-20;00:01:24;0;10;0; +2019-09-20;00:02:24;10;0;0; +Data;Hour;In7;In8;In9;In10; +2019-09-20;00:03:24;10;0;0;1; +2019-09-20;00:04:24;10;0;0;2;` + + resp := &http.Response{ + Body: ioutil.NopCloser(strings.NewReader(wrongCSV)), + } + + readings, err := c.prepareReading(context.Background(), resp, sink.Input) + assert.NoError(t, err) + expected := []*sink.Reading{ + { + Name: "In7", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 1, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 2, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + { + Name: "In10", + Type: 0, + Value: 1, + Occurred: time.Date(2019, 9, 20, 0, 3, 24, 0, time.UTC), + }, + + { + Name: "In7", + Type: 0, + Value: 10, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In8", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In9", + Type: 0, + Value: 0, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + { + Name: "In10", + Type: 0, + Value: 2, + Occurred: time.Date(2019, 9, 20, 0, 4, 24, 0, time.UTC), + }, + } + + assert.Equal(t, expected, readings) +} + +type erroredReaderMock struct { +} + +func (e *erroredReaderMock) Read(p []byte) (n int, err error) { + return 0, errors.New("test error") +} diff --git a/importer/csv_test.go b/importer/csv_test.go index 7e667fa..cbb5cf8 100644 --- a/importer/csv_test.go +++ b/importer/csv_test.go @@ -170,23 +170,6 @@ func TestCsvImporter_Import_WithFetchError(t *testing.T) { assert.Error(t, err) } -func TestCsvImporter_Import_WithResponseError(t *testing.T) { - mock, db := tests.MockGormDB() - - sl := mockDoer([]byte("")) - c := importer.NewCsvImporter(db, sl, 0, "") - now := time.Date(2019, 9, 20, 10, 0, 0, 0, time.UTC) - - mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "iqc"."reading" ORDER BY "iqc"."reading"."id" DESC LIMIT 1`)). - WillReturnRows(sqlmock.NewRows([]string{"id", "name", "type", "value", "occurred"}). - AddRow(1, "In81", 0, 210, now)) - - ctx := context.Background() - ctx = log.WithLogger(ctx, new(mocks.Logger)) - _, err := c.Import(ctx) - assert.Error(t, err) -} - func TestCsvImporter_Import_WithTimeError(t *testing.T) { mock, db := tests.MockGormDB() response := []byte(`Data;Hour;In7;In8;In9; diff --git a/sink/db_test.go b/sink/db_test.go index 01777fe..b6bebaa 100644 --- a/sink/db_test.go +++ b/sink/db_test.go @@ -60,6 +60,72 @@ func TestExporter_Export(t *testing.T) { assert.NoError(t, err) } +func TestExporter_ExportTwoBatches(t *testing.T) { + input := []*sink.Reading{ + reading1, + reading2, + reading3, + } + + mock, db := tests.MockGormDB() + e := sink.NewExporter(db, 2) + + _ = clock.Mock(time.Date(2019, 1, 1, 17, 2, 1, 0, time.UTC)) + defer clock.Restore() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name1', 'name2']), + UNNEST(ARRAY[0, 1]), + UNNEST(ARRAY[20, 150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp, '2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(2, 1)) + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name3']), + UNNEST(ARRAY[1]), + UNNEST(ARRAY[150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(3, 1)) + + err := e.Export(context.TODO(), input) + assert.NoError(t, err) +} + +func TestExporter_ExportTwoBatchesErrored(t *testing.T) { + input := []*sink.Reading{ + reading1, + reading2, + reading3, + } + + mock, db := tests.MockGormDB() + e := sink.NewExporter(db, 2) + + _ = clock.Mock(time.Date(2019, 1, 1, 17, 2, 1, 0, time.UTC)) + defer clock.Restore() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name1', 'name2']), + UNNEST(ARRAY[0, 1]), + UNNEST(ARRAY[20, 150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp, '2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnResult(sqlmock.NewResult(2, 1)) + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO "iqc"."reading" ("name","type","value","occurred") VALUES ( + UNNEST(ARRAY['name3']), + UNNEST(ARRAY[1]), + UNNEST(ARRAY[150]), + UNNEST(ARRAY['2019-01-01 03:02:01'::timestamp]) + ) ON CONFLICT DO NOTHING`)). + WillReturnError(errors.New("test error")) + + err := e.Export(context.TODO(), input) + assert.Error(t, err) +} + func TestExporter_ExportWithSavingLastImport(t *testing.T) { input := []*sink.Reading{ reading1,