Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ func (c *MongoConnection) Client() (*mongo.Client, error) {
}

func insert(collection *mongo.Collection, batch []interface{}) error {
_, err := collection.InsertMany(context.Background(), batch)
return err
if len(batch) > 0 {
_, err := collection.InsertMany(context.Background(), batch)
return err
}
return nil
}

func emptyCollection(collection *mongo.Collection) error {
Expand Down
85 changes: 46 additions & 39 deletions examples/multi/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"os"
"path/filepath"

opt "github.com/romnnn/configo"
"github.com/romnnn/mongoimport"
Expand Down Expand Up @@ -31,62 +30,70 @@ func main() {
}
defer mongoC.Terminate(context.Background())

isCSVWalkerFunc := func(path string, info os.FileInfo, err error) bool {
return !info.IsDir() && filepath.Ext(path) == ".csv"
}
/*
isCSVWalkerFunc := func(path string, info os.FileInfo, err error) bool {
return !info.IsDir() && filepath.Ext(path) == ".csv"
}

xmlLoader := loaders.DefaultXMLLoader()
xmlLoader := loaders.DefaultXMLLoader()
*/
csvLoader := loaders.DefaultCSVLoader()
csvLoader.Excel = false
datasources := []*mongoimport.Datasource{
{
Description: "Ford Escort Data",
FileProvider: &files.List{Files: []string{
filepath.Join(dir, "examples/data/ford_escort.csv"),
filepath.Join(dir, "examples/data/ford_escort2.csv"),
}},
Options: mongoimport.Options{
Collection: "ford_escorts",
/*
{
Description: "Ford Escort Data",
FileProvider: &files.List{Files: []string{
filepath.Join(dir, "examples/data/ford_escort.csv"),
filepath.Join(dir, "examples/data/ford_escort2.csv"),
}},
Options: mongoimport.Options{
Collection: "ford_escorts",
},
},
},
{
FileProvider: &files.List{Files: []string{
filepath.Join(dir, "examples/data/hurricanes.csv"),
}},
Options: mongoimport.Options{
Collection: "hurricanes",
{
FileProvider: &files.List{Files: []string{
filepath.Join(dir, "examples/data/hurricanes.csv"),
}},
Options: mongoimport.Options{
Collection: "hurricanes",
},
},
},
*/
{
FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*/*nested*.csv")},
// FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*/*nested*.csv")},
FileProvider: &files.Glob{Pattern: "/home/roman/dev/bpdata/planet/data/abschluss/*.csv"},
Options: mongoimport.Options{
Collection: "globed",
IndividualProgress: opt.SetFlag(false),
},
},
{
Description: "XML Data",
FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*.xml")},
Options: mongoimport.Options{
Collection: "xmldata",
Loader: loaders.Loader{SpecificLoader: xmlLoader},
IndividualProgress: opt.SetFlag(false),
/*
{
Description: "XML Data",
FileProvider: &files.Glob{Pattern: filepath.Join(dir, "examples/data/*.xml")},
Options: mongoimport.Options{
Collection: "xmldata",
Loader: loaders.Loader{SpecificLoader: xmlLoader},
IndividualProgress: opt.SetFlag(false),
},
},
},
{
Description: "Walk Data",
FileProvider: &files.Walker{Directory: filepath.Join(dir, "examples/data"), Handler: isCSVWalkerFunc},
Options: mongoimport.Options{
Collection: "walked",
IndividualProgress: opt.SetFlag(false),
{
Description: "Walk Data",
FileProvider: &files.Walker{Directory: filepath.Join(dir, "examples/data"), Handler: isCSVWalkerFunc},
Options: mongoimport.Options{
Collection: "walked",
IndividualProgress: opt.SetFlag(false),
},
},
},
*/
}

i := mongoimport.Import{
// Allow concurrent processing of at most 2 files with 2 threads
Sources: datasources,
Connection: conn,
MaxParallelism: 200,
Sources: datasources,
Connection: conn,
// Global options
Options: mongoimport.Options{
IndividualProgress: opt.SetFlag(true),
Expand Down
52 changes: 31 additions & 21 deletions files/glob.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package files

import (
"errors"
"io"
"os"
"path/filepath"
Expand All @@ -11,13 +12,14 @@ import (

// Glob ...
type Glob struct {
Pattern string
matchCount int
BatchSize int
batchIndex int
matchesChan chan string
file *os.File
dir, pattern string
Pattern string
matchCount int
BatchSize int
batchIndex int
matchesChan chan string
fetchMatchesChan chan string
file *os.File
dir, pattern string
}

// hasMeta reports whether path contains any of the magic characters
Expand Down Expand Up @@ -137,6 +139,7 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string)
if _, err := os.Lstat(pattern); err != nil {
return nil, nil
}
matchesChan <- pattern
return []string{pattern}, nil
}

Expand All @@ -149,7 +152,11 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string)
}

if !hasMeta(dir[volumeLen:]) {
return provider.globDir(dir, file, depth, nil, matchesChan)
matches, errs := provider.globDir(dir, file, depth, nil, matchesChan)
if len(errs) > 0 {
return matches, errors.New("There were errors")
}
return matches, nil
}

// Prevent infinite recursion. See issue 15879.
Expand All @@ -163,25 +170,26 @@ func (provider *Glob) glob(pattern string, depth int, matchesChan chan<- string)
}
var matches []string
for _, d := range m {
matches, err := provider.globDir(d, file, depth, matches, matchesChan)
if err != nil {
return matches, err
matches, errs := provider.globDir(d, file, depth, matches, matchesChan)
if len(errs) > 0 {
return matches, errors.New("there were errors")
}
}
return matches, nil
}

func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, matchesChan chan<- string) ([]string, error) {
func (provider *Glob) globDir(dir, pattern string, depth int, matches []string, matchesChan chan<- string) ([]string, []error) {
var errs []error
fi, err := os.Stat(dir)
if err != nil {
return matches, err
return matches, []error{err}
}
if !fi.IsDir() {
return matches, err
return matches, []error{err}
}
d, err := os.Open(dir)
if err != nil {
return matches, err
return matches, []error{err}
}
defer d.Close()

Expand All @@ -196,7 +204,8 @@ func (provider *Glob) globDir(dir, pattern string, depth int, matches []string,
for _, n := range names {
matched, err := filepath.Match(pattern, n)
if err != nil {
return matches, err
errs = append(errs, err)
continue
}
if matched {
match := filepath.Join(dir, n)
Expand All @@ -207,7 +216,7 @@ func (provider *Glob) globDir(dir, pattern string, depth int, matches []string,
}
}
}
return matches, nil
return matches, errs
}

// FetchDirMetadata ...
Expand All @@ -220,13 +229,13 @@ func (provider *Glob) FetchDirMetadata(updateHandler MetadataUpdateHandler) {
// Do not allow zero batches
provider.BatchSize = defaulBatchSize
}
provider.matchesChan = make(chan string, provider.BatchSize)
provider.fetchMatchesChan = make(chan string, provider.BatchSize)
go func() {
provider.glob(provider.Pattern, 0, provider.matchesChan)
close(provider.matchesChan)
provider.glob(provider.Pattern, 0, provider.fetchMatchesChan)
close(provider.fetchMatchesChan)
}()
for {
file, ok := <-provider.matchesChan
file, ok := <-provider.fetchMatchesChan
if !ok {
break
}
Expand All @@ -250,4 +259,5 @@ func (provider *Glob) NextFile() (string, error) {
return "", io.EOF
}
return file, nil

}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/romnnn/configo v0.1.0 h1:lMP77t48SHy0hckvMHLZNYRMJDWmTSCK8gzacA8qJCU=
github.com/romnnn/configo v0.1.0/go.mod h1:qeVW5OHMEK7tT8z/rSMD9phOR8pCwvp+XR86F78azlY=
github.com/romnnn/configo v0.1.1 h1:PYghmDdwdGyBuW6/0SeBFhEoM9r5srpyGU+pD2uxN3s=
github.com/romnnn/configo v0.1.1/go.mod h1:qeVW5OHMEK7tT8z/rSMD9phOR8pCwvp+XR86F78azlY=
github.com/romnnn/deepequal v0.0.0-20200304153056-ea9233e420e6 h1:7v41wDR0Ntr347K9vYzBRf5T6wLUPcGdHxWoO2rPRQA=
Expand Down
7 changes: 4 additions & 3 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (i *Import) produceJobs(jobChan chan ImportJob) error {
} else if err != nil {
partialResult.Errors = append(partialResult.Errors, err)
s.result.PartialResults = append(s.result.PartialResults, partialResult)
log.Warn(err)
} else {
dbName, err := i.sourceDatabaseName(s)
if err != nil {
Expand Down Expand Up @@ -137,11 +138,11 @@ func (s *Datasource) process(job ImportJob) PartialResult {
result.Failed++
result.Errors = append(result.Errors, err)
if opt.Enabled(s.Options.FailOnErrors) {
log.Warnf(err.Error())
continue
} else {
log.Errorf(err.Error())
break
} else {
log.Warnf(err.Error())
continue
}
}
}
Expand Down