Skip to content

Commit

Permalink
feat(ext-driver-bq): add bigquery datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
JSYOU committed Nov 1, 2022
1 parent fbef2ab commit 9f8963d
Show file tree
Hide file tree
Showing 19 changed files with 1,041 additions and 6 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
},
"private": true,
"dependencies": {
"@google-cloud/bigquery": "^6.0.3",
"@koa/cors": "^3.3.0",
"bcryptjs": "^2.4.3",
"bluebird": "^3.7.2",
Expand Down
18 changes: 18 additions & 0 deletions packages/extension-driver-bq/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
11 changes: 11 additions & 0 deletions packages/extension-driver-bq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# extension-driver-bq

This library was generated with [Nx](https://nx.dev).

## Building

Run `nx build extension-driver-bq` to build the library.

## Running unit tests

Run `nx test extension-driver-bq` to execute the unit tests via [Jest](https://jestjs.io).
14 changes: 14 additions & 0 deletions packages/extension-driver-bq/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module.exports = {
displayName: 'extension-driver-bq',
preset: '../../jest.preset.ts',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
transform: {
'^.+\\.[tj]s$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'js', 'html'],
coverageDirectory: '../../coverage/packages/extension-driver-bq',
};
29 changes: 29 additions & 0 deletions packages/extension-driver-bq/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "@vulcan-sql/extension-driver-bq",
"description": "BigQuery driver for Vulcan SQL",
"version": "0.3.0",
"type": "commonjs",
"publishConfig": {
"access": "public"
},
"keywords": [
"vulcan",
"vulcan-sql",
"data",
"sql",
"database",
"data-warehouse",
"data-lake",
"api-builder",
"postgres",
"pg"
],
"repository": {
"type": "git",
"url": "https://github.com/Canner/vulcan.git"
},
"license": "MIT",
"peerDependencies": {
"@vulcan-sql/core": "~0.3.0-0"
}
}
45 changes: 45 additions & 0 deletions packages/extension-driver-bq/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"root": "packages/extension-driver-bq",
"sourceRoot": "packages/extension-driver-bq/src",
"targets": {
"build": {
"executor": "@nrwl/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/extension-driver-bq",
"main": "packages/extension-driver-bq/src/index.ts",
"tsConfig": "packages/extension-driver-bq/tsconfig.lib.json",
"assets": ["packages/extension-driver-bq/*.md"]
}
},
"publish": {
"executor": "@nrwl/workspace:run-commands",
"options": {
"command": "node tools/scripts/publish.mjs extension-driver-bq {args.ver} {args.tag}",
"cwd": "dist/packages/extension-driver-bq"
},
"dependsOn": [
{
"projects": "self",
"target": "build"
}
]
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["packages/extension-driver-bq/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/packages/extension-driver-bq"],
"options": {
"jestConfig": "packages/extension-driver-bq/jest.config.ts",
"passWithNoTests": true
}
}
},
"tags": []
}
3 changes: 3 additions & 0 deletions packages/extension-driver-bq/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './lib/bqDataSource';
import { BQDataSource } from './lib/bqDataSource';
export default [BQDataSource];
165 changes: 165 additions & 0 deletions packages/extension-driver-bq/src/lib/bqDataSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import {
DataResult,
DataSource,
ExecuteOptions,
InternalError,
RequestParameter,
VulcanExtensionId,
} from '@vulcan-sql/core';
import { Readable } from 'stream';
import { buildSQL } from './bqlSqlBuilder';
import { mapFromBQTypeId } from './typeMapper';
import { BigQuery, Query, Job, BigQueryOptions } from '@google-cloud/bigquery';
import bigquery from '@google-cloud/bigquery/build/src/types';

export interface BQOptions extends BigQueryOptions {
chunkSize?: number;
location?: string;
}

@VulcanExtensionId('bq')
export class BQDataSource extends DataSource<any, BQOptions> {
private logger = this.getLogger();
private bqMapping = new Map<string, { bq: BigQuery; options?: BQOptions }>();

public override async onActivate() {
const profiles = this.getProfiles().values();
for (const profile of profiles) {
this.logger.debug(
`Initializing profile: ${profile.name} using pg driver`
);
const bigqueryClient = new BigQuery(profile.connection);
// https://cloud.google.com/nodejs/docs/reference/bigquery/latest

this.bqMapping.set(profile.name, {
bq: bigqueryClient,
});

// Testing connection
await bigqueryClient.query('SELECT 1;');
this.logger.debug(`Profile ${profile.name} initialized`);
}
}

public async execute({
statement: sql,
bindParams,
profileName,
operations,
}: ExecuteOptions): Promise<DataResult> {
if (!this.bqMapping.has(profileName)) {
throw new InternalError(`Profile instance ${profileName} not found`);
}
const { bq: client, options } = this.bqMapping.get(profileName)!;
this.logger.debug(`Acquiring connection from ${profileName}`);

origin;
const params: Record<string, any> = {};
bindParams.forEach((value, key) => {
params[key.replace('@', '')] = value;
});

try {
const builtSQL = buildSQL(sql, operations);
const queryOptions = {
query: builtSQL,
location: options?.location || 'US',
params,
maxResults: options?.chunkSize || 100,
};

const [job] = await client.createQueryJob(queryOptions);

// All promises MUST fulfilled in this function or we are not able to release the connection when error occurred
return await this.getResultFromQueryJob(job, options);
} catch (e: any) {
this.logger.debug(
`Errors occurred, release connection from ${profileName}`
);
throw e;
}
}

public async prepare({ parameterIndex }: RequestParameter) {
return `@p${parameterIndex}`;
}

private async getResultFromQueryJob(
queryJob: Job,
options?: BQOptions
): Promise<DataResult> {
const { chunkSize = 100 } = options || {};
const jobDataRead = this.jobDataRead.bind(this);
const firstChunk = await jobDataRead(queryJob, chunkSize);

// save first chunk in buffer for incoming requests
let bufferedRows = [...firstChunk.rows];
let bufferReadIndex = 0;
let nextQuery = firstChunk.nextQuery;

const fetchNext = async () => {
if (bufferReadIndex >= bufferedRows.length) {
if (nextQuery == null) return null;

const fetchData = await jobDataRead(queryJob, chunkSize, nextQuery);
bufferedRows = fetchData.rows;
nextQuery = fetchData.nextQuery;
bufferReadIndex = 0;
}
const res = bufferedRows[bufferReadIndex] || null;
bufferReadIndex += 1;
return res;
};

const stream = new Readable({
objectMode: true,
read() {
fetchNext()
.then((row) => {
this.push(row);
})
.catch((error) => {
this.destroy(error);
});
},
destroy(error: Error | null, cb: (error: Error | null) => void) {
// Send done event to notify upstream to release the connection.
cb(error);
},
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
autoDestroy: true,
});
return {
getColumns: () => {
const fields = firstChunk.apiResponse?.schema?.fields || [];
return fields.map((field) => ({
name: field.name || '',
type: mapFromBQTypeId(field.type || ''),
}));
},
getData: () => stream,
};
}

public async jobDataRead(
queryJob: Job,
chunkSize: number,
nextQuery?: Query | null | undefined
) {
return new Promise<{
rows: any[];
nextQuery: Query | null | undefined;
apiResponse: bigquery.IGetQueryResultsResponse | null | undefined;
}>((resolve, reject) => {
return queryJob.getQueryResults(
nextQuery || { maxResults: chunkSize },
(err, rows, nextQuery, apiResponse) => {
if (err) {
return reject(err);
}
resolve({ rows: rows || [], nextQuery, apiResponse });
}
);
});
}
}
40 changes: 40 additions & 0 deletions packages/extension-driver-bq/src/lib/bqlSqlBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Parameterized, SQLClauseOperation } from '@vulcan-sql/core';
import { isNull, isUndefined } from 'lodash';

const isNullOrUndefine = (value: any) => isUndefined(value) || isNull(value);

export const removeEndingSemiColon = (sql: string) => {
return sql.replace(/;([ \n]+)?$/, '');
};

export const addLimit = (sql: string, limit?: string | null) => {
if (isNullOrUndefine(limit)) return sql;
return [sql, `LIMIT`, limit].join(' ');
};

export const addOffset = (sql: string, offset?: string | null) => {
if (isNullOrUndefine(offset)) return sql;
return [sql, `OFFSET`, offset].join(' ');
};

// Check if there is no operations
export const isNoOP = (
operations: Partial<Parameterized<SQLClauseOperation>>
): boolean => {
if (!isNullOrUndefine(operations.limit)) return false;
if (!isNullOrUndefine(operations.offset)) return false;
return true;
};

export const buildSQL = (
sql: string,
operations: Partial<Parameterized<SQLClauseOperation>>
): string => {
if (isNoOP(operations)) return sql;
let builtSQL = '';
builtSQL += `SELECT * FROM (${removeEndingSemiColon(sql)})`;
builtSQL = addLimit(builtSQL, operations.limit);
builtSQL = addOffset(builtSQL, operations.offset);
builtSQL += ';';
return builtSQL;
};
51 changes: 51 additions & 0 deletions packages/extension-driver-bq/src/lib/typeMapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const typeMapping = new Map<string, string>();

const register = (bqType: string, type: string) => {
typeMapping.set(bqType, type);
};

// Reference
// https://github.com/googleapis/nodejs-bigquery/blob/main/src/types.d.ts#L3598-L3601
/**
* [Required] The field data type. Possible values include
* STRING,
* BYTES,
* INTEGER,
* INT64 (same as INTEGER),
* FLOAT,
* FLOAT64 (same as FLOAT),
* NUMERIC,
* BIGNUMERIC,
* BOOLEAN,
* BOOL (same as BOOLEAN),
* TIMESTAMP,
* DATE,
* TIME,
* DATETIME,
* INTERVAL,
* RECORD (where RECORD indicates that the field contains a nested schema) or
* STRUCT (same as RECORD).
*/

register('STRING', 'string');
register('BYTES', 'string');
register('INTEGER', 'number');
register('INT64', 'number');
register('FLOAT', 'number');
register('FLOAT64', 'number');
register('NUMERIC', 'number');
register('BIGNUMERIC', 'number');
register('BOOLEAN', 'boolean');
register('BOOL', 'boolean');
register('TIMESTAMP', 'string');
register('DATE', 'string');
register('TIME', 'string');
register('DATETIME', 'string');
register('INTERVAL', 'string');
register('RECORD', 'string');
register('STRUCT', 'string');

export const mapFromBQTypeId = (bqType: string) => {
if (typeMapping.has(bqType)) return typeMapping.get(bqType)!;
return 'string';
};
Loading

0 comments on commit 9f8963d

Please sign in to comment.