Skip to content

Commit

Permalink
fix!: remove @libp2p/components (#360)
Browse files Browse the repository at this point in the history
* chore!: update deps and configure dependabot

Updates all `@libp2p/*` deps to the latest versions and configures
dependabot - for some reason it wasn't turned on for dev deps.

BREAKING CHANGE: updates @libp2p/components to the latest major

* fix!: remove @libp2p/components

`@libp2p/components` is a choke-point for our dependency graph as it depends on every interface, meaning when one interface revs a major
`@libp2p/components` major has to change too which means every module depending on it also needs a major.

Switch instead to constructor injection of simple objects that let modules declare their dependencies on interfaces directly instead of
indirectly via `@libp2p/components`

Refs libp2p/js-libp2p-components#6

BREAKING CHANGE: modules no longer implement `Initializable` instead switching to constructor injection

* chore: update e2e test

Co-authored-by: Cayman <[email protected]>
  • Loading branch information
achingbrain and wemeetagain authored Oct 20, 2022
1 parent eb1a145 commit ac0ad41
Show file tree
Hide file tree
Showing 22 changed files with 1,337 additions and 948 deletions.
15 changes: 9 additions & 6 deletions .github/.dependabot.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
version: 2
updates:
- package-ecosystem: "npm"
allow:
# Allow both direct and indirect updates for all packages
- dependency-type: "production"
commit-message:
prefix: "chore: "
- package-ecosystem: npm
directory: "/"
schedule:
interval: daily
time: "10:00"
open-pull-requests-limit: 10
commit-message:
prefix: "deps"
prefix-development: "deps(dev)"
999 changes: 702 additions & 297 deletions package-lock.json

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,39 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/components": "^2.0.3",
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-connection-manager": "^1.3.0",
"@libp2p/interface-keys": "^1.0.3",
"@libp2p/interface-peer-id": "^1.0.4",
"@libp2p/interface-pubsub": "^2.0.1",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^1.1.15",
"@libp2p/peer-record": "^4.0.1",
"@libp2p/pubsub": "^3.1.2",
"@libp2p/pubsub": "^5.0.0",
"@libp2p/topology": "^3.0.0",
"abortable-iterator": "^4.0.2",
"denque": "^1.5.0",
"err-code": "^3.0.1",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4",
"it-pushable": "^3.1.0",
"multiformats": "^9.6.4",
"multiformats": "^10.0.0",
"protobufjs": "^6.11.2",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^3.0.0"
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@dapplion/benchmark": "^0.2.2",
"@libp2p/floodsub": "^3.0.5",
"@libp2p/interface-mocks": "^4.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^2.0.2",
"@libp2p/floodsub": "^5.0.0",
"@libp2p/interface-mocks": "^7.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-store": "^3.1.2",
"@libp2p/peer-store": "^5.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
Expand All @@ -115,6 +116,7 @@
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^2.0.2",
"lodash": "^4.17.15",
"mkdirp": "^1.0.4",
"os": "^0.1.1",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
Expand All @@ -127,7 +129,6 @@
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"mkdirp": "^1.0.4",
"util": "^0.12.3"
},
"engines": {
Expand Down
85 changes: 49 additions & 36 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
ToSendGroupCount
} from './metrics.js'
import {
MessageAcceptance,
MsgIdFn,
PublishConfig,
TopicStr,
Expand All @@ -51,7 +50,6 @@ import {
FastMsgIdFn,
AddrInfo,
DataTransform,
TopicValidatorFn,
rejectReasonFromAcceptance,
MsgIdToStrFn,
MessageId
Expand All @@ -61,7 +59,6 @@ import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
import type { GossipsubOptsSpec } from './config.js'
import { Components, Initializable } from '@libp2p/components'
import {
Message,
PublishResult,
Expand All @@ -70,14 +67,18 @@ import {
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerStore } from '@libp2p/interface-peer-store'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -209,7 +210,14 @@ interface AcceptFromWhitelistEntry {
acceptUntil: number
}

export class GossipSub extends EventEmitter<GossipsubEvents> implements Initializable, PubSub<GossipsubEvents> {
export interface GossipSubComponents {
peerId: PeerId
peerStore: PeerStore
registrar: Registrar
connectionManager: ConnectionManager
}

export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<GossipsubEvents> {
/**
* The signature policy to follow by default
*/
Expand Down Expand Up @@ -325,6 +333,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/** Peer score tracking */
public readonly score: PeerScore

/**
* Custom validator function per topic.
* Must return or resolve quickly (< 100ms) to prevent causing penalties for late messages.
* If you need to apply validation that may require longer times use `asyncValidation` option and callback the
* validation result through `Gossipsub.reportValidationResult`
*/
public readonly topicValidators = new Map<TopicStr, TopicValidatorFn>()

/**
Expand All @@ -338,7 +352,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*/
readonly gossipTracer: IWantTracer

private components = new Components()
private readonly components: GossipSubComponents

private directPeerInitial: ReturnType<typeof setTimeout> | null = null
private readonly log: Logger
Expand All @@ -361,7 +375,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
cancel: () => void
} | null = null

constructor(options: Partial<GossipsubOpts> = {}) {
constructor(components: GossipSubComponents, options: Partial<GossipsubOpts> = {}) {
super()

const opts = {
Expand Down Expand Up @@ -392,6 +406,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
}

this.components = components
this.decodeRpcLimits = opts.decodeRpcLimits ?? defaultDecodeRpcLimits

this.globalSignaturePolicy = opts.globalSignaturePolicy ?? StrictSign
Expand Down Expand Up @@ -473,7 +488,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* libp2p
*/
this.score = new PeerScore(this.opts.scoreParams, this.metrics, {
this.score = new PeerScore(components, this.opts.scoreParams, this.metrics, {
scoreCacheValidityMs: opts.heartbeatInterval
})

Expand All @@ -493,14 +508,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// LIFECYCLE METHODS

/**
* Pass libp2p components to interested system components
*/
async init(components: Components): Promise<void> {
this.components = components
this.score.init(components)
}

/**
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
Expand All @@ -513,7 +520,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

this.log('starting')

this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.getPeerId())
this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId)

// Create the outbound inflight queue
// This ensures that outbound stream creation happens sequentially
Expand All @@ -527,11 +534,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// set direct peer addresses in the address book
await Promise.all(
this.opts.directPeers.map(async (p) => {
await this.components.getPeerStore().addressBook.add(p.id, p.addrs)
await this.components.peerStore.addressBook.add(p.id, p.addrs)
})
)

const registrar = this.components.getRegistrar()
const registrar = this.components.registrar
// Incoming streams
// Called after a peer dials us
await Promise.all(
Expand Down Expand Up @@ -611,7 +618,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.status = { code: GossipStatusCode.stopped }

// unregister protocol and handlers
const registrar = this.components.getRegistrar()
const registrar = this.components.registrar
registrarTopologyIds.forEach((id) => registrar.unregister(id))

this.outboundInflightQueue.end()
Expand Down Expand Up @@ -1059,7 +1066,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// Dispatch the message to the user if we are subscribed to the topic
if (this.subscriptions.has(rpcMsg.topic)) {
const isFromSelf = this.components.getPeerId().equals(from)
const isFromSelf = this.components.peerId.equals(from)

if (!isFromSelf || this.opts.emitSelf) {
super.dispatchEvent(
Expand Down Expand Up @@ -1146,18 +1153,18 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// to not penalize peers for long validation times.
const topicValidator = this.topicValidators.get(rpcMsg.topic)
if (topicValidator != null) {
let acceptance: MessageAcceptance
let acceptance: TopicValidatorResult
// Use try {} catch {} in case topicValidator() is synchronous
try {
acceptance = await topicValidator(msg.topic, msg, propagationSource)
acceptance = await topicValidator(propagationSource, msg)
} catch (e) {
const errCode = (e as { code: string }).code
if (errCode === constants.ERR_TOPIC_VALIDATOR_IGNORE) acceptance = MessageAcceptance.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_REJECT) acceptance = MessageAcceptance.Reject
else acceptance = MessageAcceptance.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_IGNORE) acceptance = TopicValidatorResult.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_REJECT) acceptance = TopicValidatorResult.Reject
else acceptance = TopicValidatorResult.Ignore
}

if (acceptance !== MessageAcceptance.Accept) {
if (acceptance !== TopicValidatorResult.Accept) {
return { code: MessageStatus.invalid, reason: rejectReasonFromAcceptance(acceptance), msgIdStr }
}
}
Expand Down Expand Up @@ -1619,7 +1626,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.log("bogus peer record obtained through px: peer ID %p doesn't match expected peer %p", eid, p)
return
}
if (!(await this.components.getPeerStore().addressBook.consumePeerRecord(envelope))) {
if (!(await this.components.peerStore.addressBook.consumePeerRecord(envelope))) {
this.log('bogus peer record obtained through px: could not add peer record to address book')
return
}
Expand All @@ -1643,9 +1650,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
private async connect(id: PeerIdStr): Promise<void> {
this.log('Initiating connection with %s', id)
const peerId = peerIdFromString(id)
const connection = await this.components.getConnectionManager().openConnection(peerId)
const connection = await this.components.connectionManager.openConnection(peerId)
for (const multicodec of this.multicodecs) {
for (const topology of this.components.getRegistrar().getTopologies(multicodec)) {
for (const topology of this.components.registrar.getTopologies(multicodec)) {
topology.onConnect(peerId, connection)
}
}
Expand Down Expand Up @@ -2006,12 +2013,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// Dispatch the message to the user if we are subscribed to the topic
if (willSendToSelf) {
tosend.add(this.components.getPeerId().toString())
tosend.add(this.components.peerId.toString())

super.dispatchEvent(
new CustomEvent<GossipsubMessage>('gossipsub:message', {
detail: {
propagationSource: this.components.getPeerId(),
propagationSource: this.components.peerId,
msgId: msgIdStr,
msg
}
Expand Down Expand Up @@ -2047,8 +2054,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*
* This should only be called once per message.
*/
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: MessageAcceptance): void {
if (acceptance === MessageAcceptance.Accept) {
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: TopicValidatorResult): void {
if (acceptance === TopicValidatorResult.Accept) {
const cacheEntry = this.mcache.validate(msgId)
this.metrics?.onReportValidationMcacheHit(cacheEntry !== null)

Expand Down Expand Up @@ -2340,7 +2347,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

return {
peerID: id.toBytes(),
signedPeerRecord: await this.components.getPeerStore().addressBook.getRawEnvelope(id)
signedPeerRecord: await this.components.peerStore.addressBook.getRawEnvelope(id)
}
})
)
Expand Down Expand Up @@ -2821,3 +2828,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
metrics.registerScoreWeights(sw)
}
}

export function gossipsub(
init: Partial<GossipsubOpts> = {}
): (components: GossipSubComponents) => PubSub<GossipsubEvents> {
return (components: GossipSubComponents) => new GossipSub(components, init)
}
15 changes: 4 additions & 11 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import { TopicValidatorResult } from '@libp2p/interface-pubsub'
import type { IRPC } from './message/rpc.js'
import type { PeerScoreThresholds } from './score/peer-score-thresholds.js'
import {
MessageAcceptance,
MessageStatus,
PeerIdStr,
RejectReason,
RejectReasonObj,
TopicStr,
ValidateError
} from './types.js'
import { MessageStatus, PeerIdStr, RejectReason, RejectReasonObj, TopicStr, ValidateError } from './types.js'

/** Topic label as provided in `topicStrToLabel` */
export type TopicLabel = string
Expand Down Expand Up @@ -241,7 +234,7 @@ export function getMetrics(
/** Message validation results for each topic.
* Invalid == Reject?
* = rust-libp2p `invalid_messages`, `accepted_messages`, `ignored_messages`, `rejected_messages` */
asyncValidationResult: register.gauge<{ topic: TopicLabel; acceptance: MessageAcceptance }>({
asyncValidationResult: register.gauge<{ topic: TopicLabel; acceptance: TopicValidatorResult }>({
name: 'gossipsub_async_validation_result_total',
help: 'Message validation result for each topic',
labelNames: ['topic', 'acceptance']
Expand Down Expand Up @@ -543,7 +536,7 @@ export function getMetrics(
this.asyncValidationMcacheHit.inc({ hit: hit ? 'hit' : 'miss' })
},

onReportValidation(topicStr: TopicStr, acceptance: MessageAcceptance): void {
onReportValidation(topicStr: TopicStr, acceptance: TopicValidatorResult): void {
const topic = this.toTopic(topicStr)
this.asyncValidationResult.inc({ topic: topic, acceptance })
},
Expand Down
Loading

0 comments on commit ac0ad41

Please sign in to comment.