Skip to content

Commit

Permalink
Merge pull request #785 from oceanprotocol/issue-780-rate-limmits
Browse files Browse the repository at this point in the history
Refactor connections rate, rate per minute instead of second
  • Loading branch information
paulo-ocean authored Jan 9, 2025
2 parents cb3bcb1 + 4da190c commit bc7edcc
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export INDEXER_INTERVAL=
export ALLOWED_ADMINS=
export DASHBOARD=true
export RATE_DENY_LIST=
export MAX_REQ_PER_SECOND=
export MAX_REQ_PER_MINUTE=
export MAX_CHECKSUM_LENGTH=
export LOG_LEVEL=
export HTTP_API_PORT=
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ jobs:
P2P_ENABLE_AUTONAT: 'false'
ALLOWED_ADMINS: '["0xe2DD09d719Da89e5a3D0F2549c7E24566e947260"]'
DB_TYPE: 'elasticsearch'
MAX_REQ_PER_MINUTE: 320
MAX_CONNECTIONS_PER_MINUTE: 320
- name: Check Ocean Node is running
run: |
for i in $(seq 1 90); do
Expand Down
2 changes: 1 addition & 1 deletion docs/dockerDeployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ services:
# INDEXER_INTERVAL: ''
DASHBOARD: 'true'
# RATE_DENY_LIST: ''
# MAX_REQ_PER_SECOND: ''
# MAX_REQ_PER_MINUTE: ''
# MAX_CHECKSUM_LENGTH: ''
# LOG_LEVEL: ''
HTTP_API_PORT: '8000'
Expand Down
3 changes: 2 additions & 1 deletion docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `ALLOWED_ADMINS`: Sets the public address of accounts which have access to admin endpoints e.g. shutting down the node. Example: `"[\"0x967da4048cD07aB37855c090aAF366e4ce1b9F48\",\"0x388C818CA8B9251b393131C08a736A67ccB19297\"]"`
- `DASHBOARD`: If `false` the dashboard will not run. If not set or `true` the dashboard will start with the node. Example: `false`
- `RATE_DENY_LIST`: Blocked list of IPs and peer IDs. Example: `"{ \"peers\": [\"16Uiu2HAkuYfgjXoGcSSLSpRPD6XtUgV71t5RqmTmcqdbmrWY9MJo\"], \"ips\": [\"127.0.0.1\"] }"`
- `MAX_REQ_PER_SECOND`: Number of requests per second allowed by the same client. Example: `3`
- `MAX_REQ_PER_MINUTE`: Number of requests per minute allowed by the same client (IP or Peer id). Example: `30`
- `MAX_CONNECTIONS_PER_MINUTE`: Max number of requests allowed per minute (all clients). Example: `120`
- `MAX_CHECKSUM_LENGTH`: Define the maximum length for a file if checksum is required (Mb). Example: `10`
- `IS_BOOTSTRAP`: Is this node to be used as bootstrap node or not. Default is `false`.

Expand Down
3 changes: 2 additions & 1 deletion scripts/ocean-node-quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ services:
# INDEXER_INTERVAL: ''
DASHBOARD: 'true'
# RATE_DENY_LIST: ''
# MAX_REQ_PER_SECOND: ''
# MAX_REQ_PER_MINUTE: ''
# MAX_CONNECTIONS_PER_MINUTE: ''
# MAX_CHECKSUM_LENGTH: ''
# LOG_LEVEL: ''
HTTP_API_PORT: '$HTTP_API_PORT'
Expand Down
3 changes: 2 additions & 1 deletion src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ export interface OceanNodeConfig {
assetPurgatoryUrl: string
allowedAdmins?: string[]
codeHash?: string
rateLimit?: number
rateLimit?: number // per request ip or peer
maxConnections?: number // global, regardless of client address(es)
denyList?: DenyList
unsafeURLs?: string[]
isBootstrap?: boolean
Expand Down
21 changes: 21 additions & 0 deletions src/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import { pipe } from 'it-pipe'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from './utils/logging/Logger.js'
import { Handler } from './components/core/handler/handler.js'
import { C2DEngines } from './components/c2d/compute_engines.js'

export interface RequestLimiter {
requester: string | string[] // IP address or peer ID
lastRequestTime: number // time of the last request done (in miliseconds)
numRequests: number // number of requests done in the specific time period
}

export interface RequestDataCheck {
valid: boolean
updatedRequestData: RequestLimiter
}
export class OceanNode {
// eslint-disable-next-line no-use-before-define
private static instance: OceanNode
Expand All @@ -20,6 +31,7 @@ export class OceanNode {
private c2dEngines: C2DEngines
// requester
private remoteCaller: string | string[]
private requestMap: Map<string, RequestLimiter>
// eslint-disable-next-line no-useless-constructor
private constructor(
private db?: Database,
Expand All @@ -28,6 +40,7 @@ export class OceanNode {
private indexer?: OceanIndexer
) {
this.coreHandlers = CoreHandlersRegistry.getInstance(this)
this.requestMap = new Map<string, RequestLimiter>()
if (node) {
node.setCoreHandlers(this.coreHandlers)
}
Expand Down Expand Up @@ -95,6 +108,14 @@ export class OceanNode {
return this.remoteCaller
}

public getRequestMapSize(): number {
return this.requestMap.size
}

public getRequestMap(): Map<string, RequestLimiter> {
return this.requestMap
}

/**
* Use this method to direct calls to the node as node cannot dial into itself
* @param message command message
Expand Down
49 changes: 47 additions & 2 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import StreamConcat from 'stream-concat'
import { Handler } from '../core/handler/handler.js'
import { getConfiguration } from '../../utils/index.js'
import { checkConnectionsRateLimit } from '../httpRoutes/requestValidator.js'
import { CONNECTIONS_RATE_INTERVAL } from '../../utils/constants.js'
import { RequestLimiter } from '../../OceanNode.js'

// hold data about last request made
const connectionsData: RequestLimiter = {
lastRequestTime: Date.now(),
requester: '',
numRequests: 0
}

export class ReadableString extends Readable {
private sent = false
Expand Down Expand Up @@ -60,10 +70,14 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
return status
}

const denyList = await (await getConfiguration()).denyList
const configuration = await getConfiguration()
// check deny list configs
const { denyList } = configuration
if (denyList.peers.length > 0) {
if (denyList.peers.includes(remotePeer.toString())) {
P2P_LOGGER.error(`Incoming request denied to peer: ${remotePeer}`)
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (peer its on deny list)`
)

if (connectionStatus === 'open') {
statusStream = new ReadableString(
Expand All @@ -79,6 +93,37 @@ export async function handleProtocolCommands(otherPeerConnection: any) {
return
}
}
// check connections rate limit
const requestTime = Date.now()
if (requestTime - connectionsData.lastRequestTime > CONNECTIONS_RATE_INTERVAL) {
// last one was more than 1 minute ago? reset counter
connectionsData.numRequests = 0
}
// always increment counter
connectionsData.numRequests += 1
// update time and requester information
connectionsData.lastRequestTime = requestTime
connectionsData.requester = remoteAddr

// check global rate limits (not ip related)
const requestRateValidation = checkConnectionsRateLimit(configuration, connectionsData)
if (!requestRateValidation.valid) {
P2P_LOGGER.warn(
`Incoming request denied to peer: ${remotePeer} (rate limit exceeded)`
)
if (connectionStatus === 'open') {
statusStream = new ReadableString(
JSON.stringify(buildWrongCommandStatus(403, 'Rate limit exceeded'))
)
try {
await pipe(statusStream, otherPeerConnection.stream.sink)
} catch (e) {
P2P_LOGGER.error(e)
}
}
await closeStreamConnection(otherPeerConnection.connection, remotePeer)
return
}

try {
// eslint-disable-next-line no-unreachable-loop
Expand Down
57 changes: 27 additions & 30 deletions src/components/core/handler/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import { OceanNode } from '../../../OceanNode.js'
import { OceanNode, RequestDataCheck, RequestLimiter } from '../../../OceanNode.js'
import { Command, ICommandHandler } from '../../../@types/commands.js'
import {
ValidateParams,
Expand All @@ -9,23 +9,12 @@ import {
import { getConfiguration } from '../../../utils/index.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'
import { ReadableString } from '../../P2P/handlers.js'
import { CONNECTION_HISTORY_DELETE_THRESHOLD } from '../../../utils/constants.js'

export interface RequestLimiter {
requester: string | string[] // IP address or peer ID
lastRequestTime: number // time of the last request done (in miliseconds)
numRequests: number // number of requests done in the specific time period
}

export interface RequestDataCheck {
valid: boolean
updatedRequestData: RequestLimiter
}
export abstract class Handler implements ICommandHandler {
private nodeInstance?: OceanNode
private requestMap: Map<string, RequestLimiter>
private nodeInstance: OceanNode
public constructor(oceanNode: OceanNode) {
this.nodeInstance = oceanNode
this.requestMap = new Map<string, RequestLimiter>()
}

abstract validate(command: Command): ValidateParams
Expand All @@ -38,62 +27,69 @@ export abstract class Handler implements ICommandHandler {

// TODO LOG, implement all handlers
async checkRateLimit(): Promise<boolean> {
const ratePerSecond = (await getConfiguration()).rateLimit
const requestMap = this.getOceanNode().getRequestMap()
const ratePerMinute = (await getConfiguration()).rateLimit
const caller: string | string[] = this.getOceanNode().getRemoteCaller()
const requestTime = new Date().getTime()
let isOK = true

// we have to clear this from time to time, so it does not grow forever
if (requestMap.size > CONNECTION_HISTORY_DELETE_THRESHOLD) {
CORE_LOGGER.info('Request history reached threeshold, cleaning cache...')
requestMap.clear()
}

const self = this
// common stuff
const updateRequestData = function (remoteCaller: string) {
const updatedRequestData = self.checkRequestData(
remoteCaller,
requestTime,
ratePerSecond
ratePerMinute
)
isOK = updatedRequestData.valid
self.requestMap.set(remoteCaller, updatedRequestData.updatedRequestData)
requestMap.set(remoteCaller, updatedRequestData.updatedRequestData)
}

let data: RequestLimiter = null
if (Array.isArray(caller)) {
for (const remote of caller) {
if (!this.requestMap.has(remote)) {
if (!requestMap.has(remote)) {
data = {
requester: remote,
lastRequestTime: requestTime,
numRequests: 1
}
this.requestMap.set(remote, data)
requestMap.set(remote, data)
} else {
updateRequestData(remote)
}
// do not proceed any further
if (!isOK) {
CORE_LOGGER.warn(
`Request denied (rate limit exceeded) for remote caller ${remote}. Current request map: ${JSON.stringify(
this.requestMap.get(remote)
requestMap.get(remote)
)}`
)
return false
}
}
} else {
if (!this.requestMap.has(caller)) {
if (!requestMap.has(caller)) {
data = {
requester: caller,
lastRequestTime: requestTime,
numRequests: 1
}
this.requestMap.set(caller, data)
requestMap.set(caller, data)
return true
} else {
updateRequestData(caller)
// log if request was denied
if (!isOK) {
CORE_LOGGER.warn(
`Request denied (rate limit exceeded) for remote caller ${caller}. Current request map: ${JSON.stringify(
this.requestMap.get(caller)
requestMap.get(caller)
)}`
)
}
Expand All @@ -105,18 +101,19 @@ export abstract class Handler implements ICommandHandler {
/**
* Checks if the request is within the rate limit defined
* @param remote remote endpoint (ip or peer identifier)
* @param ratePerSecond number of calls per second allowed
* @param ratePerMinute number of calls per minute allowed (per ip or peer identifier)
* @returns updated request data
*/
checkRequestData(
remote: string,
currentTime: number,
ratePerSecond: number
ratePerMinute: number
): RequestDataCheck {
const requestData: RequestLimiter = this.requestMap.get(remote)
const diffSeconds = (currentTime - requestData.lastRequestTime) / 1000
// more than 1 sec difference means no problem
if (diffSeconds >= 1) {
const requestMap = this.getOceanNode().getRequestMap()
const requestData: RequestLimiter = requestMap.get(remote)
const diffMinutes = ((currentTime - requestData.lastRequestTime) / 1000) * 60
// more than 1 minute difference means no problem
if (diffMinutes >= 1) {
// its fine
requestData.lastRequestTime = currentTime
requestData.numRequests = 1
Expand All @@ -128,7 +125,7 @@ export abstract class Handler implements ICommandHandler {
// requests in the same interval of 1 second
requestData.numRequests++
return {
valid: requestData.numRequests <= ratePerSecond,
valid: requestData.numRequests <= ratePerMinute,
updatedRequestData: requestData
}
}
Expand Down
Loading

0 comments on commit bc7edcc

Please sign in to comment.