Skip to content

Commit 605b1a2

Browse files
committed
feat: add --wait flag to all indexing operations
For some commands, it was already implemented, but not for all commands. Having the option to wait until the (async) operation is done on Algolia's servers makes it possible to run automatic tests for the commands.
1 parent dd23514 commit 605b1a2

File tree

11 files changed

+213
-27
lines changed

11 files changed

+213
-27
lines changed

pkg/cmd/indices/clear/clear.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type ClearOptions struct {
2222

2323
Index string
2424
DoConfirm bool
25+
Wait bool
2526
}
2627

2728
// NewClearCmd creates and returns a clear command for indices
@@ -70,6 +71,7 @@ func NewClearCmd(f *cmdutil.Factory, runF func(*ClearOptions) error) *cobra.Comm
7071
}
7172

7273
cmd.Flags().BoolVarP(&confirm, "confirm", "y", false, "skip confirmation prompt")
74+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
7375

7476
return cmd
7577
}
@@ -94,11 +96,26 @@ func runClearCmd(opts *ClearOptions) error {
9496
return err
9597
}
9698

97-
_, err = client.ClearObjects(client.NewApiClearObjectsRequest(opts.Index))
99+
opts.IO.StartProgressIndicatorWithLabel(
100+
fmt.Sprintf("Deleting all records from index %s", opts.Index),
101+
)
102+
res, err := client.ClearObjects(client.NewApiClearObjectsRequest(opts.Index))
98103
if err != nil {
104+
opts.IO.StopProgressIndicator()
99105
return err
100106
}
101107

108+
if opts.Wait {
109+
opts.IO.UpdateProgressIndicatorLabel("Waiting for the task to complete")
110+
_, err := client.WaitForTask(opts.Index, res.TaskID)
111+
if err != nil {
112+
opts.IO.StopProgressIndicator()
113+
return err
114+
}
115+
}
116+
117+
opts.IO.StopProgressIndicator()
118+
102119
cs := opts.IO.ColorScheme()
103120
if opts.IO.IsStdoutTTY() {
104121
fmt.Fprintf(opts.IO.Out, "%s Cleared index %s\n", cs.SuccessIcon(), opts.Index)

pkg/cmd/indices/delete/delete.go

+25-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type DeleteOptions struct {
2424
Indices []string
2525
DoConfirm bool
2626
IncludeReplicas bool
27+
Wait bool
2728
}
2829

2930
// NewDeleteCmd creates and returns a delete command for indices
@@ -84,6 +85,7 @@ func NewDeleteCmd(f *cmdutil.Factory, runF func(*DeleteOptions) error) *cobra.Co
8485
cmd.Flags().BoolVarP(&confirm, "confirm", "y", false, "skip confirmation prompt")
8586
cmd.Flags().
8687
BoolVarP(&opts.IncludeReplicas, "include-replicas", "r", false, "delete replica indices too")
88+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
8789

8890
return cmd
8991
}
@@ -136,6 +138,7 @@ func runDeleteCmd(opts *DeleteOptions) error {
136138
)
137139
err = detachReplica(index, *settings.Primary, client)
138140
if err != nil {
141+
opts.IO.StopProgressIndicator()
139142
return fmt.Errorf("can't detach index %s: %w", index, err)
140143
}
141144
opts.IO.StopProgressIndicator()
@@ -146,14 +149,26 @@ func runDeleteCmd(opts *DeleteOptions) error {
146149
)
147150
res, err := client.DeleteIndex(client.NewApiDeleteIndexRequest(index))
148151
if err != nil {
152+
opts.IO.StopProgressIndicator()
149153
return fmt.Errorf("can't delete index %s: %w", index, err)
150154
}
151155

156+
if !opts.IncludeReplicas && opts.Wait {
157+
opts.IO.UpdateProgressIndicatorLabel("Waiting for the task to complete")
158+
_, err := client.WaitForTask(index, res.TaskID)
159+
if err != nil {
160+
opts.IO.StopProgressIndicator()
161+
return err
162+
}
163+
}
164+
152165
if opts.IncludeReplicas && len(settings.Replicas) > 0 {
153166
// Wait for primary to be deleted, otherwise deleting replicas might fail
167+
opts.IO.UpdateProgressIndicatorLabel("Waiting for the primary index to be deleted")
154168
_, err := client.WaitForTask(index, res.TaskID)
155169
if err != nil {
156-
return fmt.Errorf("error waiting for index %s to be deleted: %w", index, err)
170+
opts.IO.StopProgressIndicator()
171+
return fmt.Errorf("error while waiting for index %s to be deleted: %w", index, err)
157172
}
158173

159174
for _, replica := range settings.Replicas {
@@ -170,10 +185,18 @@ func runDeleteCmd(opts *DeleteOptions) error {
170185
opts.IO.UpdateProgressIndicatorLabel(
171186
fmt.Sprintf("Deleting replica %s", index),
172187
)
173-
_, err = client.DeleteIndex(client.NewApiDeleteIndexRequest(replica))
188+
res, err = client.DeleteIndex(client.NewApiDeleteIndexRequest(replica))
174189
if err != nil {
190+
opts.IO.StopProgressIndicator()
175191
return fmt.Errorf("can't delete replica %s: %w", replica, err)
176192
}
193+
if opts.Wait {
194+
_, err := client.WaitForTask(replica, res.TaskID)
195+
if err != nil {
196+
opts.IO.StopProgressIndicator()
197+
return err
198+
}
199+
}
177200
}
178201
}
179202
opts.IO.StopProgressIndicator()

pkg/cmd/objects/delete/delete.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,11 @@ func runDeleteCmd(opts *DeleteOptions) error {
204204

205205
// Wait for the tasks to complete
206206
if opts.Wait {
207-
opts.IO.StartProgressIndicatorWithLabel("Waiting for all of the deletion tasks to complete")
207+
opts.IO.StartProgressIndicatorWithLabel("Waiting for all deletion tasks to complete")
208208
for _, taskID := range taskIDs {
209209
_, err := client.WaitForTask(opts.Index, taskID)
210210
if err != nil {
211+
opts.IO.StopProgressIndicator()
211212
return err
212213
}
213214
}
@@ -221,6 +222,7 @@ func runDeleteCmd(opts *DeleteOptions) error {
221222
return nil
222223
}
223224

225+
// deleteByToSearchParams returns a new SearchParamsObject from a DeleteByParams struct
224226
func deleteByToSearchParams(input *search.DeleteByParams) *search.SearchParamsObject {
225227
return &search.SearchParamsObject{
226228
Filters: input.Filters,

pkg/cmd/objects/import/import.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type ImportOptions struct {
2424

2525
Scanner *bufio.Scanner
2626
BatchSize int
27+
Wait bool
2728
}
2829

2930
// NewImportCmd creates and returns an import command for indice object
@@ -75,6 +76,7 @@ func NewImportCmd(f *cmdutil.Factory) *cobra.Command {
7576
_ = cmd.MarkFlagRequired("file")
7677

7778
cmd.Flags().IntVarP(&opts.BatchSize, "batch-size", "b", 1000, "Specify the upload batch size")
79+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
7880
return cmd
7981
}
8082

@@ -102,10 +104,22 @@ func runImportCmd(opts *ImportOptions) error {
102104
count++
103105
}
104106

105-
_, err = client.SaveObjects(opts.Index, records, search.WithBatchSize(opts.BatchSize))
107+
responses, err := client.SaveObjects(opts.Index, records, search.WithBatchSize(opts.BatchSize))
106108
if err != nil {
107109
return err
108110
}
111+
112+
if opts.Wait {
113+
opts.IO.UpdateProgressIndicatorLabel("Waiting for the task to complete")
114+
for _, res := range responses {
115+
_, err := client.WaitForTask(opts.Index, res.TaskID)
116+
if err != nil {
117+
opts.IO.StopProgressIndicator()
118+
return err
119+
}
120+
}
121+
}
122+
109123
opts.IO.UpdateProgressIndicatorLabel(
110124
fmt.Sprintf("Imported %d objects in %v", len(records), time.Since(elapsed)),
111125
)

pkg/cmd/rules/delete/delete.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type DeleteOptions struct {
2525
Index string
2626
RuleIDs []string
2727
ForwardToReplicas bool
28+
Wait bool
2829

2930
DoConfirm bool
3031
}
@@ -82,6 +83,7 @@ func NewDeleteCmd(f *cmdutil.Factory, runF func(*DeleteOptions) error) *cobra.Co
8283
BoolVar(&opts.ForwardToReplicas, "forward-to-replicas", false, "Forward the delete request to the replicas")
8384

8485
cmd.Flags().BoolVarP(&confirm, "confirm", "y", false, "skip confirmation prompt")
86+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
8587

8688
return cmd
8789
}
@@ -121,14 +123,24 @@ func runDeleteCmd(opts *DeleteOptions) error {
121123
}
122124
}
123125

126+
var taskIDs []int64
127+
124128
for _, ruleID := range opts.RuleIDs {
125-
_, err = client.DeleteRule(
129+
res, err := client.DeleteRule(
126130
client.NewApiDeleteRuleRequest(opts.Index, ruleID).
127131
WithForwardToReplicas(opts.ForwardToReplicas),
128132
)
129133
if err != nil {
130-
err = fmt.Errorf("failed to delete rule %s: %w", ruleID, err)
131-
return err
134+
return fmt.Errorf("failed to delete rule %s: %w", ruleID, err)
135+
}
136+
if opts.Wait {
137+
taskIDs = append(taskIDs, res.TaskID)
138+
}
139+
}
140+
141+
if len(taskIDs) > 0 {
142+
for _, taskID := range taskIDs {
143+
_, err := client.WaitForTask(opts.Index, taskID)
132144
}
133145
}
134146

pkg/cmd/rules/import/import.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type ImportOptions struct {
2525
Index string
2626
ForwardToReplicas bool
2727
ClearExistingRules bool
28+
Wait bool
2829
Scanner *bufio.Scanner
2930

3031
DoConfirm bool
@@ -102,6 +103,7 @@ func NewImportCmd(f *cmdutil.Factory, runF func(*ImportOptions) error) *cobra.Co
102103
BoolVarP(&opts.ForwardToReplicas, "forward-to-replicas", "f", true, "Forward the rules to the index replicas")
103104
cmd.Flags().
104105
BoolVarP(&opts.ClearExistingRules, "clear-existing-rules", "c", false, "Clear existing rules before importing new ones")
106+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
105107

106108
return cmd
107109
}
@@ -147,42 +149,75 @@ func runImportCmd(opts *ImportOptions) error {
147149

148150
var rule search.Rule
149151
if err := json.Unmarshal([]byte(line), &rule); err != nil {
150-
err := fmt.Errorf("failed to parse JSON rule on line %d: %s", count, err)
151-
return err
152+
opts.IO.StopProgressIndicator()
153+
return fmt.Errorf("failed to parse JSON rule on line %d: %s", count, err)
152154
}
153155

154156
rules = append(rules, rule)
155157
count++
156158

157159
// If requested, only clear existing rules the first time
158160
if count == batchSize {
159-
_, err := client.SaveRules(
161+
res, err := client.SaveRules(
160162
client.NewApiSaveRulesRequest(opts.Index, rules).
161163
WithClearExistingRules(clearExistingRules).
162164
WithForwardToReplicas(opts.ForwardToReplicas),
163165
)
164166
if err != nil {
167+
opts.IO.StopProgressIndicator()
165168
return err
166169
}
167-
rules = make([]search.Rule, 0, batchSize)
170+
if opts.Wait {
171+
_, err := client.WaitForTask(opts.Index, res.TaskID)
172+
if err != nil {
173+
opts.IO.StopProgressIndicator()
174+
return err
175+
}
176+
}
168177
totalCount += count
169178
opts.IO.UpdateProgressIndicatorLabel(fmt.Sprintf("Imported %d rules", totalCount))
179+
180+
rules = make([]search.Rule, 0, batchSize)
170181
count = 0
171182
clearExistingRules = false
172183
}
173184
}
174185

175186
if count > 0 {
176187
totalCount += count
177-
if _, err := client.SaveRules(client.NewApiSaveRulesRequest(opts.Index, rules).WithForwardToReplicas(opts.ForwardToReplicas)); err != nil {
188+
res, err := client.SaveRules(
189+
client.NewApiSaveRulesRequest(opts.Index, rules).
190+
WithForwardToReplicas(opts.ForwardToReplicas),
191+
)
192+
if err != nil {
193+
opts.IO.StopProgressIndicator()
178194
return err
179195
}
196+
if opts.Wait {
197+
_, err := client.WaitForTask(opts.Index, res.TaskID)
198+
if err != nil {
199+
opts.IO.StopProgressIndicator()
200+
return err
201+
}
202+
}
180203
}
181204
// Clear rules if 0 rules are imported and the clear existing is set
182205
if totalCount == 0 && opts.ClearExistingRules {
183-
if _, err := client.ClearRules(client.NewApiClearRulesRequest(opts.Index).WithForwardToReplicas(opts.ForwardToReplicas)); err != nil {
206+
res, err := client.ClearRules(
207+
client.NewApiClearRulesRequest(opts.Index).
208+
WithForwardToReplicas(opts.ForwardToReplicas),
209+
)
210+
if err != nil {
211+
opts.IO.StopProgressIndicator()
184212
return err
185213
}
214+
if opts.Wait {
215+
_, err := client.WaitForTask(opts.Index, res.TaskID)
216+
if err != nil {
217+
opts.IO.StopProgressIndicator()
218+
return err
219+
}
220+
}
186221
}
187222

188223
opts.IO.StopProgressIndicator()

pkg/cmd/settings/import/import.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type ImportOptions struct {
2323
Index string
2424
Settings search.IndexSettings
2525
ForwardToReplicas bool
26+
Wait bool
2627
}
2728

2829
// NewImportCmd creates and returns an import command for settings
@@ -66,6 +67,7 @@ func NewImportCmd(f *cmdutil.Factory) *cobra.Command {
6667
_ = cmd.MarkFlagRequired("file")
6768
cmd.Flags().
6869
BoolVarP(&opts.ForwardToReplicas, "forward-to-replicas", "f", false, "Forward the settings to the replicas")
70+
cmd.Flags().BoolVarP(&opts.Wait, "wait", "w", false, "wait for the operation to complete")
6971

7072
return cmd
7173
}
@@ -77,15 +79,26 @@ func runImportCmd(opts *ImportOptions) error {
7779
}
7880

7981
opts.IO.StartProgressIndicatorWithLabel(fmt.Sprint("Importing settings to index ", opts.Index))
80-
_, err = client.SetSettings(
82+
res, err := client.SetSettings(
8183
client.NewApiSetSettingsRequest(opts.Index, &opts.Settings).
8284
WithForwardToReplicas(opts.ForwardToReplicas),
8385
)
84-
opts.IO.StopProgressIndicator()
8586
if err != nil {
87+
opts.IO.StopProgressIndicator()
8688
return err
8789
}
8890

91+
if opts.Wait {
92+
opts.IO.UpdateProgressIndicatorLabel("Waiting for the task to complete")
93+
_, err := client.WaitForTask(opts.Index, res.TaskID)
94+
if err != nil {
95+
opts.IO.StopProgressIndicator()
96+
return err
97+
}
98+
}
99+
100+
opts.IO.StopProgressIndicator()
101+
89102
cs := opts.IO.ColorScheme()
90103
if opts.IO.IsStdoutTTY() {
91104
fmt.Fprintf(opts.IO.Out, "%s Imported settings on %v\n", cs.SuccessIcon(), opts.Index)

0 commit comments

Comments
 (0)