Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions local-network/deploy/src/waves-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export type LibraryWavesApi = ReturnType<typeof waves.create>;
export type ExtendedWavesApi = LibraryWavesApi & { base: string };
export type WavesSignedTransaction = SignedTransaction<any> & { id: string };

export interface EcBlockContractInfo {
export interface BlockContractInfo {
chainHeight: number,
epochNumber: number
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export async function signAndBroadcast(wavesApi: ExtendedWavesApi, name: string,
if (options.wait) await waitForTxn(wavesApi, id).then(x => logger.debug(`Sent %O result: %O`, unsignedTxJson, x));
}

function parseBlockMeta(response: object): EcBlockContractInfo {
function parseBlockMeta(response: object): BlockContractInfo {
// @ts-ignore: Property 'value' does not exist on type 'object'.
const rawMeta = response.result.value;
return {
Expand All @@ -135,7 +135,7 @@ function parseBlockMeta(response: object): EcBlockContractInfo {
};
}

export async function waitForEcBlock(wavesApi: LibraryWavesApi, chainContractAddress: string, blockHash: string): Promise<EcBlockContractInfo> {
export async function waitForBlock(wavesApi: LibraryWavesApi, chainContractAddress: string, blockHash: string): Promise<BlockContractInfo> {
const getBlockData = async () => {
try {
return parseBlockMeta(await wavesApi.utils.fetchEvaluate(chainContractAddress, `blockMeta("${blockHash.slice(2)}")`));
Expand Down Expand Up @@ -168,7 +168,7 @@ export function prepareE2CWithdrawTxnJson(
};
}

export async function chainContractCurrFinalizedBlock(wavesApi: LibraryWavesApi, chainContractAddress: string): Promise<EcBlockContractInfo> {
export async function chainContractCurrFinalizedBlock(wavesApi: LibraryWavesApi, chainContractAddress: string): Promise<BlockContractInfo> {
// @ts-ignore: Property 'value' does not exist on type 'object'.
return parseBlockMeta(await wavesApi.utils.fetchEvaluate(chainContractAddress, `blockMeta(getStringValue("finalizedBlock"))`));
}
23 changes: 0 additions & 23 deletions local-network/deploy/test/miner_big-join.ts

This file was deleted.

2 changes: 1 addition & 1 deletion local-network/deploy/test/transfer-e2c.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ if (rawLogsInBlock.length == 0) throw new Error(`Can't find logs in ${blockHash}
const logsInBlock = rawLogsInBlock as Exclude<typeof rawLogsInBlock[number], string>[];

logger.info(`Waiting EL block ${blockHash} confirmation on CL`);
const withdrawBlockMeta = await waves.utils.waitForEcBlock(waves.wavesApi1, waves.chainContractAddress, blockHash);
const withdrawBlockMeta = await waves.utils.waitForBlock(waves.wavesApi1, waves.chainContractAddress, blockHash);
logger.info(`Withdraw block meta: %O`, withdrawBlockMeta);

let rawData: string[] = [];
Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/units/BlockHash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import play.api.libs.json.{Format, Reads, Writes}
import supertagged.TaggedType

object BlockHash extends TaggedType[String] {

val BytesSize: Int = 32
val HexSize: Int = 66

def apply(hex: String): BlockHash = {
require(hex.startsWith("0x"), "Expected hash to start with 0x")
require(hex.length == 66, s"Expected hash size of 66, got: ${hex.length}. Hex: $hex") // "0x" + 32 bytes
require(hex.length == HexSize, s"Expected hash size of $HexSize, got: ${hex.length}. Hex: $hex") // "0x" + 32 bytes
BlockHash @@ hex
}

def apply(xs: ByteStr): BlockHash = BlockHash @@ HexBytesConverter.toHex(xs)

def apply(xs: Array[Byte]): BlockHash = {
require(xs.length == 32, "Block hash size must be 32 bytes")
require(xs.length == BytesSize, s"Block hash size must be $BytesSize bytes")
BlockHash @@ HexBytesConverter.toHex(xs)
}

Expand Down
1 change: 1 addition & 0 deletions src/main/scala/units/ClientError.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package units

// TODO: maybe remove?
case class ClientError(message: String)
35 changes: 21 additions & 14 deletions src/main/scala/units/ConsensusClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import net.ceedubs.ficus.Ficus.*
import org.slf4j.LoggerFactory
import sttp.client3.HttpClientSyncBackend
import units.client.JwtAuthenticationBackend
import units.client.contract.{ChainContractClient, ChainContractStateClient}
import units.client.engine.{EngineApiClient, HttpEngineApiClient, LoggedEngineApiClient}
import units.network.*

Expand All @@ -27,8 +28,8 @@ class ConsensusClient(
config: ClientConfig,
context: ExtensionContext,
engineApiClient: EngineApiClient,
blockObserver: BlocksObserver,
allChannels: DefaultChannelGroup,
chainContractClient: ChainContractClient,
payloadObserver: PayloadObserver,
globalScheduler: Scheduler,
eluScheduler: Scheduler,
ownedResources: AutoCloseable
Expand All @@ -40,8 +41,8 @@ class ConsensusClient(
deps.config,
context,
deps.engineApiClient,
deps.blockObserver,
deps.allChannels,
deps.chainContractClient,
deps.payloadObserver,
deps.globalScheduler,
deps.eluScheduler,
deps
Expand All @@ -52,25 +53,25 @@ class ConsensusClient(
private[units] val elu =
new ELUpdater(
engineApiClient,
chainContractClient,
context.blockchain,
context.utx,
allChannels,
payloadObserver,
config,
context.time,
context.wallet,
blockObserver.loadBlock,
context.broadcastTransaction,
eluScheduler,
globalScheduler
)

private val blocksStreamCancelable: CancelableFuture[Unit] =
blockObserver.getBlockStream.foreach { case (ch, block) => elu.executionBlockReceived(block, ch) }(globalScheduler)
private val payloadsStreamCancelable: CancelableFuture[Unit] =
payloadObserver.getPayloadStream.foreach(elu.executionPayloadReceived)(globalScheduler)

override def start(): Unit = {}

def shutdown(): Future[Unit] = Future {
blocksStreamCancelable.cancel()
payloadsStreamCancelable.cancel()
ownedResources.close()
}(globalScheduler)

Expand Down Expand Up @@ -101,9 +102,9 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab

val config: ClientConfig = context.settings.config.as[ClientConfig]("waves.l2")

private val blockObserverScheduler = Schedulers.singleThread("block-observer-l2", reporter = { e => log.warn("Error in BlockObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService = Scheduler.singleThread("el-updater", reporter = { e => log.warn("Exception in ELUpdater", e) })
private val payloadObserverScheduler = Schedulers.singleThread("payload-observer-l2", reporter = { e => log.warn("Error in PayloadObserver", e) })
val globalScheduler: Scheduler = monix.execution.Scheduler.global
val eluScheduler: SchedulerService = Scheduler.singleThread("el-updater", reporter = { e => log.warn("Exception in ELUpdater", e) })

private val httpClientBackend = HttpClientSyncBackend()
private val maybeAuthenticatedBackend = config.jwtSecretFile match {
Expand All @@ -118,6 +119,9 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab

val engineApiClient = new LoggedEngineApiClient(new HttpEngineApiClient(config, maybeAuthenticatedBackend))

private val contractAddress = config.chainContractAddress
val chainContractClient = new ChainContractStateClient(contractAddress, context.blockchain)

val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
val peerDatabase = new PeerDatabaseImpl(config.network)
val messageObserver = new MessageObserver()
Expand All @@ -130,7 +134,10 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab
new ConcurrentHashMap[Channel, PeerInfo]
)

val blockObserver = new BlocksObserverImpl(allChannels, messageObserver.blocks, config.blockSyncRequestTimeout)(blockObserverScheduler)
val payloadObserver =
new PayloadObserverImpl(allChannels, messageObserver.payloads, chainContractClient.getMinersPks, config.blockSyncRequestTimeout)(
payloadObserverScheduler
)

override def close(): Unit = {
log.info("Closing HTTP/Engine API")
Expand All @@ -144,7 +151,7 @@ class ConsensusClientDependencies(context: ExtensionContext) extends AutoCloseab
messageObserver.shutdown()

log.info("Closing schedulers")
blockObserverScheduler.shutdown()
payloadObserverScheduler.shutdown()
eluScheduler.shutdown()
}
}
Loading