-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
840 additions
and
106 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,4 +20,7 @@ | |
"[prisma]": { | ||
"editor.defaultFormatter": "Prisma.prisma" | ||
}, | ||
"cSpell.words": [ | ||
"govrn" | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"extends": ["../../.eslintrc.json"], | ||
"ignorePatterns": ["!**/*"], | ||
"overrides": [ | ||
{ | ||
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"], | ||
"rules": {} | ||
}, | ||
{ | ||
"files": ["*.ts", "*.tsx"], | ||
"rules": {} | ||
}, | ||
{ | ||
"files": ["*.js", "*.jsx"], | ||
"rules": {} | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
## Guild Import Job | ||
|
||
## Summary | ||
|
||
This is a job that imports guilds by retrieving them from the NATS server stream. The job is responsible for fetching guild IDs from the stream and importing them into the database using [@guild.xyz/sdk](https://www.npmjs.com/package/@guildxyz/sdk). | ||
|
||
## Testing & Debugging | ||
|
||
To fetch a guild named `our-guild` from the NATS server stream named `guild-import-job`, follow these steps: | ||
|
||
1. Install `nats-server` locally [via a Package Manager](https://docs.nats.io/running-a-nats-service/introduction/installation#installing-via-a-package-manager). | ||
2. Install [NATS Command Line Interface](https://github.com/nats-io/natscli). _This step is optional but highly recommended for a smoother debugging experience_. | ||
3. Add env variables mentioned in the previous section. | ||
4. Run protocol API: `yarn nx run protocol-api:serve`. | ||
5. run nats server: `nats-server --jetstream` | ||
6. Publish messages to nats server with a stream name: `guild-import-job`. You can acheive that using one of two ways: | ||
- nats CLI: `nats -s nats://localhost:4222 publish "guild-import-job.row" "our-guild"`. _It is recommended to complete step 7 at least once before running this command_. | ||
- Alternatively, call the `writeMessages()` utility function from `/helpers/nats.ts` after you successfully establish a connection. | ||
7. To check if a message is successfully published: `nats stream report`. To view messages: `nats stream view` | ||
8. Run guild import job: `yarn nx run guild-import-job:serve`. | ||
|
||
Unfortunately, `nats-server` doesn't include a built-in logging or debugging mode. However, nats CLI provide a convenient ways to interact with the server's state using `nats stream help`, accessing various functionalities and insights about nats server. | ||
|
||
Here's a Loom screencast to prepare nats-server for testing: | ||
https://www.loom.com/share/18fad924307f4c14a87fdd0b0ef1458c |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/* eslint-disable */ | ||
export default { | ||
displayName: 'guild-import-job', | ||
preset: '../../jest.preset.js', | ||
globals: { | ||
'ts-jest': { | ||
tsconfig: '<rootDir>/tsconfig.spec.json', | ||
}, | ||
}, | ||
testEnvironment: 'node', | ||
transform: { | ||
'^.+\\.[tj]s$': 'ts-jest', | ||
}, | ||
moduleFileExtensions: ['ts', 'js', 'html'], | ||
coverageDirectory: '../../coverage/apps/guild-import-job', | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
{ | ||
"$schema": "../../node_modules/nx/schemas/project-schema.json", | ||
"sourceRoot": "/src", | ||
"projectType": "application", | ||
"targets": { | ||
"build": { | ||
"executor": "@nrwl/webpack:webpack", | ||
"outputs": ["{options.outputPath}"], | ||
"options": { | ||
"assets": [], | ||
"compiler": "tsc", | ||
"main": "apps/guild-import-job/src/main.ts", | ||
"outputPath": "dist/apps/guild-import-job", | ||
"target": "node", | ||
"tsConfig": "apps/guild-import-job/tsconfig.app.json" | ||
}, | ||
"configurations": { | ||
"production": { | ||
"optimization": true, | ||
"extractLicenses": true, | ||
"inspect": false | ||
} | ||
} | ||
}, | ||
"serve": { | ||
"executor": "@nrwl/node:node", | ||
"options": { | ||
"buildTarget": "guild-import-job:build" | ||
}, | ||
"configurations": { | ||
"production": { | ||
"buildTarget": "guild-import-job:build:production" | ||
} | ||
} | ||
}, | ||
"lint": { | ||
"executor": "@nrwl/linter:eslint", | ||
"outputs": ["{options.outputFile}"], | ||
"options": { | ||
"lintFilePatterns": ["apps/guild-import-job/**/*.ts"] | ||
} | ||
}, | ||
"test": { | ||
"executor": "@nrwl/jest:jest", | ||
"outputs": ["coverage/apps/guild-import-job"], | ||
"options": { | ||
"jestConfig": "apps/guild-import-job/jest.config.ts", | ||
"passWithNoTests": true | ||
} | ||
} | ||
}, | ||
"tags": [] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
import { GovrnProtocol } from '@govrn/protocol-client'; | ||
import { GetGuildByIdResponse } from '@guildxyz/sdk'; | ||
|
||
const PROTOCOL_URL = process.env.PROTOCOL_URL; | ||
const GUILD_IMPORT_TOKEN = process.env.GUILD_IMPORT_TOKEN; | ||
const CHAIN_TYPE_ID = Number(process.env.CHAIN_TYPE_ID); | ||
|
||
const INTEGRATION_TYPE = 'Guild'; | ||
|
||
export const govrn = new GovrnProtocol(PROTOCOL_URL, null, { | ||
Authorization: GUILD_IMPORT_TOKEN, | ||
}); | ||
|
||
export const getMembershipStatusId = async (name: 'Member') => { | ||
const membershipStatus = await govrn.user.guildMembershipStatus.get({ | ||
where: { name }, | ||
}); | ||
return membershipStatus.id; | ||
}; | ||
|
||
export const updateImportStatus = async ({ | ||
importId: id, | ||
status, | ||
}: { | ||
importId: number; | ||
status: 'Pending' | 'Complete' | 'Failed'; | ||
}) => { | ||
return await govrn.guild.import.update({ | ||
where: { id }, | ||
data: { | ||
import_status: { | ||
connectOrCreate: { where: { name: status }, create: { name: status } }, | ||
}, | ||
}, | ||
}); | ||
}; | ||
|
||
export const getOrCreateGuild = async ( | ||
discordId: string, | ||
getGuildResponse: GetGuildByIdResponse | null, | ||
) => { | ||
if (!getGuildResponse) { | ||
throw new Error("Get guild response can't be null"); | ||
} | ||
|
||
let dbGuild = await govrn.guild.get({ | ||
discord_id: discordId, | ||
}); | ||
|
||
if (!dbGuild) { | ||
dbGuild = await govrn.guild.create({ | ||
data: { | ||
discord_id: discordId, | ||
name: getGuildResponse.name, | ||
logo: getGuildResponse.imageUrl, | ||
}, | ||
}); | ||
} | ||
return dbGuild; | ||
}; | ||
|
||
export const createGuildImport = async (guildId: number) => | ||
await govrn.guild.import.create({ | ||
data: { | ||
guild: { | ||
connect: { id: guildId }, | ||
}, | ||
import_status: { | ||
connectOrCreate: { | ||
where: { name: 'Pending' }, | ||
create: { name: 'Pending' }, | ||
}, | ||
}, | ||
integration_type: { | ||
connectOrCreate: { | ||
where: { name: INTEGRATION_TYPE }, | ||
create: { name: INTEGRATION_TYPE }, | ||
}, | ||
}, | ||
authentication_token: '', | ||
}, | ||
}); | ||
|
||
export const listMatchingUsers = async (addresses: string[]) => { | ||
const users = await govrn.user.list({ | ||
where: { | ||
address: { in: addresses }, | ||
}, | ||
}); | ||
|
||
return users; | ||
}; | ||
|
||
export const createManyUsers = (members: string[]) => | ||
govrn.user.createMany({ | ||
data: members.map(add => ({ | ||
address: add, | ||
chain_type_id: CHAIN_TYPE_ID, | ||
})), | ||
skipDuplicates: true, | ||
}); | ||
|
||
export const connectManyGuildUsers = ( | ||
members: { | ||
guild_id: number; | ||
user_id: number; | ||
membership_status_id: number; | ||
}[], | ||
) => | ||
govrn.guild.user.bulkCreate({ | ||
data: members, | ||
skipDuplicates: true, | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import { | ||
AckPolicy, | ||
connect, | ||
JetStreamManager, | ||
JsMsg, | ||
nanos, | ||
NatsConnection, | ||
StringCodec, | ||
} from 'nats'; | ||
|
||
/** | ||
* A helper function to setup a connection to NATS and create a stream to upload messages, | ||
* trying to connect to each server in the `servers` array. | ||
* Once connected, it creates a stream with the specified `streamName`. | ||
* | ||
* @param servers - an array of servers to connect. | ||
* @param streamName - the name (subject) of the stream to create. | ||
* @param work - a callback function to perform further actions with the connection. | ||
*/ | ||
export const setupNats = async ( | ||
servers: { servers?: string; port?: number }[], | ||
streamName: string, | ||
work: (conn: NatsConnection) => Promise<void>, | ||
) => { | ||
for (const v of servers) { | ||
try { | ||
const nc = await connect(v); | ||
console.log(`connected to ${nc.getServer()}`); | ||
|
||
// create a stream to upload messages | ||
const jsm = await nc.jetstreamManager(); | ||
|
||
if (await streamExists(streamName, jsm)) { | ||
console.log(`stream ${streamName} already exists`); | ||
} else { | ||
const subj = `${streamName}.*`; | ||
const streamCfg = await jsm.streams.add({ | ||
name: streamName, | ||
subjects: [subj], | ||
}); | ||
console.log(`created stream ${streamCfg}`); | ||
} | ||
|
||
// this promise indicates the client closed | ||
const isClosed = nc.closed(); | ||
// do something with the connection | ||
await work(nc); | ||
|
||
await nc.close(); | ||
// check if the close was OK | ||
const err = await isClosed; | ||
if (err) { | ||
console.log(`error closing:`, err); | ||
} | ||
} catch (err) { | ||
console.log(err); | ||
console.log(`error connecting to ${JSON.stringify(v)}`); | ||
} | ||
} | ||
}; | ||
|
||
const streamExists = async (name: string, jsm: JetStreamManager) => { | ||
try { | ||
await jsm.streams.info(name); | ||
return true; | ||
} catch (err) { | ||
return false; | ||
} | ||
}; | ||
|
||
/** | ||
* A helper function to write messages to a stream, creating a `JetStreamClient` | ||
* and publishing given `messages` to it. | ||
* | ||
* @param nc - the connection to use. | ||
* @param streamName - the name of the stream to write to. | ||
* @param messages | ||
*/ | ||
export const writeMessages = async ( | ||
nc: NatsConnection, | ||
streamName: string, | ||
messages: string[], | ||
) => { | ||
const js = nc.jetstream(); | ||
const sc = StringCodec(); | ||
|
||
for (const m of messages) { | ||
const pubAck = await js.publish(`${streamName}.row`, sc.encode(m)); | ||
console.log( | ||
`Published message ${m} to ${pubAck.stream}, seq ${pubAck.seq}`, | ||
); | ||
} | ||
}; | ||
|
||
/** | ||
* A helper function to pull messages from a stream, creating a `JetStreamClient` | ||
* and fetching messages from it. It will call the `callback` function for each | ||
* message received. | ||
* | ||
* @param nc | ||
* @param stream | ||
* @param durable - A unique identifier that helps `JetStream` maintain the | ||
* consumer's state across sessions. {@link https://docs.nats.io/legacy/stan/intro/channels/subscriptions/durable Concept} | ||
* @param callback | ||
* @param expires | ||
* @param batch | ||
*/ | ||
export const pullMessages = async ( | ||
nc: NatsConnection, | ||
stream: string, | ||
durable: string, | ||
callback: (nc: NatsConnection, msg: JsMsg) => Promise<void>, | ||
expires = 5000, | ||
batch = 10, | ||
) => { | ||
console.log(`:: PULLING MESSAGES FROM ${stream} WITH DURABLE ${durable}`); | ||
const js = nc.jetstream(); | ||
|
||
// create a pull-based, durable subscription. server will remember the last message | ||
// it sent and will resume from there. It's more efficient than using `fetch`. | ||
const subscription = await js.pullSubscribe(`${stream}.row`, { | ||
mack: true, | ||
config: { | ||
durable_name: durable, | ||
ack_policy: AckPolicy.Explicit, | ||
ack_wait: nanos(4000), | ||
}, | ||
}); | ||
|
||
const done = (async () => { | ||
await subscription.pull({ no_wait: true, batch, expires }); | ||
for await (const m of subscription) { | ||
await callback(nc, m); | ||
m.ack(); | ||
} | ||
})(); | ||
|
||
await done; | ||
subscription.unsubscribe(); | ||
}; |
Oops, something went wrong.