Skip to content

Commit eae0905

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent 1423031 commit eae0905

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use rayon::prelude::*;
88
use serde_derive::Deserialize;
@@ -17,7 +17,7 @@ use crate::{
1717
daemon::{self, extract_bitcoind_error, Daemon},
1818
merkle::Proof,
1919
metrics::Histogram,
20-
status::ScriptHashStatus,
20+
status::{OutPointStatus, ScriptHashStatus},
2121
tracker::Tracker,
2222
types::ScriptHash,
2323
};
@@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
3232
pub struct Client {
3333
tip: Option<BlockHash>,
3434
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
35+
outpoints: HashMap<OutPoint, OutPointStatus>,
3536
}
3637

3738
#[derive(Deserialize)]
@@ -158,7 +159,25 @@ impl Rpc {
158159
}
159160
})
160161
.collect::<Result<Vec<Value>>>()
161-
.context("failed to update status")?;
162+
.context("failed to update scripthash status")?;
163+
164+
notifications.extend(
165+
client
166+
.outpoints
167+
.par_iter_mut()
168+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
169+
match self.tracker.update_outpoint_status(status, &self.daemon) {
170+
Ok(true) => Some(Ok(notification(
171+
"blockchain.outpoint.subscribe",
172+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
173+
))),
174+
Ok(false) => None, // outpoint status is the same
175+
Err(e) => Some(Err(e)),
176+
}
177+
})
178+
.collect::<Result<Vec<Value>>>()
179+
.context("failed to update scripthash status")?,
180+
);
162181

163182
if let Some(old_tip) = client.tip {
164183
let new_tip = self.tracker.chain().tip();
@@ -271,6 +290,28 @@ impl Rpc {
271290
Ok(json!(result))
272291
}
273292

293+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
294+
let outpoint = OutPoint::new(txid, vout);
295+
Ok(match client.outpoints.entry(outpoint) {
296+
Entry::Occupied(e) => json!(e.get()),
297+
Entry::Vacant(e) => {
298+
let outpoint = OutPoint::new(txid, vout);
299+
let mut status = OutPointStatus::new(outpoint);
300+
self.tracker
301+
.update_outpoint_status(&mut status, &self.daemon)?;
302+
json!(e.insert(status))
303+
}
304+
})
305+
}
306+
307+
fn outpoint_unsubscribe(
308+
&self,
309+
client: &mut Client,
310+
(txid, vout): (Txid, u32),
311+
) -> Result<Value> {
312+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
313+
}
314+
274315
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
275316
let mut status = ScriptHashStatus::new(scripthash);
276317
self.tracker
@@ -405,6 +446,8 @@ impl Rpc {
405446
Call::HeadersSubscribe => self.headers_subscribe(client),
406447
Call::MempoolFeeHistogram => self.get_fee_histogram(),
407448
Call::PeersSubscribe => Ok(json!([])),
449+
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
450+
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
408451
Call::Ping => Ok(Value::Null),
409452
Call::RelayFee => self.relayfee(),
410453
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
@@ -437,18 +480,20 @@ enum Call {
437480
Banner,
438481
BlockHeader((usize,)),
439482
BlockHeaders((usize, usize)),
440-
TransactionBroadcast((String,)),
441483
Donation,
442484
EstimateFee((u16,)),
443485
Features,
444486
HeadersSubscribe,
445487
MempoolFeeHistogram,
488+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
489+
OutPointUnsubscribe((Txid, u32)),
446490
PeersSubscribe,
447491
Ping,
448492
RelayFee,
449493
ScriptHashGetBalance((ScriptHash,)),
450494
ScriptHashGetHistory((ScriptHash,)),
451495
ScriptHashSubscribe((ScriptHash,)),
496+
TransactionBroadcast((String,)),
452497
TransactionGet(TxGetArgs),
453498
TransactionGetMerkle((Txid, usize)),
454499
Version((String, Version)),
@@ -465,6 +510,8 @@ impl Call {
465510
"blockchain.scripthash.get_balance" => Call::ScriptHashGetBalance(convert(params)?),
466511
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
467512
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
513+
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
514+
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
468515
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
469516
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
470517
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -49,12 +49,26 @@ impl TxEntry {
4949
// Confirmation height of a transaction or its mempool state:
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5151
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
52+
#[derive(Copy, Clone, Eq, PartialEq)]
5253
enum Height {
5354
Confirmed { height: usize },
5455
Unconfirmed { has_unconfirmed_inputs: bool },
5556
}
5657

5758
impl Height {
59+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
60+
let height = chain
61+
.get_block_height(&blockhash)
62+
.expect("missing block in chain");
63+
Self::Confirmed { height }
64+
}
65+
66+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
67+
Self::Unconfirmed {
68+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
69+
}
70+
}
71+
5872
fn as_i64(&self) -> i64 {
5973
match self {
6074
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -467,6 +481,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
467481
Some(StatusHash::from_engine(engine))
468482
}
469483

484+
pub(crate) struct OutPointStatus {
485+
outpoint: OutPoint,
486+
funding: Option<Height>,
487+
spending: Option<(Txid, Height)>,
488+
tip: BlockHash,
489+
}
490+
491+
impl Serialize for OutPointStatus {
492+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
493+
where
494+
S: Serializer,
495+
{
496+
let mut map = serializer.serialize_map(None)?;
497+
if let Some(funding) = &self.funding {
498+
map.serialize_entry("height", &funding)?;
499+
}
500+
if let Some((txid, height)) = &self.spending {
501+
map.serialize_entry("spender_txhash", &txid)?;
502+
map.serialize_entry("spender_height", &height)?;
503+
}
504+
map.end()
505+
}
506+
}
507+
508+
impl OutPointStatus {
509+
pub(crate) fn new(outpoint: OutPoint) -> Self {
510+
Self {
511+
outpoint,
512+
funding: None,
513+
spending: None,
514+
tip: BlockHash::default(),
515+
}
516+
}
517+
518+
pub(crate) fn sync(
519+
&mut self,
520+
index: &Index,
521+
mempool: &Mempool,
522+
daemon: &Daemon,
523+
) -> Result<bool> {
524+
let funding = self.sync_funding(index, daemon, mempool)?;
525+
let spending = self.sync_spending(index, daemon, mempool)?;
526+
let same_status = (self.funding == funding) && (self.spending == spending);
527+
self.funding = funding;
528+
self.spending = spending;
529+
self.tip = index.chain().tip();
530+
Ok(!same_status)
531+
}
532+
533+
/// Return true iff current tip became unconfirmed
534+
fn is_reorg(&self, chain: &Chain) -> bool {
535+
chain.get_block_height(&self.tip).is_none()
536+
}
537+
538+
fn sync_funding(
539+
&self,
540+
index: &Index,
541+
daemon: &Daemon,
542+
mempool: &Mempool,
543+
) -> Result<Option<Height>> {
544+
let chain = index.chain();
545+
if !self.is_reorg(chain) {
546+
if let Some(Height::Confirmed { .. }) = &self.funding {
547+
return Ok(self.funding);
548+
}
549+
}
550+
let mut confirmed = None;
551+
daemon.for_blocks(
552+
index.filter_by_txid(self.outpoint.txid),
553+
|blockhash, block| {
554+
if confirmed.is_none() {
555+
for tx in block.txdata {
556+
let txid = tx.txid();
557+
let output_len = u32::try_from(tx.output.len()).unwrap();
558+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
559+
confirmed = Some(Height::from_blockhash(blockhash, chain));
560+
return;
561+
}
562+
}
563+
}
564+
},
565+
)?;
566+
Ok(confirmed.or_else(|| {
567+
mempool
568+
.get(&self.outpoint.txid)
569+
.map(|entry| Height::unconfirmed(entry))
570+
}))
571+
}
572+
573+
fn sync_spending(
574+
&self,
575+
index: &Index,
576+
daemon: &Daemon,
577+
mempool: &Mempool,
578+
) -> Result<Option<(Txid, Height)>> {
579+
let chain = index.chain();
580+
if !self.is_reorg(chain) {
581+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
582+
return Ok(self.spending);
583+
}
584+
}
585+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
586+
let mut confirmed = None;
587+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
588+
for tx in block.txdata {
589+
for txi in &tx.input {
590+
if txi.previous_output == self.outpoint {
591+
// TODO: there should be only one spending input
592+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
593+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
594+
return;
595+
}
596+
}
597+
}
598+
})?;
599+
Ok(confirmed.or_else(|| {
600+
let entries = mempool.filter_by_spending(&self.outpoint);
601+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
602+
entries
603+
.first()
604+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
605+
}))
606+
}
607+
}
608+
470609
#[cfg(test)]
471610
mod tests {
472611
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
index::Index,
1414
mempool::{Histogram, Mempool},
1515
metrics::Metrics,
16-
status::{Balance, HistoryEntry, ScriptHashStatus},
16+
status::{Balance, HistoryEntry, OutPointStatus, ScriptHashStatus},
1717
};
1818

1919
/// Electrum protocol subscriptions' tracker
@@ -76,6 +76,14 @@ impl Tracker {
7676
Ok(prev_statushash != status.statushash())
7777
}
7878

79+
pub(crate) fn update_outpoint_status(
80+
&self,
81+
status: &mut OutPointStatus,
82+
daemon: &Daemon,
83+
) -> Result<bool> {
84+
status.sync(&self.index, &self.mempool, daemon)
85+
}
86+
7987
pub(crate) fn get_balance(&self, status: &ScriptHashStatus, cache: &Cache) -> Balance {
8088
let get_amount_fn = |outpoint: OutPoint| {
8189
cache

0 commit comments

Comments
 (0)