From 380386d28a6055bb9bd6bdd3980111635f515d24 Mon Sep 17 00:00:00 2001 From: quake wang Date: Mon, 19 Nov 2018 15:23:28 +0900 Subject: [PATCH] feat: relay block to peers after compact block reconstruction --- .../src/relayer/block_transactions_process.rs | 16 +++- sync/src/relayer/compact_block_process.rs | 81 +++++++++---------- sync/src/relayer/mod.rs | 30 ++++--- sync/src/relayer/transaction_process.rs | 16 ++-- 4 files changed, 77 insertions(+), 66 deletions(-) diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index 534891cfce..eb56d0cc45 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -2,24 +2,33 @@ use bigint::H256; use ckb_protocol::{BlockTransactions, FlatbuffersVectorIterator}; use ckb_shared::index::ChainIndex; use core::transaction::Transaction; +use network::CKBProtocolContext; use network::PeerIndex; use relayer::Relayer; +use std::sync::Arc; pub struct BlockTransactionsProcess<'a, CI: ChainIndex + 'a> { message: &'a BlockTransactions<'a>, relayer: &'a Relayer, peer: PeerIndex, + nc: &'a CKBProtocolContext, } impl<'a, CI> BlockTransactionsProcess<'a, CI> where CI: ChainIndex + 'static, { - pub fn new(message: &'a BlockTransactions, relayer: &'a Relayer, peer: PeerIndex) -> Self { + pub fn new( + message: &'a BlockTransactions, + relayer: &'a Relayer, + peer: PeerIndex, + nc: &'a CKBProtocolContext, + ) -> Self { BlockTransactionsProcess { message, relayer, peer, + nc, } } @@ -29,7 +38,7 @@ where .relayer .state .pending_compact_blocks - .lock() + .write() .remove(&hash) { let transactions: Vec = @@ -38,7 +47,8 @@ where .collect(); if let (Some(block), _) = self.relayer.reconstruct_block(&compact_block, transactions) { - let _ = self.relayer.accept_block(self.peer, block); + self.relayer + .accept_block(self.nc, self.peer, &Arc::new(block)); } } } diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 23ee88d611..b10a3cfdfd 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -1,10 +1,13 @@ use super::compact_block::CompactBlock; use ckb_protocol::{CompactBlock as FbsCompactBlock, RelayMessage}; use ckb_shared::index::ChainIndex; +use ckb_shared::shared::ChainProvider; +use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier}; use flatbuffers::FlatBufferBuilder; use network::{CKBProtocolContext, PeerIndex}; use relayer::Relayer; -use std::collections::HashSet; +use std::sync::Arc; +use util::RwLockUpgradableReadGuard; pub struct CompactBlockProcess<'a, CI: ChainIndex + 'a> { message: &'a FbsCompactBlock<'a>, @@ -33,54 +36,44 @@ where pub fn execute(self) { let compact_block: CompactBlock = (*self.message).into(); - - if self - .relayer - .state - .received_blocks - .lock() - .insert(compact_block.header.hash()) + let block_hash = compact_block.header.hash(); + let pending_compact_blocks = self.relayer.state.pending_compact_blocks.upgradable_read(); + if pending_compact_blocks.get(&block_hash).is_none() + && self.relayer.get_block(&block_hash).is_none() { - self.relayer - .request_proposal_txs(self.nc, self.peer, &compact_block); + let resolver = + HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone()); + let header_verifier = + HeaderVerifier::new(Arc::clone(&self.relayer.shared.consensus().pow_engine())); - match self.relayer.reconstruct_block(&compact_block, Vec::new()) { - (Some(block), _) => { - if self.relayer.accept_block(self.peer, block.clone()).is_ok() { - let fbb = &mut FlatBufferBuilder::new(); - let message = - RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); - fbb.finish(message, None); + if header_verifier.verify(&resolver).is_ok() { + self.relayer + .request_proposal_txs(self.nc, self.peer, &compact_block); - for peer_id in self.nc.connected_peers() { - if peer_id != self.peer { - let _ = self.nc.send(peer_id, fbb.finished_data().to_vec()); - } - } + match self.relayer.reconstruct_block(&compact_block, Vec::new()) { + (Some(block), _) => { + self.relayer + .accept_block(self.nc, self.peer, &Arc::new(block)) } - } - (_, Some(missing_indexes)) => { - let hash = compact_block.header.hash(); - self.relayer - .state - .pending_compact_blocks - .lock() - .insert(hash, compact_block); + (None, missing_indexes) => { + { + let mut write_guard = + RwLockUpgradableReadGuard::upgrade(pending_compact_blocks); + write_guard.insert(block_hash, compact_block.clone()); + } - let fbb = &mut FlatBufferBuilder::new(); - let message = RelayMessage::build_get_block_transactions( - fbb, - &hash, - &missing_indexes - .into_iter() - .map(|i| i as u32) - .collect::>(), - ); - fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); - } - (None, None) => { - // TODO fail to reconstruct block, downgrade to header first? + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_get_block_transactions( + fbb, + &block_hash, + &missing_indexes + .into_iter() + .map(|i| i as u32) + .collect::>(), + ); + fbb.finish(message, None); + let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + } } } } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index db9fcc4879..5030a3cb19 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -17,7 +17,6 @@ use self::get_block_transactions_process::GetBlockTransactionsProcess; use self::transaction_process::TransactionProcess; use bigint::H256; use ckb_chain::chain::ChainController; -use ckb_chain::error::ProcessBlockError; use ckb_protocol::{short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload}; use ckb_shared::index::ChainIndex; use ckb_shared::shared::{ChainProvider, Shared}; @@ -27,9 +26,10 @@ use flatbuffers::{get_root, FlatBufferBuilder}; use fnv::{FnvHashMap, FnvHashSet}; use network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, TimerToken}; use pool::txs_pool::TransactionPoolController; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; -use util::Mutex; +use util::{Mutex, RwLock}; pub const TX_PROPOSAL_TOKEN: TimerToken = 0; @@ -90,6 +90,7 @@ where &message.payload_as_block_transactions().unwrap(), self, peer, + nc, ).execute(), RelayPayload::GetBlockProposal => GetBlockProposalProcess::new( &message.payload_as_get_block_proposal().unwrap(), @@ -132,15 +133,25 @@ where let _ = nc.send(peer, fbb.finished_data().to_vec()); } - pub fn accept_block(&self, _peer: PeerIndex, block: Block) -> Result<(), ProcessBlockError> { - self.chain.process_block(Arc::new(block)) + pub fn accept_block(&self, nc: &CKBProtocolContext, peer: PeerIndex, block: &Arc) { + if self.chain.process_block(Arc::clone(&block)).is_ok() { + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new()); + fbb.finish(message, None); + + for peer_id in nc.connected_peers() { + if peer_id != peer { + let _ = nc.send(peer_id, fbb.finished_data().to_vec()); + } + } + } } pub fn reconstruct_block( &self, compact_block: &CompactBlock, transactions: Vec, - ) -> (Option, Option>) { + ) -> (Option, Vec) { let (key0, key1) = short_transaction_id_keys(compact_block.header.nonce(), compact_block.nonce); @@ -187,9 +198,9 @@ where .proposal_transactions(compact_block.proposal_transactions.clone()) .build(); - (Some(block), None) + (Some(block), missing_indexes) } else { - (None, Some(missing_indexes)) + (None, missing_indexes) } } @@ -264,10 +275,7 @@ where #[derive(Default)] pub struct RelayState { - // TODO add size limit or use bloom filter - pub received_blocks: Mutex>, - pub received_transactions: Mutex>, - pub pending_compact_blocks: Mutex>, + pub pending_compact_blocks: RwLock>, pub inflight_proposals: Mutex>, pub pending_proposals_request: Mutex>>, } diff --git a/sync/src/relayer/transaction_process.rs b/sync/src/relayer/transaction_process.rs index d77e897b45..5fbfefa777 100644 --- a/sync/src/relayer/transaction_process.rs +++ b/sync/src/relayer/transaction_process.rs @@ -32,15 +32,15 @@ where pub fn execute(self) { let tx: Transaction = (*self.message).into(); - let _ = self.relayer.tx_pool.add_transaction(tx.clone()); + if self.relayer.tx_pool.add_transaction(tx.clone()).is_ok() { + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_transaction(fbb, &tx); + fbb.finish(message, None); - let fbb = &mut FlatBufferBuilder::new(); - let message = RelayMessage::build_transaction(fbb, &tx); - fbb.finish(message, None); - - for peer_id in self.nc.connected_peers() { - if peer_id != self.peer { - let _ = self.nc.send(peer_id, fbb.finished_data().to_vec()); + for peer_id in self.nc.connected_peers() { + if peer_id != self.peer { + let _ = self.nc.send(peer_id, fbb.finished_data().to_vec()); + } } } }