Skip to content

Commit

Permalink
feat: blocktemplate refresh
Browse files Browse the repository at this point in the history
blocktemplate refresh when the best block changes, OR 3 seconds has passed and there are more
transactions and uncles
  • Loading branch information
zhangsoledad committed Jan 21, 2019
1 parent 91ea1c4 commit 9c8340a
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 55 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ jsonrpc-types = { path = "../util/jsonrpc-types" }
hyper = "0.12"
futures = "0.1"
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
proptest = "0.8"
ckb-chain = { path = "../chain" }
ckb-chain-spec = { path = "../spec" }
numext-fixed-hash = { version = "0.1", features = ["support_rand", "support_heapsize", "support_serde"] }
numext-fixed-uint = { version = "0.1", features = ["support_rand", "support_heapsize", "support_serde"] }
ckb-db = { path = "../db" }
ckb-verification = { path = "../verification" }
ckb-pow = { path = "../pow" }
184 changes: 183 additions & 1 deletion miner/src/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use ckb_core::header::Header;
use ckb_core::service::{Request, DEFAULT_CHANNEL_SIZE};
use ckb_core::transaction::{CellInput, CellOutput, Transaction, TransactionBuilder};
use ckb_core::uncle::UncleBlock;
use ckb_core::BlockNumber;
use ckb_core::{Cycle, Version};
use ckb_notify::NotifyController;
use ckb_pool::txs_pool::TransactionPoolController;
use ckb_shared::error::SharedError;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::{ChainProvider, Shared};
use ckb_util::Mutex;
use crossbeam_channel::{self, select, Receiver, Sender};
use faketime::unix_time_as_millis;
use fnv::FnvHashSet;
Expand All @@ -23,15 +25,39 @@ use std::thread::{self, JoinHandle};
const MAX_CANDIDATE_UNCLES: usize = 42;
type BlockTemplateParams = (Option<Cycle>, Option<u64>, Option<Version>);
type BlockTemplateResult = Result<BlockTemplate, SharedError>;

const BLOCK_ASSEMBLER_SUBSCRIBER: &str = "block_assembler";
const BLOCK_TEMPLATE_TIMEOUT: u64 = 3000;

struct CurrentTemplate {
pub time: u64,
pub uncles_updated_at: u64,
pub txs_updated_at: u64,
pub template: BlockTemplate,
}

impl CurrentTemplate {
fn is_outdate(
&self,
last_uncles_updated_at: u64,
last_txs_updated_at: u64,
current_time: u64,
number: BlockNumber,
) -> bool {
last_uncles_updated_at != self.uncles_updated_at
|| last_txs_updated_at != self.txs_updated_at
|| number != self.template.number
|| current_time.saturating_sub(self.time) > BLOCK_TEMPLATE_TIMEOUT
}
}

pub struct BlockAssembler<CI> {
shared: Shared<CI>,
tx_pool: TransactionPoolController,
candidate_uncles: LruCache<H256, Arc<Block>>,
type_hash: H256,
work_id: AtomicUsize,
last_uncles_updated_at: AtomicUsize,
current_template: Mutex<Option<CurrentTemplate>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -79,6 +105,8 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
type_hash,
candidate_uncles: LruCache::new(MAX_CANDIDATE_UNCLES),
work_id: AtomicUsize::new(0),
last_uncles_updated_at: AtomicUsize::new(0),
current_template: Mutex::new(None),
}
}

Expand All @@ -102,6 +130,8 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
Ok(uncle_block) => {
let hash = uncle_block.header().hash().clone();
self.candidate_uncles.insert(hash, uncle_block);
self.last_uncles_updated_at
.store(unix_time_as_millis() as usize, Ordering::SeqCst);
}
_ => {
error!(target: "miner", "new_uncle_receiver closed");
Expand Down Expand Up @@ -190,10 +220,26 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
self.transform_params(cycles_limit, bytes_limit, max_version);
let uncles_count_limit = self.shared.consensus().max_uncles_len() as u32;

let last_uncles_updated_at = self.last_uncles_updated_at.load(Ordering::SeqCst) as u64;
let last_txs_updated_at = self.tx_pool.get_last_txs_updated_at();

let tip_header = self.shared.tip_header().read();
let header = tip_header.inner();
let number = tip_header.number() + 1;
let current_time = cmp::max(unix_time_as_millis(), header.timestamp() + 1);

let mut current_template = self.current_template.lock();
if let Some(ref current_template) = *current_template {
if !current_template.is_outdate(
last_uncles_updated_at,
last_txs_updated_at,
current_time,
number,
) {
return Ok(current_template.template.clone());
}
}

let difficulty = self
.shared
.calculate_difficulty(header)
Expand Down Expand Up @@ -232,6 +278,13 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
work_id: format!("{}", self.work_id.fetch_add(1, Ordering::SeqCst)),
};

*current_template = Some(CurrentTemplate {
time: current_time,
uncles_updated_at: last_uncles_updated_at,
txs_updated_at: last_txs_updated_at,
template: template.clone(),
});

Ok(template)
}

Expand Down Expand Up @@ -335,3 +388,132 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
(uncles, bad_uncles)
}
}

#[cfg(test)]
mod tests {
use crate::block_assembler::BlockAssembler;
use ckb_chain::chain::ChainBuilder;
use ckb_chain::chain::ChainController;
use ckb_chain_spec::consensus::Consensus;
use ckb_core::block::BlockBuilder;
use ckb_core::header::HeaderBuilder;
use ckb_db::memorydb::MemoryKeyValueDB;
use ckb_notify::{NotifyController, NotifyService};
use ckb_pool::txs_pool::{PoolConfig, TransactionPoolController, TransactionPoolService};
use ckb_pow::Pow;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::Shared;
use ckb_shared::shared::SharedBuilder;
use ckb_shared::store::ChainKVStore;
use ckb_verification::{BlockVerifier, HeaderResolverWrapper, HeaderVerifier, Verifier};
use jsonrpc_types::{BlockTemplate, CellbaseTemplate};
use numext_fixed_hash::H256;
use std::sync::{atomic::AtomicUsize, Arc};

fn start_chain(
consensus: Option<Consensus>,
notify: Option<NotifyController>,
) -> (
ChainController,
Shared<ChainKVStore<MemoryKeyValueDB>>,
NotifyController,
) {
let mut builder = SharedBuilder::<ChainKVStore<MemoryKeyValueDB>>::new_memory();
if let Some(consensus) = consensus {
builder = builder.consensus(consensus);
}
let shared = builder.build();

let notify = notify.unwrap_or_else(|| NotifyService::default().start::<&str>(None).1);
let (chain_controller, chain_receivers) = ChainController::build();
let chain_service = ChainBuilder::new(shared.clone())
.notify(notify.clone())
.build();
let _handle = chain_service.start::<&str>(None, chain_receivers);
(chain_controller, shared, notify)
}

fn setup_tx_pool<CI: ChainIndex + 'static>(
shared: Shared<CI>,
notify: NotifyController,
) -> TransactionPoolController {
let config = PoolConfig {
max_pool_size: 1000,
max_orphan_size: 1000,
max_proposal_size: 1000,
max_cache_size: 1000,
max_pending_size: 1000,
trace: Some(100),
};
let last_txs_updated_at = Arc::new(AtomicUsize::new(0));
let (tx_pool_controller, tx_pool_receivers) =
TransactionPoolController::build(Arc::clone(&last_txs_updated_at));
let tx_pool_service =
TransactionPoolService::new(config, shared, notify, last_txs_updated_at);
let _handle = tx_pool_service.start(Some("TransactionPoolService"), tx_pool_receivers);
tx_pool_controller
}

fn setup_block_assembler<CI: ChainIndex + 'static>(
tx_pool: TransactionPoolController,
shared: Shared<CI>,
type_hash: H256,
) -> BlockAssembler<CI> {
BlockAssembler::new(shared, tx_pool, type_hash)
}

#[test]
fn test_get_get_block_template() {
let (_chain_controller, shared, notify) = start_chain(None, None);
let tx_pool_controller = setup_tx_pool(shared.clone(), notify.clone());
let mut block_assembler =
setup_block_assembler(tx_pool_controller, shared.clone(), H256::zero());

let block_template = block_assembler
.get_block_template(None, None, None)
.unwrap();

let BlockTemplate {
version,
difficulty,
current_time,
number,
parent_hash,
uncles, // Vec<UncleTemplate>
commit_transactions, // Vec<TransactionTemplate>
proposal_transactions, // Vec<ProposalShortId>
cellbase, // CellbaseTemplate
..
// cycles_limit,
// bytes_limit,
// uncles_count_limit,
} = block_template;

let (cellbase_id, cellbase) = {
let CellbaseTemplate { hash, data, .. } = cellbase;
(hash, data)
};

let header_builder = HeaderBuilder::default()
.version(version)
.number(number)
.difficulty(difficulty)
.timestamp(current_time)
.parent_hash(parent_hash)
.cellbase_id(cellbase_id);

let block = BlockBuilder::default()
.uncles(uncles.into_iter().map(Into::into).collect())
.commit_transaction(cellbase.into())
.commit_transactions(commit_transactions.into_iter().map(Into::into).collect())
.proposal_transactions(proposal_transactions.into_iter().map(Into::into).collect())
.with_header_builder(header_builder);

let resolver = HeaderResolverWrapper::new(block.header(), shared.clone());
let header_verifier = HeaderVerifier::new(shared.clone(), Pow::Dummy.engine());
assert!(header_verifier.verify(&resolver).is_ok());

let block_verify = BlockVerifier::new(shared.clone());
assert!(block_verify.verify(&block).is_ok());
}
}
35 changes: 22 additions & 13 deletions miner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use hyper::Uri;
use hyper::{Body, Chunk, Client as HttpClient, Method, Request};
use jsonrpc_types::BlockTemplate;
use jsonrpc_types::{
id::Id, params::Params, request::MethodCall, version::Version, Block as JsonBlock,
error::Error as RpcFail, id::Id, params::Params, request::MethodCall, response::Output,
version::Version, Block as JsonBlock,
};
use log::debug;
use log::error;
use log::{debug, error};
use serde_json::error::Error as JsonError;
use serde_json::{self, json, Value};
use std::sync::Arc;
Expand All @@ -27,6 +27,7 @@ pub enum RpcError {
Http(HyperError),
Canceled, //oneshot canceled
Json(JsonError),
Fail(RpcFail),
}

#[derive(Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ impl Rpc {
let stream = receiver.for_each(move |(sender, call): RpcRequest| {
let req_url = url.clone();
let request_json = serde_json::to_vec(&call).expect("valid rpc call");

let mut req = Request::new(Body::from(request_json));
*req.method_mut() = Method::POST;
*req.uri_mut() = req_url;
Expand All @@ -78,9 +80,7 @@ impl Rpc {
.request(req)
.and_then(|res| res.into_body().concat2())
.then(|res| sender.send(res.map_err(RpcError::Http)))
.map_err(|err| {
error!(target: "miner", "rpc request error {:?}", err);
});
.map_err(|_| ());

rt::spawn(request);
Ok(())
Expand All @@ -101,7 +101,7 @@ impl Rpc {
&self,
method: String,
params: Vec<Value>,
) -> impl Future<Item = Chunk, Error = RpcError> {
) -> impl Future<Item = Output, Error = RpcError> {
let (tx, rev) = oneshot::channel();

let call = MethodCall {
Expand All @@ -114,7 +114,9 @@ impl Rpc {
let req = (tx, call);
let mut sender = self.inner.sender.clone();
let _ = sender.try_send(req);
rev.map_err(|_| RpcError::Canceled).flatten()
rev.map_err(|_| RpcError::Canceled)
.flatten()
.and_then(|chunk| serde_json::from_slice(&chunk).map_err(RpcError::Json))
}
}

Expand Down Expand Up @@ -153,10 +155,10 @@ impl Client {
&self,
work_id: &str,
block: &Block,
) -> impl Future<Item = Chunk, Error = RpcError> {
) -> impl Future<Item = Output, Error = RpcError> {
let block: JsonBlock = block.into();
let method = "submit_block".to_owned();
let params = vec![json!(block), json!(work_id)];
let params = vec![json!(work_id), json!(block)];

self.rpc.request(method, params)
}
Expand Down Expand Up @@ -189,8 +191,15 @@ impl Client {
json!(self.config.max_version),
];

self.rpc
.request(method, params)
.and_then(|body| serde_json::from_slice(&body).map_err(RpcError::Json))
self.rpc.request(method, params).and_then(parse_response)
}
}

fn parse_response<T: serde::de::DeserializeOwned>(output: Output) -> Result<T, RpcError> {
match output {
Output::Success(success) => {
serde_json::from_value::<T>(success.result).map_err(RpcError::Json)
}
Output::Failure(failure) => Err(RpcError::Fail(failure.error)),
}
}
6 changes: 1 addition & 5 deletions miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ impl Miner {
} = template;

let (cellbase_id, cellbase) = {
let CellbaseTemplate {
hash,
data,
..
} = cellbase;
let CellbaseTemplate { hash, data, .. } = cellbase;
(hash, data)
};

Expand Down
Loading

0 comments on commit 9c8340a

Please sign in to comment.