Skip to content

Commit 917abea

Browse files
sarangan12xirzec
andauthored
Add Document Key Retriever to Buffered Sender (Azure#13478)
* Add Document Key Retriever to Buffered Sender * Fix Lint Errors * Update sdk/search/search-documents/src/searchIndexingBufferedSender.ts Co-authored-by: Jeff Fisher <[email protected]> * Minor changes * Remove ts-ignore comment Co-authored-by: Jeff Fisher <[email protected]>
1 parent e15c042 commit 917abea

File tree

10 files changed

+57
-16
lines changed

10 files changed

+57
-16
lines changed

sdk/search/search-documents/review/search-documents.api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1653,7 +1653,7 @@ export interface SearchIndexerWarning {
16531653

16541654
// @public
16551655
export class SearchIndexingBufferedSender<T> {
1656-
constructor(client: IndexDocumentsClient<T>, options?: SearchIndexingBufferedSenderOptions);
1656+
constructor(client: IndexDocumentsClient<T>, documentKeyRetriever: (document: T) => string, options?: SearchIndexingBufferedSenderOptions);
16571657
deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise<void>;
16581658
dispose(): Promise<void>;
16591659
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;

sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
GeographyPoint,
66
SearchIndexClient
77
} from "@azure/search-documents";
8-
import { createIndex, WAIT_TIME } from "../../utils/setup";
8+
import { createIndex, documentKeyRetriever, WAIT_TIME } from "../../utils/setup";
99
import { Hotel } from "../../utils/interfaces";
1010
import { delay } from "@azure/core-http";
1111
import * as dotenv from "dotenv";
@@ -65,9 +65,13 @@ export async function main() {
6565
await createIndex(indexClient, TEST_INDEX_NAME);
6666
await delay(WAIT_TIME);
6767

68-
const bufferedClient = new SearchIndexingBufferedSender<Hotel>(searchClient, {
69-
autoFlush: true
70-
});
68+
const bufferedClient = new SearchIndexingBufferedSender<Hotel>(
69+
searchClient,
70+
documentKeyRetriever,
71+
{
72+
autoFlush: true
73+
}
74+
);
7175

7276
bufferedClient.on("batchAdded", (response: any) => {
7377
console.log(`Batch Added Event has been receieved: ${response}`);

sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
SearchIndexClient,
77
DEFAULT_FLUSH_WINDOW
88
} from "@azure/search-documents";
9-
import { createIndex, WAIT_TIME } from "../../utils/setup";
9+
import { createIndex, documentKeyRetriever, WAIT_TIME } from "../../utils/setup";
1010
import { Hotel } from "../../utils/interfaces";
1111
import { delay } from "@azure/core-http";
1212
import * as dotenv from "dotenv";
@@ -39,6 +39,7 @@ export async function main() {
3939

4040
const bufferedClient: SearchIndexingBufferedSender<Hotel> = new SearchIndexingBufferedSender(
4141
searchClient,
42+
documentKeyRetriever,
4243
{
4344
autoFlush: true
4445
}

sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
GeographyPoint,
66
SearchIndexClient
77
} from "@azure/search-documents";
8-
import { createIndex, WAIT_TIME } from "../../utils/setup";
8+
import { createIndex, documentKeyRetriever, WAIT_TIME } from "../../utils/setup";
99
import { Hotel } from "../../utils/interfaces";
1010
import { delay } from "@azure/core-http";
1111
import * as dotenv from "dotenv";
@@ -36,6 +36,7 @@ export async function main() {
3636

3737
const bufferedClient: SearchIndexingBufferedSender<Hotel> = new SearchIndexingBufferedSender(
3838
searchClient,
39+
documentKeyRetriever,
3940
{
4041
autoFlush: false
4142
}

sdk/search/search-documents/samples/typescript/src/utils/setup.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@
22
// Licensed under the MIT license.
33

44
import { SearchIndexClient, SearchIndex, KnownAnalyzerNames } from "@azure/search-documents";
5+
import { Hotel } from "./interfaces";
56

67
export const WAIT_TIME = 4000;
78

9+
export const documentKeyRetriever: (document: Hotel) => string = (document: Hotel): string => {
10+
return document.hotelId;
11+
};
12+
813
// eslint-disable-next-line @azure/azure-sdk/ts-use-interface-parameters
914
export async function createIndex(client: SearchIndexClient, name: string): Promise<void> {
1015
const hotelIndex: SearchIndex = {

sdk/search/search-documents/src/generated/data/searchClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { Documents } from "./operations";
1010
import { SearchClientContext } from "./searchClientContext";
1111
import { SearchClientOptionalParams, ApiVersion20200630 } from "./models";
1212

13-
/** @hidden */
13+
/** @internal */
1414
export class SearchClient extends SearchClientContext {
1515
/**
1616
* Initializes a new instance of the SearchClient class.

sdk/search/search-documents/src/generated/data/searchClientContext.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { ApiVersion20200630, SearchClientOptionalParams } from "./models";
1212
const packageName = "@azure/search-documents";
1313
const packageVersion = "11.1.0-beta.2";
1414

15-
/** @hidden */
15+
/** @internal */
1616
export class SearchClientContext extends coreHttp.ServiceClient {
1717
endpoint: string;
1818
indexName: string;

sdk/search/search-documents/src/generated/service/searchServiceClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
SearchServiceClientGetServiceStatisticsResponse
2525
} from "./models";
2626

27-
/** @hidden */
27+
/** @internal */
2828
export class SearchServiceClient extends SearchServiceClientContext {
2929
/**
3030
* Initializes a new instance of the SearchServiceClient class.

sdk/search/search-documents/src/generated/service/searchServiceClientContext.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
const packageName = "@azure/search-documents";
1616
const packageVersion = "11.1.0-beta.2";
1717

18-
/** @hidden */
18+
/** @internal */
1919
export class SearchServiceClientContext extends coreHttp.ServiceClient {
2020
endpoint: string;
2121
apiVersion: ApiVersion20200630;

sdk/search/search-documents/src/searchIndexingBufferedSender.ts

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ export class SearchIndexingBufferedSender<T> {
103103
* Event emitter/publisher used in the Buffered Sender
104104
*/
105105
private readonly emitter = new EventEmitter();
106+
/**
107+
* Method to retrieve the document key
108+
*/
109+
private documentKeyRetriever: (document: T) => string;
106110

107111
/**
108112
* Creates a new instance of SearchIndexingBufferedSender.
@@ -111,10 +115,15 @@ export class SearchIndexingBufferedSender<T> {
111115
* @param options - Options to modify auto flush.
112116
*
113117
*/
114-
constructor(client: IndexDocumentsClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
118+
constructor(
119+
client: IndexDocumentsClient<T>,
120+
documentKeyRetriever: (document: T) => string,
121+
options: SearchIndexingBufferedSenderOptions = {}
122+
) {
115123
this.client = client;
124+
this.documentKeyRetriever = documentKeyRetriever;
116125
// General Configuration properties
117-
this.autoFlush = options.autoFlush ?? false;
126+
this.autoFlush = options.autoFlush ?? true;
118127
this.initialBatchActionCount = options.initialBatchActionCount ?? DEFAULT_BATCH_SIZE;
119128
this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW;
120129
// Retry specific configuration properties
@@ -377,9 +386,30 @@ export class SearchIndexingBufferedSender<T> {
377386
this.batchObject = new IndexDocumentsBatch<T>();
378387
while (actions.length > 0) {
379388
const actionsToSend = actions.splice(0, this.initialBatchActionCount);
380-
await this.submitDocuments(actionsToSend, options);
389+
const { batchToSubmit, submitLater } = this.pruneActions(actionsToSend);
390+
actions.unshift(...submitLater);
391+
await this.submitDocuments(batchToSubmit, options);
392+
}
393+
}
394+
}
395+
396+
private pruneActions(
397+
batch: IndexDocumentsAction<T>[]
398+
): { batchToSubmit: IndexDocumentsAction<T>[]; submitLater: IndexDocumentsAction<T>[] } {
399+
const hashSet: Set<string> = new Set<string>();
400+
const resultBatch: IndexDocumentsAction<T>[] = [];
401+
const pruned: IndexDocumentsAction<T>[] = [];
402+
403+
for (const document of batch) {
404+
const key = this.documentKeyRetriever((document as unknown) as T);
405+
if (hashSet.has(key)) {
406+
pruned.push(document);
407+
} else {
408+
hashSet.add(key);
409+
resultBatch.push(document);
381410
}
382411
}
412+
return { batchToSubmit: resultBatch, submitLater: pruned };
383413
}
384414

385415
private async submitDocuments(
@@ -398,7 +428,7 @@ export class SearchIndexingBufferedSender<T> {
398428
// raise success event
399429
this.emitter.emit("batchSucceeded", result);
400430
} catch (e) {
401-
if (e.code && e.code === "413" && actionsToSend.length > 1) {
431+
if (e.statusCode && e.statusCode === 413 && actionsToSend.length > 1) {
402432
// Cut the payload size to half
403433
const splitActionsArray = [
404434
actionsToSend.slice(0, actionsToSend.length / 2),
@@ -427,6 +457,6 @@ export class SearchIndexingBufferedSender<T> {
427457
}
428458

429459
private isRetryAbleError(e: any): boolean {
430-
return e.code && (e.code === "422" || e.code === "409" || e.code === "503");
460+
return e.statusCode && (e.statusCode === 422 || e.statusCode === 409 || e.statusCode === 503);
431461
}
432462
}

0 commit comments

Comments
 (0)