From 89507922ee603a337b2cf2601340bd5ab4c5e692 Mon Sep 17 00:00:00 2001 From: Star Date: Thu, 3 Nov 2022 12:43:47 +0800 Subject: [PATCH] fix(ext-drievr-bq): Fix pr suggestion, including typo, document layout and testing --- packages/doc/docs/connectors.mdx | 2 +- packages/doc/docs/connectors/bigquery.mdx | 62 ++++++----- packages/extension-driver-bq/README.md | 4 +- packages/extension-driver-bq/package.json | 4 +- packages/extension-driver-bq/project.json | 25 ++++- .../src/lib/bqDataSource.ts | 20 ++-- .../test/bqDataSource.spec.ts | 102 ++---------------- packages/extension-driver-bq/test/bqServer.ts | 2 +- 8 files changed, 77 insertions(+), 144 deletions(-) diff --git a/packages/doc/docs/connectors.mdx b/packages/doc/docs/connectors.mdx index 766cc8a66..6ccb2e9b7 100644 --- a/packages/doc/docs/connectors.mdx +++ b/packages/doc/docs/connectors.mdx @@ -7,7 +7,7 @@ We support the following data warehouses to connect with, you can choose multipl | [PostgreSQL](./connectors/postgresql) | ✅ Yes | ✅ Yes | ❌ No | | [DuckDB](./connectors/duckdb) | ✅ Yes | ✅ Yes | ❌ No | | [Snowflake](./connectors/snowflake) | ✅ Yes | ✅ Yes | ❌ No | -| BigQuery | ✅ Yes | ✅ Yes | ❌ No | +| [BigQuery](./connectors/bigquery) | ✅ Yes | ✅ Yes | ❌ No | \* Fetching rows only when we need them, it has better performance with large query results. diff --git a/packages/doc/docs/connectors/bigquery.mdx b/packages/doc/docs/connectors/bigquery.mdx index 205b5c075..dcb9af389 100644 --- a/packages/doc/docs/connectors/bigquery.mdx +++ b/packages/doc/docs/connectors/bigquery.mdx @@ -27,34 +27,42 @@ Connect with your bigquery servers via the official [Node.js Driver](https://clo 3. Create a new profile in `profiles.yaml` or in your profile files. For example: :::info You can choose one from `keyFilename` or `credentials` to use. - For details, please refer to [here](https://cloud.google.com/docs/authentication#service-accounts) + + Your service account must have the following permissions to successfully execute queries. + + - BigQuery Data Viewer + - BigQuery Job User + + > + + For details, please refer to [here](https://cloud.google.com/docs/authentication#service-accounts). ::: -wish keyFilename: - -```yaml -- name: bq # profile name - type: bq - connection: - location: '' - projectId: 'your-project-id' - keyFilename: '/path/to/keyfile.json' - allow: '*' -``` - -wish credential: - -```yaml -- name: bq # profile name - type: bq - connection: - location: US - projectId: 'your-project-id' - credential: - client_email: vulcan@projectId.iam.gserviceaccount.com - private_key: '-----BEGIN PRIVATE KEY----- XXXXX -----END PRIVATE KEY-----\n' - allow: '*' -``` + with `keyFilename`: + + ```yaml + - name: bq # profile name + type: bq + connection: + location: US + projectId: 'your-project-id' + keyFilename: '/path/to/keyfile.json' + allow: '*' + ``` + + with `credential`: + + ```yaml + - name: bq # profile name + type: bq + connection: + location: US + projectId: 'your-project-id' + credential: + client_email: vulcan@projectId.iam.gserviceaccount.com + private_key: '-----BEGIN PRIVATE KEY----- XXXXX -----END PRIVATE KEY-----\n' + allow: '*' + ``` ## Connection Configuration @@ -63,7 +71,7 @@ Please check [Interface BigQueryOptions](https://cloud.google.com/nodejs/docs/re | Name | Required | Default | Description | | ------------------------ | -------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | location | N | US | Location must match that of the dataset(s) referenced in the query. | -| projectId | N | | The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application Application Default Credentials), your project ID will be detected. | +| projectId | N | | The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application) Application Default Credentials, your project ID will be detected. | | keyFilename | N | | Full path to the a .json, .pem, or .p12 key downloaded from the Google Developers Console. If you provide a path to a JSON file, the `projectId` option above is not necessary. NOTE: .pem and .p12 require you to specify the `email` option as well. | | credentials | N | | Credentials object. | | credentials.client_email | N | | Your service account. | diff --git a/packages/extension-driver-bq/README.md b/packages/extension-driver-bq/README.md index c51f7cac4..7c680db4e 100644 --- a/packages/extension-driver-bq/README.md +++ b/packages/extension-driver-bq/README.md @@ -19,7 +19,7 @@ 3. Create a new profile in `profiles.yaml` or in your profiles' paths. -> ⚠️ Your service account must have the following permissions to successfully execute queries... +> ⚠️ Your service account must have the following permissions to successfully execute queries. > > - BigQuery Data Viewer > - BigQuery Job User @@ -32,7 +32,7 @@ location: US # Optional: The max rows we should fetch once. chunkSize: 100 - # The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application Application Default Credentials), your project ID will be detected. + # The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application) Application Default Credentials), your project ID will be detected. projectId: 'your-project-id' # Full path to the a .json, .pem, or .p12 key downloaded from the Google Developers Console. If you provide a path to a JSON file, the `projectId` option above is not necessary. NOTE: .pem and .p12 require you to specify the `email` option as well. keyFilename: '/path/to/keyfile.json' diff --git a/packages/extension-driver-bq/package.json b/packages/extension-driver-bq/package.json index f1827a641..e07ce0723 100644 --- a/packages/extension-driver-bq/package.json +++ b/packages/extension-driver-bq/package.json @@ -15,8 +15,8 @@ "data-warehouse", "data-lake", "api-builder", - "postgres", - "pg" + "bigquery", + "bq" ], "repository": { "type": "git", diff --git a/packages/extension-driver-bq/project.json b/packages/extension-driver-bq/project.json index 7c974c25e..07167b528 100644 --- a/packages/extension-driver-bq/project.json +++ b/packages/extension-driver-bq/project.json @@ -3,19 +3,38 @@ "sourceRoot": "packages/extension-driver-bq/src", "targets": { "build": { + "executor": "@nrwl/js:tsc", + "options": { + "command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-bq" + }, + "dependsOn": [ + { + "projects": "self", + "target": "tsc" + } + ] + }, + "tsc": { "executor": "@nrwl/js:tsc", "outputs": ["{options.outputPath}"], "options": { "outputPath": "dist/packages/extension-driver-bq", "main": "packages/extension-driver-bq/src/index.ts", "tsConfig": "packages/extension-driver-bq/tsconfig.lib.json", - "assets": ["packages/extension-driver-bq/*.md"] - } + "assets": ["packages/extension-driver-bq/*.md"], + "buildableProjectDepsInPackageJsonType": "dependencies" + }, + "dependsOn": [ + { + "projects": "dependencies", + "target": "build" + } + ] }, "publish": { "executor": "@nrwl/workspace:run-commands", "options": { - "command": "node tools/scripts/publish.mjs extension-driver-bq {args.ver} {args.tag}", + "command": "node ../../../tools/scripts/publish.mjs {args.ver} {args.tag}", "cwd": "dist/packages/extension-driver-bq" }, "dependsOn": [ diff --git a/packages/extension-driver-bq/src/lib/bqDataSource.ts b/packages/extension-driver-bq/src/lib/bqDataSource.ts index 60ac4a41b..ca0e43c7c 100644 --- a/packages/extension-driver-bq/src/lib/bqDataSource.ts +++ b/packages/extension-driver-bq/src/lib/bqDataSource.ts @@ -26,13 +26,14 @@ export class BQDataSource extends DataSource { const profiles = this.getProfiles().values(); for (const profile of profiles) { this.logger.debug( - `Initializing profile: ${profile.name} using pg driver` + `Initializing profile: ${profile.name} using bq driver` ); const bigqueryClient = new BigQuery(profile.connection); // https://cloud.google.com/nodejs/docs/reference/bigquery/latest this.bqMapping.set(profile.name, { bq: bigqueryClient, + options: profile.connection, }); // Testing connection @@ -51,9 +52,7 @@ export class BQDataSource extends DataSource { throw new InternalError(`Profile instance ${profileName} not found`); } const { bq: client, options } = this.bqMapping.get(profileName)!; - this.logger.debug(`Acquiring connection from ${profileName}`); - origin; const params: Record = {}; bindParams.forEach((value, key) => { params[key.replace('@', '')] = value; @@ -70,7 +69,6 @@ export class BQDataSource extends DataSource { const [job] = await client.createQueryJob(queryOptions); - // All promises MUST fulfilled in this function or we are not able to release the connection when error occurred return await this.getResultFromQueryJob(job, options); } catch (e: any) { this.logger.debug( @@ -89,8 +87,8 @@ export class BQDataSource extends DataSource { options?: BQOptions ): Promise { const { chunkSize = 100 } = options || {}; - const jobDataRead = this.jobDataRead.bind(this); - const firstChunk = await jobDataRead(queryJob, chunkSize); + const fetchJobResult = this.fetchJobResult.bind(this); + const firstChunk = await fetchJobResult(queryJob, chunkSize); // save first chunk in buffer for incoming requests let bufferedRows = [...firstChunk.rows]; @@ -101,7 +99,7 @@ export class BQDataSource extends DataSource { if (bufferReadIndex >= bufferedRows.length) { if (nextQuery == null) return null; - const fetchData = await jobDataRead(queryJob, chunkSize, nextQuery); + const fetchData = await fetchJobResult(queryJob, chunkSize, nextQuery); bufferedRows = fetchData.rows; nextQuery = fetchData.nextQuery; bufferReadIndex = 0; @@ -122,10 +120,6 @@ export class BQDataSource extends DataSource { this.destroy(error); }); }, - destroy(error: Error | null, cb: (error: Error | null) => void) { - // Send done event to notify upstream to release the connection. - cb(error); - }, // automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16 autoDestroy: true, }); @@ -141,7 +135,7 @@ export class BQDataSource extends DataSource { }; } - public async jobDataRead( + public async fetchJobResult( queryJob: Job, chunkSize: number, nextQuery?: Query | null | undefined @@ -151,7 +145,7 @@ export class BQDataSource extends DataSource { nextQuery: Query | null | undefined; apiResponse: bigquery.IGetQueryResultsResponse | null | undefined; }>((resolve, reject) => { - return queryJob.getQueryResults( + queryJob.getQueryResults( nextQuery || { maxResults: chunkSize }, (err, rows, nextQuery, apiResponse) => { if (err) { diff --git a/packages/extension-driver-bq/test/bqDataSource.spec.ts b/packages/extension-driver-bq/test/bqDataSource.spec.ts index d3942bacb..dca6bd5ef 100644 --- a/packages/extension-driver-bq/test/bqDataSource.spec.ts +++ b/packages/extension-driver-bq/test/bqDataSource.spec.ts @@ -1,13 +1,10 @@ import { BQDataSource } from '../src'; import { BQflakeServer } from './bqServer'; import { streamToArray } from '@vulcan-sql/core'; -import { Writable } from 'stream'; const bigQuery = new BQflakeServer(); let dataSource: BQDataSource; -const bqTable = `\`cannerflow-286003.bq_testing_tpch.orders\``; - it('Data source should be activate without any error when all profiles are valid', async () => { // Arrange dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); @@ -16,10 +13,9 @@ it('Data source should be activate without any error when all profiles are valid await expect(dataSource.activate()).resolves.not.toThrow(); }); -it('Data source should throw error when activating if any profile is invalid', async () => { +it('Data source should throw error when activating any profile which is invalid', async () => { // Arrange const invalidProfile = bigQuery.getProfile('profile1'); - // invalidProfile.connection.projectId = 'invalid'; invalidProfile.connection.credentials = {}; dataSource = new BQDataSource({}, '', [ bigQuery.getProfile('profile1'), @@ -36,7 +32,7 @@ it('Data source should return correct rows with 2 chunks', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 193`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 193)) AS num`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -52,7 +48,7 @@ it('Data source should return correct rows with 1 chunk', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 12`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 20)) AS num LIMIT 12`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -68,7 +64,7 @@ it('Data source should return empty data with no row', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 0`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 10)) AS num LIMIT 0`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -78,57 +74,6 @@ it('Data source should return empty data with no row', async () => { expect(rows.length).toBe(0); }, 30000); -it('Data source should release the connection when finished no matter success or not', async () => { - // Arrange - dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); - await dataSource.activate(); - - // Act - // send parallel queries to test pool leak - const result = await Promise.all( - [ - async () => { - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 1`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - return await streamToArray(getData()); - }, - async () => { - try { - const { getData } = await dataSource.execute({ - statement: 'wrong sql', - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - await streamToArray(getData()); - return [{}]; // fake data - } catch { - // ignore error - return []; - } - }, - async () => { - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 1`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - return await streamToArray(getData()); - }, - ].map((task) => task()) - ); - - // Assert - expect(result[0].length).toBe(1); - expect(result[1].length).toBe(0); - expect(result[2].length).toBe(1); -}, 30000); - it('Data source should work with prepare statements', async () => { // Arrange dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); @@ -167,7 +112,7 @@ it('Data source should return correct column types', async () => { await dataSource.activate(); // Act const { getColumns, getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 0`, + statement: `SELECT CAST(1 as bigint) as a, true as b`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -178,39 +123,6 @@ it('Data source should return correct column types', async () => { data.destroy(); // Assert - expect(column[0]).toEqual({ name: 'orderkey', type: 'number' }); - expect(column[2]).toEqual({ name: 'orderstatus', type: 'string' }); -}, 30000); - -it('Data source should release connection when readable stream is destroyed', async () => { - // Arrange - dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); - await dataSource.activate(); - // Act - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 100`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - const readStream = getData(); - const rows: any[] = []; - let resolve: any; - const waitForStream = () => new Promise((res) => (resolve = res)); - const writeStream = new Writable({ - write(chunk, _, cb) { - rows.push(chunk); - // After read 5 records, destroy the upstream - if (rows.length === 5) { - readStream.destroy(); - resolve(); - } else cb(); - }, - objectMode: true, - }); - readStream.pipe(writeStream); - await waitForStream(); - // Assert - expect(rows.length).toBe(5); - // afterEach hook will timeout if any leak occurred. + expect(column[0]).toEqual({ name: 'a', type: 'number' }); + expect(column[1]).toEqual({ name: 'b', type: 'boolean' }); }, 30000); diff --git a/packages/extension-driver-bq/test/bqServer.ts b/packages/extension-driver-bq/test/bqServer.ts index dfd178f67..b6efaafdc 100644 --- a/packages/extension-driver-bq/test/bqServer.ts +++ b/packages/extension-driver-bq/test/bqServer.ts @@ -11,8 +11,8 @@ export class BQflakeServer { return { name, type: 'bq', - location: process.env['BQ_LOCATION'], connection: { + location: process.env['BQ_LOCATION'], projectId: process.env['BQ_PROJECT_ID'], credentials: { client_email: process.env['BQ_CLIENT_EMAIL'],