+
Skip to content

feat: async add new address to electrum #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/floresta-electrum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ categories = ["bitcoin", "blockchain", "node"]
floresta-common = { path = "../floresta-common" }
floresta-chain = { path = "../floresta-chain" }
floresta-watch-only = { path = "../floresta-watch-only" }
floresta-compact-filters = { path = "../floresta-compact-filters" }
floresta-wire = { path = "../floresta-wire" }

rustreexo = "0.1.0"
sha2 = "^0.10.6"
Expand Down
196 changes: 161 additions & 35 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use bitcoin::consensus::deserialize;
use bitcoin::consensus::encode::serialize_hex;
use bitcoin::hashes::hex::FromHex;
use bitcoin::hashes::sha256;
use bitcoin::hashes::Hash;
use bitcoin::ScriptBuf;
use bitcoin::Transaction;
use bitcoin::TxOut;
Expand All @@ -22,9 +23,13 @@ use floresta_chain::pruned_utreexo::BlockchainInterface;
use floresta_common::get_hash_from_u8;
use floresta_common::get_spk_hash;
use floresta_common::spsc::Channel;
use floresta_compact_filters::BlockFilterBackend;
use floresta_compact_filters::QueryType;
use floresta_watch_only::kv_database::KvDatabase;
use floresta_watch_only::AddressCache;
use floresta_watch_only::CachedTransaction;
use floresta_wire::node_interface::NodeInterface;
use floresta_wire::node_interface::NodeMethods;
use log::error;
use log::info;
use log::trace;
Expand Down Expand Up @@ -95,13 +100,28 @@ pub struct ElectrumServer<Blockchain: BlockchainInterface> {
/// We keep the script_hash and which client has it, so we can notify the
/// clients when a new transaction is received.
pub client_addresses: HashMap<sha256::Hash, Arc<Client>>,
/// A Arc-ed copy of the block filters backend that we can use to check if a
/// block contains a transaction that we are interested in.
pub block_filters: Option<Arc<BlockFilterBackend>>,
/// An interface to a running node, used to broadcast transactions and request
/// blocks.
pub node_interface: Arc<NodeInterface>,
/// A list of addresses that we've just learned about and need to rescan for
/// transactions.
///
/// We accumulate those addresses here and then periodically
/// scan, since a wallet will often send multiple addresses, but
/// in different requests.
pub addresses_to_scan: Vec<sha256::Hash>,
}

impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
pub async fn new(
address: &'static str,
address_cache: Arc<RwLock<AddressCache<KvDatabase>>>,
chain: Arc<Blockchain>,
block_filters: Option<Arc<BlockFilterBackend>>,
node_interface: Arc<NodeInterface>,
) -> Result<ElectrumServer<Blockchain>, Box<dyn std::error::Error>> {
let listener = Arc::new(TcpListener::bind(address).await?);
let (tx, rx) = unbounded();
Expand All @@ -112,11 +132,14 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
Ok(ElectrumServer {
chain,
address_cache,
block_filters,
node_interface,
tcp_listener: listener,
clients: HashMap::new(),
message_receiver: rx,
message_transmitter: tx,
client_addresses: HashMap::new(),
addresses_to_scan: Vec::new(),
})
}

Expand Down Expand Up @@ -199,37 +222,28 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
"blockchain.scripthash.get_history" => {
let script_hash = get_arg!(request, sha256::Hash, 0);
let transactions = self
.address_cache
self.address_cache
.read()
.await
.get_address_history(&script_hash);
let mut res = Vec::new();
for transaction in transactions {
let entry = if transaction.height == 0 {
json!({
"tx_hash": transaction.hash,
"height": transaction.height,
"fee": 2000
})
} else {
json!({
"tx_hash": transaction.hash,
"height": transaction.height,
})
};

res.push(entry);
}

json_rpc_res!(request, res)
.get_address_history(&script_hash)
.map(|transactions| {
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
})
.unwrap_or_else(|| {
Ok(json!({
"jsonrpc": "2.0",
"result": null,
"id": request.id
}))
})
}
"blockchain.scripthash.get_mempool" => json_rpc_res!(request, []),
"blockchain.scripthash.listunspent" => {
let hash = get_arg!(request, sha256::Hash, 0);
let utxos = self.address_cache.read().await.get_address_utxos(&hash);
if utxos.is_none() {
return Err(crate::error::Error::InvalidParams);
return json_rpc_res!(request, []);
}
let mut final_utxos = Vec::new();
for (utxo, prevout) in utxos.unwrap().into_iter() {
Expand Down Expand Up @@ -262,12 +276,19 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
self.client_addresses.insert(hash, client);

let history = self.address_cache.read().await.get_address_history(&hash);

if history.is_empty() {
return json_rpc_res!(request, null);
match history {
Some(transactions) if !transactions.is_empty() => {
let res = get_status(transactions);
json_rpc_res!(request, res)
}
Some(_) => {
json_rpc_res!(request, null)
}
None => {
self.addresses_to_scan.push(hash);
json_rpc_res!(request, null)
}
}
let status_hash = get_status(history);
json_rpc_res!(request, status_hash)
}
"blockchain.scripthash.unsubscribe" => {
let address = get_arg!(request, sha256::Hash, 0);
Expand Down Expand Up @@ -370,19 +391,118 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
for (block, height) in blocks.recv() {
self.handle_block(block, height).await;
}
let request = async_std::future::timeout(

// handles client requests
while let Ok(request) = async_std::future::timeout(
std::time::Duration::from_secs(1),
self.message_receiver.recv(),
)
.await;
.await
{
if let Ok(message) = request {
self.handle_message(message).await?;
}
}

if let Ok(Ok(message)) = request {
self.handle_message(message).await?;
// rescan for new addresses, if any
if !self.addresses_to_scan.is_empty() {
info!("Catching up with addresses {:?}", self.addresses_to_scan);
let addresses: Vec<sha256::Hash> = self.addresses_to_scan.drain(..).collect();
for address in addresses.iter().copied() {
self.address_cache.write().await.cache_address_hash(address);
}
self.rescan_for_addresses(addresses).await?;
}
}
}

async fn handle_block(&mut self, block: bitcoin::Block, height: u32) {
/// If a user adds a new address that we didn't have cached, this method
/// will look for historical transactions for it.
///
/// Usually, we'll relly on compact block filters to speed things up. If
/// we don't have compact block filters, we may rescan using the older,
/// more bandwidth-intensive method of actually downloading blocks.
async fn rescan_for_addresses(
&mut self,
addresses: Vec<sha256::Hash>,
) -> Result<(), super::error::Error> {
// If compact block filters are enabled, use them. Otherwise, fallback
// to the "old-school" rescaning.
match &self.block_filters {
Some(cfilters) => self.rescan_with_block_filters(cfilters, addresses).await,
None => self
.chain
.rescan(1)
.map_err(|e| super::error::Error::Blockchain(Box::new(e))),
}
}

/// If we have compact block filters enabled, this method will use them to
/// find blocks of interest and download for our wallet to learn about new
/// transactions, once a new address is added by subscription.
async fn rescan_with_block_filters(
&self,
cfilters: &BlockFilterBackend,
addresses: Vec<sha256::Hash>,
) -> Result<(), super::error::Error> {
// By default, we look from 1..tip
let height = self.chain.get_height().unwrap_or(0) as u64;

let addresses = addresses
.into_iter()
.map(|a| QueryType::ScriptHash(a.to_byte_array()))
.collect::<Vec<_>>();

// TODO (Davidson): Let users select what the starting and end height is
let blocks: Vec<_> = cfilters
.match_any(1, height, &addresses)
.unwrap_or_default()
.into_iter()
.flat_map(|height| {
self.chain
.get_block_hash(height as u32)
.into_iter()
.zip(Some(height))
})
.flat_map(|(hash, height)| {
self.node_interface
.get_block(hash)
.ok()
.flatten()
.map(|block| (block, height))
})
.collect();

// Tells users about the transactions we found
for (block, height) in blocks {
self.handle_block(block, height as u32).await;
}

Ok(())
}

fn process_history(transactions: &[CachedTransaction]) -> Vec<Value> {
let mut res = Vec::new();
for transaction in transactions {
let entry = if transaction.height == 0 {
json!({
"tx_hash": transaction.hash,
"height": transaction.height,
"fee": 2000
})
} else {
json!({
"tx_hash": transaction.hash,
"height": transaction.height,
})
};

res.push(entry);
}
res
}

async fn handle_block(&self, block: bitcoin::Block, height: u32) {
let result = json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
Expand All @@ -391,10 +511,14 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
"hex": serialize_hex(&block.header)
}]
});
if !self.chain.is_in_idb() || height % 1000 == 0 {

let current_height = self.address_cache.read().await.get_cache_height();

if (!self.chain.is_in_idb() || height % 1000 == 0) && (height > current_height) {
let lock = self.address_cache.write().await;
lock.bump_height(height);
}

if self.chain.get_height().unwrap() == height {
for client in &mut self.clients.values() {
let res = client
Expand All @@ -405,6 +529,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
}
}

let transactions = self
.address_cache
.write()
Expand Down Expand Up @@ -468,7 +593,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
if let Some(client) = self.client_addresses.get(&hash) {
let history = self.address_cache.read().await.get_address_history(&hash);

let status_hash = get_status(history);
let status_hash = get_status(history.unwrap());
let notify = json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
Expand Down Expand Up @@ -584,6 +709,7 @@ macro_rules! json_rpc_res {
}))
}
}

#[macro_export]
/// Returns and parses a value from the request json or fails with [super::error::Error::InvalidParams].
macro_rules! get_arg {
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载