-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TxPool v2 General architecture #2162
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP review. Included the big one for now to get convo going, but will continue reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I've found a bug, and I suggest breaking down the remove_node_and_dependent_sub_graph
method implementation into smaller methods.
I'll continue reviewing tomorrow as there's much of the code I haven't looked at yet, but so far I'd like to see at least a fix for the the cumulative sum miscalculation bug and a test case that exposes the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP
} | ||
|
||
#[test] | ||
fn stability_test() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stability of what? What are we checking? What is success? This could all be captured in the test name. I take it this isn't exactly a "unit test" so maybe the G/W/T doesn't apply (maybe it does?) but jumping in with no context I'm unsure why this exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed from old pool and in the new one it's explicit.
gas_limit_range: u64, | ||
} | ||
|
||
fn some_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might give more meaning to the test if we can be clearer what this tx is for. It's also not clear why it takes a limits
and a tip
param.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this file, I just tested how fuzzer works and that it capable of finding issue in TxPoolV1 that we are aware of=)
|
||
pub trait CollisionManager { | ||
/// Storage type of the collision manager. | ||
type Storage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assoc type still needed? Before we were passing it through as a param to the methods, but I'm not sure what it's doing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, removed=)
pub trait BasicCollisionManagerStorage { | ||
type StorageIndex: Copy + Debug + Hash + PartialEq + Eq; | ||
} | ||
|
||
pub struct BasicCollisionManager<S: BasicCollisionManagerStorage> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this trait could just be a generic for StorageIndex
on the BasicCollisionManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right=)
} | ||
} | ||
|
||
impl<S: BasicCollisionManagerStorage> CollisionManager for BasicCollisionManager<S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like these trait methods could all have unit tests. There is a bit of logic here and instead of having to check that all the logic is correct, we could just define the "correct" behavior with tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly we will test the internal implementation, since we need to craft transaction with right inputs and outputs, and write test based on that knowledge.
But yeah, we could test it as well. But maybe it is better to do in a separate PR to not bloat this one.
} | ||
|
||
#[tokio::test] | ||
async fn insert__already_known_tx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn insert__already_known_tx() { | |
async fn insert__already_known_tx_returns_error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that the "__" was saying: "failing"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
#[tokio::test] | ||
async fn insert__unknown_utxo() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn insert__unknown_utxo() { | |
async fn insert__unknown_utxo_returns_error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
#[tokio::test] | ||
async fn insert_higher_priced_tx_removes_lower_priced_tx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn insert_higher_priced_tx_removes_lower_priced_tx() { | |
async fn insert__higher_priced_tx_removes_lower_priced_tx() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
#[tokio::test] | ||
async fn insert__colliding_dependent_underpriced() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn insert__colliding_dependent_underpriced() { | |
async fn insert__colliding_dependent_and_underpriced_returns_error() { |
Does this name make more sense? I'm trying to make sure I understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done.
|
||
// When | ||
let result1 = universe.verify_and_insert(tx1).await; | ||
let result2 = universe.verify_and_insert(tx2).await; | ||
|
||
// Then | ||
assert!(result1.is_ok()); | ||
assert!(result2.is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a bunch of places where you assert
the preliminary transactions' result.
I think it makes more sense to just unwrap the first result and move it into the // Given
section. It's part of the setup.
// When | |
let result1 = universe.verify_and_insert(tx1).await; | |
let result2 = universe.verify_and_insert(tx2).await; | |
// Then | |
assert!(result1.is_ok()); | |
assert!(result2.is_ok()); | |
universe.verify_and_insert(tx1).await.unwrap(); | |
// When | |
let result = universe.verify_and_insert(tx2).await; | |
// Then | |
assert!(result.is_ok()); |
There are a bunch of places like this. I'll avoid calling them all out to reduce noise. LMK if some places aren't obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok changed everywhere
gas_limit_range: u64, | ||
} | ||
|
||
fn some_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this file, I just tested how fuzzer works and that it capable of finding issue in TxPoolV1 that we are aware of=)
pub trait BasicCollisionManagerStorage { | ||
type StorageIndex: Copy + Debug + Hash + PartialEq + Eq; | ||
} | ||
|
||
pub struct BasicCollisionManager<S: BasicCollisionManagerStorage> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right=)
} | ||
} | ||
|
||
impl<S: BasicCollisionManagerStorage> CollisionManager for BasicCollisionManager<S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly we will test the internal implementation, since we need to craft transaction with right inputs and outputs, and write test based on that knowledge.
But yeah, we could test it as well. But maybe it is better to do in a separate PR to not bloat this one.
|
||
pub trait CollisionManager { | ||
/// Storage type of the collision manager. | ||
type Storage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, removed=)
WasmChecker: WasmCheckerTrait + Send + Sync + 'static, | ||
MemoryPool: MemoryPoolTrait + Send + Sync + 'static, | ||
{ | ||
async fn insert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We insert transactions into the service, not TxPool=) You can communicate with the service only in a shared state. The internal pipeline of the service may be inserted into the TxPool if it fulfills all requirements.
But I agree that we don't need to have TxPool
and HeavyAsyncProcessor
I the shared state. I think we can have a queue of requests that should be processed by the TxPool service, like we did it for P2P.
The insert
method can return a stream around the mpsc
channel, where each async task from the HeavyAsyncProcessor
can submit a transaction when it is inserted or not. In this case, we will have built-in back pressure if the queue is overwhelmed with queries.
In the service run
loop, we will forward transactions to the HeavyAsyncProcessor
, which will return the result to the channel after insertion. Or return an error and close the stream(it will fix your TODO).
@@ -0,0 +1,1256 @@ | |||
#![allow(non_snake_case)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to check what tests were migrated. @AurelienFT Could you hi light that was changed mainly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the pool tests were migrated, no one really had there logic changed it's just a refactoring for readibility. SOme have been added such as insert__tx_gas_limit
, insert__tx_bytes_limit
, insert__tx_upgrade_with_invalid_wasm
and insert__if_tx3_depends_and_collides_with_tx2
let err = result3.unwrap_err(); | ||
assert!(matches!(err, Error::Collided(CollisionReason::Utxo(id)) if id == utxo_id)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see test for old overpriced_tx_contract_input_not_inserted
test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed together that this behavior shouldn't exist anymore as it's not important that the price of the usage is bigger as it's a dependency of the creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice stuff! I have a few questions, but nothing blocking and any suggestions can be tackled in follow-up tasks.
entry.push(CollisionReason::Utxo(*utxo_id)); | ||
} | ||
} | ||
Input::MessageCoinSigned(MessageCoinSigned { nonce, .. }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious question: Unrelated to this PR, but I noticed that the nonce here is declared as:
// Unique identifier of the message
pub nonce: Nonce,
why do we call this Nonce then and not MessageId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @xgreenx as i don't have the answer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because MessageId
already is used in another context. The MessageId
is a hash over all fields of the Message
including the Nonce
. But only Nonce
makes this MessageId
unique, so we use it for the database and inputs instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay thanks for clarifying. Is Nonce
guaranteed to be unique or is it user-controlled? I.e. could a user forge a message that collides with another message and have it evicted from the txpool?
pub struct HeavyAsyncProcessor { | ||
rayon_thread_pool: rayon::ThreadPool, | ||
semaphore: Arc<Semaphore>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand the need for this type. I only see one usage in txpool/src/service.rs
, where it's offloading the shared_state.new_peer_subscribed(peer_id).await;
call to the dedicated thread pool.
Why do we need an explicit thread pool for this task? I don't see any blocking operations there, so why not just spawn it within the existing tokio runtime instead of using a dedicated executor for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also seems like a pure copy-paste of the p2p/src/heavy_task_processor.rs
task processor. It's also copied in the txpool_v1
, is there a reason why we don't break this out to a shared crate an reuse it wherever needed? (Though I question the need for it here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #2162 (comment) but a bit orthogonal since that comment still assumes we use the dedicated thread pool when consuming messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it's also used by the insertion/verifications of transactions in this PR. The need is that we want to have a way to not block the current executor to do their thing but also to have a limit on the resources we dedicate to specific tasks. In this case we place arbitrary number of threads and queue length on tx verif/insert and some arbitrary on p2p sync. This two operations can take a lot of computational power/waiting a lot that's why this is those ones that are separated.
This file isn't an exact copy as it works with async code which is not the same as in p2p but I agree that it's very similar and I asked where I can place it to share the two and it seems that there is no good place for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, but are there any blocking calls? That seems like what tokio::spawn_blocking
is made for. When it comes to limiting the number of active futures I agree that it makes sense to use the Semaphore
. I am more skeptical about the rayon part.
crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fixes!
## Version v0.37.0 ### Added - [1609](#1609): Add DA compression support. Compressed blocks are stored in the offchain database when blocks are produced, and can be fetched using the GraphQL API. - [2290](#2290): Added a new CLI argument `--graphql-max-directives`. The default value is `10`. - [2195](#2195): Added enforcement of the limit on the size of the L2 transactions per block according to the `block_transaction_size_limit` parameter. - [2131](#2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool - [2182](#2151): Limit number of transactions that can be fetched via TxSource::next - [2189](#2151): Select next DA height to never include more than u16::MAX -1 transactions from L1. - [2162](#2162): Pool structure with dependencies, etc.. for the next transaction pool module. Also adds insertion/verification process in PoolV2 and tests refactoring - [2265](#2265): Integrate Block Committer API for DA Block Costs. - [2280](#2280): Allow comma separated relayer addresses in cli - [2299](#2299): Support blobs in the predicates. - [2300](#2300): Added new function to `fuel-core-client` for checking whether a blob exists. ### Changed #### Breaking - [2299](#2299): Anyone who wants to participate in the transaction broadcasting via p2p must upgrade to support new predicates on the TxPool level. - [2299](#2299): Upgraded `fuel-vm` to `0.58.0`. More information in the [release](https://github.com/FuelLabs/fuel-vm/releases/tag/v0.58.0). - [2276](#2276): Changed how complexity for blocks is calculated. The default complexity now is 80_000. All queries that somehow touch the block header now are more expensive. - [2290](#2290): Added a new GraphQL limit on number of `directives`. The default value is `10`. - [2206](#2206): Use timestamp of last block when dry running transactions. - [2153](#2153): Updated default gas costs for the local testnet configuration to match `fuel-core 0.35.0`. ## What's Changed * fix: use core-test.fuellabs.net for dnsaddr resolution by @rymnc in #2214 * Removed state transition bytecode from the local testnet by @xgreenx in #2215 * Send whole transaction pool upon subscription to gossip by @AurelienFT in #2131 * Update default gas costs based on 0.35.0 benchmarks by @xgreenx in #2153 * feat: Use timestamp of last block when dry running transactions by @netrome in #2206 * fix(dnsaddr_resolution): use fqdn separator to prevent suffixing by dns resolvers by @rymnc in #2222 * TransactionSource: specify maximum number of transactions to be fetched by @acerone85 in #2182 * Implement worst case scenario for price algorithm v1 by @rafal-ch in #2219 * chore(gas_price_service): define port for L2 data by @rymnc in #2224 * Block producer selects da height to never exceed u64::MAX - 1 transactions from L1 by @acerone85 in #2189 * Weekly `cargo update` by @github-actions in #2236 * Use fees to calculate DA reward and avoid issues with Gwei/Wei conversions by @MitchTurner in #2229 * Protect against passing `i128::MIN` to `abs()` which causes overflow by @rafal-ch in #2241 * Acquire `da_finalization_period` from the command line by @rafal-ch in #2240 * Executor: test Tx_count limit with incorrect tx source by @acerone85 in #2242 * Minor updates to docs + a few typos fixed by @rafal-ch in #2250 * chore(gas_price_service): move algorithm_updater to fuel-core-gas-price-service by @rymnc in #2246 * Use single heavy input in the `transaction_throughput.rs` benchmarks by @xgreenx in #2205 * Enforce the block size limit by @rafal-ch in #2195 * feat: build ARM and AMD in parallel by @mchristopher in #2130 * Weekly `cargo update` by @github-actions in #2268 * chore(gas_price_service): split into v0 and v1 and squash FuelGasPriceUpdater type into GasPriceService by @rymnc in #2256 * feat(gas_price_service): update block committer da source with established contract by @rymnc in #2265 * Use bytes from `unrecorded_blocks` rather from the block from DA by @MitchTurner in #2252 * TxPool v2 General architecture by @AurelienFT in #2162 * Add value delimiter and tests args by @AurelienFT in #2280 * fix(da_block_costs): remove Arc<Mutex<>> on shared_state and expose channel by @rymnc in #2278 * fix(combined_database): syncing auxiliary databases on startup with custom behaviour by @rymnc in #2272 * fix: Manually encode Authorization header for eventsource_client by @Br1ght0ne in #2284 * Address `async-graphql` vulnerability by @MitchTurner in #2290 * Update the WASM compatibility tests for `0.36` release by @rafal-ch in #2271 * DA compression by @Dentosal in #1609 * Use different port for every version compatibility test by @rafal-ch in #2301 * Fix block query complexity by @xgreenx in #2297 * Support blobs in predicates by @Voxelot in #2299 **Full Changelog**: v0.36.0...v0.37.0
Linked Issues/PRs
Closes #2160
Description
This PR contains a new way to organize and store transactions. This new architecture have multiple benefits :
dependencies
anddependents
transactions everywhere (not a mix with parents etc).dependencies
is for transaction that need to be included before us anddependents
is for transaction that need to be included after us.Pool
structureChanges in code logic from TxPool v1
The verifications are performed in a new order specified in Allow insertion of transactions in the TxPool v2 #2186. The goal is to avoid making the computation heavy work if the simple checks aren't valid. In this new version we also ensure that verifications are done in order by having wrapper type around each step to allow only one verification path.
The insertion is performed in a separate thread pool, the goal is to not block the pool on any verifications/insertions and to manage the ressources we allocate to these works
The insertion rules and conditions has change to the following :
A transaction with dependencies can collide only with one other transaction
A transaction without dependencies can collide with multiple transaction
Rules to free up space for new transaction
New limits on the size of the pool : max_pool_bytes_size and max_pool_gas
Some parts of this refactoring can still be improved :
Ideas that could be added but maybe overkill :
Checklist
Before requesting review