Skip to content

Commit 94e03c6

Browse files
Merge pull request #164 from EYBlockchain/dattatray/bem-integration
Bem integration with timber
2 parents 18ee46f + 6505a40 commit 94e03c6

14 files changed

Lines changed: 1754 additions & 347 deletions

merkle-tree/package-lock.json

Lines changed: 1058 additions & 310 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

merkle-tree/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
"express-promise-router": "^3.0.3",
2424
"fs-extra": "^8.1.0",
2525
"hex-to-binary": "^1.0.1",
26+
"ioredis": "^5.6.1",
2627
"jsonfile": "^6.1.0",
28+
"jsonwebtoken": "^9.0.2",
29+
"kafkajs": "^2.2.4",
2730
"keccak": "^2.1.0",
2831
"mongoose": "^6.13.6",
2932
"request": "^2.88.2",
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import axios from "axios";
2+
import logger from "../../logger";
3+
import { BemException, BemConnectionError } from "./exceptions";
4+
import { redisClient } from "../infra/redis/redis-client";
5+
6+
const CONTRACT_METADATA_STORE = 'contract-metadata';
7+
8+
const handleBemException = (exception) => {
9+
if (exception?.response?.data) {
10+
throw new BemException(exception?.response?.data);
11+
}
12+
throw new BemConnectionError(exception);
13+
}
14+
15+
export const subscribeToBemEvents = async (contractAddress, eventSpecification) => {
16+
try {
17+
logger.info(`Subscribing to bem with ${contractAddress}`);
18+
19+
const context = JSON.parse(await redisClient.hget(CONTRACT_METADATA_STORE, contractAddress) || '{}')?.context;
20+
if (!context) throw new Error(`Context not found in Redis for ${contractAddress}`);
21+
22+
logger.debug('Fetched context details from Redis: ');
23+
24+
const axiosConfig = {
25+
method: "post",
26+
url: `${process.env.BEM_ENDPOINT}/contract-event/subscribe`,
27+
headers: { context: JSON.stringify(context) }, timeout: 3600000,
28+
data: {
29+
productIdentifier: 'ocm',
30+
contractAddress,
31+
eventSpecification,
32+
callbackTopicSuffix: process.env.CALLBACK_TOPIC_SUFFIX
33+
},
34+
};
35+
await axios(axiosConfig);
36+
} catch (error) {
37+
handleBemException(error);
38+
}
39+
};
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
export class GeneralConnectionError extends Error {
2+
constructor(error) {
3+
let errorMessage;
4+
5+
if (typeof error === "string") {
6+
errorMessage = error;
7+
} else if (error.message) {
8+
errorMessage = error.message;
9+
} else {
10+
errorMessage = "Something Went Wrong!";
11+
}
12+
13+
super(errorMessage);
14+
this.name = 'GeneralConnectionError';
15+
this.errorCode = 500;
16+
this.errorType = {
17+
stage: "EXECUTION_ERROR"
18+
};
19+
this.errorInformation = error;
20+
}
21+
}
22+
23+
export class BemException extends Error {
24+
constructor(errorInformation) {
25+
let errorMessage;
26+
27+
if (typeof errorInformation === "string") {
28+
errorMessage = errorInformation;
29+
} else if (errorInformation.error?.message) {
30+
errorMessage = errorInformation.error?.message;
31+
} else {
32+
errorMessage = "Something Went Wrong!";
33+
}
34+
35+
super(errorMessage);
36+
this.name = 'BemException';
37+
this.errorCode = 500;
38+
this.errorInformation = errorInformation;
39+
this.errorType = {
40+
stage: "EXECUTION_ERROR"
41+
};
42+
}
43+
}
44+
45+
export class BemConnectionError extends GeneralConnectionError {
46+
constructor(errorInformation) {
47+
super(errorInformation);
48+
this.name = 'BemConnectionError';
49+
}
50+
}
51+
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { KafkaConsumer } from "./kafka-consumer";
2+
import { KafkaMessageProcessingError } from "../kafka-exceptions";
3+
import logger from "../../../../logger";
4+
import { LeafService, MetadataService } from '../../../../db/service';
5+
import { redisClient } from "../../redis/redis-client";
6+
import config from 'config';
7+
import adminDbConnection from '../../../../db/common/adminDbConnection';
8+
import DB from '../../../../db/mongodb/db';
9+
const { admin } = config.get('mongo');
10+
11+
const CONTRACT_METADATA_STORE = 'contract-metadata';
12+
13+
export class BemConsumer extends KafkaConsumer {
14+
constructor() {
15+
super(
16+
`${process.env.BEM_KAFKA_TOPIC}${process.env.CALLBACK_TOPIC_SUFFIX}`,
17+
process.env.BEM_KAFKA_GROUP_ID,
18+
+process.env.BEM_KAFKA_SESSION_TIMEOUT
19+
);
20+
}
21+
22+
async handleIncomingMessage(parsedMessage) {
23+
const { eventLog } = parsedMessage;
24+
const blockNumber = Number(parsedMessage.blockNumber || 0);
25+
const contractAddress = eventLog.address;
26+
const eventName = eventLog.name;
27+
28+
logger.info(`Processing message for contract: ${contractAddress}`);
29+
30+
try {
31+
const contractDetails = await this.getContractMetadataFromRedis(contractAddress);
32+
if (!contractDetails) {
33+
logger.error(`No contract details found in Redis for ${contractAddress}`);
34+
return;
35+
}
36+
37+
const db = await this.getDbInstanceForContract(contractDetails);
38+
logger.info(`DB instance initialized for contract: ${contractDetails.contractName}`);
39+
40+
switch (eventName) {
41+
case 'NewLeaves':
42+
await this.handleNewLeavesEvent(eventLog, blockNumber, db);
43+
break;
44+
case 'NewLeaf':
45+
await this.handleNewLeafEvent(eventLog, blockNumber, db);
46+
break;
47+
default:
48+
logger.warn(`Unhandled event name: ${eventName}`);
49+
}
50+
51+
logger.info(`Finished processing event: ${eventName} for contract: ${contractAddress}`);
52+
} catch (error) {
53+
logger.error('Error in processing Kafka message', error);
54+
throw new KafkaMessageProcessingError(error);
55+
}
56+
}
57+
58+
59+
async handleNewLeavesEvent(eventLog, blockNumber, db) {
60+
try {
61+
logger.debug(`Handling 'NewLeaves': inserting leaves into Merkle tree`);
62+
const eventInstance = this.buildEventPayload(eventLog);
63+
const { minLeafIndex, leafValues } = eventInstance;
64+
65+
const metadataService = new MetadataService(db);
66+
const { treeHeight } = await metadataService.getTreeHeight();
67+
68+
const leaves = leafValues.map((leafValue, index) => ({
69+
value: leafValue.toString(),
70+
leafIndex: Number(minLeafIndex) + index,
71+
blockNumber
72+
}));
73+
74+
const leafService = new LeafService(db);
75+
leafService.insertLeaves(treeHeight, leaves);
76+
logger.info(`Inserted ${leaves.length} leaves into DB`);
77+
} catch (error) {
78+
logger.error(`Error in handling 'NewLeaves': `, error);
79+
throw new Error(`Error in handling 'NewLeaves': `, error);
80+
}
81+
82+
}
83+
84+
85+
async handleNewLeafEvent(eventLog, blockNumber, db) {
86+
try {
87+
logger.debug(`Handling 'NewLeaf': inserting leaf into Merkle tree`);
88+
const eventInstance = this.buildEventPayload(eventLog);
89+
let { leafIndex, leafValue } = eventInstance;
90+
91+
const metadataService = new MetadataService(db);
92+
const { treeHeight } = await metadataService.getTreeHeight();
93+
94+
leafIndex = Number(eventInstance.leafIndex);
95+
const leaf = {
96+
value: leafValue.toString(),
97+
leafIndex,
98+
blockNumber
99+
};
100+
101+
const leafService = new LeafService(db);
102+
leafService.insertLeaf(treeHeight, leaf);
103+
logger.info(`Inserted single leaf at index ${leafIndex} into DB`);
104+
} catch (error) {
105+
logger.error(`Error in handling 'NewLeaf': `, error);
106+
throw new Error(`Error handling 'NewLeaf': `, error);
107+
}
108+
109+
}
110+
111+
112+
buildEventPayload(eventLog) {
113+
const eventName = eventLog.name;
114+
const eventConfig = config.contracts['default'].events[eventName];
115+
116+
if (!eventConfig) {
117+
throw new Error(`No config found for event ${eventName}`);
118+
}
119+
120+
const payload = {};
121+
eventConfig.parameters.forEach(param => {
122+
payload[param] = eventLog.returnValues[param];
123+
});
124+
125+
logger.debug(`Built event payload for ${eventName}: ${JSON.stringify(payload)}`);
126+
return payload;
127+
}
128+
129+
130+
async getDbInstanceForContract({ contractId, contractName }) {
131+
try {
132+
logger.debug(`Creating DB instance for contractId: ${contractId}, contractName: ${contractName}`);
133+
const treeId = '';
134+
return new DB(adminDbConnection, admin, contractName, treeId, contractId);
135+
} catch (error) {
136+
logger.error(`Failed to create DB instance for contractId: ${contractId}. Error: ${error.message}`);
137+
throw new Error(`Failed to create DB instance for contractId: ${contractId}. Error: ${error.message}`);
138+
}
139+
}
140+
141+
142+
async getContractMetadataFromRedis(contractAddress) {
143+
try {
144+
logger.debug(`Fetching contract metadata from Redis for address: ${contractAddress}`);
145+
const data = await redisClient.hget(CONTRACT_METADATA_STORE, contractAddress);
146+
if (!data) return null;
147+
148+
const parsed = typeof data === 'string' ? JSON.parse(data) : data;
149+
logger.info('Fetched contract metadata from from Redis: ');
150+
return parsed;
151+
} catch (e) {
152+
logger.error(`Invalid Redis data for ${contractAddress}`);
153+
return null;
154+
}
155+
}
156+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import logger from '../../../../logger';
2+
import { KafkaMessageProcessingError } from '../kafka-exceptions';
3+
import { createKafkaClient } from "../kafka-client";
4+
5+
export class KafkaConsumer {
6+
constructor(topic, groupId, sessionTimeout = 300000) {
7+
this.topic = topic;
8+
this.groupId = groupId;
9+
this.sessionTimeout = sessionTimeout;
10+
this.started = false;
11+
}
12+
13+
async start() {
14+
while (!this.started) {
15+
logger.info('Starting Kafka consumer...');
16+
try {
17+
this.kafka = await createKafkaClient();
18+
this.consumer = this.kafka.consumer({
19+
groupId: this.groupId,
20+
isolationLevel: 'read_committed',
21+
sessionTimeout: this.sessionTimeout,
22+
});
23+
24+
this.consumer.on('consumer.crash', async (payload) => {
25+
logger.warn('Kafka Consumer crashed', payload);
26+
if (!payload.restart) {
27+
await this.disconnect();
28+
await this.run();
29+
}
30+
});
31+
32+
await this.run();
33+
} catch (error) {
34+
logger.error('Failed to connect and consume from Kafka', JSON.stringify(error));
35+
await new Promise((resolve) => setTimeout(() => resolve(), 5000));
36+
}
37+
}
38+
}
39+
40+
async run() {
41+
try {
42+
await this.consumer.connect();
43+
logger.info('Connected to Kafka');
44+
45+
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true });
46+
logger.info(`Subscribed to the topic ${this.topic}`);
47+
48+
await this.consumer.run({
49+
eachBatchAutoResolve: false,
50+
autoCommit: false,
51+
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale, commitOffsetsIfNecessary }) => {
52+
for (const message of batch.messages) {
53+
if (!isRunning() || isStale()) break;
54+
try {
55+
if (this.topic === batch.topic) {
56+
const parsedMessage = JSON.parse(message.value.toString());
57+
await this.handleIncomingMessage(parsedMessage);
58+
await commitOffsetsIfNecessary({
59+
topics: [{
60+
topic: batch.topic,
61+
partitions: [{
62+
partition: batch.partition,
63+
offset: `${Number(message.offset) + 1}`
64+
}]
65+
}]
66+
});
67+
resolveOffset(message.offset);
68+
await heartbeat();
69+
}
70+
} catch (error) {
71+
logger.error(`Error processing message at offset ${message.offset} in partition ${batch.partition}`, error);
72+
throw new KafkaMessageProcessingError(error);
73+
}
74+
}
75+
}
76+
});
77+
78+
this.started = true;
79+
} catch (error) {
80+
logger.error('Error running Kafka consumer', error);
81+
}
82+
}
83+
84+
async disconnect() {
85+
try {
86+
if (this.consumer) {
87+
await this.consumer.disconnect();
88+
this.started = false;
89+
logger.info('Disconnected from Kafka');
90+
}
91+
} catch (error) {
92+
logger.error('Error disconnecting from Kafka', error);
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)