diff --git a/bip-0352/bitcoin_utils.py b/bip-0352/bitcoin_utils.py index 9e279e45..7e9bd9b2 100644 --- a/bip-0352/bitcoin_utils.py +++ b/bip-0352/bitcoin_utils.py @@ -2,7 +2,7 @@ import hashlib import struct from io import BytesIO from ripemd160 import ripemd160 -from secp256k1 import ECKey +from secp256k1lab.secp256k1 import Scalar from typing import Union @@ -93,7 +93,7 @@ class VinInfo: else: self.txinwitness = txinwitness if private_key is None: - self.private_key = ECKey() + self.private_key = Scalar() else: self.private_key = private_key self.scriptSig = scriptSig diff --git a/bip-0352/reference.py b/bip-0352/reference.py index 394fcc4d..714050e4 100755 --- a/bip-0352/reference.py +++ b/bip-0352/reference.py @@ -2,15 +2,19 @@ # For running the test vectors, run this script: # ./reference.py send_and_receive_test_vectors.json -import hashlib import json +from pathlib import Path +import sys from typing import List, Tuple, Dict, cast -from sys import argv, exit -from functools import reduce -# local files +# import the vendored copy of secp256k1lab +sys.path.insert(0, str(Path(__file__).parent / "secp256k1lab/src")) +from secp256k1lab.bip340 import schnorr_sign, schnorr_verify +from secp256k1lab.secp256k1 import G, GE, Scalar +from secp256k1lab.util import tagged_hash, hash_sha256 + + from bech32m import convertbits, bech32_encode, decode, Encoding -from secp256k1 import ECKey, ECPubKey, TaggedHash, NUMS_H from bitcoin_utils import ( deser_txid, from_hex, @@ -27,9 +31,10 @@ from bitcoin_utils import ( K_max = 2323 # per-group recipient limit +NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0 -def get_pubkey_from_input(vin: VinInfo) -> ECPubKey: +def get_pubkey_from_input(vin: VinInfo) -> GE: if is_p2pkh(vin.prevout): # skip the first 3 op_codes and grab the 20 byte hash # from the scriptPubKey @@ -47,20 +52,22 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey: pubkey_bytes = vin.scriptSig[i - 33:i] pubkey_hash = hash160(pubkey_bytes) if pubkey_hash == spk_hash: - pubkey = ECPubKey().set(pubkey_bytes) - if (pubkey.valid) & (pubkey.compressed): - return pubkey + try: + return GE.from_bytes_compressed(pubkey_bytes) + except ValueError: + pass if is_p2sh(vin.prevout): redeem_script = vin.scriptSig[1:] if is_p2wpkh(redeem_script): - pubkey = ECPubKey().set(vin.txinwitness.scriptWitness.stack[-1]) - if (pubkey.valid) & (pubkey.compressed): - return pubkey + try: + return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1]) + except (ValueError, AssertionError): + pass if is_p2wpkh(vin.prevout): - txin = vin.txinwitness - pubkey = ECPubKey().set(txin.scriptWitness.stack[-1]) - if (pubkey.valid) & (pubkey.compressed): - return pubkey + try: + return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1]) + except (ValueError, AssertionError): + pass if is_p2tr(vin.prevout): witnessStack = vin.txinwitness.scriptWitness.stack if (len(witnessStack) >= 1): @@ -75,71 +82,69 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey: internal_key = control_block[1:33] if (internal_key == NUMS_H.to_bytes(32, 'big')): # Skip if NUMS_H - return ECPubKey() + return GE() - pubkey = ECPubKey().set(vin.prevout[2:]) - if (pubkey.valid) & (pubkey.compressed): - return pubkey + try: + return GE.from_bytes_xonly(vin.prevout[2:]) + except ValueError: + pass + + return GE() - return ECPubKey() - - -def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: ECPubKey) -> bytes: +def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: GE) -> bytes: lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0] - return TaggedHash("BIP0352/Inputs", lowest_outpoint.serialize() + cast(bytes, sum_input_pubkeys.get_bytes(False))) + return tagged_hash("BIP0352/Inputs", lowest_outpoint.serialize() + sum_input_pubkeys.to_bytes_compressed()) -def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "tsp", version: int = 0) -> str: - data = convertbits(cast(bytes, B_scan.get_bytes(False)) + cast(bytes, B_m.get_bytes(False)), 8, 5) +def encode_silent_payment_address(B_scan: GE, B_m: GE, hrp: str = "tsp", version: int = 0) -> str: + data = convertbits(B_scan.to_bytes_compressed() + B_m.to_bytes_compressed(), 8, 5) return bech32_encode(hrp, [version] + cast(List[int], data), Encoding.BECH32M) -def generate_label(b_scan: ECKey, m: int) -> bytes: - return TaggedHash("BIP0352/Label", b_scan.get_bytes() + ser_uint32(m)) +def generate_label(b_scan: Scalar, m: int) -> Scalar: + return Scalar.from_bytes_checked(tagged_hash("BIP0352/Label", b_scan.to_bytes() + ser_uint32(m))) -def create_labeled_silent_payment_address(b_scan: ECKey, B_spend: ECPubKey, m: int, hrp: str = "tsp", version: int = 0) -> str: - G = ECKey().set(1).get_pubkey() - B_scan = b_scan.get_pubkey() +def create_labeled_silent_payment_address(b_scan: Scalar, B_spend: GE, m: int, hrp: str = "tsp", version: int = 0) -> str: + B_scan = b_scan * G B_m = B_spend + generate_label(b_scan, m) * G labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version) return labeled_address -def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[ECPubKey, ECPubKey]: +def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[GE, GE]: _, data = decode(hrp, address) if data is None: - return ECPubKey(), ECPubKey() - B_scan = ECPubKey().set(data[:33]) - B_spend = ECPubKey().set(data[33:]) + return GE(), GE() + B_scan = GE.from_bytes_compressed(data[:33]) + B_spend = GE.from_bytes_compressed(data[33:]) return B_scan, B_spend -def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]: - G = ECKey().set(1).get_pubkey() +def create_outputs(input_priv_keys: List[Tuple[Scalar, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]: negated_keys = [] for key, is_xonly in input_priv_keys: - k = ECKey().set(key.get_bytes()) - if is_xonly and k.get_pubkey().get_y() % 2 != 0: - k.negate() + k = Scalar.from_bytes_checked(key.to_bytes()) + if is_xonly and not (k * G).has_even_y(): + k = -k negated_keys.append(k) - a_sum = sum(negated_keys) - if not a_sum.valid: + a_sum = Scalar.sum(*negated_keys) + if a_sum == 0: # Input privkeys sum is zero -> fail return [] - assert ECKey().set(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum" - input_hash = get_input_hash(outpoints, a_sum * G) - silent_payment_groups: Dict[ECPubKey, List[ECPubKey]] = {} + assert Scalar.from_bytes_checked(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum" + input_hash_scalar = Scalar.from_bytes_checked(get_input_hash(outpoints, a_sum * G)) + silent_payment_groups: Dict[GE, List[GE]] = {} for recipient in recipients: B_scan, B_m = decode_silent_payment_address(recipient["address"], hrp=hrp) # Verify decoded intermediate keys for recipient - expected_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"])) - expected_B_m = ECPubKey().set(bytes.fromhex(recipient["spend_pub_key"])) + expected_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"])) + expected_B_m = GE.from_bytes_compressed(bytes.fromhex(recipient["spend_pub_key"])) assert expected_B_scan == B_scan, "B_scan did not match expected recipient.scan_pub_key" assert expected_B_m == B_m, "B_m did not match expected recipient.spend_pub_key" if B_scan in silent_payment_groups: @@ -153,68 +158,67 @@ def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[CO outputs = [] for B_scan, B_m_values in silent_payment_groups.items(): - ecdh_shared_secret = input_hash * a_sum * B_scan + ecdh_shared_secret = input_hash_scalar * a_sum * B_scan expected_shared_secrets = expected.get("shared_secrets", {}) # Find the recipient address that corresponds to this B_scan and get its index for recipient_idx, recipient in enumerate(recipients): - recipient_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"])) + recipient_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"])) if recipient_B_scan == B_scan: expected_shared_secret_hex = expected_shared_secrets[recipient_idx] - assert ecdh_shared_secret.get_bytes(False).hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}" + assert ecdh_shared_secret.to_bytes_compressed().hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}" break k = 0 for B_m in B_m_values: - t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k)) + t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k))) P_km = B_m + t_k * G - outputs.append(P_km.get_bytes().hex()) + outputs.append(P_km.to_bytes_xonly().hex()) k += 1 return list(set(outputs)) -def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]: - G = ECKey().set(1).get_pubkey() - input_hash_key = ECKey().set(input_hash) - computed_tweak_point = input_hash_key * A_sum - assert computed_tweak_point.get_bytes(False).hex() == expected.get("tweak"), "tweak did not match expected" - ecdh_shared_secret = input_hash * b_scan * A_sum - assert ecdh_shared_secret.get_bytes(False).hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret" +def scanning(b_scan: Scalar, B_spend: GE, A_sum: GE, input_hash: bytes, outputs_to_check: List[bytes], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]: + input_hash_scalar = Scalar.from_bytes_checked(input_hash) + computed_tweak_point = input_hash_scalar * A_sum + assert computed_tweak_point.to_bytes_compressed().hex() == expected.get("tweak"), "tweak did not match expected" + ecdh_shared_secret = input_hash_scalar * b_scan * A_sum + assert ecdh_shared_secret.to_bytes_compressed().hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret" k = 0 wallet = [] while True: if k == K_max: # Don't look further than the per-group recipient limit (K_max) break - t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k)) + t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k))) P_k = B_spend + t_k * G for output in outputs_to_check: - if P_k == output: - wallet.append({"pub_key": P_k.get_bytes().hex(), "priv_key_tweak": t_k.hex()}) + output_ge = GE.from_bytes_xonly(output) + if P_k.to_bytes_xonly() == output: + wallet.append({"pub_key": P_k.to_bytes_xonly().hex(), "priv_key_tweak": t_k.to_bytes().hex()}) outputs_to_check.remove(output) k += 1 break elif labels: - m_G_sub = output - P_k - if m_G_sub.get_bytes(False).hex() in labels: + m_G_sub = output_ge - P_k + if m_G_sub.to_bytes_compressed().hex() in labels: P_km = P_k + m_G_sub wallet.append({ - "pub_key": P_km.get_bytes().hex(), - "priv_key_tweak": (ECKey().set(t_k).add( - bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()]) - )).get_bytes().hex(), + "pub_key": P_km.to_bytes_xonly().hex(), + "priv_key_tweak": (t_k + Scalar.from_bytes_checked( + bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()]) + )).to_bytes().hex(), }) outputs_to_check.remove(output) k += 1 break else: - output.negate() - m_G_sub = output - P_k - if m_G_sub.get_bytes(False).hex() in labels: + m_G_sub = -output_ge - P_k + if m_G_sub.to_bytes_compressed().hex() in labels: P_km = P_k + m_G_sub wallet.append({ - "pub_key": P_km.get_bytes().hex(), - "priv_key_tweak": (ECKey().set(t_k).add( - bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()]) - )).get_bytes().hex(), + "pub_key": P_km.to_bytes_xonly().hex(), + "priv_key_tweak": (t_k + Scalar.from_bytes_checked( + bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()]) + )).to_bytes().hex(), }) outputs_to_check.remove(output) k += 1 @@ -225,15 +229,13 @@ def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: byte if __name__ == "__main__": - if len(argv) != 2 or argv[1] in ('-h', '--help'): + if len(sys.argv) != 2 or sys.argv[1] in ('-h', '--help'): print("Usage: ./reference.py send_and_receive_test_vectors.json") - exit(0) + sys.exit(0) - with open(argv[1], "r") as f: + with open(sys.argv[1], "r") as f: test_data = json.loads(f.read()) - # G , needed for generating the labels "database" - G = ECKey().set(1).get_pubkey() for case in test_data: print(case["comment"]) # Test sending @@ -247,7 +249,7 @@ if __name__ == "__main__": scriptSig=bytes.fromhex(input["scriptSig"]), txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])), prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]), - private_key=ECKey().set(bytes.fromhex(input["private_key"])), + private_key=Scalar.from_bytes_checked(bytes.fromhex(input["private_key"])), ) for input in given["vin"] ] @@ -256,14 +258,14 @@ if __name__ == "__main__": input_pub_keys = [] for vin in vins: pubkey = get_pubkey_from_input(vin) - if not pubkey.valid: + if pubkey.infinity: continue input_priv_keys.append(( vin.private_key, is_p2tr(vin.prevout), )) input_pub_keys.append(pubkey) - assert [pk.get_bytes(False).hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected" + assert [pk.to_bytes_compressed().hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected" sending_outputs = [] if (len(input_pub_keys) > 0): @@ -283,13 +285,13 @@ if __name__ == "__main__": assert(sending_outputs == expected["outputs"][0] == []), "Sending test failed" # Test receiving - msg = hashlib.sha256(b"message").digest() - aux = hashlib.sha256(b"random auxiliary data").digest() + msg = hash_sha256(b"message") + aux = hash_sha256(b"random auxiliary data") for receiving_test in case["receiving"]: given = receiving_test["given"] expected = receiving_test["expected"] outputs_to_check = [ - ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"] + bytes.fromhex(p) for p in given["outputs"] ] vins = [ VinInfo( @@ -302,12 +304,10 @@ if __name__ == "__main__": ] # Check that the given inputs for the receiving test match what was generated during the sending test receiving_addresses = [] - b_scan = ECKey().set(bytes.fromhex(given["key_material"]["scan_priv_key"])) - b_spend = ECKey().set( - bytes.fromhex(given["key_material"]["spend_priv_key"]) - ) - B_scan = b_scan.get_pubkey() - B_spend = b_spend.get_pubkey() + b_scan = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["scan_priv_key"])) + b_spend = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["spend_priv_key"])) + B_scan = b_scan * G + B_spend = b_spend * G receiving_addresses.append( encode_silent_payment_address(B_scan, B_spend, hrp="sp") ) @@ -324,21 +324,21 @@ if __name__ == "__main__": input_pub_keys = [] for vin in vins: pubkey = get_pubkey_from_input(vin) - if not pubkey.valid: + if pubkey.infinity: continue input_pub_keys.append(pubkey) add_to_wallet = [] if (len(input_pub_keys) > 0): - A_sum = reduce(lambda x, y: x + y, input_pub_keys) - if A_sum.get_bytes() is None: + A_sum = GE.sum(*input_pub_keys) + if A_sum.infinity: # Input pubkeys sum is point at infinity -> skip tx assert expected["outputs"] == [] continue - assert A_sum.get_bytes(False).hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum" + assert A_sum.to_bytes_compressed().hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum" input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum) pre_computed_labels = { - (generate_label(b_scan, label) * G).get_bytes(False).hex(): generate_label(b_scan, label).hex() + (generate_label(b_scan, label) * G).to_bytes_compressed().hex(): generate_label(b_scan, label).to_bytes().hex() for label in given["labels"] } add_to_wallet = scanning( @@ -353,13 +353,13 @@ if __name__ == "__main__": # Check that the private key is correct for the found output public key for output in add_to_wallet: - pub_key = ECPubKey().set(bytes.fromhex(output["pub_key"])) - full_private_key = b_spend.add(bytes.fromhex(output["priv_key_tweak"])) - if full_private_key.get_pubkey().get_y() % 2 != 0: - full_private_key.negate() + pub_key = GE.from_bytes_xonly(bytes.fromhex(output["pub_key"])) + full_private_key = b_spend + Scalar.from_bytes_checked(bytes.fromhex(output["priv_key_tweak"])) + if not (full_private_key * G).has_even_y(): + full_private_key = -full_private_key - sig = full_private_key.sign_schnorr(msg, aux) - assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}" + sig = schnorr_sign(msg, full_private_key.to_bytes(), aux) + assert schnorr_verify(msg, pub_key.to_bytes_xonly(), sig), f"Invalid signature for {pub_key}" output["signature"] = sig.hex() # Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses diff --git a/bip-0352/secp256k1.py b/bip-0352/secp256k1.py deleted file mode 100644 index 0ccbc4e6..00000000 --- a/bip-0352/secp256k1.py +++ /dev/null @@ -1,696 +0,0 @@ -# Copyright (c) 2019 Pieter Wuille -# Distributed under the MIT software license, see the accompanying -# file COPYING or http://www.opensource.org/licenses/mit-license.php. -"""Test-only secp256k1 elliptic curve implementation - -WARNING: This code is slow, uses bad randomness, does not properly protect -keys, and is trivially vulnerable to side channel attacks. Do not use for -anything but tests.""" -import random -import hashlib -import hmac - -def TaggedHash(tag, data): - ss = hashlib.sha256(tag.encode('utf-8')).digest() - ss += ss - ss += data - return hashlib.sha256(ss).digest() - -def modinv(a, n): - """Compute the modular inverse of a modulo n - - See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers. - """ - t1, t2 = 0, 1 - r1, r2 = n, a - while r2 != 0: - q = r1 // r2 - t1, t2 = t2, t1 - q * t2 - r1, r2 = r2, r1 - q * r2 - if r1 > 1: - return None - if t1 < 0: - t1 += n - return t1 - -def jacobi_symbol(n, k): - """Compute the Jacobi symbol of n modulo k - - See http://en.wikipedia.org/wiki/Jacobi_symbol - - For our application k is always prime, so this is the same as the Legendre symbol.""" - assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k" - n %= k - t = 0 - while n != 0: - while n & 1 == 0: - n >>= 1 - r = k & 7 - t ^= (r == 3 or r == 5) - n, k = k, n - t ^= (n & k & 3 == 3) - n = n % k - if k == 1: - return -1 if t else 1 - return 0 - -def modsqrt(a, p): - """Compute the square root of a modulo p when p % 4 = 3. - - The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm - - Limiting this function to only work for p % 4 = 3 means we don't need to - iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd - is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4) - - secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4. - """ - if p % 4 != 3: - raise NotImplementedError("modsqrt only implemented for p % 4 = 3") - sqrt = pow(a, (p + 1)//4, p) - if pow(sqrt, 2, p) == a % p: - return sqrt - return None - -def int_or_bytes(s): - "Convert 32-bytes to int while accepting also int and returning it as is." - if isinstance(s, bytes): - assert(len(s) == 32) - s = int.from_bytes(s, 'big') - elif not isinstance(s, int): - raise TypeError - return s - -class EllipticCurve: - def __init__(self, p, a, b): - """Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p).""" - self.p = p - self.a = a % p - self.b = b % p - - def affine(self, p1): - """Convert a Jacobian point tuple p1 to affine form, or None if at infinity. - - An affine point is represented as the Jacobian (x, y, 1)""" - x1, y1, z1 = p1 - if z1 == 0: - return None - inv = modinv(z1, self.p) - inv_2 = (inv**2) % self.p - inv_3 = (inv_2 * inv) % self.p - return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1) - - def has_even_y(self, p1): - """Whether the point p1 has an even Y coordinate when expressed in affine coordinates.""" - return not (p1[2] == 0 or self.affine(p1)[1] & 1) - - def negate(self, p1): - """Negate a Jacobian point tuple p1.""" - x1, y1, z1 = p1 - return (x1, (self.p - y1) % self.p, z1) - - def on_curve(self, p1): - """Determine whether a Jacobian tuple p is on the curve (and not infinity)""" - x1, y1, z1 = p1 - z2 = pow(z1, 2, self.p) - z4 = pow(z2, 2, self.p) - return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0 - - def is_x_coord(self, x): - """Test whether x is a valid X coordinate on the curve.""" - x_3 = pow(x, 3, self.p) - return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1 - - def lift_x(self, x): - """Given an X coordinate on the curve, return a corresponding affine point.""" - x_3 = pow(x, 3, self.p) - v = x_3 + self.a * x + self.b - y = modsqrt(v, self.p) - if y is None: - return None - return (x, y, 1) - - def double(self, p1): - """Double a Jacobian tuple p1 - - See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling""" - x1, y1, z1 = p1 - if z1 == 0: - return (0, 1, 0) - y1_2 = (y1**2) % self.p - y1_4 = (y1_2**2) % self.p - x1_2 = (x1**2) % self.p - s = (4*x1*y1_2) % self.p - m = 3*x1_2 - if self.a: - m += self.a * pow(z1, 4, self.p) - m = m % self.p - x2 = (m**2 - 2*s) % self.p - y2 = (m*(s - x2) - 8*y1_4) % self.p - z2 = (2*y1*z1) % self.p - return (x2, y2, z2) - - def add_mixed(self, p1, p2): - """Add a Jacobian tuple p1 and an affine tuple p2 - - See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)""" - x1, y1, z1 = p1 - x2, y2, z2 = p2 - assert(z2 == 1) - # Adding to the point at infinity is a no-op - if z1 == 0: - return p2 - z1_2 = (z1**2) % self.p - z1_3 = (z1_2 * z1) % self.p - u2 = (x2 * z1_2) % self.p - s2 = (y2 * z1_3) % self.p - if x1 == u2: - if (y1 != s2): - # p1 and p2 are inverses. Return the point at infinity. - return (0, 1, 0) - # p1 == p2. The formulas below fail when the two points are equal. - return self.double(p1) - h = u2 - x1 - r = s2 - y1 - h_2 = (h**2) % self.p - h_3 = (h_2 * h) % self.p - u1_h_2 = (x1 * h_2) % self.p - x3 = (r**2 - h_3 - 2*u1_h_2) % self.p - y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p - z3 = (h*z1) % self.p - return (x3, y3, z3) - - def add(self, p1, p2): - """Add two Jacobian tuples p1 and p2 - - See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition""" - x1, y1, z1 = p1 - x2, y2, z2 = p2 - # Adding the point at infinity is a no-op - if z1 == 0: - return p2 - if z2 == 0: - return p1 - # Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1 - if z1 == 1: - return self.add_mixed(p2, p1) - if z2 == 1: - return self.add_mixed(p1, p2) - z1_2 = (z1**2) % self.p - z1_3 = (z1_2 * z1) % self.p - z2_2 = (z2**2) % self.p - z2_3 = (z2_2 * z2) % self.p - u1 = (x1 * z2_2) % self.p - u2 = (x2 * z1_2) % self.p - s1 = (y1 * z2_3) % self.p - s2 = (y2 * z1_3) % self.p - if u1 == u2: - if (s1 != s2): - # p1 and p2 are inverses. Return the point at infinity. - return (0, 1, 0) - # p1 == p2. The formulas below fail when the two points are equal. - return self.double(p1) - h = u2 - u1 - r = s2 - s1 - h_2 = (h**2) % self.p - h_3 = (h_2 * h) % self.p - u1_h_2 = (u1 * h_2) % self.p - x3 = (r**2 - h_3 - 2*u1_h_2) % self.p - y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p - z3 = (h*z1*z2) % self.p - return (x3, y3, z3) - - def mul(self, ps): - """Compute a (multi) point multiplication - - ps is a list of (Jacobian tuple, scalar) pairs. - """ - r = (0, 1, 0) - for i in range(255, -1, -1): - r = self.double(r) - for (p, n) in ps: - if ((n >> i) & 1): - r = self.add(r, p) - return r - -SECP256K1_FIELD_SIZE = 2**256 - 2**32 - 977 -SECP256K1 = EllipticCurve(SECP256K1_FIELD_SIZE, 0, 7) -SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1) -SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 -SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2 -NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0 - -class ECPubKey(): - """A secp256k1 public key""" - - def __init__(self): - """Construct an uninitialized public key""" - self.valid = False - - def __repr__(self): - return self.get_bytes().hex() - - def __eq__(self, other): - assert isinstance(other, ECPubKey) - return self.get_bytes() == other.get_bytes() - - def __hash__(self): - return hash(self.get_bytes()) - - def set(self, data): - """Construct a public key from a serialization in compressed or uncompressed DER format or BIP340 format""" - if (len(data) == 65 and data[0] == 0x04): - p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1) - self.valid = SECP256K1.on_curve(p) - if self.valid: - self.p = p - self.compressed = False - elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)): - x = int.from_bytes(data[1:33], 'big') - if SECP256K1.is_x_coord(x): - p = SECP256K1.lift_x(x) - # if the oddness of the y co-ord isn't correct, find the other - # valid y - if (p[1] & 1) != (data[0] & 1): - p = SECP256K1.negate(p) - self.p = p - self.valid = True - self.compressed = True - else: - self.valid = False - elif (len(data) == 32): - x = int.from_bytes(data[0:32], 'big') - if SECP256K1.is_x_coord(x): - p = SECP256K1.lift_x(x) - # if the oddness of the y co-ord isn't correct, find the other - # valid y - if p[1]%2 != 0: - p = SECP256K1.negate(p) - self.p = p - self.valid = True - self.compressed = True - else: - self.valid = False - else: - self.valid = False - return self - - @property - def is_compressed(self): - return self.compressed - - @property - def is_valid(self): - return self.valid - - def get_y(self): - return SECP256K1.affine(self.p)[1] - - def get_x(self): - return SECP256K1.affine(self.p)[0] - - def get_bytes(self, bip340=True): - assert(self.valid) - p = SECP256K1.affine(self.p) - if p is None: - return None - if bip340: - return bytes(p[0].to_bytes(32, 'big')) - elif self.compressed: - return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big') - else: - return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big') - - def verify_ecdsa(self, sig, msg, low_s=True): - """Verify a strictly DER-encoded ECDSA signature against this pubkey. - - See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the - ECDSA verifier algorithm""" - assert(self.valid) - - # Extract r and s from the DER formatted signature. Return false for - # any DER encoding errors. - if (sig[1] + 2 != len(sig)): - return False - if (len(sig) < 4): - return False - if (sig[0] != 0x30): - return False - if (sig[2] != 0x02): - return False - rlen = sig[3] - if (len(sig) < 6 + rlen): - return False - if rlen < 1 or rlen > 33: - return False - if sig[4] >= 0x80: - return False - if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)): - return False - r = int.from_bytes(sig[4:4+rlen], 'big') - if (sig[4+rlen] != 0x02): - return False - slen = sig[5+rlen] - if slen < 1 or slen > 33: - return False - if (len(sig) != 6 + rlen + slen): - return False - if sig[6+rlen] >= 0x80: - return False - if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)): - return False - s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big') - - # Verify that r and s are within the group order - if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER: - return False - if low_s and s >= SECP256K1_ORDER_HALF: - return False - z = int.from_bytes(msg, 'big') - - # Run verifier algorithm on r, s - w = modinv(s, SECP256K1_ORDER) - u1 = z*w % SECP256K1_ORDER - u2 = r*w % SECP256K1_ORDER - R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)])) - if R is None or R[0] != r: - return False - return True - - def verify_schnorr(self, sig, msg): - assert(len(msg) == 32) - assert(len(sig) == 64) - assert(self.valid) - r = int.from_bytes(sig[0:32], 'big') - if r >= SECP256K1_FIELD_SIZE: - return False - s = int.from_bytes(sig[32:64], 'big') - if s >= SECP256K1_ORDER: - return False - e = int.from_bytes(TaggedHash("BIP0340/challenge", sig[0:32] + self.get_bytes() + msg), 'big') % SECP256K1_ORDER - R = SECP256K1.mul([(SECP256K1_G, s), (self.p, SECP256K1_ORDER - e)]) - if not SECP256K1.has_even_y(R): - return False - if ((r * R[2] * R[2]) % SECP256K1_FIELD_SIZE) != R[0]: - return False - return True - - def __add__(self, other): - """Adds two ECPubKey points.""" - assert isinstance(other, ECPubKey) - assert self.valid - assert other.valid - ret = ECPubKey() - ret.p = SECP256K1.add(other.p, self.p) - ret.valid = True - ret.compressed = self.compressed - return ret - - def __radd__(self, other): - """Allows this ECPubKey to be added to 0 for sum()""" - if other == 0: - return self - else: - return self + other - - def __mul__(self, other): - """Multiplies ECPubKey point with a scalar(int/32bytes/ECKey).""" - if isinstance(other, ECKey): - assert self.valid - assert other.secret is not None - multiplier = other.secret - else: - # int_or_bytes checks that other is `int` or `bytes` - multiplier = int_or_bytes(other) - - assert multiplier < SECP256K1_ORDER - multiplier = multiplier % SECP256K1_ORDER - ret = ECPubKey() - ret.p = SECP256K1.mul([(self.p, multiplier)]) - ret.valid = True - ret.compressed = self.compressed - return ret - - def __rmul__(self, other): - """Multiplies a scalar(int/32bytes/ECKey) with an ECPubKey point""" - return self * other - - def __sub__(self, other): - """Subtract one point from another""" - assert isinstance(other, ECPubKey) - assert self.valid - assert other.valid - ret = ECPubKey() - ret.p = SECP256K1.add(self.p, SECP256K1.negate(other.p)) - ret.valid = True - ret.compressed = self.compressed - return ret - - def tweak_add(self, tweak): - assert(self.valid) - t = int_or_bytes(tweak) - if t >= SECP256K1_ORDER: - return None - tweaked = SECP256K1.affine(SECP256K1.mul([(self.p, 1), (SECP256K1_G, t)])) - if tweaked is None: - return None - ret = ECPubKey() - ret.p = tweaked - ret.valid = True - ret.compressed = self.compressed - return ret - - def mul(self, data): - """Multiplies ECPubKey point with scalar data.""" - assert self.valid - other = ECKey() - other.set(data, True) - return self * other - - def negate(self): - self.p = SECP256K1.affine(SECP256K1.negate(self.p)) - -def rfc6979_nonce(key): - """Compute signing nonce using RFC6979.""" - v = bytes([1] * 32) - k = bytes([0] * 32) - k = hmac.new(k, v + b"\x00" + key, 'sha256').digest() - v = hmac.new(k, v, 'sha256').digest() - k = hmac.new(k, v + b"\x01" + key, 'sha256').digest() - v = hmac.new(k, v, 'sha256').digest() - return hmac.new(k, v, 'sha256').digest() - -class ECKey(): - """A secp256k1 private key""" - - def __init__(self): - self.valid = False - - def __repr__(self): - return str(self.secret) - - def __eq__(self, other): - assert isinstance(other, ECKey) - return self.secret == other.secret - - def __hash__(self): - return hash(self.secret) - - def set(self, secret, compressed=True): - """Construct a private key object from either 32-bytes or an int secret and a compressed flag.""" - secret = int_or_bytes(secret) - - self.valid = (secret > 0 and secret < SECP256K1_ORDER) - if self.valid: - self.secret = secret - self.compressed = compressed - return self - - def generate(self, compressed=True): - """Generate a random private key (compressed or uncompressed).""" - self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed) - return self - - def get_bytes(self): - """Retrieve the 32-byte representation of this key.""" - assert(self.valid) - return self.secret.to_bytes(32, 'big') - - def as_int(self): - return self.secret - - def from_int(self, secret, compressed=True): - self.valid = (secret > 0 and secret < SECP256K1_ORDER) - if self.valid: - self.secret = secret - self.compressed = compressed - - def __add__(self, other): - """Add key secrets. Returns compressed key.""" - assert isinstance(other, ECKey) - assert other.secret > 0 and other.secret < SECP256K1_ORDER - assert self.valid is True - ret_data = ((self.secret + other.secret) % SECP256K1_ORDER).to_bytes(32, 'big') - ret = ECKey() - ret.set(ret_data, True) - return ret - - def __radd__(self, other): - """Allows this ECKey to be added to 0 for sum()""" - if other == 0: - return self - else: - return self + other - - def __sub__(self, other): - """Subtract key secrets. Returns compressed key.""" - assert isinstance(other, ECKey) - assert other.secret > 0 and other.secret < SECP256K1_ORDER - assert self.valid is True - ret_data = ((self.secret - other.secret) % SECP256K1_ORDER).to_bytes(32, 'big') - ret = ECKey() - ret.set(ret_data, True) - return ret - - def __mul__(self, other): - """Multiply a private key by another private key or multiply a public key by a private key. Returns compressed key.""" - if isinstance(other, ECKey): - assert other.secret > 0 and other.secret < SECP256K1_ORDER - assert self.valid is True - ret_data = ((self.secret * other.secret) % SECP256K1_ORDER).to_bytes(32, 'big') - ret = ECKey() - ret.set(ret_data, True) - return ret - elif isinstance(other, ECPubKey): - return other * self - else: - # ECKey().set() checks that other is an `int` or `bytes` - assert self.valid - second = ECKey().set(other, self.compressed) - return self * second - - def __rmul__(self, other): - return self * other - - def add(self, data): - """Add key to scalar data. Returns compressed key.""" - other = ECKey() - other.set(data, True) - return self + other - - def mul(self, data): - """Multiply key secret with scalar data. Returns compressed key.""" - other = ECKey() - other.set(data, True) - return self * other - - def negate(self): - """Negate a private key.""" - assert self.valid - self.secret = SECP256K1_ORDER - self.secret - - @property - def is_valid(self): - return self.valid - - @property - def is_compressed(self): - return self.compressed - - def get_pubkey(self): - """Compute an ECPubKey object for this secret key.""" - assert(self.valid) - ret = ECPubKey() - p = SECP256K1.mul([(SECP256K1_G, self.secret)]) - ret.p = p - ret.valid = True - ret.compressed = self.compressed - return ret - - def sign_ecdsa(self, msg, low_s=True, rfc6979=False): - """Construct a DER-encoded ECDSA signature with this key. - - See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the - ECDSA signer algorithm.""" - assert(self.valid) - z = int.from_bytes(msg, 'big') - # Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation) - if rfc6979: - k = int.from_bytes(rfc6979_nonce(self.secret.to_bytes(32, 'big') + msg), 'big') - else: - k = random.randrange(1, SECP256K1_ORDER) - R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) - r = R[0] % SECP256K1_ORDER - s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER - if low_s and s > SECP256K1_ORDER_HALF: - s = SECP256K1_ORDER - s - # Represent in DER format. The byte representations of r and s have - # length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33 - # bytes). - rb = r.to_bytes((r.bit_length() + 8) // 8, 'big') - sb = s.to_bytes((s.bit_length() + 8) // 8, 'big') - return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb - - def sign_schnorr(self, msg, aux=None): - """Create a Schnorr signature (see BIP340).""" - if aux is None: - aux = bytes(32) - - assert self.valid - assert len(msg) == 32 - assert len(aux) == 32 - - t = (self.secret ^ int.from_bytes(TaggedHash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big') - kp = int.from_bytes(TaggedHash("BIP0340/nonce", t + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER - assert kp != 0 - R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)])) - k = kp if SECP256K1.has_even_y(R) else SECP256K1_ORDER - kp - e = int.from_bytes(TaggedHash("BIP0340/challenge", R[0].to_bytes(32, 'big') + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER - return R[0].to_bytes(32, 'big') + ((k + e * self.secret) % SECP256K1_ORDER).to_bytes(32, 'big') - - def tweak_add(self, tweak): - """Return a tweaked version of this private key.""" - assert(self.valid) - t = int_or_bytes(tweak) - if t >= SECP256K1_ORDER: - return None - tweaked = (self.secret + t) % SECP256K1_ORDER - if tweaked == 0: - return None - ret = ECKey() - ret.set(tweaked.to_bytes(32, 'big'), self.compressed) - return ret - -def generate_key_pair(secret=None, compressed=True): - """Convenience function to generate a private-public key pair.""" - d = ECKey() - if secret: - d.set(secret, compressed) - else: - d.generate(compressed) - - P = d.get_pubkey() - return d, P - -def generate_bip340_key_pair(): - """Convenience function to generate a BIP0340 private-public key pair.""" - d = ECKey() - d.generate() - P = d.get_pubkey() - if P.get_y()%2 != 0: - d.negate() - P.negate() - return d, P - -def generate_schnorr_nonce(): - """Generate a random valid BIP340 nonce. - - See https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki. - This implementation ensures the y-coordinate of the nonce point is even.""" - kp = random.randrange(1, SECP256K1_ORDER) - assert kp != 0 - R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)])) - k = kp if R[1] % 2 == 0 else SECP256K1_ORDER - kp - k_key = ECKey() - k_key.set(k.to_bytes(32, 'big'), True) - return k_key