1
0
mirror of https://github.com/bitcoin/bips.git synced 2026-03-16 15:55:37 +00:00

BIP-352: take use of vendored secp256k1lab for reference implementation

This allows to remove secp256k1.py and replace the secp256k1-specific
parts in the reference implementation. Replacement guide:

    * ECKey -> Scalar
    * ECKey.set(seckey_bytes) -> Scalar.from_bytes_checked(seckey_bytes)
    * seckey.get_pubkey() -> seckey * G
    * seckey.get_bytes() -> seckey.to_bytes()
    * seckey.add(tweak_bytes) -> seckey + Scalar.from_bytes_checked(tweak_bytes)
    * seckey.negate() -> seckey = -seckey
    * seckey.sign_schnorr -> schnorr_sign(..., seckey.to_bytes(), ...)

    * ECPubKey -> GE
    * ECPubKey.set(pubkey_bytes) -> GE.from_bytes_{xonly,compressed}(pubkey_bytes)
    * pubkey.get_y() % 2 == 0 -> pubkey.has_even_y()
    * pubkey.get_bytes(False) -> pubkey.to_bytes_compressed()
    * pubkey.get_bytes() -> pubkey.to_bytes_xonly()
    * not pubkey.valid -> pubkey.infinity
    * pubkey.verify_schnorr -> schnorr_verify(..., pubkey.to_bytes_xonly(), ...)

    * TaggedHash -> tagged_hash
    * hashlib.sha256(preimage).digest() -> hash_sha256(preimage)
This commit is contained in:
Sebastian Falbesoner
2026-01-16 16:26:23 +01:00
parent 511bb99dc4
commit f2ffa99a4a
3 changed files with 106 additions and 802 deletions

View File

@@ -2,7 +2,7 @@ import hashlib
import struct
from io import BytesIO
from ripemd160 import ripemd160
from secp256k1 import ECKey
from secp256k1lab.secp256k1 import Scalar
from typing import Union
@@ -93,7 +93,7 @@ class VinInfo:
else:
self.txinwitness = txinwitness
if private_key is None:
self.private_key = ECKey()
self.private_key = Scalar()
else:
self.private_key = private_key
self.scriptSig = scriptSig

View File

@@ -2,15 +2,19 @@
# For running the test vectors, run this script:
# ./reference.py send_and_receive_test_vectors.json
import hashlib
import json
from pathlib import Path
import sys
from typing import List, Tuple, Dict, cast
from sys import argv, exit
from functools import reduce
# local files
# import the vendored copy of secp256k1lab
sys.path.insert(0, str(Path(__file__).parent / "secp256k1lab/src"))
from secp256k1lab.bip340 import schnorr_sign, schnorr_verify
from secp256k1lab.secp256k1 import G, GE, Scalar
from secp256k1lab.util import tagged_hash, hash_sha256
from bech32m import convertbits, bech32_encode, decode, Encoding
from secp256k1 import ECKey, ECPubKey, TaggedHash, NUMS_H
from bitcoin_utils import (
deser_txid,
from_hex,
@@ -27,9 +31,10 @@ from bitcoin_utils import (
K_max = 2323 # per-group recipient limit
NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0
def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
def get_pubkey_from_input(vin: VinInfo) -> GE:
if is_p2pkh(vin.prevout):
# skip the first 3 op_codes and grab the 20 byte hash
# from the scriptPubKey
@@ -47,20 +52,22 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
pubkey_bytes = vin.scriptSig[i - 33:i]
pubkey_hash = hash160(pubkey_bytes)
if pubkey_hash == spk_hash:
pubkey = ECPubKey().set(pubkey_bytes)
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(pubkey_bytes)
except ValueError:
pass
if is_p2sh(vin.prevout):
redeem_script = vin.scriptSig[1:]
if is_p2wpkh(redeem_script):
pubkey = ECPubKey().set(vin.txinwitness.scriptWitness.stack[-1])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1])
except (ValueError, AssertionError):
pass
if is_p2wpkh(vin.prevout):
txin = vin.txinwitness
pubkey = ECPubKey().set(txin.scriptWitness.stack[-1])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1])
except (ValueError, AssertionError):
pass
if is_p2tr(vin.prevout):
witnessStack = vin.txinwitness.scriptWitness.stack
if (len(witnessStack) >= 1):
@@ -75,71 +82,69 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
internal_key = control_block[1:33]
if (internal_key == NUMS_H.to_bytes(32, 'big')):
# Skip if NUMS_H
return ECPubKey()
return GE()
pubkey = ECPubKey().set(vin.prevout[2:])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_xonly(vin.prevout[2:])
except ValueError:
pass
return GE()
return ECPubKey()
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: ECPubKey) -> bytes:
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: GE) -> bytes:
lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0]
return TaggedHash("BIP0352/Inputs", lowest_outpoint.serialize() + cast(bytes, sum_input_pubkeys.get_bytes(False)))
return tagged_hash("BIP0352/Inputs", lowest_outpoint.serialize() + sum_input_pubkeys.to_bytes_compressed())
def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "tsp", version: int = 0) -> str:
data = convertbits(cast(bytes, B_scan.get_bytes(False)) + cast(bytes, B_m.get_bytes(False)), 8, 5)
def encode_silent_payment_address(B_scan: GE, B_m: GE, hrp: str = "tsp", version: int = 0) -> str:
data = convertbits(B_scan.to_bytes_compressed() + B_m.to_bytes_compressed(), 8, 5)
return bech32_encode(hrp, [version] + cast(List[int], data), Encoding.BECH32M)
def generate_label(b_scan: ECKey, m: int) -> bytes:
return TaggedHash("BIP0352/Label", b_scan.get_bytes() + ser_uint32(m))
def generate_label(b_scan: Scalar, m: int) -> Scalar:
return Scalar.from_bytes_checked(tagged_hash("BIP0352/Label", b_scan.to_bytes() + ser_uint32(m)))
def create_labeled_silent_payment_address(b_scan: ECKey, B_spend: ECPubKey, m: int, hrp: str = "tsp", version: int = 0) -> str:
G = ECKey().set(1).get_pubkey()
B_scan = b_scan.get_pubkey()
def create_labeled_silent_payment_address(b_scan: Scalar, B_spend: GE, m: int, hrp: str = "tsp", version: int = 0) -> str:
B_scan = b_scan * G
B_m = B_spend + generate_label(b_scan, m) * G
labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version)
return labeled_address
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[ECPubKey, ECPubKey]:
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[GE, GE]:
_, data = decode(hrp, address)
if data is None:
return ECPubKey(), ECPubKey()
B_scan = ECPubKey().set(data[:33])
B_spend = ECPubKey().set(data[33:])
return GE(), GE()
B_scan = GE.from_bytes_compressed(data[:33])
B_spend = GE.from_bytes_compressed(data[33:])
return B_scan, B_spend
def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]:
G = ECKey().set(1).get_pubkey()
def create_outputs(input_priv_keys: List[Tuple[Scalar, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]:
negated_keys = []
for key, is_xonly in input_priv_keys:
k = ECKey().set(key.get_bytes())
if is_xonly and k.get_pubkey().get_y() % 2 != 0:
k.negate()
k = Scalar.from_bytes_checked(key.to_bytes())
if is_xonly and not (k * G).has_even_y():
k = -k
negated_keys.append(k)
a_sum = sum(negated_keys)
if not a_sum.valid:
a_sum = Scalar.sum(*negated_keys)
if a_sum == 0:
# Input privkeys sum is zero -> fail
return []
assert ECKey().set(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum"
input_hash = get_input_hash(outpoints, a_sum * G)
silent_payment_groups: Dict[ECPubKey, List[ECPubKey]] = {}
assert Scalar.from_bytes_checked(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum"
input_hash_scalar = Scalar.from_bytes_checked(get_input_hash(outpoints, a_sum * G))
silent_payment_groups: Dict[GE, List[GE]] = {}
for recipient in recipients:
B_scan, B_m = decode_silent_payment_address(recipient["address"], hrp=hrp)
# Verify decoded intermediate keys for recipient
expected_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
expected_B_m = ECPubKey().set(bytes.fromhex(recipient["spend_pub_key"]))
expected_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"]))
expected_B_m = GE.from_bytes_compressed(bytes.fromhex(recipient["spend_pub_key"]))
assert expected_B_scan == B_scan, "B_scan did not match expected recipient.scan_pub_key"
assert expected_B_m == B_m, "B_m did not match expected recipient.spend_pub_key"
if B_scan in silent_payment_groups:
@@ -153,68 +158,67 @@ def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[CO
outputs = []
for B_scan, B_m_values in silent_payment_groups.items():
ecdh_shared_secret = input_hash * a_sum * B_scan
ecdh_shared_secret = input_hash_scalar * a_sum * B_scan
expected_shared_secrets = expected.get("shared_secrets", {})
# Find the recipient address that corresponds to this B_scan and get its index
for recipient_idx, recipient in enumerate(recipients):
recipient_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
recipient_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"]))
if recipient_B_scan == B_scan:
expected_shared_secret_hex = expected_shared_secrets[recipient_idx]
assert ecdh_shared_secret.get_bytes(False).hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}"
assert ecdh_shared_secret.to_bytes_compressed().hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}"
break
k = 0
for B_m in B_m_values:
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k)))
P_km = B_m + t_k * G
outputs.append(P_km.get_bytes().hex())
outputs.append(P_km.to_bytes_xonly().hex())
k += 1
return list(set(outputs))
def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]:
G = ECKey().set(1).get_pubkey()
input_hash_key = ECKey().set(input_hash)
computed_tweak_point = input_hash_key * A_sum
assert computed_tweak_point.get_bytes(False).hex() == expected.get("tweak"), "tweak did not match expected"
ecdh_shared_secret = input_hash * b_scan * A_sum
assert ecdh_shared_secret.get_bytes(False).hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret"
def scanning(b_scan: Scalar, B_spend: GE, A_sum: GE, input_hash: bytes, outputs_to_check: List[bytes], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]:
input_hash_scalar = Scalar.from_bytes_checked(input_hash)
computed_tweak_point = input_hash_scalar * A_sum
assert computed_tweak_point.to_bytes_compressed().hex() == expected.get("tweak"), "tweak did not match expected"
ecdh_shared_secret = input_hash_scalar * b_scan * A_sum
assert ecdh_shared_secret.to_bytes_compressed().hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret"
k = 0
wallet = []
while True:
if k == K_max: # Don't look further than the per-group recipient limit (K_max)
break
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k)))
P_k = B_spend + t_k * G
for output in outputs_to_check:
if P_k == output:
wallet.append({"pub_key": P_k.get_bytes().hex(), "priv_key_tweak": t_k.hex()})
output_ge = GE.from_bytes_xonly(output)
if P_k.to_bytes_xonly() == output:
wallet.append({"pub_key": P_k.to_bytes_xonly().hex(), "priv_key_tweak": t_k.to_bytes().hex()})
outputs_to_check.remove(output)
k += 1
break
elif labels:
m_G_sub = output - P_k
if m_G_sub.get_bytes(False).hex() in labels:
m_G_sub = output_ge - P_k
if m_G_sub.to_bytes_compressed().hex() in labels:
P_km = P_k + m_G_sub
wallet.append({
"pub_key": P_km.get_bytes().hex(),
"priv_key_tweak": (ECKey().set(t_k).add(
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
)).get_bytes().hex(),
"pub_key": P_km.to_bytes_xonly().hex(),
"priv_key_tweak": (t_k + Scalar.from_bytes_checked(
bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()])
)).to_bytes().hex(),
})
outputs_to_check.remove(output)
k += 1
break
else:
output.negate()
m_G_sub = output - P_k
if m_G_sub.get_bytes(False).hex() in labels:
m_G_sub = -output_ge - P_k
if m_G_sub.to_bytes_compressed().hex() in labels:
P_km = P_k + m_G_sub
wallet.append({
"pub_key": P_km.get_bytes().hex(),
"priv_key_tweak": (ECKey().set(t_k).add(
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
)).get_bytes().hex(),
"pub_key": P_km.to_bytes_xonly().hex(),
"priv_key_tweak": (t_k + Scalar.from_bytes_checked(
bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()])
)).to_bytes().hex(),
})
outputs_to_check.remove(output)
k += 1
@@ -225,15 +229,13 @@ def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: byte
if __name__ == "__main__":
if len(argv) != 2 or argv[1] in ('-h', '--help'):
if len(sys.argv) != 2 or sys.argv[1] in ('-h', '--help'):
print("Usage: ./reference.py send_and_receive_test_vectors.json")
exit(0)
sys.exit(0)
with open(argv[1], "r") as f:
with open(sys.argv[1], "r") as f:
test_data = json.loads(f.read())
# G , needed for generating the labels "database"
G = ECKey().set(1).get_pubkey()
for case in test_data:
print(case["comment"])
# Test sending
@@ -247,7 +249,7 @@ if __name__ == "__main__":
scriptSig=bytes.fromhex(input["scriptSig"]),
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
private_key=ECKey().set(bytes.fromhex(input["private_key"])),
private_key=Scalar.from_bytes_checked(bytes.fromhex(input["private_key"])),
)
for input in given["vin"]
]
@@ -256,14 +258,14 @@ if __name__ == "__main__":
input_pub_keys = []
for vin in vins:
pubkey = get_pubkey_from_input(vin)
if not pubkey.valid:
if pubkey.infinity:
continue
input_priv_keys.append((
vin.private_key,
is_p2tr(vin.prevout),
))
input_pub_keys.append(pubkey)
assert [pk.get_bytes(False).hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected"
assert [pk.to_bytes_compressed().hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected"
sending_outputs = []
if (len(input_pub_keys) > 0):
@@ -283,13 +285,13 @@ if __name__ == "__main__":
assert(sending_outputs == expected["outputs"][0] == []), "Sending test failed"
# Test receiving
msg = hashlib.sha256(b"message").digest()
aux = hashlib.sha256(b"random auxiliary data").digest()
msg = hash_sha256(b"message")
aux = hash_sha256(b"random auxiliary data")
for receiving_test in case["receiving"]:
given = receiving_test["given"]
expected = receiving_test["expected"]
outputs_to_check = [
ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"]
bytes.fromhex(p) for p in given["outputs"]
]
vins = [
VinInfo(
@@ -302,12 +304,10 @@ if __name__ == "__main__":
]
# Check that the given inputs for the receiving test match what was generated during the sending test
receiving_addresses = []
b_scan = ECKey().set(bytes.fromhex(given["key_material"]["scan_priv_key"]))
b_spend = ECKey().set(
bytes.fromhex(given["key_material"]["spend_priv_key"])
)
B_scan = b_scan.get_pubkey()
B_spend = b_spend.get_pubkey()
b_scan = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["scan_priv_key"]))
b_spend = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["spend_priv_key"]))
B_scan = b_scan * G
B_spend = b_spend * G
receiving_addresses.append(
encode_silent_payment_address(B_scan, B_spend, hrp="sp")
)
@@ -324,21 +324,21 @@ if __name__ == "__main__":
input_pub_keys = []
for vin in vins:
pubkey = get_pubkey_from_input(vin)
if not pubkey.valid:
if pubkey.infinity:
continue
input_pub_keys.append(pubkey)
add_to_wallet = []
if (len(input_pub_keys) > 0):
A_sum = reduce(lambda x, y: x + y, input_pub_keys)
if A_sum.get_bytes() is None:
A_sum = GE.sum(*input_pub_keys)
if A_sum.infinity:
# Input pubkeys sum is point at infinity -> skip tx
assert expected["outputs"] == []
continue
assert A_sum.get_bytes(False).hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum"
assert A_sum.to_bytes_compressed().hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum"
input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum)
pre_computed_labels = {
(generate_label(b_scan, label) * G).get_bytes(False).hex(): generate_label(b_scan, label).hex()
(generate_label(b_scan, label) * G).to_bytes_compressed().hex(): generate_label(b_scan, label).to_bytes().hex()
for label in given["labels"]
}
add_to_wallet = scanning(
@@ -353,13 +353,13 @@ if __name__ == "__main__":
# Check that the private key is correct for the found output public key
for output in add_to_wallet:
pub_key = ECPubKey().set(bytes.fromhex(output["pub_key"]))
full_private_key = b_spend.add(bytes.fromhex(output["priv_key_tweak"]))
if full_private_key.get_pubkey().get_y() % 2 != 0:
full_private_key.negate()
pub_key = GE.from_bytes_xonly(bytes.fromhex(output["pub_key"]))
full_private_key = b_spend + Scalar.from_bytes_checked(bytes.fromhex(output["priv_key_tweak"]))
if not (full_private_key * G).has_even_y():
full_private_key = -full_private_key
sig = full_private_key.sign_schnorr(msg, aux)
assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}"
sig = schnorr_sign(msg, full_private_key.to_bytes(), aux)
assert schnorr_verify(msg, pub_key.to_bytes_xonly(), sig), f"Invalid signature for {pub_key}"
output["signature"] = sig.hex()
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses

View File

@@ -1,696 +0,0 @@
# Copyright (c) 2019 Pieter Wuille
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test-only secp256k1 elliptic curve implementation
WARNING: This code is slow, uses bad randomness, does not properly protect
keys, and is trivially vulnerable to side channel attacks. Do not use for
anything but tests."""
import random
import hashlib
import hmac
def TaggedHash(tag, data):
ss = hashlib.sha256(tag.encode('utf-8')).digest()
ss += ss
ss += data
return hashlib.sha256(ss).digest()
def modinv(a, n):
"""Compute the modular inverse of a modulo n
See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers.
"""
t1, t2 = 0, 1
r1, r2 = n, a
while r2 != 0:
q = r1 // r2
t1, t2 = t2, t1 - q * t2
r1, r2 = r2, r1 - q * r2
if r1 > 1:
return None
if t1 < 0:
t1 += n
return t1
def jacobi_symbol(n, k):
"""Compute the Jacobi symbol of n modulo k
See http://en.wikipedia.org/wiki/Jacobi_symbol
For our application k is always prime, so this is the same as the Legendre symbol."""
assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k"
n %= k
t = 0
while n != 0:
while n & 1 == 0:
n >>= 1
r = k & 7
t ^= (r == 3 or r == 5)
n, k = k, n
t ^= (n & k & 3 == 3)
n = n % k
if k == 1:
return -1 if t else 1
return 0
def modsqrt(a, p):
"""Compute the square root of a modulo p when p % 4 = 3.
The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm
Limiting this function to only work for p % 4 = 3 means we don't need to
iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd
is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4)
secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4.
"""
if p % 4 != 3:
raise NotImplementedError("modsqrt only implemented for p % 4 = 3")
sqrt = pow(a, (p + 1)//4, p)
if pow(sqrt, 2, p) == a % p:
return sqrt
return None
def int_or_bytes(s):
"Convert 32-bytes to int while accepting also int and returning it as is."
if isinstance(s, bytes):
assert(len(s) == 32)
s = int.from_bytes(s, 'big')
elif not isinstance(s, int):
raise TypeError
return s
class EllipticCurve:
def __init__(self, p, a, b):
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
self.p = p
self.a = a % p
self.b = b % p
def affine(self, p1):
"""Convert a Jacobian point tuple p1 to affine form, or None if at infinity.
An affine point is represented as the Jacobian (x, y, 1)"""
x1, y1, z1 = p1
if z1 == 0:
return None
inv = modinv(z1, self.p)
inv_2 = (inv**2) % self.p
inv_3 = (inv_2 * inv) % self.p
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
def has_even_y(self, p1):
"""Whether the point p1 has an even Y coordinate when expressed in affine coordinates."""
return not (p1[2] == 0 or self.affine(p1)[1] & 1)
def negate(self, p1):
"""Negate a Jacobian point tuple p1."""
x1, y1, z1 = p1
return (x1, (self.p - y1) % self.p, z1)
def on_curve(self, p1):
"""Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
x1, y1, z1 = p1
z2 = pow(z1, 2, self.p)
z4 = pow(z2, 2, self.p)
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
def is_x_coord(self, x):
"""Test whether x is a valid X coordinate on the curve."""
x_3 = pow(x, 3, self.p)
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
def lift_x(self, x):
"""Given an X coordinate on the curve, return a corresponding affine point."""
x_3 = pow(x, 3, self.p)
v = x_3 + self.a * x + self.b
y = modsqrt(v, self.p)
if y is None:
return None
return (x, y, 1)
def double(self, p1):
"""Double a Jacobian tuple p1
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling"""
x1, y1, z1 = p1
if z1 == 0:
return (0, 1, 0)
y1_2 = (y1**2) % self.p
y1_4 = (y1_2**2) % self.p
x1_2 = (x1**2) % self.p
s = (4*x1*y1_2) % self.p
m = 3*x1_2
if self.a:
m += self.a * pow(z1, 4, self.p)
m = m % self.p
x2 = (m**2 - 2*s) % self.p
y2 = (m*(s - x2) - 8*y1_4) % self.p
z2 = (2*y1*z1) % self.p
return (x2, y2, z2)
def add_mixed(self, p1, p2):
"""Add a Jacobian tuple p1 and an affine tuple p2
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
x1, y1, z1 = p1
x2, y2, z2 = p2
assert(z2 == 1)
# Adding to the point at infinity is a no-op
if z1 == 0:
return p2
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
u2 = (x2 * z1_2) % self.p
s2 = (y2 * z1_3) % self.p
if x1 == u2:
if (y1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - x1
r = s2 - y1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (x1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
z3 = (h*z1) % self.p
return (x3, y3, z3)
def add(self, p1, p2):
"""Add two Jacobian tuples p1 and p2
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition"""
x1, y1, z1 = p1
x2, y2, z2 = p2
# Adding the point at infinity is a no-op
if z1 == 0:
return p2
if z2 == 0:
return p1
# Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1
if z1 == 1:
return self.add_mixed(p2, p1)
if z2 == 1:
return self.add_mixed(p1, p2)
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
z2_2 = (z2**2) % self.p
z2_3 = (z2_2 * z2) % self.p
u1 = (x1 * z2_2) % self.p
u2 = (x2 * z1_2) % self.p
s1 = (y1 * z2_3) % self.p
s2 = (y2 * z1_3) % self.p
if u1 == u2:
if (s1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - u1
r = s2 - s1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (u1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
z3 = (h*z1*z2) % self.p
return (x3, y3, z3)
def mul(self, ps):
"""Compute a (multi) point multiplication
ps is a list of (Jacobian tuple, scalar) pairs.
"""
r = (0, 1, 0)
for i in range(255, -1, -1):
r = self.double(r)
for (p, n) in ps:
if ((n >> i) & 1):
r = self.add(r, p)
return r
SECP256K1_FIELD_SIZE = 2**256 - 2**32 - 977
SECP256K1 = EllipticCurve(SECP256K1_FIELD_SIZE, 0, 7)
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0
class ECPubKey():
"""A secp256k1 public key"""
def __init__(self):
"""Construct an uninitialized public key"""
self.valid = False
def __repr__(self):
return self.get_bytes().hex()
def __eq__(self, other):
assert isinstance(other, ECPubKey)
return self.get_bytes() == other.get_bytes()
def __hash__(self):
return hash(self.get_bytes())
def set(self, data):
"""Construct a public key from a serialization in compressed or uncompressed DER format or BIP340 format"""
if (len(data) == 65 and data[0] == 0x04):
p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
self.valid = SECP256K1.on_curve(p)
if self.valid:
self.p = p
self.compressed = False
elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
x = int.from_bytes(data[1:33], 'big')
if SECP256K1.is_x_coord(x):
p = SECP256K1.lift_x(x)
# if the oddness of the y co-ord isn't correct, find the other
# valid y
if (p[1] & 1) != (data[0] & 1):
p = SECP256K1.negate(p)
self.p = p
self.valid = True
self.compressed = True
else:
self.valid = False
elif (len(data) == 32):
x = int.from_bytes(data[0:32], 'big')
if SECP256K1.is_x_coord(x):
p = SECP256K1.lift_x(x)
# if the oddness of the y co-ord isn't correct, find the other
# valid y
if p[1]%2 != 0:
p = SECP256K1.negate(p)
self.p = p
self.valid = True
self.compressed = True
else:
self.valid = False
else:
self.valid = False
return self
@property
def is_compressed(self):
return self.compressed
@property
def is_valid(self):
return self.valid
def get_y(self):
return SECP256K1.affine(self.p)[1]
def get_x(self):
return SECP256K1.affine(self.p)[0]
def get_bytes(self, bip340=True):
assert(self.valid)
p = SECP256K1.affine(self.p)
if p is None:
return None
if bip340:
return bytes(p[0].to_bytes(32, 'big'))
elif self.compressed:
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
else:
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
def verify_ecdsa(self, sig, msg, low_s=True):
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA verifier algorithm"""
assert(self.valid)
# Extract r and s from the DER formatted signature. Return false for
# any DER encoding errors.
if (sig[1] + 2 != len(sig)):
return False
if (len(sig) < 4):
return False
if (sig[0] != 0x30):
return False
if (sig[2] != 0x02):
return False
rlen = sig[3]
if (len(sig) < 6 + rlen):
return False
if rlen < 1 or rlen > 33:
return False
if sig[4] >= 0x80:
return False
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
return False
r = int.from_bytes(sig[4:4+rlen], 'big')
if (sig[4+rlen] != 0x02):
return False
slen = sig[5+rlen]
if slen < 1 or slen > 33:
return False
if (len(sig) != 6 + rlen + slen):
return False
if sig[6+rlen] >= 0x80:
return False
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
return False
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
# Verify that r and s are within the group order
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
return False
if low_s and s >= SECP256K1_ORDER_HALF:
return False
z = int.from_bytes(msg, 'big')
# Run verifier algorithm on r, s
w = modinv(s, SECP256K1_ORDER)
u1 = z*w % SECP256K1_ORDER
u2 = r*w % SECP256K1_ORDER
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
if R is None or R[0] != r:
return False
return True
def verify_schnorr(self, sig, msg):
assert(len(msg) == 32)
assert(len(sig) == 64)
assert(self.valid)
r = int.from_bytes(sig[0:32], 'big')
if r >= SECP256K1_FIELD_SIZE:
return False
s = int.from_bytes(sig[32:64], 'big')
if s >= SECP256K1_ORDER:
return False
e = int.from_bytes(TaggedHash("BIP0340/challenge", sig[0:32] + self.get_bytes() + msg), 'big') % SECP256K1_ORDER
R = SECP256K1.mul([(SECP256K1_G, s), (self.p, SECP256K1_ORDER - e)])
if not SECP256K1.has_even_y(R):
return False
if ((r * R[2] * R[2]) % SECP256K1_FIELD_SIZE) != R[0]:
return False
return True
def __add__(self, other):
"""Adds two ECPubKey points."""
assert isinstance(other, ECPubKey)
assert self.valid
assert other.valid
ret = ECPubKey()
ret.p = SECP256K1.add(other.p, self.p)
ret.valid = True
ret.compressed = self.compressed
return ret
def __radd__(self, other):
"""Allows this ECPubKey to be added to 0 for sum()"""
if other == 0:
return self
else:
return self + other
def __mul__(self, other):
"""Multiplies ECPubKey point with a scalar(int/32bytes/ECKey)."""
if isinstance(other, ECKey):
assert self.valid
assert other.secret is not None
multiplier = other.secret
else:
# int_or_bytes checks that other is `int` or `bytes`
multiplier = int_or_bytes(other)
assert multiplier < SECP256K1_ORDER
multiplier = multiplier % SECP256K1_ORDER
ret = ECPubKey()
ret.p = SECP256K1.mul([(self.p, multiplier)])
ret.valid = True
ret.compressed = self.compressed
return ret
def __rmul__(self, other):
"""Multiplies a scalar(int/32bytes/ECKey) with an ECPubKey point"""
return self * other
def __sub__(self, other):
"""Subtract one point from another"""
assert isinstance(other, ECPubKey)
assert self.valid
assert other.valid
ret = ECPubKey()
ret.p = SECP256K1.add(self.p, SECP256K1.negate(other.p))
ret.valid = True
ret.compressed = self.compressed
return ret
def tweak_add(self, tweak):
assert(self.valid)
t = int_or_bytes(tweak)
if t >= SECP256K1_ORDER:
return None
tweaked = SECP256K1.affine(SECP256K1.mul([(self.p, 1), (SECP256K1_G, t)]))
if tweaked is None:
return None
ret = ECPubKey()
ret.p = tweaked
ret.valid = True
ret.compressed = self.compressed
return ret
def mul(self, data):
"""Multiplies ECPubKey point with scalar data."""
assert self.valid
other = ECKey()
other.set(data, True)
return self * other
def negate(self):
self.p = SECP256K1.affine(SECP256K1.negate(self.p))
def rfc6979_nonce(key):
"""Compute signing nonce using RFC6979."""
v = bytes([1] * 32)
k = bytes([0] * 32)
k = hmac.new(k, v + b"\x00" + key, 'sha256').digest()
v = hmac.new(k, v, 'sha256').digest()
k = hmac.new(k, v + b"\x01" + key, 'sha256').digest()
v = hmac.new(k, v, 'sha256').digest()
return hmac.new(k, v, 'sha256').digest()
class ECKey():
"""A secp256k1 private key"""
def __init__(self):
self.valid = False
def __repr__(self):
return str(self.secret)
def __eq__(self, other):
assert isinstance(other, ECKey)
return self.secret == other.secret
def __hash__(self):
return hash(self.secret)
def set(self, secret, compressed=True):
"""Construct a private key object from either 32-bytes or an int secret and a compressed flag."""
secret = int_or_bytes(secret)
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
return self
def generate(self, compressed=True):
"""Generate a random private key (compressed or uncompressed)."""
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
return self
def get_bytes(self):
"""Retrieve the 32-byte representation of this key."""
assert(self.valid)
return self.secret.to_bytes(32, 'big')
def as_int(self):
return self.secret
def from_int(self, secret, compressed=True):
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
def __add__(self, other):
"""Add key secrets. Returns compressed key."""
assert isinstance(other, ECKey)
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret + other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
def __radd__(self, other):
"""Allows this ECKey to be added to 0 for sum()"""
if other == 0:
return self
else:
return self + other
def __sub__(self, other):
"""Subtract key secrets. Returns compressed key."""
assert isinstance(other, ECKey)
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret - other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
def __mul__(self, other):
"""Multiply a private key by another private key or multiply a public key by a private key. Returns compressed key."""
if isinstance(other, ECKey):
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret * other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
elif isinstance(other, ECPubKey):
return other * self
else:
# ECKey().set() checks that other is an `int` or `bytes`
assert self.valid
second = ECKey().set(other, self.compressed)
return self * second
def __rmul__(self, other):
return self * other
def add(self, data):
"""Add key to scalar data. Returns compressed key."""
other = ECKey()
other.set(data, True)
return self + other
def mul(self, data):
"""Multiply key secret with scalar data. Returns compressed key."""
other = ECKey()
other.set(data, True)
return self * other
def negate(self):
"""Negate a private key."""
assert self.valid
self.secret = SECP256K1_ORDER - self.secret
@property
def is_valid(self):
return self.valid
@property
def is_compressed(self):
return self.compressed
def get_pubkey(self):
"""Compute an ECPubKey object for this secret key."""
assert(self.valid)
ret = ECPubKey()
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
ret.p = p
ret.valid = True
ret.compressed = self.compressed
return ret
def sign_ecdsa(self, msg, low_s=True, rfc6979=False):
"""Construct a DER-encoded ECDSA signature with this key.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA signer algorithm."""
assert(self.valid)
z = int.from_bytes(msg, 'big')
# Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation)
if rfc6979:
k = int.from_bytes(rfc6979_nonce(self.secret.to_bytes(32, 'big') + msg), 'big')
else:
k = random.randrange(1, SECP256K1_ORDER)
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
r = R[0] % SECP256K1_ORDER
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
if low_s and s > SECP256K1_ORDER_HALF:
s = SECP256K1_ORDER - s
# Represent in DER format. The byte representations of r and s have
# length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33
# bytes).
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb
def sign_schnorr(self, msg, aux=None):
"""Create a Schnorr signature (see BIP340)."""
if aux is None:
aux = bytes(32)
assert self.valid
assert len(msg) == 32
assert len(aux) == 32
t = (self.secret ^ int.from_bytes(TaggedHash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big')
kp = int.from_bytes(TaggedHash("BIP0340/nonce", t + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
assert kp != 0
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
k = kp if SECP256K1.has_even_y(R) else SECP256K1_ORDER - kp
e = int.from_bytes(TaggedHash("BIP0340/challenge", R[0].to_bytes(32, 'big') + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
return R[0].to_bytes(32, 'big') + ((k + e * self.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
def tweak_add(self, tweak):
"""Return a tweaked version of this private key."""
assert(self.valid)
t = int_or_bytes(tweak)
if t >= SECP256K1_ORDER:
return None
tweaked = (self.secret + t) % SECP256K1_ORDER
if tweaked == 0:
return None
ret = ECKey()
ret.set(tweaked.to_bytes(32, 'big'), self.compressed)
return ret
def generate_key_pair(secret=None, compressed=True):
"""Convenience function to generate a private-public key pair."""
d = ECKey()
if secret:
d.set(secret, compressed)
else:
d.generate(compressed)
P = d.get_pubkey()
return d, P
def generate_bip340_key_pair():
"""Convenience function to generate a BIP0340 private-public key pair."""
d = ECKey()
d.generate()
P = d.get_pubkey()
if P.get_y()%2 != 0:
d.negate()
P.negate()
return d, P
def generate_schnorr_nonce():
"""Generate a random valid BIP340 nonce.
See https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki.
This implementation ensures the y-coordinate of the nonce point is even."""
kp = random.randrange(1, SECP256K1_ORDER)
assert kp != 0
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
k = kp if R[1] % 2 == 0 else SECP256K1_ORDER - kp
k_key = ECKey()
k_key.set(k.to_bytes(32, 'big'), True)
return k_key