Skip to content

Commit 8b1ed9b

Browse files
authored
Merge pull request #11 from openaq/provider/clarity
Add Clarity provider
2 parents c81ae81 + 14cf010 commit 8b1ed9b

File tree

6 files changed

+466
-58
lines changed

6 files changed

+466
-58
lines changed

README.md

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
## Deploy
66

7-
* `yarn cdk deploy` deploy this stack to your default AWS account/region
8-
* `yarn cdk diff` compare deployed stack with current state
9-
* `yarn cdk synth` emits the synthesized CloudFormation template
7+
- `yarn cdk deploy` deploy this stack to your default AWS account/region
8+
- `yarn cdk diff` compare deployed stack with current state
9+
- `yarn cdk synth` emits the synthesized CloudFormation template
1010

1111
## Development
1212

@@ -26,12 +26,12 @@ yarn test
2626

2727
Configuration for the ingestion is provided via environment variables.
2828

29-
* `BUCKET`: The bucket to which the ingested data should be written. **Required**
30-
* `SOURCE`: The [data source](#data-sources) to ingest. **Required**
31-
* `LCS_API`: The API used when fetching supported measurands. _Default: `'https://api.openaq.org'`_
32-
* `STACK`: The stack to which the ingested data should be associated. This is mainly used to apply a prefix to data uploaded to S3 in order to separate it from production data. _Default: `'local'`_
33-
* `SECRET_STACK`: The stack to which the used [Secrets](#provider-secrets) are associated. At times, a developer may want to use credentials relating to a different stack (e.g. a devloper is testing the script, they want output data uploaded to the `local` stack but want to use the production stack's secrets). _Default: the value from the `STACK` env variable_
34-
* `VERBOSE`: Enable verbose logging. _Default: disabled_
29+
- `BUCKET`: The bucket to which the ingested data should be written. **Required**
30+
- `SOURCE`: The [data source](#data-sources) to ingest. **Required**
31+
- `LCS_API`: The API used when fetching supported measurands. _Default: `'https://api.openaq.org'`_
32+
- `STACK`: The stack to which the ingested data should be associated. This is mainly used to apply a prefix to data uploaded to S3 in order to separate it from production data. _Default: `'local'`_
33+
- `SECRET_STACK`: The stack to which the used [Secrets](#provider-secrets) are associated. At times, a developer may want to use credentials relating to a different stack (e.g. a devloper is testing the script, they want output data uploaded to the `local` stack but want to use the production stack's secrets). _Default: the value from the `STACK` env variable_
34+
- `VERBOSE`: Enable verbose logging. _Default: disabled_
3535

3636
### Running locally
3737

@@ -56,15 +56,12 @@ outline what is necessary to create and a new source.
5656

5757
The first step for a new source is to add JSON config file to the the `fetcher/sources` directory.
5858

59-
6059
```json
6160
{
62-
"schema": "v1",
63-
"provider": "example",
64-
"frequency": "hour",
65-
"meta": {
66-
67-
}
61+
"schema": "v1",
62+
"provider": "example",
63+
"frequency": "hour",
64+
"meta": {}
6865
}
6966
```
7067

@@ -73,7 +70,6 @@ The first step for a new source is to add JSON config file to the the `fetcher/s
7370
| `provider` | Unique provider name |
7471
| `frequency` | `day`, `hour`, or `minute` |
7572

76-
7773
The config file can contain any properties that should be configurable via the
7874
provider script. The above table however outlines the attributes that are required.
7975

@@ -86,32 +82,33 @@ The script here should expose a function named `processor`. This function should
8682

8783
The script below is a basic example of a new source:
8884

89-
9085
```js
91-
const Providers = require('../providers');
92-
const { Sensor, SensorNode, SensorSystem } = require('../station');
93-
const { Measures, FixedMeasure, MobileMeasure } = require('../measure');
86+
const Providers = require("../providers");
87+
const { Sensor, SensorNode, SensorSystem } = require("../station");
88+
const { Measures, FixedMeasure, MobileMeasure } = require("../measure");
9489

9590
async function processor(source_name, source) {
96-
// Get Locations/Sensor Systems via http/s3 etc.
97-
const locs = await get_locations()
91+
// Get Locations/Sensor Systems via http/s3 etc.
92+
const locs = await get_locations();
9893

99-
// Map locations into SensorNodes
100-
const station = new SensorNode();
94+
// Map locations into SensorNodes
95+
const station = new SensorNode();
10196

102-
await Providers.put_stations(source_name, [ station ]);
97+
await Providers.put_stations(source_name, [station]);
10398

104-
const fixed_measures = new Measures(FixedMeasure);
105-
// or
106-
const mobile_measures = new Measures(MobileMeasure);
99+
const fixed_measures = new Measures(FixedMeasure);
100+
// or
101+
const mobile_measures = new Measures(MobileMeasure);
107102

108-
fixed_measures.push(new FixedMeasure({
109-
sensor_id: 'PurpleAir-123',
110-
measure: 123,
111-
timestamp: Math.floor(new Date() / 1000) //UNIX Timestamp
112-
}));
103+
fixed_measures.push(
104+
new FixedMeasure({
105+
sensor_id: "PurpleAir-123",
106+
measure: 123,
107+
timestamp: Math.floor(new Date() / 1000), //UNIX Timestamp
108+
})
109+
);
113110

114-
await Providers.put_measures(source_name, fixed_measures);
111+
await Providers.put_measures(source_name, fixed_measures);
115112
}
116113

117114
module.exports = { processor };
@@ -120,3 +117,28 @@ module.exports = { processor };
120117
### Provider Secrets
121118

122119
For data providers that require credentials, credentials should be store on AWS Secrets Manager with an ID composed of the stack name and provider name, such as `:stackName/:providerName`.
120+
121+
#### Google Keys
122+
123+
Some providers (e.g. CMU, Clarity) require us to read data from Google services (e.g. Drive, Sheets). To do this, the organization hosting the data should do the following:
124+
125+
1. [create a project & enable access to the required APIs](https://developers.google.com/workspace/guides/create-project)
126+
1. [create a service account](https://cloud.google.com/iam/docs/creating-managing-service-accounts)
127+
1. [generate service account keys](https://cloud.google.com/iam/docs/creating-managing-service-account-keys)
128+
129+
The should look something like the following and be stored in its entirety within the AWS Secrets Manager.
130+
131+
```json
132+
{
133+
"type": "service_account",
134+
"project_id": "project-id",
135+
"private_key_id": "key-id",
136+
"private_key": "-----BEGIN PRIVATE KEY-----\nprivate-key\n-----END PRIVATE KEY-----\n",
137+
"client_email": "service-account-email",
138+
"client_id": "client-id",
139+
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
140+
"token_uri": "https://accounts.google.com/o/oauth2/token",
141+
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
142+
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/service-account-email"
143+
}
144+
```

fetcher/lib/measurand.js

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ class Measurand {
1717
* @returns { Object } normalizer
1818
*/
1919
get _normalizer() {
20-
return {
21-
ppb: ['ppm', (val) => (val / 1000)],
22-
'ng/m³': ['µg/m³', (val) => (val / 1000)],
23-
'pp100ml': ['particles/cm³', (val) => (val / 100)]
24-
}[this.unit] || [this.unit, (val) => val];
20+
return (
21+
{
22+
ppb: ['ppm', (val) => val / 1000],
23+
'ng/m³': ['µg/m³', (val) => val / 1000],
24+
pp100ml: ['particles/cm³', (val) => val / 100]
25+
}[this.unit] || [this.unit, (val) => val]
26+
);
2527
}
2628

2729
get normalized_unit() {
@@ -35,7 +37,7 @@ class Measurand {
3537
/**
3638
* Given a map of lookups from an input parameter (i.e. how a data provider
3739
* identifies a measurand) to a tuple of a measurand parameter (i.e. how we
38-
* idenify a measurand internally) and a measurand unit, generate an array
40+
* identify a measurand internally) and a measurand unit, generate an array
3941
* Measurand objects that are supported by the OpenAQ API.
4042
*
4143
* @param {*} lookups, e.g. {'CO': ['co', 'ppb'] }
@@ -47,9 +49,14 @@ class Measurand {
4749
let morePages;
4850
let page = 1;
4951
do {
50-
const url = new URL('/v2/parameters', process.env.LCS_API || 'https://api.openaq.org');
52+
const url = new URL(
53+
'/v2/parameters',
54+
process.env.LCS_API || 'https://api.openaq.org'
55+
);
5156
url.searchParams.append('page', page++);
52-
const { body: { meta, results } } = await request({
57+
const {
58+
body: { meta, results }
59+
} = await request({
5360
json: true,
5461
method: 'GET',
5562
url
@@ -59,24 +66,52 @@ class Measurand {
5966
}
6067
morePages = meta.found > meta.page * meta.limit;
6168
} while (morePages);
62-
if (VERBOSE) console.debug(`Fetched ${supportedMeasurandParameters.length} supported measurement parameters.`);
69+
if (VERBOSE)
70+
console.debug(
71+
`Fetched ${supportedMeasurandParameters.length} supported measurement parameters.`
72+
);
6373

6474
// Filter provided lookups
6575
const supportedLookups = Object.entries(lookups).filter(
6676
// eslint-disable-next-line no-unused-vars
67-
([input_param, [measurand_parameter, measurand_unit]]) => supportedMeasurandParameters.includes(measurand_parameter)
77+
([input_param, [measurand_parameter, measurand_unit]]) =>
78+
supportedMeasurandParameters.includes(measurand_parameter)
6879
);
6980
if (!supportedLookups.length) throw new Error('No measurands supported.');
7081
if (VERBOSE) {
7182
Object.values(lookups)
7283
.map(([measurand_parameter]) => measurand_parameter)
73-
.filter((measurand_parameter) => !supportedMeasurandParameters.includes(measurand_parameter))
74-
.map((measurand_parameter) => console.debug(`warning - ignoring unsupported parameters: ${measurand_parameter}`));
84+
.filter(
85+
(measurand_parameter) =>
86+
!supportedMeasurandParameters.includes(measurand_parameter)
87+
)
88+
.map((measurand_parameter) =>
89+
console.debug(
90+
`warning - ignoring unsupported parameters: ${measurand_parameter}`
91+
)
92+
);
7593
}
7694
return supportedLookups.map(
77-
([input_param, [parameter, unit]]) => (
95+
([input_param, [parameter, unit]]) =>
7896
new Measurand({ input_param, parameter, unit })
79-
)
97+
);
98+
}
99+
100+
/**
101+
* Given a map of lookups from an input parameter (i.e. how a data provider
102+
* identifies a measurand) to a tuple of a measurand parameter (i.e. how we
103+
* identify a measurand internally) and a measurand unit, generate an object
104+
* of Measurand objects that are supported by the OpenAQ API, indexed by their
105+
* input parameter.
106+
*
107+
* @param {*} lookups e.g. {'CO': ['co', 'ppb'] }
108+
* @returns {object}
109+
*/
110+
static async getIndexedSupportedMeasurands(lookups) {
111+
const measurands = await Measurand.getSupportedMeasurands(lookups);
112+
return Object.assign(
113+
{},
114+
...measurands.map((measurand) => ({ [measurand.input_param]: measurand }))
80115
);
81116
}
82117
}

fetcher/lib/providers.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
const fs = require('fs');
2-
const zlib = require('zlib');
3-
const { promisify } = require('util');
42
const path = require('path');
53
const AWS = require('aws-sdk');
6-
const { VERBOSE } = require('./utils');
4+
const { VERBOSE, gzip, unzip } = require('./utils');
75

86
const s3 = new AWS.S3({
97
maxRetries: 10
108
});
11-
const gzip = promisify(zlib.gzip);
12-
const unzip = promisify(zlib.unzip);
139

1410
/**
1511
* Runtime handler for each of the custom provider scripts, as well

fetcher/lib/utils.js

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const zlib = require('zlib');
12
const { promisify } = require('util');
23
const request = promisify(require('request'));
34
const AWS = require('aws-sdk');
@@ -17,19 +18,47 @@ async function fetchSecret(source_name) {
1718

1819
if (!process.env.STACK) throw new Error('STACK Env Var Required');
1920

20-
const SecretId = `${process.env.SECRET_STACK || process.env.STACK}/${source_name}`;
21+
const SecretId = `${
22+
process.env.SECRET_STACK || process.env.STACK
23+
}/${source_name}`;
2124

2225
if (VERBOSE) console.debug(`Fetching ${SecretId}...`);
2326

24-
const { SecretString } = await secretsManager.getSecretValue({
25-
SecretId
26-
}).promise();
27+
const { SecretString } = await secretsManager
28+
.getSecretValue({
29+
SecretId
30+
})
31+
.promise();
2732

2833
return JSON.parse(SecretString);
2934
}
3035

36+
/**
37+
* Transform phrase to camel case.
38+
* e.g. toCamelCase("API Key") === "apiKey"
39+
*
40+
* @param {string} phrase
41+
* @returns {string}
42+
*/
43+
function toCamelCase(phrase) {
44+
return phrase
45+
.split(' ')
46+
.map((word) => word.toLowerCase())
47+
.map((word, i) => {
48+
if (i === 0) return word;
49+
return word.replace(/^./, word[0].toUpperCase());
50+
})
51+
.join('');
52+
}
53+
54+
const gzip = promisify(zlib.gzip);
55+
const unzip = promisify(zlib.unzip);
56+
3157
module.exports = {
3258
fetchSecret,
3359
request,
60+
toCamelCase,
61+
gzip,
62+
unzip,
3463
VERBOSE
3564
};

0 commit comments

Comments
 (0)