forked from datastax/nodejs-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecution-profile.js
266 lines (248 loc) · 9.29 KB
/
execution-profile.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const utils = require('./utils');
const types = require('./types');
const promiseUtils = require('./promise-utils');
/**
* Creates a new instance of {@link ExecutionProfile}.
* @classdesc
* Represents a set configurations to be used in a statement execution to be used for a single {@link Client} instance.
* <p>
* An {@link ExecutionProfile} instance should not be shared across different {@link Client} instances.
* </p>
* @param {String} name Name of the execution profile.
* <p>
* Use <code>'default'</code> to specify that the new instance should be the default {@link ExecutionProfile} if no
* profile is specified in the execution.
* </p>
* @param {Object} [options] Profile options, when any of the options is not specified the {@link Client} will the use
* the ones defined in the default profile.
* @param {Number} [options.consistency] The consistency level to use for this profile.
* @param {LoadBalancingPolicy} [options.loadBalancing] The load-balancing policy to use for this profile.
* @param {Number} [options.readTimeout] The client per-host request timeout to use for this profile.
* @param {RetryPolicy} [options.retry] The retry policy to use for this profile.
* @param {Number} [options.serialConsistency] The serial consistency level to use for this profile.
* @param {Object} [options.graphOptions]
* @param {String} [options.graphOptions.language] The graph language to use for graph queries.
* <p>
* Note that this setting should normally be <code>undefined</code> or set by a utility method and it's not expected
* to be defined manually by the user.
* </p>
* @param {String} [options.graphOptions.results] The protocol to use for serializing and deserializing graph results.
* <p>
* Note that this setting should normally be <code>undefined</code> or set by a utility method and it's not expected
* to be defined manually by the user.
* </p>
* @param {String} [options.graphOptions.name] The graph name to use for graph queries.
* @param {Number} [options.graphOptions.readConsistency] The consistency level to use for graph read queries.
* @param {String} [options.graphOptions.source] The graph traversal source name to use for graph queries.
* @param {Number} [options.graphOptions.writeConsistency] The consistency level to use for graph write queries.
* @param {LoadBalancingPolicy} [options.loadBalancing] The load-balancing policy to use for this profile.
* @param {Number} [options.readTimeout] The client per-host request timeout to use for this profile.
* @param {RetryPolicy} [options.retry] The retry policy to use for this profile.
* @param {Number} [options.serialConsistency] The serial consistency level to use for this profile.
* @example
* const { Client, ExecutionProfile } = require('cassandra-driver');
* const client = new Client({
* contactPoints: ['host1', 'host2'],
* profiles: [
* new ExecutionProfile('metrics-oltp', {
* consistency: consistency.localQuorum,
* retry: myRetryPolicy
* })
* ]
* });
*
* client.execute(query, params, { executionProfile: 'metrics-oltp' }, callback);
* @constructor
*/
function ExecutionProfile(name, options) {
if (typeof name !== 'string') {
throw new TypeError('Execution profile name must be a string');
}
options = options || utils.emptyObject;
const graphOptions = options.graphOptions || utils.emptyObject;
/**
* Name of the execution profile.
* @type {String}
*/
this.name = name;
/**
* Consistency level.
* @type {Number}
*/
this.consistency = options.consistency;
/**
* Load-balancing policy
* @type {LoadBalancingPolicy}
*/
this.loadBalancing = options.loadBalancing;
/**
* Client read timeout.
* @type {Number}
*/
this.readTimeout = options.readTimeout;
/**
* Retry policy.
* @type {RetryPolicy}
*/
this.retry = options.retry;
/**
* Serial consistency level.
* @type {Number}
*/
this.serialConsistency = options.serialConsistency;
/**
* The graph options for this profile.
* @type {Object}
* @property {String} language The graph language.
* @property {String} name The graph name.
* @property {String} readConsistency The consistency to use for graph write queries.
* @property {String} source The graph traversal source.
* @property {String} writeConsistency The consistency to use for graph write queries.
*/
this.graphOptions = {
language: graphOptions.language,
results: graphOptions.results,
name: graphOptions.name,
readConsistency: graphOptions.readConsistency,
source: graphOptions.source,
writeConsistency: graphOptions.writeConsistency
};
}
/**
* Contains the logic to handle the different execution profiles of a {@link Client}.
* @ignore
*/
class ProfileManager {
/**
* @param {ClientOptions} options
*/
constructor(options) {
this._profiles = options.profiles || [];
this._defaultConfiguredRetryPolicy = undefined;
this._setDefault(options);
// A array of unique load balancing policies
this._loadBalancingPolicies = [];
// A dictionary of name keys and profile values
this._profilesMap = {};
// A dictionary of name keys and custom payload dictionaries as values
this._customPayloadCache = {};
// A dictionary of name keys and graph options as values
this._graphOptionsCache = {};
this._profiles.forEach(function (p) {
this._profilesMap[p.name] = p;
// Set required properties
p.loadBalancing = p.loadBalancing || this._defaultProfile.loadBalancing;
// Using array indexOf is not very efficient (O(n)) but the amount of profiles should be limited
// and a handful of load-balancing policies (no hashcode for load-Balancing policies)
if (this._loadBalancingPolicies.indexOf(p.loadBalancing) === -1) {
this._loadBalancingPolicies.push(p.loadBalancing);
}
return p;
}, this);
}
/**
* @param {Client} client
* @param {HostMap} hosts
*/
async init(client, hosts) {
for (const lbp of this._loadBalancingPolicies) {
await promiseUtils.fromCallback(callback => lbp.init(client, hosts, callback));
}
}
/**
* Uses the load-balancing policies to get the relative distance to the host and return the closest one.
* @param {Host} host
*/
getDistance(host) {
let distance = types.distance.ignored;
// this is performance critical: we can't use any other language features than for-loop :(
for (let i = 0; i < this._loadBalancingPolicies.length; i++) {
const d = this._loadBalancingPolicies[i].getDistance(host);
if (d < distance) {
distance = d;
if (distance === types.distance.local) {
break;
}
}
}
host.setDistance(distance);
return distance;
}
/**
* @param {String|ExecutionProfile} name
* @returns {ExecutionProfile|undefined} It returns the execution profile by name or the default profile when name is
* undefined. It returns undefined when the profile does not exist.
*/
getProfile(name) {
if (name instanceof ExecutionProfile) {
return name;
}
return this._profilesMap[name || 'default'];
}
/** @returns {ExecutionProfile} */
getDefault() {
return this._defaultProfile;
}
/** @returns {LoadBalancingPolicy} */
getDefaultLoadBalancing() {
return this._defaultProfile.loadBalancing;
}
/**
* Gets the cached default graph options for a given profile. If it doesn't exist, it creates new options using the
* handler and inserts it into the cache
* @param {ExecutionProfile} profile
* @param {Function} createHandler
*/
getOrCreateGraphOptions(profile, createHandler) {
let graphOptions = this._graphOptionsCache[profile.name];
if (!graphOptions) {
graphOptions = (this._graphOptionsCache[profile.name] = createHandler());
}
return graphOptions;
}
/**
* @private
* @param {ClientOptions} options
*/
_setDefault(options) {
this._defaultProfile = this._profiles.filter(function (p) { return p.name === 'default'; })[0];
if (!this._defaultProfile) {
this._profiles.push(this._defaultProfile = new ExecutionProfile('default'));
}
// Store the default configured retry policy
this._defaultConfiguredRetryPolicy = this._defaultProfile.retry;
// Set the required properties
this._defaultProfile.loadBalancing = this._defaultProfile.loadBalancing || options.policies.loadBalancing;
this._defaultProfile.retry = this._defaultProfile.retry || options.policies.retry;
}
/**
* Gets all the execution profiles currently defined.
* @returns {Array.<ExecutionProfile>}
*/
getAll() {
return this._profiles;
}
getDefaultConfiguredRetryPolicy() {
return this._defaultConfiguredRetryPolicy;
}
}
module.exports = {
ProfileManager,
ExecutionProfile
};