mirror of
https://github.com/bitcoin/bips.git
synced 2026-03-09 15:53:54 +00:00
Introduce an optional "n_outputs" field as alternative to the detailed "outputs" objects (the field was already specified, but not used so far). Also update the documentation of the fields.
381 lines
18 KiB
Python
Executable File
381 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# For running the test vectors, run this script:
|
|
# ./reference.py send_and_receive_test_vectors.json
|
|
|
|
import hashlib
|
|
import json
|
|
from typing import List, Tuple, Dict, cast
|
|
from sys import argv, exit
|
|
from functools import reduce
|
|
|
|
# local files
|
|
from bech32m import convertbits, bech32_encode, decode, Encoding
|
|
from secp256k1 import ECKey, ECPubKey, TaggedHash, NUMS_H
|
|
from bitcoin_utils import (
|
|
deser_txid,
|
|
from_hex,
|
|
hash160,
|
|
is_p2pkh,
|
|
is_p2sh,
|
|
is_p2wpkh,
|
|
is_p2tr,
|
|
ser_uint32,
|
|
COutPoint,
|
|
CTxInWitness,
|
|
VinInfo,
|
|
)
|
|
|
|
|
|
K_max = 2323 # per-group recipient limit
|
|
|
|
|
|
def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
|
|
if is_p2pkh(vin.prevout):
|
|
# skip the first 3 op_codes and grab the 20 byte hash
|
|
# from the scriptPubKey
|
|
spk_hash = vin.prevout[3:3 + 20]
|
|
for i in range(len(vin.scriptSig), 0, -1):
|
|
if i - 33 >= 0:
|
|
# starting from the back, we move over the scriptSig with a 33 byte
|
|
# window (to match a compressed pubkey). we hash this and check if it matches
|
|
# the 20 byte hash from the scriptPubKey. for standard scriptSigs, this will match
|
|
# right away because the pubkey is the last item in the scriptSig.
|
|
# if its a non-standard (malleated) scriptSig, we will still find the pubkey if its
|
|
# a compressed pubkey.
|
|
#
|
|
# note: this is an incredibly inefficient implementation, for demonstration purposes only.
|
|
pubkey_bytes = vin.scriptSig[i - 33:i]
|
|
pubkey_hash = hash160(pubkey_bytes)
|
|
if pubkey_hash == spk_hash:
|
|
pubkey = ECPubKey().set(pubkey_bytes)
|
|
if (pubkey.valid) & (pubkey.compressed):
|
|
return pubkey
|
|
if is_p2sh(vin.prevout):
|
|
redeem_script = vin.scriptSig[1:]
|
|
if is_p2wpkh(redeem_script):
|
|
pubkey = ECPubKey().set(vin.txinwitness.scriptWitness.stack[-1])
|
|
if (pubkey.valid) & (pubkey.compressed):
|
|
return pubkey
|
|
if is_p2wpkh(vin.prevout):
|
|
txin = vin.txinwitness
|
|
pubkey = ECPubKey().set(txin.scriptWitness.stack[-1])
|
|
if (pubkey.valid) & (pubkey.compressed):
|
|
return pubkey
|
|
if is_p2tr(vin.prevout):
|
|
witnessStack = vin.txinwitness.scriptWitness.stack
|
|
if (len(witnessStack) >= 1):
|
|
if (len(witnessStack) > 1 and witnessStack[-1][0] == 0x50):
|
|
# Last item is annex
|
|
witnessStack.pop()
|
|
|
|
if (len(witnessStack) > 1):
|
|
# Script-path spend
|
|
control_block = witnessStack[-1]
|
|
# control block is <control byte> <32 byte internal key> and 0 or more <32 byte hash>
|
|
internal_key = control_block[1:33]
|
|
if (internal_key == NUMS_H.to_bytes(32, 'big')):
|
|
# Skip if NUMS_H
|
|
return ECPubKey()
|
|
|
|
pubkey = ECPubKey().set(vin.prevout[2:])
|
|
if (pubkey.valid) & (pubkey.compressed):
|
|
return pubkey
|
|
|
|
|
|
return ECPubKey()
|
|
|
|
|
|
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: ECPubKey) -> bytes:
|
|
lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0]
|
|
return TaggedHash("BIP0352/Inputs", lowest_outpoint.serialize() + cast(bytes, sum_input_pubkeys.get_bytes(False)))
|
|
|
|
|
|
|
|
def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "tsp", version: int = 0) -> str:
|
|
data = convertbits(cast(bytes, B_scan.get_bytes(False)) + cast(bytes, B_m.get_bytes(False)), 8, 5)
|
|
return bech32_encode(hrp, [version] + cast(List[int], data), Encoding.BECH32M)
|
|
|
|
|
|
def generate_label(b_scan: ECKey, m: int) -> bytes:
|
|
return TaggedHash("BIP0352/Label", b_scan.get_bytes() + ser_uint32(m))
|
|
|
|
|
|
def create_labeled_silent_payment_address(b_scan: ECKey, B_spend: ECPubKey, m: int, hrp: str = "tsp", version: int = 0) -> str:
|
|
G = ECKey().set(1).get_pubkey()
|
|
B_scan = b_scan.get_pubkey()
|
|
B_m = B_spend + generate_label(b_scan, m) * G
|
|
labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version)
|
|
|
|
return labeled_address
|
|
|
|
|
|
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[ECPubKey, ECPubKey]:
|
|
_, data = decode(hrp, address)
|
|
if data is None:
|
|
return ECPubKey(), ECPubKey()
|
|
B_scan = ECPubKey().set(data[:33])
|
|
B_spend = ECPubKey().set(data[33:])
|
|
|
|
return B_scan, B_spend
|
|
|
|
|
|
def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]:
|
|
G = ECKey().set(1).get_pubkey()
|
|
negated_keys = []
|
|
for key, is_xonly in input_priv_keys:
|
|
k = ECKey().set(key.get_bytes())
|
|
if is_xonly and k.get_pubkey().get_y() % 2 != 0:
|
|
k.negate()
|
|
negated_keys.append(k)
|
|
|
|
a_sum = sum(negated_keys)
|
|
if not a_sum.valid:
|
|
# Input privkeys sum is zero -> fail
|
|
return []
|
|
assert ECKey().set(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum"
|
|
input_hash = get_input_hash(outpoints, a_sum * G)
|
|
silent_payment_groups: Dict[ECPubKey, List[ECPubKey]] = {}
|
|
for recipient in recipients:
|
|
B_scan, B_m = decode_silent_payment_address(recipient["address"], hrp=hrp)
|
|
# Verify decoded intermediate keys for recipient
|
|
expected_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
|
|
expected_B_m = ECPubKey().set(bytes.fromhex(recipient["spend_pub_key"]))
|
|
assert expected_B_scan == B_scan, "B_scan did not match expected recipient.scan_pub_key"
|
|
assert expected_B_m == B_m, "B_m did not match expected recipient.spend_pub_key"
|
|
if B_scan in silent_payment_groups:
|
|
silent_payment_groups[B_scan].append(B_m)
|
|
else:
|
|
silent_payment_groups[B_scan] = [B_m]
|
|
|
|
# Fail if per-group recipient limit (K_max) is exceeded
|
|
if any([len(group) > K_max for group in silent_payment_groups.values()]):
|
|
return []
|
|
|
|
outputs = []
|
|
for B_scan, B_m_values in silent_payment_groups.items():
|
|
ecdh_shared_secret = input_hash * a_sum * B_scan
|
|
expected_shared_secrets = expected.get("shared_secrets", {})
|
|
# Find the recipient address that corresponds to this B_scan and get its index
|
|
for recipient_idx, recipient in enumerate(recipients):
|
|
recipient_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
|
|
if recipient_B_scan == B_scan:
|
|
expected_shared_secret_hex = expected_shared_secrets[recipient_idx]
|
|
assert ecdh_shared_secret.get_bytes(False).hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}"
|
|
break
|
|
k = 0
|
|
for B_m in B_m_values:
|
|
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
|
|
P_km = B_m + t_k * G
|
|
outputs.append(P_km.get_bytes().hex())
|
|
k += 1
|
|
|
|
return list(set(outputs))
|
|
|
|
|
|
def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]:
|
|
G = ECKey().set(1).get_pubkey()
|
|
input_hash_key = ECKey().set(input_hash)
|
|
computed_tweak_point = input_hash_key * A_sum
|
|
assert computed_tweak_point.get_bytes(False).hex() == expected.get("tweak"), "tweak did not match expected"
|
|
ecdh_shared_secret = input_hash * b_scan * A_sum
|
|
assert ecdh_shared_secret.get_bytes(False).hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret"
|
|
k = 0
|
|
wallet = []
|
|
while True:
|
|
if k == K_max: # Don't look further than the per-group recipient limit (K_max)
|
|
break
|
|
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
|
|
P_k = B_spend + t_k * G
|
|
for output in outputs_to_check:
|
|
if P_k == output:
|
|
wallet.append({"pub_key": P_k.get_bytes().hex(), "priv_key_tweak": t_k.hex()})
|
|
outputs_to_check.remove(output)
|
|
k += 1
|
|
break
|
|
elif labels:
|
|
m_G_sub = output - P_k
|
|
if m_G_sub.get_bytes(False).hex() in labels:
|
|
P_km = P_k + m_G_sub
|
|
wallet.append({
|
|
"pub_key": P_km.get_bytes().hex(),
|
|
"priv_key_tweak": (ECKey().set(t_k).add(
|
|
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
|
|
)).get_bytes().hex(),
|
|
})
|
|
outputs_to_check.remove(output)
|
|
k += 1
|
|
break
|
|
else:
|
|
output.negate()
|
|
m_G_sub = output - P_k
|
|
if m_G_sub.get_bytes(False).hex() in labels:
|
|
P_km = P_k + m_G_sub
|
|
wallet.append({
|
|
"pub_key": P_km.get_bytes().hex(),
|
|
"priv_key_tweak": (ECKey().set(t_k).add(
|
|
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
|
|
)).get_bytes().hex(),
|
|
})
|
|
outputs_to_check.remove(output)
|
|
k += 1
|
|
break
|
|
else:
|
|
break
|
|
return wallet
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(argv) != 2 or argv[1] in ('-h', '--help'):
|
|
print("Usage: ./reference.py send_and_receive_test_vectors.json")
|
|
exit(0)
|
|
|
|
with open(argv[1], "r") as f:
|
|
test_data = json.loads(f.read())
|
|
|
|
# G , needed for generating the labels "database"
|
|
G = ECKey().set(1).get_pubkey()
|
|
for case in test_data:
|
|
print(case["comment"])
|
|
# Test sending
|
|
for sending_test in case["sending"]:
|
|
given = sending_test["given"]
|
|
expected = sending_test["expected"]
|
|
|
|
vins = [
|
|
VinInfo(
|
|
outpoint=COutPoint(hash=deser_txid(input["txid"]), n=input["vout"]),
|
|
scriptSig=bytes.fromhex(input["scriptSig"]),
|
|
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
|
|
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
|
|
private_key=ECKey().set(bytes.fromhex(input["private_key"])),
|
|
)
|
|
for input in given["vin"]
|
|
]
|
|
# Convert the tuples to lists so they can be easily compared to the json list of lists from the given test vectors
|
|
input_priv_keys = []
|
|
input_pub_keys = []
|
|
for vin in vins:
|
|
pubkey = get_pubkey_from_input(vin)
|
|
if not pubkey.valid:
|
|
continue
|
|
input_priv_keys.append((
|
|
vin.private_key,
|
|
is_p2tr(vin.prevout),
|
|
))
|
|
input_pub_keys.append(pubkey)
|
|
assert [pk.get_bytes(False).hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected"
|
|
|
|
sending_outputs = []
|
|
if (len(input_pub_keys) > 0):
|
|
outpoints = [vin.outpoint for vin in vins]
|
|
recipients = [] # expand given recipient entries to full list
|
|
for recipient_entry in given["recipients"]:
|
|
count = recipient_entry.get("count", 1)
|
|
recipients.extend([recipient_entry] * count)
|
|
sending_outputs = create_outputs(input_priv_keys, outpoints, recipients, expected=expected, hrp="sp")
|
|
|
|
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses
|
|
# will produce different generated outputs if sending to multiple silent payment addresses belonging to the
|
|
# same sender but with different labels. Because of this, expected["outputs"] contains all possible valid output sets,
|
|
# based on all possible permutations of recipient address orderings. Must match exactly one of the possible output sets.
|
|
assert(any(set(sending_outputs) == set(lst) for lst in expected["outputs"])), "Sending test failed"
|
|
else:
|
|
assert(sending_outputs == expected["outputs"][0] == []), "Sending test failed"
|
|
|
|
# Test receiving
|
|
msg = hashlib.sha256(b"message").digest()
|
|
aux = hashlib.sha256(b"random auxiliary data").digest()
|
|
for receiving_test in case["receiving"]:
|
|
given = receiving_test["given"]
|
|
expected = receiving_test["expected"]
|
|
outputs_to_check = [
|
|
ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"]
|
|
]
|
|
vins = [
|
|
VinInfo(
|
|
outpoint=COutPoint(hash=deser_txid(input["txid"]), n=input["vout"]),
|
|
scriptSig=bytes.fromhex(input["scriptSig"]),
|
|
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
|
|
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
|
|
)
|
|
for input in given["vin"]
|
|
]
|
|
# Check that the given inputs for the receiving test match what was generated during the sending test
|
|
receiving_addresses = []
|
|
b_scan = ECKey().set(bytes.fromhex(given["key_material"]["scan_priv_key"]))
|
|
b_spend = ECKey().set(
|
|
bytes.fromhex(given["key_material"]["spend_priv_key"])
|
|
)
|
|
B_scan = b_scan.get_pubkey()
|
|
B_spend = b_spend.get_pubkey()
|
|
receiving_addresses.append(
|
|
encode_silent_payment_address(B_scan, B_spend, hrp="sp")
|
|
)
|
|
if given["labels"]:
|
|
for label in given["labels"]:
|
|
receiving_addresses.append(
|
|
create_labeled_silent_payment_address(
|
|
b_scan, B_spend, m=label, hrp="sp"
|
|
)
|
|
)
|
|
|
|
# Check that the silent payment addresses match for the given BIP32 seed and labels dictionary
|
|
assert (receiving_addresses == expected["addresses"]), "Receiving addresses don't match"
|
|
input_pub_keys = []
|
|
for vin in vins:
|
|
pubkey = get_pubkey_from_input(vin)
|
|
if not pubkey.valid:
|
|
continue
|
|
input_pub_keys.append(pubkey)
|
|
|
|
add_to_wallet = []
|
|
if (len(input_pub_keys) > 0):
|
|
A_sum = reduce(lambda x, y: x + y, input_pub_keys)
|
|
if A_sum.get_bytes() is None:
|
|
# Input pubkeys sum is point at infinity -> skip tx
|
|
assert expected["outputs"] == []
|
|
continue
|
|
assert A_sum.get_bytes(False).hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum"
|
|
input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum)
|
|
pre_computed_labels = {
|
|
(generate_label(b_scan, label) * G).get_bytes(False).hex(): generate_label(b_scan, label).hex()
|
|
for label in given["labels"]
|
|
}
|
|
add_to_wallet = scanning(
|
|
b_scan=b_scan,
|
|
B_spend=B_spend,
|
|
A_sum=A_sum,
|
|
input_hash=input_hash,
|
|
outputs_to_check=outputs_to_check,
|
|
labels=pre_computed_labels,
|
|
expected=expected,
|
|
)
|
|
|
|
# Check that the private key is correct for the found output public key
|
|
for output in add_to_wallet:
|
|
pub_key = ECPubKey().set(bytes.fromhex(output["pub_key"]))
|
|
full_private_key = b_spend.add(bytes.fromhex(output["priv_key_tweak"]))
|
|
if full_private_key.get_pubkey().get_y() % 2 != 0:
|
|
full_private_key.negate()
|
|
|
|
sig = full_private_key.sign_schnorr(msg, aux)
|
|
assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}"
|
|
output["signature"] = sig.hex()
|
|
|
|
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses
|
|
# will produce different generated outputs if sending to multiple silent payment addresses belonging to the
|
|
# same sender but with different labels. Because of this, expected["outputs"] contains all possible valid output sets,
|
|
# based on all possible permutations of recipient address orderings. Must match exactly one of the possible found output
|
|
# sets in expected["outputs"]
|
|
if "outputs" in expected: # detailed check against expected outputs
|
|
generated_set = {frozenset(d.items()) for d in add_to_wallet}
|
|
expected_set = {frozenset(d.items()) for d in expected["outputs"]}
|
|
assert generated_set == expected_set, "Receive test failed"
|
|
elif "n_outputs" in expected: # only check the number of found outputs
|
|
assert len(add_to_wallet) == expected["n_outputs"], "Receive test failed"
|
|
else:
|
|
assert False, "either 'outputs' or 'n_outputs' must be specified in 'expected' field of receiving test vector"
|
|
|
|
|
|
print("All tests passed")
|