Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/csv uploader #358

Draft
wants to merge 15 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions apps/jobs/member-csv-upload-job/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
17 changes: 17 additions & 0 deletions apps/jobs/member-csv-upload-job/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* eslint-disable */
export default {
displayName: 'jobs-member-csv-upload-job',
verbose: true,
preset: '../../../jest.preset.js',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
testEnvironment: 'node',
transform: {
'^.+\\.[tj]s$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'js', 'html'],
coverageDirectory: '../../../coverage/apps/jobs/member-csv-upload-job',
};
55 changes: 55 additions & 0 deletions apps/jobs/member-csv-upload-job/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"$schema": "../../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "apps/jobs/member-csv-upload-job/src",
"projectType": "application",
"targets": {
"build": {
"executor": "@nrwl/webpack:webpack",
"outputs": ["{options.outputPath}"],
"options": {
"target": "node",
"compiler": "tsc",
"outputPath": "dist/apps/jobs/member-csv-upload-job",
"main": "apps/jobs/member-csv-upload-job/src/main.ts",
"tsConfig": "apps/jobs/member-csv-upload-job/tsconfig.app.json",
"assets": ["apps/jobs/member-csv-upload-job/src/assets"]
},
"configurations": {
"production": {
"optimization": true,
"extractLicenses": true,
"inspect": false,
"fileReplacements": []
}
}
},
"serve": {
"executor": "@nrwl/js:node",
"options": {
"buildTarget": "jobs-member-csv-upload-job:build"
},
"configurations": {
"production": {
"buildTarget": "jobs-member-csv-upload-job:build:production"
}
}
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["apps/jobs/member-csv-upload-job/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/apps/jobs/member-csv-upload-job"],
"options": {
"jestConfig": "apps/jobs/member-csv-upload-job/jest.config.ts",
"passWithNoTests": true,
"verbose": true
}
}
},
"tags": []
}
Empty file.
Empty file.
117 changes: 117 additions & 0 deletions apps/jobs/member-csv-upload-job/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { NatsConnection, JsMsg, StringCodec } from 'nats';
import { setupNats, pullMessages, writeMessages } from '@govrn/govrn-nats';
import { GovrnProtocol } from '@govrn/protocol-client';
import { ethers } from 'ethers';

console.log('Hello World!');
const protcolUrl = process.env.PROTOCOL_URL;
const protocolApiToken = process.env.PROTOCOL_API_TOKEN;
const streamName = 'dao-membership-csv';
// 1. Receives form data with a file and input for dao name
// https://stackoverflow.com/questions/74927686/how-to-upload-a-file-from-client-to-server-and-then-server-to-server
// 2. Verify File is a CSV file
// 3. Verify columns are correct
// 4. Verify data is correct
//
//
//
//
const servers = [
// { servers: ["demo.nats.io:4442", "demo.nats.io:4222"] },
// { servers: "demo.nats.io:4443" },
// { port: 4222 },
{ servers: 'localhost' },
];
let govrn: GovrnProtocol = null;
const logic = async (conn: NatsConnection) => {
console.log(conn);
console.log('Main');
// pull
// transform
// enqueue
// etc
const pullTransform = async (conn: NatsConnection, msg: JsMsg) => {
const sc = StringCodec();
const data = sc.decode(msg.data);
console.log('processing message...');
// DAO ID, address, discord name/username (optional), dicord_id (optional ), admin
const [daoId, address, discordName, discordId, admin] = data.split(',');
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably wrong in the ticket but we should probably remove id


// verify row
const guild = await govrn.guild.get({
id: +daoId,
});
if (guild == null) {
console.log('No guild exists for id: ' + daoId);
return;
}

if (!ethers.utils.isAddress(address)) {
console.log('Invalid wallet address: ' + address);
return;
}

// TODO: validate username/discord id?

const user = await govrn.user.createEx({
address: address,
display_name: discordName,
name: discordName,
chain_type: {
connectOrCreate: {
create: {
name: 'ethereum_mainnet',
},
where: {
name: 'ethereum_mainnet',
},
},
},
discord_users: {
create: [
{
discord_id: discordId,
display_name: discordName,
},
],
},
});
console.log(
`created user: ${user.id} ${user.address} ${user.discord_users}`,
);
const guildUser = await govrn.guildUser.create({
data: {
guildId: guild.id,
guildName: guild.name,
userAddress: address,
userId: user.id,
},
});
console.log(`created guildUser: ${guildUser.guild_id} ${guildUser.id}`);

// ack is handled by pull messages
return;
};
await writeMessages(conn, streamName, [
// DAO ID, address, discord name/username (optional), dicord_id (optional ), admin
'15,0x292c4cE0EEFbCA990F319BEfac1c032cCcA6dE57,Flip,447315691226398733,False',
'15,0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990,Flip2,447315691226398739,False',
]);
await await pullMessages(
conn,
streamName,
`${streamName}-durable`,
pullTransform,
);
};

const main = async () => {
govrn = new GovrnProtocol(protcolUrl, undefined, {
Authorization: protocolApiToken,
});

await setupNats(servers, streamName, logic);
// TODO: Add schema validation
};

main();
62 changes: 62 additions & 0 deletions apps/jobs/member-csv-upload-job/src/nats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { connect, NatsConnection, JsMsg } from 'nats';

// TODO: How are streams created
// TODO: How is pulling filtered
export const setupNats = async (
servers: { servers?: string; port?: number }[],
work: (conn: NatsConnection) => Promise<void>,
) => {
for (const v of servers) {
try {
const nc = await connect(v);
console.log(`connected to ${nc.getServer()}`);
// this promise indicates the client closed
const done = nc.closed();
// do something with the connection
await work(nc);

// close the connection
await nc.close();
// check if the close was OK
const err = await done;
if (err) {
console.log(`error closing:`, err);
}
console.log('Done');
} catch (err) {
console.log(`error connecting to ${JSON.stringify(v)}`);
}
}
};

// Subscription membership.import.csv
// subscription is a durable queue group
export const pullMessages = async (
nc: NatsConnection,
stream: string,
durable: string,
callback: (nc: NatsConnection, msg: JsMsg) => void,
expires = 5000,
batch = 10,
) => {
// create a jetstream client:
const js = nc.jetstream();
// To get multiple messages in one request you can:
const msgs = await js.fetch(stream, durable, {
batch: batch,
expires: expires,
});
// the request returns an iterator that will get at most 10 messages or wait
// for 5000ms for messages to arrive.

const done = (async () => {
for await (const m of msgs) {
// do something with the message
// and if the consumer is not set to auto-ack, ack!
await callback(nc, m);
m.ack();
}
})();
// The iterator completed,
await done;
};
113 changes: 113 additions & 0 deletions apps/jobs/member-csv-upload-job/src/tests/csv.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { describe, expect, test } from '@jest/globals';
import { NatsConnection, connect, AckPolicy, JsMsg } from 'nats';
import { setupNats, pullMessages } from '../nats';

describe('connect to nats', () => {
test('test logic runs', async () => {
let count = 0;
const logic = async (nc: NatsConnection) => {
count += 1;
return;
};
await setupNats([{ servers: 'localhost' }], logic);
expect(count).toBe(1);
});
});

// 1. Test jeststream messages are fetched
// 2. Test jeststrem messages are acknowledged
// 3. Test jestream messages are run in the callback
describe('test pull message', () => {
test('test pulled', async () => {
const nc = await connect({ servers: 'localhost' });
const stream = 'stream';
const durable = 'me';

const jsm = await nc.jetstreamManager();
// TODO: What does this do
await jsm.streams.add({ name: stream, subjects: ['hello.>'] });
// TODO: What does this do
await jsm.consumers.add(stream, {
durable_name: 'me',
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();

const increment = 2;
await js.publish(
'hello.world',
Buffer.from(JSON.stringify({ count: increment })),
{},
);

let count = 0;
const logic = async (nc: NatsConnection, m: JsMsg) => {
const msg = JSON.parse(m.data.toString()) as { count: number };
count += msg.count;
return;
};
await pullMessages(nc, stream, durable, logic, 500);
expect(count).toBe(increment);
});
test('test didAck', async () => {
const nc = await connect({ servers: 'localhost' });
const stream = 'stream';
const durable = 'me';

const jsm = await nc.jetstreamManager();
// TODO: What does this do
await jsm.streams.add({ name: stream, subjects: ['hello.>'] });
// TODO: What does this do
await jsm.consumers.add(stream, {
durable_name: 'me',
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();

await js.publish(
'hello.world',
Buffer.from(JSON.stringify({ count: 0 })),
{},
);

let msg = null;
const logic = async (nc: NatsConnection, m: JsMsg) => {
msg = m;
return;
};
await pullMessages(nc, stream, durable, logic, 500);
expect(msg.didAck).toBe(true);
});
test('pull batch', async () => {
const nc = await connect({ servers: 'localhost' });
const stream = 'stream';
const durable = 'me';

const jsm = await nc.jetstreamManager();
// TODO: What does this do
await jsm.streams.add({ name: stream, subjects: ['hello.>'] });
// TODO: What does this do
await jsm.consumers.add(stream, {
durable_name: 'me',
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();

for (let i = 0; i < 5; i++) {
await js.publish(
'hello.world',
Buffer.from(JSON.stringify({ count: 1 })),
{},
);
}

let count = 0;
const logic = async (nc: NatsConnection, m: JsMsg) => {
const msg = JSON.parse(m.data.toString()) as { count: number };
count += msg.count;
return;
};
await pullMessages(nc, stream, durable, logic, 5000, 5);
expect(count).toBe(5);
});
});
Loading