Skip to content

Commit 049df10

Browse files
committed
Add blockchain.outpoint.subscribe RPC
1 parent cf0acfb commit 049df10

File tree

3 files changed

+200
-6
lines changed

3 files changed

+200
-6
lines changed

src/electrum.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
22
use bitcoin::{
33
consensus::{deserialize, serialize},
44
hashes::hex::{FromHex, ToHex},
5-
BlockHash, Txid,
5+
BlockHash, OutPoint, Txid,
66
};
77
use crossbeam_channel::Receiver;
88
use rayon::prelude::*;
@@ -19,7 +19,7 @@ use crate::{
1919
merkle::Proof,
2020
metrics::{self, Histogram},
2121
signals::Signal,
22-
status::ScriptHashStatus,
22+
status::{OutPointStatus, ScriptHashStatus},
2323
tracker::Tracker,
2424
types::ScriptHash,
2525
};
@@ -34,6 +34,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
3434
pub struct Client {
3535
tip: Option<BlockHash>,
3636
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
37+
outpoints: HashMap<OutPoint, OutPointStatus>,
3738
}
3839

3940
#[derive(Deserialize)]
@@ -178,7 +179,25 @@ impl Rpc {
178179
}
179180
})
180181
.collect::<Result<Vec<Value>>>()
181-
.context("failed to update status")?;
182+
.context("failed to update scripthash status")?;
183+
184+
notifications.extend(
185+
client
186+
.outpoints
187+
.par_iter_mut()
188+
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
189+
match self.tracker.update_outpoint_status(status, &self.daemon) {
190+
Ok(true) => Some(Ok(notification(
191+
"blockchain.outpoint.subscribe",
192+
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
193+
))),
194+
Ok(false) => None, // outpoint status is the same
195+
Err(e) => Some(Err(e)),
196+
}
197+
})
198+
.collect::<Result<Vec<Value>>>()
199+
.context("failed to update scripthash status")?,
200+
);
182201

183202
if let Some(old_tip) = client.tip {
184203
let new_tip = self.tracker.chain().tip();
@@ -306,6 +325,28 @@ impl Rpc {
306325
Ok(json!(result))
307326
}
308327

328+
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
329+
let outpoint = OutPoint::new(txid, vout);
330+
Ok(match client.outpoints.entry(outpoint) {
331+
Entry::Occupied(e) => json!(e.get()),
332+
Entry::Vacant(e) => {
333+
let outpoint = OutPoint::new(txid, vout);
334+
let mut status = OutPointStatus::new(outpoint);
335+
self.tracker
336+
.update_outpoint_status(&mut status, &self.daemon)?;
337+
json!(e.insert(status))
338+
}
339+
})
340+
}
341+
342+
fn outpoint_unsubscribe(
343+
&self,
344+
client: &mut Client,
345+
(txid, vout): (Txid, u32),
346+
) -> Result<Value> {
347+
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
348+
}
349+
309350
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
310351
let mut status = ScriptHashStatus::new(scripthash);
311352
self.tracker
@@ -440,6 +481,8 @@ impl Rpc {
440481
Call::HeadersSubscribe => self.headers_subscribe(client),
441482
Call::MempoolFeeHistogram => self.get_fee_histogram(),
442483
Call::PeersSubscribe => Ok(json!([])),
484+
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
485+
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
443486
Call::Ping => Ok(Value::Null),
444487
Call::RelayFee => self.relayfee(),
445488
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
@@ -473,19 +516,21 @@ enum Call {
473516
Banner,
474517
BlockHeader((usize,)),
475518
BlockHeaders((usize, usize)),
476-
TransactionBroadcast((String,)),
477519
Donation,
478520
EstimateFee((u16,)),
479521
Features,
480522
HeadersSubscribe,
481523
MempoolFeeHistogram,
524+
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
525+
OutPointUnsubscribe((Txid, u32)),
482526
PeersSubscribe,
483527
Ping,
484528
RelayFee,
485529
ScriptHashGetBalance((ScriptHash,)),
486530
ScriptHashGetHistory((ScriptHash,)),
487531
ScriptHashListUnspent((ScriptHash,)),
488532
ScriptHashSubscribe((ScriptHash,)),
533+
TransactionBroadcast((String,)),
489534
TransactionGet(TxGetArgs),
490535
TransactionGetMerkle((Txid, usize)),
491536
Version((String, Version)),
@@ -503,6 +548,8 @@ impl Call {
503548
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
504549
"blockchain.scripthash.listunspent" => Call::ScriptHashListUnspent(convert(params)?),
505550
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
551+
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
552+
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
506553
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
507554
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
508555
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),

src/status.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bitcoin::{
44
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
55
};
66
use rayon::prelude::*;
7-
use serde::ser::{Serialize, Serializer};
7+
use serde::ser::{Serialize, SerializeMap, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
@@ -49,12 +49,26 @@ impl TxEntry {
4949
// Confirmation height of a transaction or its mempool state:
5050
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
5151
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
52+
#[derive(Copy, Clone, Eq, PartialEq)]
5253
enum Height {
5354
Confirmed { height: usize },
5455
Unconfirmed { has_unconfirmed_inputs: bool },
5556
}
5657

5758
impl Height {
59+
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
60+
let height = chain
61+
.get_block_height(&blockhash)
62+
.expect("missing block in chain");
63+
Self::Confirmed { height }
64+
}
65+
66+
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
67+
Self::Unconfirmed {
68+
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
69+
}
70+
}
71+
5872
fn as_i64(&self) -> i64 {
5973
match self {
6074
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -511,6 +525,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
511525
Some(StatusHash::from_engine(engine))
512526
}
513527

528+
pub(crate) struct OutPointStatus {
529+
outpoint: OutPoint,
530+
funding: Option<Height>,
531+
spending: Option<(Txid, Height)>,
532+
tip: BlockHash,
533+
}
534+
535+
impl Serialize for OutPointStatus {
536+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
537+
where
538+
S: Serializer,
539+
{
540+
let mut map = serializer.serialize_map(None)?;
541+
if let Some(funding) = &self.funding {
542+
map.serialize_entry("height", &funding)?;
543+
}
544+
if let Some((txid, height)) = &self.spending {
545+
map.serialize_entry("spender_txhash", &txid)?;
546+
map.serialize_entry("spender_height", &height)?;
547+
}
548+
map.end()
549+
}
550+
}
551+
552+
impl OutPointStatus {
553+
pub(crate) fn new(outpoint: OutPoint) -> Self {
554+
Self {
555+
outpoint,
556+
funding: None,
557+
spending: None,
558+
tip: BlockHash::default(),
559+
}
560+
}
561+
562+
pub(crate) fn sync(
563+
&mut self,
564+
index: &Index,
565+
mempool: &Mempool,
566+
daemon: &Daemon,
567+
) -> Result<bool> {
568+
let funding = self.sync_funding(index, daemon, mempool)?;
569+
let spending = self.sync_spending(index, daemon, mempool)?;
570+
let same_status = (self.funding == funding) && (self.spending == spending);
571+
self.funding = funding;
572+
self.spending = spending;
573+
self.tip = index.chain().tip();
574+
Ok(!same_status)
575+
}
576+
577+
/// Return true iff current tip became unconfirmed
578+
fn is_reorg(&self, chain: &Chain) -> bool {
579+
chain.get_block_height(&self.tip).is_none()
580+
}
581+
582+
fn sync_funding(
583+
&self,
584+
index: &Index,
585+
daemon: &Daemon,
586+
mempool: &Mempool,
587+
) -> Result<Option<Height>> {
588+
let chain = index.chain();
589+
if !self.is_reorg(chain) {
590+
if let Some(Height::Confirmed { .. }) = &self.funding {
591+
return Ok(self.funding);
592+
}
593+
}
594+
let mut confirmed = None;
595+
daemon.for_blocks(
596+
index.filter_by_txid(self.outpoint.txid),
597+
|blockhash, block| {
598+
if confirmed.is_none() {
599+
for tx in block.txdata {
600+
let txid = tx.txid();
601+
let output_len = u32::try_from(tx.output.len()).unwrap();
602+
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
603+
confirmed = Some(Height::from_blockhash(blockhash, chain));
604+
return;
605+
}
606+
}
607+
}
608+
},
609+
)?;
610+
Ok(confirmed.or_else(|| {
611+
mempool
612+
.get(&self.outpoint.txid)
613+
.map(|entry| Height::unconfirmed(entry))
614+
}))
615+
}
616+
617+
fn sync_spending(
618+
&self,
619+
index: &Index,
620+
daemon: &Daemon,
621+
mempool: &Mempool,
622+
) -> Result<Option<(Txid, Height)>> {
623+
let chain = index.chain();
624+
if !self.is_reorg(chain) {
625+
if let Some((_, Height::Confirmed { .. })) = &self.spending {
626+
return Ok(self.spending);
627+
}
628+
}
629+
let spending_blockhashes = index.filter_by_spending(self.outpoint);
630+
let mut confirmed = None;
631+
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
632+
for tx in block.txdata {
633+
for txi in &tx.input {
634+
if txi.previous_output == self.outpoint {
635+
// TODO: there should be only one spending input
636+
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
637+
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
638+
return;
639+
}
640+
}
641+
}
642+
})?;
643+
Ok(confirmed.or_else(|| {
644+
let entries = mempool.filter_by_spending(&self.outpoint);
645+
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
646+
entries
647+
.first()
648+
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
649+
}))
650+
}
651+
}
652+
514653
#[cfg(test)]
515654
mod tests {
516655
use super::HistoryEntry;

src/tracker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
mempool::{FeeHistogram, Mempool},
1212
metrics::Metrics,
1313
signals::ExitFlag,
14-
status::{Balance, ScriptHashStatus, UnspentEntry},
14+
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
1515
};
1616

1717
/// Electrum protocol subscriptions' tracker
@@ -83,6 +83,14 @@ impl Tracker {
8383
status.get_balance(self.chain())
8484
}
8585

86+
pub(crate) fn update_outpoint_status(
87+
&self,
88+
status: &mut OutPointStatus,
89+
daemon: &Daemon,
90+
) -> Result<bool> {
91+
status.sync(&self.index, &self.mempool, daemon)
92+
}
93+
8694
pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
8795
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
8896
self.index.filter_by_txid(txid).next()

0 commit comments

Comments
 (0)