-
Notifications
You must be signed in to change notification settings - Fork 879
/
Copy pathserverless.integration.test.ts
78 lines (64 loc) · 2.44 KB
/
serverless.integration.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Copyright 2016-2025, Pulumi Corporation. All rights reserved.
import * as athena from "athena-client";
import { resolve } from "path";
import { PulumiRunner } from "../testing/integration";
jest.setTimeout(360000);
let runner: PulumiRunner;
const region = "us-west-2";
beforeAll(async () => {
const config: { [key: string]: string } = {
"aws:region": region,
"aws-ts-serverless-datawarehouse:dev": "true",
};
const pulumiProjDir = resolve("./");
runner = new PulumiRunner(config, pulumiProjDir);
const setupResult = await runner.setup();
if (!setupResult.success) {
throw new Error(`Pulumi setup failed, aborting: ${setupResult.error}`);
}
});
afterAll(async () => {
const teardownResult = await runner.teardown();
if (!teardownResult.success) {
throw new Error(`Pulumi teardown failed. Test stack has leaked: ${teardownResult.error}`);
}
});
test("WithStreamingInput integrtion test", async () => {
expect(runner.getStackOutputKeys().length).toBe(6);
const db = runner.getStackOutput("databaseName");
const clickTable = runner.getStackOutput("clickTableName");
const impressionTable = runner.getStackOutput("impressionTableName");
const bucket = runner.getStackOutput("athenaResultsBucket");
const clickPromise = verifyRecordsInTable(db, clickTable, bucket);
const impressionPromise = verifyRecordsInTable(db, impressionTable, bucket);
const [clickTableHasRecords, impressionTableHasRecords] = await Promise.all([clickPromise, impressionPromise]);
expect(clickTableHasRecords).toBe(true);
expect(impressionTableHasRecords).toBe(true);
});
const verifyRecordsInTable = async (db: string, table: string, bucket: string) => {
const bucketUri = `s3://${bucket}`;
const clientConfig = {
bucketUri,
};
const awsConfig = {
region,
};
const athenaClient = athena.createClient(clientConfig, awsConfig);
let didFindResults = false;
const query = `select * from ${db}.${table} limit 10;`;
console.log(query);
let retry = 0;
while (retry < 5) {
const result = await athenaClient.execute(query).toPromise();
console.log(JSON.stringify(result));
if (result.records.length > 0) {
didFindResults = true;
break;
}
else {
retry++;
await new Promise(resolve => setTimeout(resolve, 30000));
}
}
return didFindResults;
};