Skip to content

Commit

Permalink
feat: relay block to peers after compact block reconstruction
Browse files Browse the repository at this point in the history
  • Loading branch information
quake authored and doitian committed Nov 19, 2018
1 parent 3438cc8 commit 380386d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 66 deletions.
16 changes: 13 additions & 3 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,33 @@ use bigint::H256;
use ckb_protocol::{BlockTransactions, FlatbuffersVectorIterator};
use ckb_shared::index::ChainIndex;
use core::transaction::Transaction;
use network::CKBProtocolContext;
use network::PeerIndex;
use relayer::Relayer;
use std::sync::Arc;

pub struct BlockTransactionsProcess<'a, CI: ChainIndex + 'a> {
message: &'a BlockTransactions<'a>,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a CKBProtocolContext,
}

impl<'a, CI> BlockTransactionsProcess<'a, CI>
where
CI: ChainIndex + 'static,
{
pub fn new(message: &'a BlockTransactions, relayer: &'a Relayer<CI>, peer: PeerIndex) -> Self {
pub fn new(
message: &'a BlockTransactions,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
nc: &'a CKBProtocolContext,
) -> Self {
BlockTransactionsProcess {
message,
relayer,
peer,
nc,
}
}

Expand All @@ -29,7 +38,7 @@ where
.relayer
.state
.pending_compact_blocks
.lock()
.write()
.remove(&hash)
{
let transactions: Vec<Transaction> =
Expand All @@ -38,7 +47,8 @@ where
.collect();

if let (Some(block), _) = self.relayer.reconstruct_block(&compact_block, transactions) {
let _ = self.relayer.accept_block(self.peer, block);
self.relayer
.accept_block(self.nc, self.peer, &Arc::new(block));
}
}
}
Expand Down
81 changes: 37 additions & 44 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::compact_block::CompactBlock;
use ckb_protocol::{CompactBlock as FbsCompactBlock, RelayMessage};
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::ChainProvider;
use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier};
use flatbuffers::FlatBufferBuilder;
use network::{CKBProtocolContext, PeerIndex};
use relayer::Relayer;
use std::collections::HashSet;
use std::sync::Arc;
use util::RwLockUpgradableReadGuard;

pub struct CompactBlockProcess<'a, CI: ChainIndex + 'a> {
message: &'a FbsCompactBlock<'a>,
Expand Down Expand Up @@ -33,54 +36,44 @@ where

pub fn execute(self) {
let compact_block: CompactBlock = (*self.message).into();

if self
.relayer
.state
.received_blocks
.lock()
.insert(compact_block.header.hash())
let block_hash = compact_block.header.hash();
let pending_compact_blocks = self.relayer.state.pending_compact_blocks.upgradable_read();
if pending_compact_blocks.get(&block_hash).is_none()
&& self.relayer.get_block(&block_hash).is_none()
{
self.relayer
.request_proposal_txs(self.nc, self.peer, &compact_block);
let resolver =
HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone());
let header_verifier =
HeaderVerifier::new(Arc::clone(&self.relayer.shared.consensus().pow_engine()));

match self.relayer.reconstruct_block(&compact_block, Vec::new()) {
(Some(block), _) => {
if self.relayer.accept_block(self.peer, block.clone()).is_ok() {
let fbb = &mut FlatBufferBuilder::new();
let message =
RelayMessage::build_compact_block(fbb, &block, &HashSet::new());
fbb.finish(message, None);
if header_verifier.verify(&resolver).is_ok() {
self.relayer
.request_proposal_txs(self.nc, self.peer, &compact_block);

for peer_id in self.nc.connected_peers() {
if peer_id != self.peer {
let _ = self.nc.send(peer_id, fbb.finished_data().to_vec());
}
}
match self.relayer.reconstruct_block(&compact_block, Vec::new()) {
(Some(block), _) => {
self.relayer
.accept_block(self.nc, self.peer, &Arc::new(block))
}
}
(_, Some(missing_indexes)) => {
let hash = compact_block.header.hash();
self.relayer
.state
.pending_compact_blocks
.lock()
.insert(hash, compact_block);
(None, missing_indexes) => {
{
let mut write_guard =
RwLockUpgradableReadGuard::upgrade(pending_compact_blocks);
write_guard.insert(block_hash, compact_block.clone());
}

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_get_block_transactions(
fbb,
&hash,
&missing_indexes
.into_iter()
.map(|i| i as u32)
.collect::<Vec<_>>(),
);
fbb.finish(message, None);
let _ = self.nc.send(self.peer, fbb.finished_data().to_vec());
}
(None, None) => {
// TODO fail to reconstruct block, downgrade to header first?
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_get_block_transactions(
fbb,
&block_hash,
&missing_indexes
.into_iter()
.map(|i| i as u32)
.collect::<Vec<_>>(),
);
fbb.finish(message, None);
let _ = self.nc.send(self.peer, fbb.finished_data().to_vec());
}
}
}
}
Expand Down
30 changes: 19 additions & 11 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use self::get_block_transactions_process::GetBlockTransactionsProcess;
use self::transaction_process::TransactionProcess;
use bigint::H256;
use ckb_chain::chain::ChainController;
use ckb_chain::error::ProcessBlockError;
use ckb_protocol::{short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload};
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::{ChainProvider, Shared};
Expand All @@ -27,9 +26,10 @@ use flatbuffers::{get_root, FlatBufferBuilder};
use fnv::{FnvHashMap, FnvHashSet};
use network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, TimerToken};
use pool::txs_pool::TransactionPoolController;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use util::Mutex;
use util::{Mutex, RwLock};

pub const TX_PROPOSAL_TOKEN: TimerToken = 0;

Expand Down Expand Up @@ -90,6 +90,7 @@ where
&message.payload_as_block_transactions().unwrap(),
self,
peer,
nc,
).execute(),
RelayPayload::GetBlockProposal => GetBlockProposalProcess::new(
&message.payload_as_get_block_proposal().unwrap(),
Expand Down Expand Up @@ -132,15 +133,25 @@ where
let _ = nc.send(peer, fbb.finished_data().to_vec());
}

pub fn accept_block(&self, _peer: PeerIndex, block: Block) -> Result<(), ProcessBlockError> {
self.chain.process_block(Arc::new(block))
pub fn accept_block(&self, nc: &CKBProtocolContext, peer: PeerIndex, block: &Arc<Block>) {
if self.chain.process_block(Arc::clone(&block)).is_ok() {
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new());
fbb.finish(message, None);

for peer_id in nc.connected_peers() {
if peer_id != peer {
let _ = nc.send(peer_id, fbb.finished_data().to_vec());
}
}
}
}

pub fn reconstruct_block(
&self,
compact_block: &CompactBlock,
transactions: Vec<Transaction>,
) -> (Option<Block>, Option<Vec<usize>>) {
) -> (Option<Block>, Vec<usize>) {
let (key0, key1) =
short_transaction_id_keys(compact_block.header.nonce(), compact_block.nonce);

Expand Down Expand Up @@ -187,9 +198,9 @@ where
.proposal_transactions(compact_block.proposal_transactions.clone())
.build();

(Some(block), None)
(Some(block), missing_indexes)
} else {
(None, Some(missing_indexes))
(None, missing_indexes)
}
}

Expand Down Expand Up @@ -264,10 +275,7 @@ where

#[derive(Default)]
pub struct RelayState {
// TODO add size limit or use bloom filter
pub received_blocks: Mutex<FnvHashSet<H256>>,
pub received_transactions: Mutex<FnvHashSet<H256>>,
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub pending_compact_blocks: RwLock<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
}
16 changes: 8 additions & 8 deletions sync/src/relayer/transaction_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ where

pub fn execute(self) {
let tx: Transaction = (*self.message).into();
let _ = self.relayer.tx_pool.add_transaction(tx.clone());
if self.relayer.tx_pool.add_transaction(tx.clone()).is_ok() {
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
fbb.finish(message, None);

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_transaction(fbb, &tx);
fbb.finish(message, None);

for peer_id in self.nc.connected_peers() {
if peer_id != self.peer {
let _ = self.nc.send(peer_id, fbb.finished_data().to_vec());
for peer_id in self.nc.connected_peers() {
if peer_id != self.peer {
let _ = self.nc.send(peer_id, fbb.finished_data().to_vec());
}
}
}
}
Expand Down

0 comments on commit 380386d

Please sign in to comment.