From 861be8f18504911aae36699c80c85826e79d0af5 Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Thu, 30 Nov 2023 03:10:55 +0300 Subject: [PATCH 1/3] Fixed broadcasting MicroBlockInv --- .../com/wavesplatform/mining/Miner.scala | 6 +-- .../microblocks/MicroBlockMinerImpl.scala | 39 +++++++++++++------ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index 239fdb8d06a..6f683deb9ff 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -309,12 +309,8 @@ class MinerImpl( }.uncancelable for { - elapsed <- waitBlockAppendedTask.timed.map(_._1) - newOffset = (offset - elapsed).max(Duration.Zero) - - _ <- Task(microBlockAttempt := SerialCancelable()).delayExecution(newOffset) + _ <- waitBlockAppendedTask result <- Task(forgeBlock(account)).executeOn(minerScheduler) - _ <- result match { case Right((block, totalConstraint)) => appendTask(block, totalConstraint) diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index de83f87765d..2933e5d573f 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -113,18 +113,15 @@ class MicroBlockMinerImpl( for { _ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")) _ <- Task.sleep(delay) - _ = log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") - blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash) - .leftWiden[Throwable] - .liftTo[Task] - (signedBlock, microBlock) = blocks - blockId <- appendMicroBlock(microBlock) - _ = BlockStats.mined(microBlock, blockId) - _ <- broadcastMicroBlock(account, microBlock, blockId) - } yield { - if (updatedTotalConstraint.isFull) Stop - else Success(signedBlock, updatedTotalConstraint) - } + r <- + if (blockchainUpdater.lastBlockId.forall(_ == accumulatedBlock.id())) { + log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") + appendAndBroadcastMicroBlock(account, accumulatedBlock, unconfirmed, updatedTotalConstraint, stateHash) + } else { + log.trace(s"Stopping generating microBlock for ${account.toAddress}, new key block was appended") + Task(Stop) + } + } yield r case (_, updatedTotalConstraint, _) => if (updatedTotalConstraint.isFull) { @@ -142,6 +139,24 @@ class MicroBlockMinerImpl( } } + private def appendAndBroadcastMicroBlock( + account: KeyPair, + block: Block, + transactions: Seq[Transaction], + updatedTotalConstraint: MiningConstraint, + stateHash: Option[BlockId] + ): Task[MicroBlockMiningResult] = + for { + (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash) + .leftWiden[Throwable] + .liftTo[Task] + blockId <- appendMicroBlock(microBlock) + _ = BlockStats.mined(microBlock, blockId) + _ <- broadcastMicroBlock(account, microBlock, blockId) + } yield + if (updatedTotalConstraint.isFull) Stop + else Success(signedBlock, updatedTotalConstraint) + private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))) From 51f585ab8e3932d8b77b5200adde30ceb906c9ad Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Thu, 30 Nov 2023 03:12:52 +0300 Subject: [PATCH 2/3] Better code --- .../mining/microblocks/MicroBlockMinerImpl.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index 2933e5d573f..871b27abd10 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -143,19 +143,17 @@ class MicroBlockMinerImpl( account: KeyPair, block: Block, transactions: Seq[Transaction], - updatedTotalConstraint: MiningConstraint, + constraint: MiningConstraint, stateHash: Option[BlockId] ): Task[MicroBlockMiningResult] = for { - (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash) - .leftWiden[Throwable] - .liftTo[Task] - blockId <- appendMicroBlock(microBlock) + (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash).leftWiden[Throwable].liftTo[Task] + blockId <- appendMicroBlock(microBlock) _ = BlockStats.mined(microBlock, blockId) _ <- broadcastMicroBlock(account, microBlock, blockId) } yield - if (updatedTotalConstraint.isFull) Stop - else Success(signedBlock, updatedTotalConstraint) + if (constraint.isFull) Stop + else Success(signedBlock, constraint) private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))) From 306dd67fe146b7f7dc694d887c504b64aba403e7 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Fri, 1 Dec 2023 17:47:46 +0400 Subject: [PATCH 3/3] wip --- .../com/wavesplatform/mining/Miner.scala | 6 +- .../mining/microblocks/MicroBlockMiner.scala | 2 +- .../microblocks/MicroBlockMinerImpl.scala | 89 ++++++++----------- .../com/wavesplatform/network/messages.scala | 6 +- 4 files changed, 48 insertions(+), 55 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index c02c414b391..5ef98f4f374 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -309,8 +309,12 @@ class MinerImpl( }.uncancelable for { - _ <- waitBlockAppendedTask + elapsed <- waitBlockAppendedTask.timed.map(_._1) + newOffset = (offset - elapsed).max(Duration.Zero) + + _ <- Task(microBlockAttempt := SerialCancelable()).delayExecution(newOffset) result <- Task(forgeBlock(account)).executeOn(minerScheduler) + _ <- result match { case Right((block, totalConstraint)) => appendTask(block, totalConstraint) diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala index d53d1cf88b1..5f895c14d66 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala @@ -17,7 +17,7 @@ trait MicroBlockMiner { account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] } diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index 871b27abd10..f6b0f764b5b 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -45,15 +45,15 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[Unit] = - generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, lastMicroBlock) + generateOneMicroBlockTask(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs) .flatMap { case res @ Success(newBlock, newConstraint) => Task.defer(generateMicroBlockSequence(account, newBlock, newConstraint, res.nanoTime)) case Retry => Task - .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, lastMicroBlock)) + .defer(generateMicroBlockSequence(account, accumulatedBlock, restTotalConstraint, prevMicroBlockTs)) .delayExecution(1 second) case Stop => setDebugState(MinerDebugInfo.MiningBlocks) @@ -65,7 +65,7 @@ class MicroBlockMinerImpl( account: KeyPair, accumulatedBlock: Block, restTotalConstraint: MiningConstraint, - lastMicroBlock: Long + prevMicroBlockTs: Long ): Task[MicroBlockMiningResult] = { val packTask = Task.cancelable[(Option[Seq[Transaction]], MiningConstraint, Option[ByteStr])] { cb => @volatile var cancelled = false @@ -93,8 +93,8 @@ class MicroBlockMinerImpl( ) ) ) - log.trace(s"Finished pack for ${accumulatedBlock.id()}") val updatedTotalConstraint = updatedMdConstraint.head + log.trace(s"Finished pack for ${accumulatedBlock.id()}, updated total constraint: $updatedTotalConstraint") cb.onSuccess((unconfirmed, updatedTotalConstraint, stateHash)) } Task.eval { @@ -104,24 +104,25 @@ class MicroBlockMinerImpl( packTask.flatMap { case (Some(unconfirmed), updatedTotalConstraint, stateHash) if unconfirmed.nonEmpty => - val delay = { - val delay = System.nanoTime() - lastMicroBlock - val requiredDelay = settings.microBlockInterval.toNanos - if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos - } - for { - _ <- Task.now(if (delay > Duration.Zero) log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock")) - _ <- Task.sleep(delay) - r <- - if (blockchainUpdater.lastBlockId.forall(_ == accumulatedBlock.id())) { - log.trace(s"Generating microBlock for ${account.toAddress}, constraints: $updatedTotalConstraint") - appendAndBroadcastMicroBlock(account, accumulatedBlock, unconfirmed, updatedTotalConstraint, stateHash) - } else { - log.trace(s"Stopping generating microBlock for ${account.toAddress}, new key block was appended") - Task(Stop) - } - } yield r + blocks <- forgeBlocks(account, accumulatedBlock, unconfirmed, stateHash) + .leftWiden[Throwable] + .liftTo[Task] + (signedBlock, microBlock) = blocks + delay = { + val delay = System.nanoTime() - prevMicroBlockTs + val requiredDelay = settings.microBlockInterval.toNanos + if (delay >= requiredDelay) Duration.Zero else (requiredDelay - delay).nanos + } + _ <- + if (delay > Duration.Zero) { + log.trace(s"Sleeping ${delay.toMillis} ms before applying microBlock") + Task.sleep(delay) + } else Task.unit + _ <- appendMicroBlock(microBlock, account) + } yield + if (updatedTotalConstraint.isFull) Stop + else Success(signedBlock, updatedTotalConstraint) case (_, updatedTotalConstraint, _) => if (updatedTotalConstraint.isFull) { @@ -139,39 +140,27 @@ class MicroBlockMinerImpl( } } - private def appendAndBroadcastMicroBlock( - account: KeyPair, - block: Block, - transactions: Seq[Transaction], - constraint: MiningConstraint, - stateHash: Option[BlockId] - ): Task[MicroBlockMiningResult] = - for { - (signedBlock, microBlock) <- forgeBlocks(account, block, transactions, stateHash).leftWiden[Throwable].liftTo[Task] - blockId <- appendMicroBlock(microBlock) - _ = BlockStats.mined(microBlock, blockId) - _ <- broadcastMicroBlock(account, microBlock, blockId) - } yield - if (constraint.isFull) Stop - else Success(signedBlock, constraint) - - private def broadcastMicroBlock(account: KeyPair, microBlock: MicroBlock, blockId: BlockId): Task[Unit] = - Task(if (allChannels != null) allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference))) - - private def appendMicroBlock(microBlock: MicroBlock): Task[BlockId] = - MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None) - .flatMap { - case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) - case Right(v) => Task.now(v) - } + private def appendMicroBlock(microBlock: MicroBlock, account: KeyPair): Task[BlockId] = + MicroblockAppender(blockchainUpdater, utx, appenderScheduler)(microBlock, None).flatMap { + case Left(err) => Task.raiseError(MicroBlockAppendError(microBlock, err)) + case Right(blockId) => + Task.evalAsync { + BlockStats.mined(microBlock, blockId) + if (allChannels != null) { + allChannels.broadcast(MicroBlockInv(account, blockId, microBlock.reference)) + } + blockId + } + }.uncancelable private def forgeBlocks( account: KeyPair, accumulatedBlock: Block, - unconfirmed: Seq[Transaction], + packedTxs: Seq[Transaction], stateHash: Option[ByteStr] ): Either[MicroBlockMiningError, (Block, MicroBlock)] = microBlockBuildTimeStats.measureSuccessful { + log.trace(s"Forging microBlock for ${account.toAddress}") for { signedBlock <- Block .buildAndSign( @@ -180,7 +169,7 @@ class MicroBlockMinerImpl( reference = accumulatedBlock.header.reference, baseTarget = accumulatedBlock.header.baseTarget, generationSignature = accumulatedBlock.header.generationSignature, - txs = accumulatedBlock.transactionData ++ unconfirmed, + txs = accumulatedBlock.transactionData ++ packedTxs, signer = account, featureVotes = accumulatedBlock.header.featureVotes, rewardVote = accumulatedBlock.header.rewardVote, @@ -189,7 +178,7 @@ class MicroBlockMinerImpl( ) .leftMap(BlockBuildError) microBlock <- MicroBlock - .buildAndSign(signedBlock.header.version, account, unconfirmed, accumulatedBlock.id(), signedBlock.signature, stateHash) + .buildAndSign(signedBlock.header.version, account, packedTxs, accumulatedBlock.id(), signedBlock.signature, stateHash) .leftMap(MicroBlockBuildError) } yield (signedBlock, microBlock) } diff --git a/node/src/main/scala/com/wavesplatform/network/messages.scala b/node/src/main/scala/com/wavesplatform/network/messages.scala index 896b9062468..f5b09c74133 100644 --- a/node/src/main/scala/com/wavesplatform/network/messages.scala +++ b/node/src/main/scala/com/wavesplatform/network/messages.scala @@ -80,9 +80,9 @@ case class MicroBlockInv(sender: PublicKey, totalBlockId: ByteStr, reference: By } object MicroBlockInv { - def apply(sender: KeyPair, totalBlockRef: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { - val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockRef.arr ++ prevBlockRef.arr) - new MicroBlockInv(sender.publicKey, totalBlockRef, prevBlockRef, signature) + def apply(sender: KeyPair, totalBlockId: ByteStr, prevBlockRef: ByteStr): MicroBlockInv = { + val signature = crypto.sign(sender.privateKey, sender.toAddress.bytes ++ totalBlockId.arr ++ prevBlockRef.arr) + new MicroBlockInv(sender.publicKey, totalBlockId, prevBlockRef, signature) } }