From f51a45544c95bbdbfb0c1fb6e48c57ae38df150e Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 13:18:30 +0200 Subject: [PATCH 1/6] panic errors for debugging --- files/walk.go | 1 + workers.go | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/files/walk.go b/files/walk.go index f07c3b7..5eeb20b 100644 --- a/files/walk.go +++ b/files/walk.go @@ -153,6 +153,7 @@ func (provider *Walker) nextBatch(currentFile *os.File) ([]string, error) { for _, f := range files { fileInfo, err := os.Lstat(f) if err != nil { + panic(err) log.Warn(err) continue } diff --git a/workers.go b/workers.go index 7f687c2..a6c23cb 100644 --- a/workers.go +++ b/workers.go @@ -137,11 +137,13 @@ func (s *Datasource) process(job ImportJob) PartialResult { result.Failed++ result.Errors = append(result.Errors, err) if opt.Enabled(s.Options.FailOnErrors) { - log.Warnf(err.Error()) - continue - } else { + panic(err) log.Errorf(err.Error()) break + } else { + panic(err) + log.Warnf(err.Error()) + continue } } } @@ -150,6 +152,7 @@ func (s *Datasource) process(job ImportJob) PartialResult { // Insert remaining err := insert(job.Collection, batch[:batched]) if err != nil { + panic(err) log.Warn(err) result.Errors = append(result.Errors, err) } @@ -196,6 +199,7 @@ func (s *Datasource) process(job ImportJob) PartialResult { // log.Infof("insert into %s:%s", databaseName, collection) err := insert(job.Collection, minibatch) if err != nil { + panic(err) log.Warn(err) result.Errors = append(result.Errors, err) break From a7d1ed8df7728114c5fabde43270917711e6733a Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 13:36:51 +0200 Subject: [PATCH 2/6] Check batch size before insertion --- database.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/database.go b/database.go index 74fdf6a..23cf287 100644 --- a/database.go +++ b/database.go @@ -38,8 +38,11 @@ func (c *MongoConnection) Client() (*mongo.Client, error) { } func insert(collection *mongo.Collection, batch []interface{}) error { - _, err := collection.InsertMany(context.Background(), batch) - return err + if len(batch) > 0 { + _, err := collection.InsertMany(context.Background(), batch) + return err + } + return nil } func emptyCollection(collection *mongo.Collection) error { From 33e9632a669ac2bc4c6507c5e75d88597b4786a2 Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 13:41:26 +0200 Subject: [PATCH 3/6] Add more halts for remote debugging --- workers.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/workers.go b/workers.go index a6c23cb..18a5cb0 100644 --- a/workers.go +++ b/workers.go @@ -38,11 +38,14 @@ func (i *Import) produceJobs(jobChan chan ImportJob) error { Collection: s.Collection, } if err == io.EOF { + panic(err) // No-op (produced all files for this source) break } else if err != nil { partialResult.Errors = append(partialResult.Errors, err) s.result.PartialResults = append(s.result.PartialResults, partialResult) + panic(err) + log.Warn(err) } else { dbName, err := i.sourceDatabaseName(s) if err != nil { From 97bd33796a637fc1fd6e707eae83ad46e2bba5aa Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 14:28:41 +0200 Subject: [PATCH 4/6] Log glob output --- examples/multi/multi.go | 81 ++++++++++++++++++++++------------------- files/glob.go | 46 ++++++++++++++++------- go.sum | 2 - 3 files changed, 77 insertions(+), 52 deletions(-) diff --git a/examples/multi/multi.go b/examples/multi/multi.go index e1fcc83..42030be 100644 --- a/examples/multi/multi.go +++ b/examples/multi/multi.go @@ -31,32 +31,36 @@ func main() { } defer mongoC.Terminate(context.Background()) - isCSVWalkerFunc := func(path string, info os.FileInfo, err error) bool { - return !info.IsDir() && filepath.Ext(path) == ".csv" - } + /* + isCSVWalkerFunc := func(path string, info os.FileInfo, err error) bool { + return !info.IsDir() && filepath.Ext(path) == ".csv" + } - xmlLoader := loaders.DefaultXMLLoader() + xmlLoader := loaders.DefaultXMLLoader() + */ csvLoader := loaders.DefaultCSVLoader() csvLoader.Excel = false datasources := []*mongoimport.Datasource{ - { - Description: "Ford Escort Data", - FileProvider: &files.List{Files: []string{ - filepath.Join(dir, "examples/data/ford_escort.csv"), - filepath.Join(dir, "examples/data/ford_escort2.csv"), - }}, - Options: mongoimport.Options{ - Collection: "ford_escorts", + /* + { + Description: "Ford Escort Data", + FileProvider: &files.List{Files: []string{ + filepath.Join(dir, "examples/data/ford_escort.csv"), + filepath.Join(dir, "examples/data/ford_escort2.csv"), + }}, + Options: mongoimport.Options{ + Collection: "ford_escorts", + }, }, - }, - { - FileProvider: &files.List{Files: []string{ - filepath.Join(dir, "examples/data/hurricanes.csv"), - }}, - Options: mongoimport.Options{ - Collection: "hurricanes", + { + FileProvider: &files.List{Files: []string{ + filepath.Join(dir, "examples/data/hurricanes.csv"), + }}, + Options: mongoimport.Options{ + Collection: "hurricanes", + }, }, - }, + */ { FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*/*nested*.csv")}, Options: mongoimport.Options{ @@ -64,29 +68,32 @@ func main() { IndividualProgress: opt.SetFlag(false), }, }, - { - Description: "XML Data", - FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*.xml")}, - Options: mongoimport.Options{ - Collection: "xmldata", - Loader: loaders.Loader{SpecificLoader: xmlLoader}, - IndividualProgress: opt.SetFlag(false), + /* + { + Description: "XML Data", + FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*.xml")}, + Options: mongoimport.Options{ + Collection: "xmldata", + Loader: loaders.Loader{SpecificLoader: xmlLoader}, + IndividualProgress: opt.SetFlag(false), + }, }, - }, - { - Description: "Walk Data", - FileProvider: &files.Walker{Directory: filepath.Join(dir, "examples/data"), Handler: isCSVWalkerFunc}, - Options: mongoimport.Options{ - Collection: "walked", - IndividualProgress: opt.SetFlag(false), + { + Description: "Walk Data", + FileProvider: &files.Walker{Directory: filepath.Join(dir, "examples/data"), Handler: isCSVWalkerFunc}, + Options: mongoimport.Options{ + Collection: "walked", + IndividualProgress: opt.SetFlag(false), + }, }, - }, + */ } i := mongoimport.Import{ // Allow concurrent processing of at most 2 files with 2 threads - Sources: datasources, - Connection: conn, + MaxParallelism: 1, + Sources: datasources, + Connection: conn, // Global options Options: mongoimport.Options{ IndividualProgress: opt.SetFlag(true), diff --git a/files/glob.go b/files/glob.go index e7434e1..2d74465 100644 --- a/files/glob.go +++ b/files/glob.go @@ -1,6 +1,7 @@ package files import ( + "fmt" "io" "os" "path/filepath" @@ -11,13 +12,14 @@ import ( // Glob ... type Glob struct { - Pattern string - matchCount int - BatchSize int - batchIndex int - matchesChan chan string - file *os.File - dir, pattern string + Pattern string + matchCount int + BatchSize int + batchIndex int + matchesChan chan string + fetchMatchesChan chan string + file *os.File + dir, pattern string } // hasMeta reports whether path contains any of the magic characters @@ -126,7 +128,18 @@ func (provider *Glob) Prepare() error { } provider.matchesChan = make(chan string, provider.BatchSize) go func() { - provider.glob(provider.Pattern, 0, provider.matchesChan) + _, err := provider.glob(provider.Pattern, 0, provider.matchesChan) + if err != nil { + panic(err) + } + /* + for { + x, ok := <-provider.matchesChan + fmt.Println(ok, x) + } + */ + // panic(fmt.Sprintf("Channel content=%s", provider.matchesChan)) + // panic(fmt.Sprintf("Closing with m=%s", m)) close(provider.matchesChan) }() return nil @@ -137,6 +150,7 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string) if _, err := os.Lstat(pattern); err != nil { return nil, nil } + matchesChan <- pattern return []string{pattern}, nil } @@ -149,22 +163,27 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string) } if !hasMeta(dir[volumeLen:]) { - return provider.globDir(dir, file, depth, nil, matchesChan) + matches, err := provider.globDir(dir, file, depth, nil, matchesChan) + return matches, err + // panic(fmt.Sprintf("hasMeta m=%s", matches)) } // Prevent infinite recursion. See issue 15879. if dir == pattern { + panic(pattern) return nil, filepath.ErrBadPattern } m, err := provider.glob(dir, depth+1, matchesChan) if err != nil { + panic(dir) return nil, err } var matches []string for _, d := range m { matches, err := provider.globDir(d, file, depth, matches, matchesChan) if err != nil { + panic(file) return matches, err } } @@ -220,13 +239,13 @@ func (provider *Glob) FetchDirMetadata(updateHandler MetadataUpdateHandler) { // Do not allow zero batches provider.BatchSize = defaulBatchSize } - provider.matchesChan = make(chan string, provider.BatchSize) + provider.fetchMatchesChan = make(chan string, provider.BatchSize) go func() { - provider.glob(provider.Pattern, 0, provider.matchesChan) - close(provider.matchesChan) + provider.glob(provider.Pattern, 0, provider.fetchMatchesChan) + close(provider.fetchMatchesChan) }() for { - file, ok := <-provider.matchesChan + file, ok := <-provider.fetchMatchesChan if !ok { break } @@ -246,6 +265,7 @@ func (provider *Glob) FetchDirMetadata(updateHandler MetadataUpdateHandler) { // NextFile ... func (provider *Glob) NextFile() (string, error) { file, ok := <-provider.matchesChan + fmt.Println(ok, file) if !ok { return "", io.EOF } diff --git a/go.sum b/go.sum index dee9a60..8a63793 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/romnnn/configo v0.1.0 h1:lMP77t48SHy0hckvMHLZNYRMJDWmTSCK8gzacA8qJCU= -github.com/romnnn/configo v0.1.0/go.mod h1:qeVW5OHMEK7tT8z/rSMD9phOR8pCwvp+XR86F78azlY= github.com/romnnn/configo v0.1.1 h1:PYghmDdwdGyBuW6/0SeBFhEoM9r5srpyGU+pD2uxN3s= github.com/romnnn/configo v0.1.1/go.mod h1:qeVW5OHMEK7tT8z/rSMD9phOR8pCwvp+XR86F78azlY= github.com/romnnn/deepequal v0.0.0-20200304153056-ea9233e420e6 h1:7v41wDR0Ntr347K9vYzBRf5T6wLUPcGdHxWoO2rPRQA= From bfa7ff73000eff11b99d953ed290e46cc7d9d450 Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 16:44:48 +0200 Subject: [PATCH 5/6] Debug logs --- examples/multi/multi.go | 6 +++--- files/glob.go | 46 +++++++++++++++++++++++++++++------------ workers.go | 6 ------ 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/examples/multi/multi.go b/examples/multi/multi.go index 42030be..0dd2f12 100644 --- a/examples/multi/multi.go +++ b/examples/multi/multi.go @@ -3,7 +3,6 @@ package main import ( "context" "os" - "path/filepath" opt "github.com/romnnn/configo" "github.com/romnnn/mongoimport" @@ -62,7 +61,8 @@ func main() { }, */ { - FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*/*nested*.csv")}, + // FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*/*nested*.csv")}, + FileProvider: &files.Glob{Pattern: "/home/roman/dev/bpdata/planet/data/abschluss/*.csv"}, Options: mongoimport.Options{ Collection: "globed", IndividualProgress: opt.SetFlag(false), @@ -91,7 +91,7 @@ func main() { i := mongoimport.Import{ // Allow concurrent processing of at most 2 files with 2 threads - MaxParallelism: 1, + MaxParallelism: 200, Sources: datasources, Connection: conn, // Global options diff --git a/files/glob.go b/files/glob.go index 2d74465..950de9c 100644 --- a/files/glob.go +++ b/files/glob.go @@ -1,6 +1,7 @@ package files import ( + "errors" "fmt" "io" "os" @@ -16,6 +17,7 @@ type Glob struct { matchCount int BatchSize int batchIndex int + doneChan chan bool matchesChan chan string fetchMatchesChan chan string file *os.File @@ -128,10 +130,12 @@ func (provider *Glob) Prepare() error { } provider.matchesChan = make(chan string, provider.BatchSize) go func() { - _, err := provider.glob(provider.Pattern, 0, provider.matchesChan) + matches, err := provider.glob(provider.Pattern, 0, provider.matchesChan) if err != nil { panic(err) } + fmt.Printf("Closing with m=%s (length %d)\n", matches, len(matches)) + // panic(fmt.Sprintf("Closing with m=%s (length %d)", matches, len(matches))) /* for { x, ok := <-provider.matchesChan @@ -140,6 +144,7 @@ func (provider *Glob) Prepare() error { */ // panic(fmt.Sprintf("Channel content=%s", provider.matchesChan)) // panic(fmt.Sprintf("Closing with m=%s", m)) + // doneChan close(provider.matchesChan) }() return nil @@ -163,8 +168,11 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string) } if !hasMeta(dir[volumeLen:]) { - matches, err := provider.globDir(dir, file, depth, nil, matchesChan) - return matches, err + matches, errs := provider.globDir(dir, file, depth, nil, matchesChan) + if len(errs) > 0 { + return matches, errors.New("There were errors") + } + return matches, nil // panic(fmt.Sprintf("hasMeta m=%s", matches)) } @@ -181,26 +189,27 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string) } var matches []string for _, d := range m { - matches, err := provider.globDir(d, file, depth, matches, matchesChan) - if err != nil { + matches, errs := provider.globDir(d, file, depth, matches, matchesChan) + if len(errs) > 0 { panic(file) - return matches, err + return matches, errors.New("there were errors") } } return matches, nil } -func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, matchesChan chan<- string) ([]string, error) { +func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, matchesChan chan<- string) ([]string, []error) { + var errs []error fi, err := os.Stat(dir) if err != nil { - return matches, err + return matches, []error{err} } if !fi.IsDir() { - return matches, err + return matches, []error{err} } d, err := os.Open(dir) if err != nil { - return matches, err + return matches, []error{err} } defer d.Close() @@ -215,7 +224,8 @@ func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, for _, n := range names { matched, err := filepath.Match(pattern, n) if err != nil { - return matches, err + errs = append(errs, err) + continue } if matched { match := filepath.Join(dir, n) @@ -226,7 +236,7 @@ func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, } } } - return matches, nil + return matches, errs } // FetchDirMetadata ... @@ -264,10 +274,20 @@ func (provider *Glob) FetchDirMetadata(updateHandler MetadataUpdateHandler) { // NextFile ... func (provider *Glob) NextFile() (string, error) { + /* + select { + case file := <-provider.matchesChan: + return file, nil + case <-provider.done: + return "", io.EOF + } + */ + file, ok := <-provider.matchesChan - fmt.Println(ok, file) + fmt.Println(ok, file, len(provider.matchesChan)) if !ok { return "", io.EOF } return file, nil + } diff --git a/workers.go b/workers.go index 18a5cb0..ddbb5f0 100644 --- a/workers.go +++ b/workers.go @@ -38,13 +38,11 @@ func (i *Import) produceJobs(jobChan chan ImportJob) error { Collection: s.Collection, } if err == io.EOF { - panic(err) // No-op (produced all files for this source) break } else if err != nil { partialResult.Errors = append(partialResult.Errors, err) s.result.PartialResults = append(s.result.PartialResults, partialResult) - panic(err) log.Warn(err) } else { dbName, err := i.sourceDatabaseName(s) @@ -140,11 +138,9 @@ func (s *Datasource) process(job ImportJob) PartialResult { result.Failed++ result.Errors = append(result.Errors, err) if opt.Enabled(s.Options.FailOnErrors) { - panic(err) log.Errorf(err.Error()) break } else { - panic(err) log.Warnf(err.Error()) continue } @@ -155,7 +151,6 @@ func (s *Datasource) process(job ImportJob) PartialResult { // Insert remaining err := insert(job.Collection, batch[:batched]) if err != nil { - panic(err) log.Warn(err) result.Errors = append(result.Errors, err) } @@ -202,7 +197,6 @@ func (s *Datasource) process(job ImportJob) PartialResult { // log.Infof("insert into %s:%s", databaseName, collection) err := insert(job.Collection, minibatch) if err != nil { - panic(err) log.Warn(err) result.Errors = append(result.Errors, err) break From 98343598301027b6e9dcc4697097fc817f16b155 Mon Sep 17 00:00:00 2001 From: romnnn Date: Fri, 3 Apr 2020 19:49:13 +0200 Subject: [PATCH 6/6] Remove panics --- files/glob.go | 32 +------------------------------- files/walk.go | 1 - 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/files/glob.go b/files/glob.go index 950de9c..3dd98cd 100644 --- a/files/glob.go +++ b/files/glob.go @@ -2,7 +2,6 @@ package files import ( "errors" - "fmt" "io" "os" "path/filepath" @@ -17,7 +16,6 @@ type Glob struct { matchCount int BatchSize int batchIndex int - doneChan chan bool matchesChan chan string fetchMatchesChan chan string file *os.File @@ -130,21 +128,7 @@ func (provider *Glob) Prepare() error { } provider.matchesChan = make(chan string, provider.BatchSize) go func() { - matches, err := provider.glob(provider.Pattern, 0, provider.matchesChan) - if err != nil { - panic(err) - } - fmt.Printf("Closing with m=%s (length %d)\n", matches, len(matches)) - // panic(fmt.Sprintf("Closing with m=%s (length %d)", matches, len(matches))) - /* - for { - x, ok := <-provider.matchesChan - fmt.Println(ok, x) - } - */ - // panic(fmt.Sprintf("Channel content=%s", provider.matchesChan)) - // panic(fmt.Sprintf("Closing with m=%s", m)) - // doneChan + provider.glob(provider.Pattern, 0, provider.matchesChan) close(provider.matchesChan) }() return nil @@ -173,25 +157,21 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string) return matches, errors.New("There were errors") } return matches, nil - // panic(fmt.Sprintf("hasMeta m=%s", matches)) } // Prevent infinite recursion. See issue 15879. if dir == pattern { - panic(pattern) return nil, filepath.ErrBadPattern } m, err := provider.glob(dir, depth+1, matchesChan) if err != nil { - panic(dir) return nil, err } var matches []string for _, d := range m { matches, errs := provider.globDir(d, file, depth, matches, matchesChan) if len(errs) > 0 { - panic(file) return matches, errors.New("there were errors") } } @@ -274,17 +254,7 @@ func (provider *Glob) FetchDirMetadata(updateHandler MetadataUpdateHandler) { // NextFile ... func (provider *Glob) NextFile() (string, error) { - /* - select { - case file := <-provider.matchesChan: - return file, nil - case <-provider.done: - return "", io.EOF - } - */ - file, ok := <-provider.matchesChan - fmt.Println(ok, file, len(provider.matchesChan)) if !ok { return "", io.EOF } diff --git a/files/walk.go b/files/walk.go index 5eeb20b..f07c3b7 100644 --- a/files/walk.go +++ b/files/walk.go @@ -153,7 +153,6 @@ func (provider *Walker) nextBatch(currentFile *os.File) ([]string, error) { for _, f := range files { fileInfo, err := os.Lstat(f) if err != nil { - panic(err) log.Warn(err) continue }