1
0
mirror of https://github.com/bitcoin/bips.git synced 2026-03-09 15:53:54 +00:00

Merge pull request #2087 from theStack/bip352-vendor-secp256k1lab

BIP-352: vendor secp256k1lab and use it for reference implementation
This commit is contained in:
Mark "Murch" Erhardt
2026-03-06 14:39:56 -05:00
committed by GitHub
17 changed files with 790 additions and 803 deletions

View File

@@ -378,7 +378,10 @@ Silent payments introduces a new address format and protocol for sending and as
== Test Vectors ==
A [[bip-0352/send_and_receive_test_vectors.json|collection of test vectors in JSON format]] are provided, along with a [[bip-0352/reference.py|python reference implementation]]. Each test vector consists of a sending test case and corresponding receiving test case. This is to allow sending and receiving to be implemented separately. To ensure determinism while testing, sort the array of ''B<sub>m</sub>'' by amount (see the [[bip-0352/reference.py|reference implementation]]). Test cases use the following schema:
A [[bip-0352/send_and_receive_test_vectors.json|collection of test vectors in JSON format]] is provided, along with a [[bip-0352/reference.py|python reference implementation]]. It uses a vendored copy of the [https://github.com/secp256k1lab/secp256k1lab/ secp256k1lab] library at version 1.0.0
(commit [https://github.com/secp256k1lab/secp256k1lab/commit/44dc4bd893b8f03e621585e3bf255253e0e0fbfb 44dc4bd893b8f03e621585e3bf255253e0e0fbfb]).
Each test vector consists of a sending test case and corresponding receiving test case. This is to allow sending and receiving to be implemented separately. To ensure determinism while testing, sort the array of ''B<sub>m</sub>'' by amount (see the [[bip-0352/reference.py|reference implementation]]). Test cases use the following schema:
''' test_case '''

View File

@@ -2,7 +2,7 @@ import hashlib
import struct
from io import BytesIO
from ripemd160 import ripemd160
from secp256k1 import ECKey
from secp256k1lab.secp256k1 import Scalar
from typing import Union
@@ -93,7 +93,7 @@ class VinInfo:
else:
self.txinwitness = txinwitness
if private_key is None:
self.private_key = ECKey()
self.private_key = Scalar()
else:
self.private_key = private_key
self.scriptSig = scriptSig

View File

@@ -2,15 +2,19 @@
# For running the test vectors, run this script:
# ./reference.py send_and_receive_test_vectors.json
import hashlib
import json
from pathlib import Path
import sys
from typing import List, Tuple, Dict, cast
from sys import argv, exit
from functools import reduce
# local files
# import the vendored copy of secp256k1lab
sys.path.insert(0, str(Path(__file__).parent / "secp256k1lab/src"))
from secp256k1lab.bip340 import schnorr_sign, schnorr_verify
from secp256k1lab.secp256k1 import G, GE, Scalar
from secp256k1lab.util import tagged_hash, hash_sha256
from bech32m import convertbits, bech32_encode, decode, Encoding
from secp256k1 import ECKey, ECPubKey, TaggedHash, NUMS_H
from bitcoin_utils import (
deser_txid,
from_hex,
@@ -27,9 +31,10 @@ from bitcoin_utils import (
K_max = 2323 # per-group recipient limit
NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0
def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
def get_pubkey_from_input(vin: VinInfo) -> GE:
if is_p2pkh(vin.prevout):
# skip the first 3 op_codes and grab the 20 byte hash
# from the scriptPubKey
@@ -47,20 +52,22 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
pubkey_bytes = vin.scriptSig[i - 33:i]
pubkey_hash = hash160(pubkey_bytes)
if pubkey_hash == spk_hash:
pubkey = ECPubKey().set(pubkey_bytes)
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(pubkey_bytes)
except ValueError:
pass
if is_p2sh(vin.prevout):
redeem_script = vin.scriptSig[1:]
if is_p2wpkh(redeem_script):
pubkey = ECPubKey().set(vin.txinwitness.scriptWitness.stack[-1])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1])
except (ValueError, AssertionError):
pass
if is_p2wpkh(vin.prevout):
txin = vin.txinwitness
pubkey = ECPubKey().set(txin.scriptWitness.stack[-1])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_compressed(vin.txinwitness.scriptWitness.stack[-1])
except (ValueError, AssertionError):
pass
if is_p2tr(vin.prevout):
witnessStack = vin.txinwitness.scriptWitness.stack
if (len(witnessStack) >= 1):
@@ -75,71 +82,69 @@ def get_pubkey_from_input(vin: VinInfo) -> ECPubKey:
internal_key = control_block[1:33]
if (internal_key == NUMS_H.to_bytes(32, 'big')):
# Skip if NUMS_H
return ECPubKey()
return GE()
pubkey = ECPubKey().set(vin.prevout[2:])
if (pubkey.valid) & (pubkey.compressed):
return pubkey
try:
return GE.from_bytes_xonly(vin.prevout[2:])
except ValueError:
pass
return GE()
return ECPubKey()
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: ECPubKey) -> bytes:
def get_input_hash(outpoints: List[COutPoint], sum_input_pubkeys: GE) -> bytes:
lowest_outpoint = sorted(outpoints, key=lambda outpoint: outpoint.serialize())[0]
return TaggedHash("BIP0352/Inputs", lowest_outpoint.serialize() + cast(bytes, sum_input_pubkeys.get_bytes(False)))
return tagged_hash("BIP0352/Inputs", lowest_outpoint.serialize() + sum_input_pubkeys.to_bytes_compressed())
def encode_silent_payment_address(B_scan: ECPubKey, B_m: ECPubKey, hrp: str = "tsp", version: int = 0) -> str:
data = convertbits(cast(bytes, B_scan.get_bytes(False)) + cast(bytes, B_m.get_bytes(False)), 8, 5)
def encode_silent_payment_address(B_scan: GE, B_m: GE, hrp: str = "tsp", version: int = 0) -> str:
data = convertbits(B_scan.to_bytes_compressed() + B_m.to_bytes_compressed(), 8, 5)
return bech32_encode(hrp, [version] + cast(List[int], data), Encoding.BECH32M)
def generate_label(b_scan: ECKey, m: int) -> bytes:
return TaggedHash("BIP0352/Label", b_scan.get_bytes() + ser_uint32(m))
def generate_label(b_scan: Scalar, m: int) -> Scalar:
return Scalar.from_bytes_checked(tagged_hash("BIP0352/Label", b_scan.to_bytes() + ser_uint32(m)))
def create_labeled_silent_payment_address(b_scan: ECKey, B_spend: ECPubKey, m: int, hrp: str = "tsp", version: int = 0) -> str:
G = ECKey().set(1).get_pubkey()
B_scan = b_scan.get_pubkey()
def create_labeled_silent_payment_address(b_scan: Scalar, B_spend: GE, m: int, hrp: str = "tsp", version: int = 0) -> str:
B_scan = b_scan * G
B_m = B_spend + generate_label(b_scan, m) * G
labeled_address = encode_silent_payment_address(B_scan, B_m, hrp, version)
return labeled_address
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[ECPubKey, ECPubKey]:
def decode_silent_payment_address(address: str, hrp: str = "tsp") -> Tuple[GE, GE]:
_, data = decode(hrp, address)
if data is None:
return ECPubKey(), ECPubKey()
B_scan = ECPubKey().set(data[:33])
B_spend = ECPubKey().set(data[33:])
return GE(), GE()
B_scan = GE.from_bytes_compressed(data[:33])
B_spend = GE.from_bytes_compressed(data[33:])
return B_scan, B_spend
def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]:
G = ECKey().set(1).get_pubkey()
def create_outputs(input_priv_keys: List[Tuple[Scalar, bool]], outpoints: List[COutPoint], recipients: List[str], expected: Dict[str, any] = None, hrp="tsp") -> List[str]:
negated_keys = []
for key, is_xonly in input_priv_keys:
k = ECKey().set(key.get_bytes())
if is_xonly and k.get_pubkey().get_y() % 2 != 0:
k.negate()
k = Scalar.from_bytes_checked(key.to_bytes())
if is_xonly and not (k * G).has_even_y():
k = -k
negated_keys.append(k)
a_sum = sum(negated_keys)
if not a_sum.valid:
a_sum = Scalar.sum(*negated_keys)
if a_sum == 0:
# Input privkeys sum is zero -> fail
return []
assert ECKey().set(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum"
input_hash = get_input_hash(outpoints, a_sum * G)
silent_payment_groups: Dict[ECPubKey, List[ECPubKey]] = {}
assert Scalar.from_bytes_checked(bytes.fromhex(expected.get("input_private_key_sum"))) == a_sum, "a_sum did not match expected input_private_key_sum"
input_hash_scalar = Scalar.from_bytes_checked(get_input_hash(outpoints, a_sum * G))
silent_payment_groups: Dict[GE, List[GE]] = {}
for recipient in recipients:
B_scan, B_m = decode_silent_payment_address(recipient["address"], hrp=hrp)
# Verify decoded intermediate keys for recipient
expected_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
expected_B_m = ECPubKey().set(bytes.fromhex(recipient["spend_pub_key"]))
expected_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"]))
expected_B_m = GE.from_bytes_compressed(bytes.fromhex(recipient["spend_pub_key"]))
assert expected_B_scan == B_scan, "B_scan did not match expected recipient.scan_pub_key"
assert expected_B_m == B_m, "B_m did not match expected recipient.spend_pub_key"
if B_scan in silent_payment_groups:
@@ -153,68 +158,67 @@ def create_outputs(input_priv_keys: List[Tuple[ECKey, bool]], outpoints: List[CO
outputs = []
for B_scan, B_m_values in silent_payment_groups.items():
ecdh_shared_secret = input_hash * a_sum * B_scan
ecdh_shared_secret = input_hash_scalar * a_sum * B_scan
expected_shared_secrets = expected.get("shared_secrets", {})
# Find the recipient address that corresponds to this B_scan and get its index
for recipient_idx, recipient in enumerate(recipients):
recipient_B_scan = ECPubKey().set(bytes.fromhex(recipient["scan_pub_key"]))
recipient_B_scan = GE.from_bytes_compressed(bytes.fromhex(recipient["scan_pub_key"]))
if recipient_B_scan == B_scan:
expected_shared_secret_hex = expected_shared_secrets[recipient_idx]
assert ecdh_shared_secret.get_bytes(False).hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}"
assert ecdh_shared_secret.to_bytes_compressed().hex() == expected_shared_secret_hex, f"ecdh_shared_secret did not match expected, recipient {recipient_idx} ({recipient['address']}): expected={expected_shared_secret_hex}"
break
k = 0
for B_m in B_m_values:
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k)))
P_km = B_m + t_k * G
outputs.append(P_km.get_bytes().hex())
outputs.append(P_km.to_bytes_xonly().hex())
k += 1
return list(set(outputs))
def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: bytes, outputs_to_check: List[ECPubKey], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]:
G = ECKey().set(1).get_pubkey()
input_hash_key = ECKey().set(input_hash)
computed_tweak_point = input_hash_key * A_sum
assert computed_tweak_point.get_bytes(False).hex() == expected.get("tweak"), "tweak did not match expected"
ecdh_shared_secret = input_hash * b_scan * A_sum
assert ecdh_shared_secret.get_bytes(False).hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret"
def scanning(b_scan: Scalar, B_spend: GE, A_sum: GE, input_hash: bytes, outputs_to_check: List[bytes], labels: Dict[str, str] = None, expected: Dict[str, any] = None) -> List[Dict[str, str]]:
input_hash_scalar = Scalar.from_bytes_checked(input_hash)
computed_tweak_point = input_hash_scalar * A_sum
assert computed_tweak_point.to_bytes_compressed().hex() == expected.get("tweak"), "tweak did not match expected"
ecdh_shared_secret = input_hash_scalar * b_scan * A_sum
assert ecdh_shared_secret.to_bytes_compressed().hex() == expected.get("shared_secret"), "ecdh_shared_secret did not match expected shared_secret"
k = 0
wallet = []
while True:
if k == K_max: # Don't look further than the per-group recipient limit (K_max)
break
t_k = TaggedHash("BIP0352/SharedSecret", ecdh_shared_secret.get_bytes(False) + ser_uint32(k))
t_k = Scalar.from_bytes_checked(tagged_hash("BIP0352/SharedSecret", ecdh_shared_secret.to_bytes_compressed() + ser_uint32(k)))
P_k = B_spend + t_k * G
for output in outputs_to_check:
if P_k == output:
wallet.append({"pub_key": P_k.get_bytes().hex(), "priv_key_tweak": t_k.hex()})
output_ge = GE.from_bytes_xonly(output)
if P_k.to_bytes_xonly() == output:
wallet.append({"pub_key": P_k.to_bytes_xonly().hex(), "priv_key_tweak": t_k.to_bytes().hex()})
outputs_to_check.remove(output)
k += 1
break
elif labels:
m_G_sub = output - P_k
if m_G_sub.get_bytes(False).hex() in labels:
m_G_sub = output_ge - P_k
if m_G_sub.to_bytes_compressed().hex() in labels:
P_km = P_k + m_G_sub
wallet.append({
"pub_key": P_km.get_bytes().hex(),
"priv_key_tweak": (ECKey().set(t_k).add(
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
)).get_bytes().hex(),
"pub_key": P_km.to_bytes_xonly().hex(),
"priv_key_tweak": (t_k + Scalar.from_bytes_checked(
bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()])
)).to_bytes().hex(),
})
outputs_to_check.remove(output)
k += 1
break
else:
output.negate()
m_G_sub = output - P_k
if m_G_sub.get_bytes(False).hex() in labels:
m_G_sub = -output_ge - P_k
if m_G_sub.to_bytes_compressed().hex() in labels:
P_km = P_k + m_G_sub
wallet.append({
"pub_key": P_km.get_bytes().hex(),
"priv_key_tweak": (ECKey().set(t_k).add(
bytes.fromhex(labels[m_G_sub.get_bytes(False).hex()])
)).get_bytes().hex(),
"pub_key": P_km.to_bytes_xonly().hex(),
"priv_key_tweak": (t_k + Scalar.from_bytes_checked(
bytes.fromhex(labels[m_G_sub.to_bytes_compressed().hex()])
)).to_bytes().hex(),
})
outputs_to_check.remove(output)
k += 1
@@ -225,15 +229,13 @@ def scanning(b_scan: ECKey, B_spend: ECPubKey, A_sum: ECPubKey, input_hash: byte
if __name__ == "__main__":
if len(argv) != 2 or argv[1] in ('-h', '--help'):
if len(sys.argv) != 2 or sys.argv[1] in ('-h', '--help'):
print("Usage: ./reference.py send_and_receive_test_vectors.json")
exit(0)
sys.exit(0)
with open(argv[1], "r") as f:
with open(sys.argv[1], "r") as f:
test_data = json.loads(f.read())
# G , needed for generating the labels "database"
G = ECKey().set(1).get_pubkey()
for case in test_data:
print(case["comment"])
# Test sending
@@ -247,7 +249,7 @@ if __name__ == "__main__":
scriptSig=bytes.fromhex(input["scriptSig"]),
txinwitness=CTxInWitness().deserialize(from_hex(input["txinwitness"])),
prevout=bytes.fromhex(input["prevout"]["scriptPubKey"]["hex"]),
private_key=ECKey().set(bytes.fromhex(input["private_key"])),
private_key=Scalar.from_bytes_checked(bytes.fromhex(input["private_key"])),
)
for input in given["vin"]
]
@@ -256,14 +258,14 @@ if __name__ == "__main__":
input_pub_keys = []
for vin in vins:
pubkey = get_pubkey_from_input(vin)
if not pubkey.valid:
if pubkey.infinity:
continue
input_priv_keys.append((
vin.private_key,
is_p2tr(vin.prevout),
))
input_pub_keys.append(pubkey)
assert [pk.get_bytes(False).hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected"
assert [pk.to_bytes_compressed().hex() for pk in input_pub_keys] == expected.get("input_pub_keys"), "input_pub_keys did not match expected"
sending_outputs = []
if (len(input_pub_keys) > 0):
@@ -283,13 +285,13 @@ if __name__ == "__main__":
assert(sending_outputs == expected["outputs"][0] == []), "Sending test failed"
# Test receiving
msg = hashlib.sha256(b"message").digest()
aux = hashlib.sha256(b"random auxiliary data").digest()
msg = hash_sha256(b"message")
aux = hash_sha256(b"random auxiliary data")
for receiving_test in case["receiving"]:
given = receiving_test["given"]
expected = receiving_test["expected"]
outputs_to_check = [
ECPubKey().set(bytes.fromhex(p)) for p in given["outputs"]
bytes.fromhex(p) for p in given["outputs"]
]
vins = [
VinInfo(
@@ -302,12 +304,10 @@ if __name__ == "__main__":
]
# Check that the given inputs for the receiving test match what was generated during the sending test
receiving_addresses = []
b_scan = ECKey().set(bytes.fromhex(given["key_material"]["scan_priv_key"]))
b_spend = ECKey().set(
bytes.fromhex(given["key_material"]["spend_priv_key"])
)
B_scan = b_scan.get_pubkey()
B_spend = b_spend.get_pubkey()
b_scan = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["scan_priv_key"]))
b_spend = Scalar.from_bytes_checked(bytes.fromhex(given["key_material"]["spend_priv_key"]))
B_scan = b_scan * G
B_spend = b_spend * G
receiving_addresses.append(
encode_silent_payment_address(B_scan, B_spend, hrp="sp")
)
@@ -324,21 +324,21 @@ if __name__ == "__main__":
input_pub_keys = []
for vin in vins:
pubkey = get_pubkey_from_input(vin)
if not pubkey.valid:
if pubkey.infinity:
continue
input_pub_keys.append(pubkey)
add_to_wallet = []
if (len(input_pub_keys) > 0):
A_sum = reduce(lambda x, y: x + y, input_pub_keys)
if A_sum.get_bytes() is None:
A_sum = GE.sum(*input_pub_keys)
if A_sum.infinity:
# Input pubkeys sum is point at infinity -> skip tx
assert expected["outputs"] == []
continue
assert A_sum.get_bytes(False).hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum"
assert A_sum.to_bytes_compressed().hex() == expected.get("input_pub_key_sum"), "A_sum did not match expected input_pub_key_sum"
input_hash = get_input_hash([vin.outpoint for vin in vins], A_sum)
pre_computed_labels = {
(generate_label(b_scan, label) * G).get_bytes(False).hex(): generate_label(b_scan, label).hex()
(generate_label(b_scan, label) * G).to_bytes_compressed().hex(): generate_label(b_scan, label).to_bytes().hex()
for label in given["labels"]
}
add_to_wallet = scanning(
@@ -353,13 +353,13 @@ if __name__ == "__main__":
# Check that the private key is correct for the found output public key
for output in add_to_wallet:
pub_key = ECPubKey().set(bytes.fromhex(output["pub_key"]))
full_private_key = b_spend.add(bytes.fromhex(output["priv_key_tweak"]))
if full_private_key.get_pubkey().get_y() % 2 != 0:
full_private_key.negate()
pub_key = GE.from_bytes_xonly(bytes.fromhex(output["pub_key"]))
full_private_key = b_spend + Scalar.from_bytes_checked(bytes.fromhex(output["priv_key_tweak"]))
if not (full_private_key * G).has_even_y():
full_private_key = -full_private_key
sig = full_private_key.sign_schnorr(msg, aux)
assert pub_key.verify_schnorr(sig, msg), f"Invalid signature for {pub_key}"
sig = schnorr_sign(msg, full_private_key.to_bytes(), aux)
assert schnorr_verify(msg, pub_key.to_bytes_xonly(), sig), f"Invalid signature for {pub_key}"
output["signature"] = sig.hex()
# Note: order doesn't matter for creating/finding the outputs. However, different orderings of the recipient addresses

View File

@@ -1,696 +0,0 @@
# Copyright (c) 2019 Pieter Wuille
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test-only secp256k1 elliptic curve implementation
WARNING: This code is slow, uses bad randomness, does not properly protect
keys, and is trivially vulnerable to side channel attacks. Do not use for
anything but tests."""
import random
import hashlib
import hmac
def TaggedHash(tag, data):
ss = hashlib.sha256(tag.encode('utf-8')).digest()
ss += ss
ss += data
return hashlib.sha256(ss).digest()
def modinv(a, n):
"""Compute the modular inverse of a modulo n
See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers.
"""
t1, t2 = 0, 1
r1, r2 = n, a
while r2 != 0:
q = r1 // r2
t1, t2 = t2, t1 - q * t2
r1, r2 = r2, r1 - q * r2
if r1 > 1:
return None
if t1 < 0:
t1 += n
return t1
def jacobi_symbol(n, k):
"""Compute the Jacobi symbol of n modulo k
See http://en.wikipedia.org/wiki/Jacobi_symbol
For our application k is always prime, so this is the same as the Legendre symbol."""
assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k"
n %= k
t = 0
while n != 0:
while n & 1 == 0:
n >>= 1
r = k & 7
t ^= (r == 3 or r == 5)
n, k = k, n
t ^= (n & k & 3 == 3)
n = n % k
if k == 1:
return -1 if t else 1
return 0
def modsqrt(a, p):
"""Compute the square root of a modulo p when p % 4 = 3.
The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm
Limiting this function to only work for p % 4 = 3 means we don't need to
iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd
is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4)
secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4.
"""
if p % 4 != 3:
raise NotImplementedError("modsqrt only implemented for p % 4 = 3")
sqrt = pow(a, (p + 1)//4, p)
if pow(sqrt, 2, p) == a % p:
return sqrt
return None
def int_or_bytes(s):
"Convert 32-bytes to int while accepting also int and returning it as is."
if isinstance(s, bytes):
assert(len(s) == 32)
s = int.from_bytes(s, 'big')
elif not isinstance(s, int):
raise TypeError
return s
class EllipticCurve:
def __init__(self, p, a, b):
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
self.p = p
self.a = a % p
self.b = b % p
def affine(self, p1):
"""Convert a Jacobian point tuple p1 to affine form, or None if at infinity.
An affine point is represented as the Jacobian (x, y, 1)"""
x1, y1, z1 = p1
if z1 == 0:
return None
inv = modinv(z1, self.p)
inv_2 = (inv**2) % self.p
inv_3 = (inv_2 * inv) % self.p
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
def has_even_y(self, p1):
"""Whether the point p1 has an even Y coordinate when expressed in affine coordinates."""
return not (p1[2] == 0 or self.affine(p1)[1] & 1)
def negate(self, p1):
"""Negate a Jacobian point tuple p1."""
x1, y1, z1 = p1
return (x1, (self.p - y1) % self.p, z1)
def on_curve(self, p1):
"""Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
x1, y1, z1 = p1
z2 = pow(z1, 2, self.p)
z4 = pow(z2, 2, self.p)
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
def is_x_coord(self, x):
"""Test whether x is a valid X coordinate on the curve."""
x_3 = pow(x, 3, self.p)
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
def lift_x(self, x):
"""Given an X coordinate on the curve, return a corresponding affine point."""
x_3 = pow(x, 3, self.p)
v = x_3 + self.a * x + self.b
y = modsqrt(v, self.p)
if y is None:
return None
return (x, y, 1)
def double(self, p1):
"""Double a Jacobian tuple p1
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling"""
x1, y1, z1 = p1
if z1 == 0:
return (0, 1, 0)
y1_2 = (y1**2) % self.p
y1_4 = (y1_2**2) % self.p
x1_2 = (x1**2) % self.p
s = (4*x1*y1_2) % self.p
m = 3*x1_2
if self.a:
m += self.a * pow(z1, 4, self.p)
m = m % self.p
x2 = (m**2 - 2*s) % self.p
y2 = (m*(s - x2) - 8*y1_4) % self.p
z2 = (2*y1*z1) % self.p
return (x2, y2, z2)
def add_mixed(self, p1, p2):
"""Add a Jacobian tuple p1 and an affine tuple p2
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
x1, y1, z1 = p1
x2, y2, z2 = p2
assert(z2 == 1)
# Adding to the point at infinity is a no-op
if z1 == 0:
return p2
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
u2 = (x2 * z1_2) % self.p
s2 = (y2 * z1_3) % self.p
if x1 == u2:
if (y1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - x1
r = s2 - y1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (x1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
z3 = (h*z1) % self.p
return (x3, y3, z3)
def add(self, p1, p2):
"""Add two Jacobian tuples p1 and p2
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition"""
x1, y1, z1 = p1
x2, y2, z2 = p2
# Adding the point at infinity is a no-op
if z1 == 0:
return p2
if z2 == 0:
return p1
# Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1
if z1 == 1:
return self.add_mixed(p2, p1)
if z2 == 1:
return self.add_mixed(p1, p2)
z1_2 = (z1**2) % self.p
z1_3 = (z1_2 * z1) % self.p
z2_2 = (z2**2) % self.p
z2_3 = (z2_2 * z2) % self.p
u1 = (x1 * z2_2) % self.p
u2 = (x2 * z1_2) % self.p
s1 = (y1 * z2_3) % self.p
s2 = (y2 * z1_3) % self.p
if u1 == u2:
if (s1 != s2):
# p1 and p2 are inverses. Return the point at infinity.
return (0, 1, 0)
# p1 == p2. The formulas below fail when the two points are equal.
return self.double(p1)
h = u2 - u1
r = s2 - s1
h_2 = (h**2) % self.p
h_3 = (h_2 * h) % self.p
u1_h_2 = (u1 * h_2) % self.p
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
z3 = (h*z1*z2) % self.p
return (x3, y3, z3)
def mul(self, ps):
"""Compute a (multi) point multiplication
ps is a list of (Jacobian tuple, scalar) pairs.
"""
r = (0, 1, 0)
for i in range(255, -1, -1):
r = self.double(r)
for (p, n) in ps:
if ((n >> i) & 1):
r = self.add(r, p)
return r
SECP256K1_FIELD_SIZE = 2**256 - 2**32 - 977
SECP256K1 = EllipticCurve(SECP256K1_FIELD_SIZE, 0, 7)
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
NUMS_H = 0x50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0
class ECPubKey():
"""A secp256k1 public key"""
def __init__(self):
"""Construct an uninitialized public key"""
self.valid = False
def __repr__(self):
return self.get_bytes().hex()
def __eq__(self, other):
assert isinstance(other, ECPubKey)
return self.get_bytes() == other.get_bytes()
def __hash__(self):
return hash(self.get_bytes())
def set(self, data):
"""Construct a public key from a serialization in compressed or uncompressed DER format or BIP340 format"""
if (len(data) == 65 and data[0] == 0x04):
p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
self.valid = SECP256K1.on_curve(p)
if self.valid:
self.p = p
self.compressed = False
elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
x = int.from_bytes(data[1:33], 'big')
if SECP256K1.is_x_coord(x):
p = SECP256K1.lift_x(x)
# if the oddness of the y co-ord isn't correct, find the other
# valid y
if (p[1] & 1) != (data[0] & 1):
p = SECP256K1.negate(p)
self.p = p
self.valid = True
self.compressed = True
else:
self.valid = False
elif (len(data) == 32):
x = int.from_bytes(data[0:32], 'big')
if SECP256K1.is_x_coord(x):
p = SECP256K1.lift_x(x)
# if the oddness of the y co-ord isn't correct, find the other
# valid y
if p[1]%2 != 0:
p = SECP256K1.negate(p)
self.p = p
self.valid = True
self.compressed = True
else:
self.valid = False
else:
self.valid = False
return self
@property
def is_compressed(self):
return self.compressed
@property
def is_valid(self):
return self.valid
def get_y(self):
return SECP256K1.affine(self.p)[1]
def get_x(self):
return SECP256K1.affine(self.p)[0]
def get_bytes(self, bip340=True):
assert(self.valid)
p = SECP256K1.affine(self.p)
if p is None:
return None
if bip340:
return bytes(p[0].to_bytes(32, 'big'))
elif self.compressed:
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
else:
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
def verify_ecdsa(self, sig, msg, low_s=True):
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA verifier algorithm"""
assert(self.valid)
# Extract r and s from the DER formatted signature. Return false for
# any DER encoding errors.
if (sig[1] + 2 != len(sig)):
return False
if (len(sig) < 4):
return False
if (sig[0] != 0x30):
return False
if (sig[2] != 0x02):
return False
rlen = sig[3]
if (len(sig) < 6 + rlen):
return False
if rlen < 1 or rlen > 33:
return False
if sig[4] >= 0x80:
return False
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
return False
r = int.from_bytes(sig[4:4+rlen], 'big')
if (sig[4+rlen] != 0x02):
return False
slen = sig[5+rlen]
if slen < 1 or slen > 33:
return False
if (len(sig) != 6 + rlen + slen):
return False
if sig[6+rlen] >= 0x80:
return False
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
return False
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
# Verify that r and s are within the group order
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
return False
if low_s and s >= SECP256K1_ORDER_HALF:
return False
z = int.from_bytes(msg, 'big')
# Run verifier algorithm on r, s
w = modinv(s, SECP256K1_ORDER)
u1 = z*w % SECP256K1_ORDER
u2 = r*w % SECP256K1_ORDER
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
if R is None or R[0] != r:
return False
return True
def verify_schnorr(self, sig, msg):
assert(len(msg) == 32)
assert(len(sig) == 64)
assert(self.valid)
r = int.from_bytes(sig[0:32], 'big')
if r >= SECP256K1_FIELD_SIZE:
return False
s = int.from_bytes(sig[32:64], 'big')
if s >= SECP256K1_ORDER:
return False
e = int.from_bytes(TaggedHash("BIP0340/challenge", sig[0:32] + self.get_bytes() + msg), 'big') % SECP256K1_ORDER
R = SECP256K1.mul([(SECP256K1_G, s), (self.p, SECP256K1_ORDER - e)])
if not SECP256K1.has_even_y(R):
return False
if ((r * R[2] * R[2]) % SECP256K1_FIELD_SIZE) != R[0]:
return False
return True
def __add__(self, other):
"""Adds two ECPubKey points."""
assert isinstance(other, ECPubKey)
assert self.valid
assert other.valid
ret = ECPubKey()
ret.p = SECP256K1.add(other.p, self.p)
ret.valid = True
ret.compressed = self.compressed
return ret
def __radd__(self, other):
"""Allows this ECPubKey to be added to 0 for sum()"""
if other == 0:
return self
else:
return self + other
def __mul__(self, other):
"""Multiplies ECPubKey point with a scalar(int/32bytes/ECKey)."""
if isinstance(other, ECKey):
assert self.valid
assert other.secret is not None
multiplier = other.secret
else:
# int_or_bytes checks that other is `int` or `bytes`
multiplier = int_or_bytes(other)
assert multiplier < SECP256K1_ORDER
multiplier = multiplier % SECP256K1_ORDER
ret = ECPubKey()
ret.p = SECP256K1.mul([(self.p, multiplier)])
ret.valid = True
ret.compressed = self.compressed
return ret
def __rmul__(self, other):
"""Multiplies a scalar(int/32bytes/ECKey) with an ECPubKey point"""
return self * other
def __sub__(self, other):
"""Subtract one point from another"""
assert isinstance(other, ECPubKey)
assert self.valid
assert other.valid
ret = ECPubKey()
ret.p = SECP256K1.add(self.p, SECP256K1.negate(other.p))
ret.valid = True
ret.compressed = self.compressed
return ret
def tweak_add(self, tweak):
assert(self.valid)
t = int_or_bytes(tweak)
if t >= SECP256K1_ORDER:
return None
tweaked = SECP256K1.affine(SECP256K1.mul([(self.p, 1), (SECP256K1_G, t)]))
if tweaked is None:
return None
ret = ECPubKey()
ret.p = tweaked
ret.valid = True
ret.compressed = self.compressed
return ret
def mul(self, data):
"""Multiplies ECPubKey point with scalar data."""
assert self.valid
other = ECKey()
other.set(data, True)
return self * other
def negate(self):
self.p = SECP256K1.affine(SECP256K1.negate(self.p))
def rfc6979_nonce(key):
"""Compute signing nonce using RFC6979."""
v = bytes([1] * 32)
k = bytes([0] * 32)
k = hmac.new(k, v + b"\x00" + key, 'sha256').digest()
v = hmac.new(k, v, 'sha256').digest()
k = hmac.new(k, v + b"\x01" + key, 'sha256').digest()
v = hmac.new(k, v, 'sha256').digest()
return hmac.new(k, v, 'sha256').digest()
class ECKey():
"""A secp256k1 private key"""
def __init__(self):
self.valid = False
def __repr__(self):
return str(self.secret)
def __eq__(self, other):
assert isinstance(other, ECKey)
return self.secret == other.secret
def __hash__(self):
return hash(self.secret)
def set(self, secret, compressed=True):
"""Construct a private key object from either 32-bytes or an int secret and a compressed flag."""
secret = int_or_bytes(secret)
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
return self
def generate(self, compressed=True):
"""Generate a random private key (compressed or uncompressed)."""
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
return self
def get_bytes(self):
"""Retrieve the 32-byte representation of this key."""
assert(self.valid)
return self.secret.to_bytes(32, 'big')
def as_int(self):
return self.secret
def from_int(self, secret, compressed=True):
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
self.secret = secret
self.compressed = compressed
def __add__(self, other):
"""Add key secrets. Returns compressed key."""
assert isinstance(other, ECKey)
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret + other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
def __radd__(self, other):
"""Allows this ECKey to be added to 0 for sum()"""
if other == 0:
return self
else:
return self + other
def __sub__(self, other):
"""Subtract key secrets. Returns compressed key."""
assert isinstance(other, ECKey)
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret - other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
def __mul__(self, other):
"""Multiply a private key by another private key or multiply a public key by a private key. Returns compressed key."""
if isinstance(other, ECKey):
assert other.secret > 0 and other.secret < SECP256K1_ORDER
assert self.valid is True
ret_data = ((self.secret * other.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
ret = ECKey()
ret.set(ret_data, True)
return ret
elif isinstance(other, ECPubKey):
return other * self
else:
# ECKey().set() checks that other is an `int` or `bytes`
assert self.valid
second = ECKey().set(other, self.compressed)
return self * second
def __rmul__(self, other):
return self * other
def add(self, data):
"""Add key to scalar data. Returns compressed key."""
other = ECKey()
other.set(data, True)
return self + other
def mul(self, data):
"""Multiply key secret with scalar data. Returns compressed key."""
other = ECKey()
other.set(data, True)
return self * other
def negate(self):
"""Negate a private key."""
assert self.valid
self.secret = SECP256K1_ORDER - self.secret
@property
def is_valid(self):
return self.valid
@property
def is_compressed(self):
return self.compressed
def get_pubkey(self):
"""Compute an ECPubKey object for this secret key."""
assert(self.valid)
ret = ECPubKey()
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
ret.p = p
ret.valid = True
ret.compressed = self.compressed
return ret
def sign_ecdsa(self, msg, low_s=True, rfc6979=False):
"""Construct a DER-encoded ECDSA signature with this key.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA signer algorithm."""
assert(self.valid)
z = int.from_bytes(msg, 'big')
# Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation)
if rfc6979:
k = int.from_bytes(rfc6979_nonce(self.secret.to_bytes(32, 'big') + msg), 'big')
else:
k = random.randrange(1, SECP256K1_ORDER)
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
r = R[0] % SECP256K1_ORDER
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
if low_s and s > SECP256K1_ORDER_HALF:
s = SECP256K1_ORDER - s
# Represent in DER format. The byte representations of r and s have
# length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33
# bytes).
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb
def sign_schnorr(self, msg, aux=None):
"""Create a Schnorr signature (see BIP340)."""
if aux is None:
aux = bytes(32)
assert self.valid
assert len(msg) == 32
assert len(aux) == 32
t = (self.secret ^ int.from_bytes(TaggedHash("BIP0340/aux", aux), 'big')).to_bytes(32, 'big')
kp = int.from_bytes(TaggedHash("BIP0340/nonce", t + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
assert kp != 0
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
k = kp if SECP256K1.has_even_y(R) else SECP256K1_ORDER - kp
e = int.from_bytes(TaggedHash("BIP0340/challenge", R[0].to_bytes(32, 'big') + self.get_pubkey().get_bytes() + msg), 'big') % SECP256K1_ORDER
return R[0].to_bytes(32, 'big') + ((k + e * self.secret) % SECP256K1_ORDER).to_bytes(32, 'big')
def tweak_add(self, tweak):
"""Return a tweaked version of this private key."""
assert(self.valid)
t = int_or_bytes(tweak)
if t >= SECP256K1_ORDER:
return None
tweaked = (self.secret + t) % SECP256K1_ORDER
if tweaked == 0:
return None
ret = ECKey()
ret.set(tweaked.to_bytes(32, 'big'), self.compressed)
return ret
def generate_key_pair(secret=None, compressed=True):
"""Convenience function to generate a private-public key pair."""
d = ECKey()
if secret:
d.set(secret, compressed)
else:
d.generate(compressed)
P = d.get_pubkey()
return d, P
def generate_bip340_key_pair():
"""Convenience function to generate a BIP0340 private-public key pair."""
d = ECKey()
d.generate()
P = d.get_pubkey()
if P.get_y()%2 != 0:
d.negate()
P.negate()
return d, P
def generate_schnorr_nonce():
"""Generate a random valid BIP340 nonce.
See https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki.
This implementation ensures the y-coordinate of the nonce point is even."""
kp = random.randrange(1, SECP256K1_ORDER)
assert kp != 0
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)]))
k = kp if R[1] % 2 == 0 else SECP256K1_ORDER - kp
k_key = ECKey()
k_key.set(k.to_bytes(32, 'big'), True)
return k_key

View File

@@ -0,0 +1,17 @@
name: Tests
on: [push, pull_request]
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v5
- run: uvx ruff check .
mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v5
- run: uvx mypy .

View File

@@ -0,0 +1 @@
3.9

View File

@@ -0,0 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.0.0] - 2025-03-31
Initial release.

View File

@@ -0,0 +1,23 @@
The MIT License (MIT)
Copyright (c) 2009-2024 The Bitcoin Core developers
Copyright (c) 2009-2024 Bitcoin Developers
Copyright (c) 2025- The secp256k1lab Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -0,0 +1,13 @@
secp256k1lab
============
![Dependencies: None](https://img.shields.io/badge/dependencies-none-success)
An INSECURE implementation of the secp256k1 elliptic curve and related cryptographic schemes written in Python, intended for prototyping, experimentation and education.
Features:
* Low-level secp256k1 field and group arithmetic.
* Schnorr signing/verification and key generation according to [BIP-340](https://github.com/bitcoin/bips/blob/master/bip-0340.mediawiki).
* ECDH key exchange.
WARNING: The code in this library is slow and trivially vulnerable to side channel attacks.

View File

@@ -0,0 +1,34 @@
[project]
name = "secp256k1lab"
version = "1.0.0"
description = "An INSECURE implementation of the secp256k1 elliptic curve and related cryptographic schemes, intended for prototyping, experimentation and education"
readme = "README.md"
authors = [
{ name = "Pieter Wuille", email = "pieter@wuille.net" },
{ name = "Tim Ruffing", email = "me@real-or-random.org" },
{ name = "Jonas Nick", email = "jonasd.nick@gmail.com" },
{ name = "Sebastian Falbesoner", email = "sebastian.falbesoner@gmail.com" }
]
maintainers = [
{ name = "Tim Ruffing", email = "me@real-or-random.org" },
{ name = "Jonas Nick", email = "jonasd.nick@gmail.com" },
{ name = "Sebastian Falbesoner", email = "sebastian.falbesoner@gmail.com" }
]
requires-python = ">=3.9"
license = "MIT"
license-files = ["COPYING"]
keywords = ["secp256k1", "elliptic curves", "cryptography", "Bitcoin"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: Education",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Topic :: Security :: Cryptography",
]
dependencies = []
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -0,0 +1,73 @@
# The following functions are based on the BIP 340 reference implementation:
# https://github.com/bitcoin/bips/blob/master/bip-0340/reference.py
from .secp256k1 import FE, GE, G
from .util import int_from_bytes, bytes_from_int, xor_bytes, tagged_hash
def pubkey_gen(seckey: bytes) -> bytes:
d0 = int_from_bytes(seckey)
if not (1 <= d0 <= GE.ORDER - 1):
raise ValueError("The secret key must be an integer in the range 1..n-1.")
P = d0 * G
assert not P.infinity
return P.to_bytes_xonly()
def schnorr_sign(
msg: bytes, seckey: bytes, aux_rand: bytes, tag_prefix: str = "BIP0340"
) -> bytes:
d0 = int_from_bytes(seckey)
if not (1 <= d0 <= GE.ORDER - 1):
raise ValueError("The secret key must be an integer in the range 1..n-1.")
if len(aux_rand) != 32:
raise ValueError("aux_rand must be 32 bytes instead of %i." % len(aux_rand))
P = d0 * G
assert not P.infinity
d = d0 if P.has_even_y() else GE.ORDER - d0
t = xor_bytes(bytes_from_int(d), tagged_hash(tag_prefix + "/aux", aux_rand))
k0 = (
int_from_bytes(tagged_hash(tag_prefix + "/nonce", t + P.to_bytes_xonly() + msg))
% GE.ORDER
)
if k0 == 0:
raise RuntimeError("Failure. This happens only with negligible probability.")
R = k0 * G
assert not R.infinity
k = k0 if R.has_even_y() else GE.ORDER - k0
e = (
int_from_bytes(
tagged_hash(
tag_prefix + "/challenge", R.to_bytes_xonly() + P.to_bytes_xonly() + msg
)
)
% GE.ORDER
)
sig = R.to_bytes_xonly() + bytes_from_int((k + e * d) % GE.ORDER)
assert schnorr_verify(msg, P.to_bytes_xonly(), sig, tag_prefix=tag_prefix)
return sig
def schnorr_verify(
msg: bytes, pubkey: bytes, sig: bytes, tag_prefix: str = "BIP0340"
) -> bool:
if len(pubkey) != 32:
raise ValueError("The public key must be a 32-byte array.")
if len(sig) != 64:
raise ValueError("The signature must be a 64-byte array.")
try:
P = GE.from_bytes_xonly(pubkey)
except ValueError:
return False
r = int_from_bytes(sig[0:32])
s = int_from_bytes(sig[32:64])
if (r >= FE.SIZE) or (s >= GE.ORDER):
return False
e = (
int_from_bytes(tagged_hash(tag_prefix + "/challenge", sig[0:32] + pubkey + msg))
% GE.ORDER
)
R = s * G - e * P
if R.infinity or (not R.has_even_y()) or (R.x != r):
return False
return True

View File

@@ -0,0 +1,16 @@
import hashlib
from .secp256k1 import GE, Scalar
def ecdh_compressed_in_raw_out(seckey: bytes, pubkey: bytes) -> GE:
"""TODO"""
shared_secret = Scalar.from_bytes_checked(seckey) * GE.from_bytes_compressed(pubkey)
assert not shared_secret.infinity # prime-order group
return shared_secret
def ecdh_libsecp256k1(seckey: bytes, pubkey: bytes) -> bytes:
"""TODO"""
shared_secret = ecdh_compressed_in_raw_out(seckey, pubkey)
return hashlib.sha256(shared_secret.to_bytes_compressed()).digest()

View File

@@ -0,0 +1,15 @@
from .secp256k1 import GE, G
from .util import int_from_bytes
# The following function is based on the BIP 327 reference implementation
# https://github.com/bitcoin/bips/blob/master/bip-0327/reference.py
# Return the plain public key corresponding to a given secret key
def pubkey_gen_plain(seckey: bytes) -> bytes:
d0 = int_from_bytes(seckey)
if not (1 <= d0 <= GE.ORDER - 1):
raise ValueError("The secret key must be an integer in the range 1..n-1.")
P = d0 * G
assert not P.infinity
return P.to_bytes_compressed()

View File

@@ -0,0 +1,454 @@
# Copyright (c) 2022-2023 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test-only implementation of low-level secp256k1 field and group arithmetic
It is designed for ease of understanding, not performance.
WARNING: This code is slow and trivially vulnerable to side channel attacks. Do not use for
anything but tests.
Exports:
* FE: class for secp256k1 field elements
* GE: class for secp256k1 group elements
* G: the secp256k1 generator point
"""
# TODO Docstrings of methods still say "field element"
class APrimeFE:
"""Objects of this class represent elements of a prime field.
They are represented internally in numerator / denominator form, in order to delay inversions.
"""
# The size of the field (also its modulus and characteristic).
SIZE: int
def __init__(self, a=0, b=1):
"""Initialize a field element a/b; both a and b can be ints or field elements."""
if isinstance(a, type(self)):
num = a._num
den = a._den
else:
num = a % self.SIZE
den = 1
if isinstance(b, type(self)):
den = (den * b._num) % self.SIZE
num = (num * b._den) % self.SIZE
else:
den = (den * b) % self.SIZE
assert den != 0
if num == 0:
den = 1
self._num = num
self._den = den
def __add__(self, a):
"""Compute the sum of two field elements (second may be int)."""
if isinstance(a, type(self)):
return type(self)(self._num * a._den + self._den * a._num, self._den * a._den)
if isinstance(a, int):
return type(self)(self._num + self._den * a, self._den)
return NotImplemented
def __radd__(self, a):
"""Compute the sum of an integer and a field element."""
return type(self)(a) + self
@classmethod
# REVIEW This should be
# def sum(cls, *es: Iterable[Self]) -> Self:
# but Self needs the typing_extension package on Python <= 3.12.
def sum(cls, *es):
"""Compute the sum of field elements.
sum(a, b, c, ...) is identical to (0 + a + b + c + ...)."""
return sum(es, start=cls(0))
def __sub__(self, a):
"""Compute the difference of two field elements (second may be int)."""
if isinstance(a, type(self)):
return type(self)(self._num * a._den - self._den * a._num, self._den * a._den)
if isinstance(a, int):
return type(self)(self._num - self._den * a, self._den)
return NotImplemented
def __rsub__(self, a):
"""Compute the difference of an integer and a field element."""
return type(self)(a) - self
def __mul__(self, a):
"""Compute the product of two field elements (second may be int)."""
if isinstance(a, type(self)):
return type(self)(self._num * a._num, self._den * a._den)
if isinstance(a, int):
return type(self)(self._num * a, self._den)
return NotImplemented
def __rmul__(self, a):
"""Compute the product of an integer with a field element."""
return type(self)(a) * self
def __truediv__(self, a):
"""Compute the ratio of two field elements (second may be int)."""
if isinstance(a, type(self)) or isinstance(a, int):
return type(self)(self, a)
return NotImplemented
def __pow__(self, a):
"""Raise a field element to an integer power."""
return type(self)(pow(self._num, a, self.SIZE), pow(self._den, a, self.SIZE))
def __neg__(self):
"""Negate a field element."""
return type(self)(-self._num, self._den)
def __int__(self):
"""Convert a field element to an integer in range 0..SIZE-1. The result is cached."""
if self._den != 1:
self._num = (self._num * pow(self._den, -1, self.SIZE)) % self.SIZE
self._den = 1
return self._num
def sqrt(self):
"""Compute the square root of a field element if it exists (None otherwise)."""
raise NotImplementedError
def is_square(self):
"""Determine if this field element has a square root."""
# A more efficient algorithm is possible here (Jacobi symbol).
return self.sqrt() is not None
def is_even(self):
"""Determine whether this field element, represented as integer in 0..SIZE-1, is even."""
return int(self) & 1 == 0
def __eq__(self, a):
"""Check whether two field elements are equal (second may be an int)."""
if isinstance(a, type(self)):
return (self._num * a._den - self._den * a._num) % self.SIZE == 0
return (self._num - self._den * a) % self.SIZE == 0
def to_bytes(self):
"""Convert a field element to a 32-byte array (BE byte order)."""
return int(self).to_bytes(32, 'big')
@classmethod
def from_int_checked(cls, v):
"""Convert an integer to a field element (no overflow allowed)."""
if v >= cls.SIZE:
raise ValueError
return cls(v)
@classmethod
def from_int_wrapping(cls, v):
"""Convert an integer to a field element (reduced modulo SIZE)."""
return cls(v % cls.SIZE)
@classmethod
def from_bytes_checked(cls, b):
"""Convert a 32-byte array to a field element (BE byte order, no overflow allowed)."""
v = int.from_bytes(b, 'big')
return cls.from_int_checked(v)
@classmethod
def from_bytes_wrapping(cls, b):
"""Convert a 32-byte array to a field element (BE byte order, reduced modulo SIZE)."""
v = int.from_bytes(b, 'big')
return cls.from_int_wrapping(v)
def __str__(self):
"""Convert this field element to a 64 character hex string."""
return f"{int(self):064x}"
def __repr__(self):
"""Get a string representation of this field element."""
return f"{type(self).__qualname__}(0x{int(self):x})"
class FE(APrimeFE):
SIZE = 2**256 - 2**32 - 977
def sqrt(self):
# Due to the fact that our modulus p is of the form (p % 4) == 3, the Tonelli-Shanks
# algorithm (https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm) is simply
# raising the argument to the power (p + 1) / 4.
# To see why: (p-1) % 2 = 0, so 2 divides the order of the multiplicative group,
# and thus only half of the non-zero field elements are squares. An element a is
# a (nonzero) square when Euler's criterion, a^((p-1)/2) = 1 (mod p), holds. We're
# looking for x such that x^2 = a (mod p). Given a^((p-1)/2) = 1, that is equivalent
# to x^2 = a^(1 + (p-1)/2) mod p. As (1 + (p-1)/2) is even, this is equivalent to
# x = a^((1 + (p-1)/2)/2) mod p, or x = a^((p+1)/4) mod p.
v = int(self)
s = pow(v, (self.SIZE + 1) // 4, self.SIZE)
if s**2 % self.SIZE == v:
return type(self)(s)
return None
class Scalar(APrimeFE):
"""TODO Docstring"""
SIZE = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
class GE:
"""Objects of this class represent secp256k1 group elements (curve points or infinity)
GE objects are immutable.
Normal points on the curve have fields:
* x: the x coordinate (a field element)
* y: the y coordinate (a field element, satisfying y^2 = x^3 + 7)
* infinity: False
The point at infinity has field:
* infinity: True
"""
# TODO The following two class attributes should probably be just getters as
# classmethods to enforce immutability. Unfortunately Python makes it hard
# to create "classproperties". `G` could then also be just a classmethod.
# Order of the group (number of points on the curve, plus 1 for infinity)
ORDER = Scalar.SIZE
# Number of valid distinct x coordinates on the curve.
ORDER_HALF = ORDER // 2
@property
def infinity(self):
"""Whether the group element is the point at infinity."""
return self._infinity
@property
def x(self):
"""The x coordinate (a field element) of a non-infinite group element."""
assert not self.infinity
return self._x
@property
def y(self):
"""The y coordinate (a field element) of a non-infinite group element."""
assert not self.infinity
return self._y
def __init__(self, x=None, y=None):
"""Initialize a group element with specified x and y coordinates, or infinity."""
if x is None:
# Initialize as infinity.
assert y is None
self._infinity = True
else:
# Initialize as point on the curve (and check that it is).
fx = FE(x)
fy = FE(y)
assert fy**2 == fx**3 + 7
self._infinity = False
self._x = fx
self._y = fy
def __add__(self, a):
"""Add two group elements together."""
# Deal with infinity: a + infinity == infinity + a == a.
if self.infinity:
return a
if a.infinity:
return self
if self.x == a.x:
if self.y != a.y:
# A point added to its own negation is infinity.
assert self.y + a.y == 0
return GE()
else:
# For identical inputs, use the tangent (doubling formula).
lam = (3 * self.x**2) / (2 * self.y)
else:
# For distinct inputs, use the line through both points (adding formula).
lam = (self.y - a.y) / (self.x - a.x)
# Determine point opposite to the intersection of that line with the curve.
x = lam**2 - (self.x + a.x)
y = lam * (self.x - x) - self.y
return GE(x, y)
@staticmethod
def sum(*ps):
"""Compute the sum of group elements.
GE.sum(a, b, c, ...) is identical to (GE() + a + b + c + ...)."""
return sum(ps, start=GE())
@staticmethod
def batch_mul(*aps):
"""Compute a (batch) scalar group element multiplication.
GE.batch_mul((a1, p1), (a2, p2), (a3, p3)) is identical to a1*p1 + a2*p2 + a3*p3,
but more efficient."""
# Reduce all the scalars modulo order first (so we can deal with negatives etc).
naps = [(int(a), p) for a, p in aps]
# Start with point at infinity.
r = GE()
# Iterate over all bit positions, from high to low.
for i in range(255, -1, -1):
# Double what we have so far.
r = r + r
# Add then add the points for which the corresponding scalar bit is set.
for (a, p) in naps:
if (a >> i) & 1:
r += p
return r
def __rmul__(self, a):
"""Multiply an integer with a group element."""
if self == G:
return FAST_G.mul(Scalar(a))
return GE.batch_mul((Scalar(a), self))
def __neg__(self):
"""Compute the negation of a group element."""
if self.infinity:
return self
return GE(self.x, -self.y)
def __sub__(self, a):
"""Subtract a group element from another."""
return self + (-a)
def __eq__(self, a):
"""Check if two group elements are equal."""
return (self - a).infinity
def has_even_y(self):
"""Determine whether a non-infinity group element has an even y coordinate."""
assert not self.infinity
return self.y.is_even()
def to_bytes_compressed(self):
"""Convert a non-infinite group element to 33-byte compressed encoding."""
assert not self.infinity
return bytes([3 - self.y.is_even()]) + self.x.to_bytes()
def to_bytes_compressed_with_infinity(self):
"""Convert a group element to 33-byte compressed encoding, mapping infinity to zeros."""
if self.infinity:
return 33 * b"\x00"
return self.to_bytes_compressed()
def to_bytes_uncompressed(self):
"""Convert a non-infinite group element to 65-byte uncompressed encoding."""
assert not self.infinity
return b'\x04' + self.x.to_bytes() + self.y.to_bytes()
def to_bytes_xonly(self):
"""Convert (the x coordinate of) a non-infinite group element to 32-byte xonly encoding."""
assert not self.infinity
return self.x.to_bytes()
@staticmethod
def lift_x(x):
"""Return group element with specified field element as x coordinate (and even y)."""
y = (FE(x)**3 + 7).sqrt()
if y is None:
raise ValueError
if not y.is_even():
y = -y
return GE(x, y)
@staticmethod
def from_bytes_compressed(b):
"""Convert a compressed to a group element."""
assert len(b) == 33
if b[0] != 2 and b[0] != 3:
raise ValueError
x = FE.from_bytes_checked(b[1:])
r = GE.lift_x(x)
if b[0] == 3:
r = -r
return r
@staticmethod
def from_bytes_uncompressed(b):
"""Convert an uncompressed to a group element."""
assert len(b) == 65
if b[0] != 4:
raise ValueError
x = FE.from_bytes_checked(b[1:33])
y = FE.from_bytes_checked(b[33:])
if y**2 != x**3 + 7:
raise ValueError
return GE(x, y)
@staticmethod
def from_bytes(b):
"""Convert a compressed or uncompressed encoding to a group element."""
assert len(b) in (33, 65)
if len(b) == 33:
return GE.from_bytes_compressed(b)
else:
return GE.from_bytes_uncompressed(b)
@staticmethod
def from_bytes_xonly(b):
"""Convert a point given in xonly encoding to a group element."""
assert len(b) == 32
x = FE.from_bytes_checked(b)
r = GE.lift_x(x)
return r
@staticmethod
def is_valid_x(x):
"""Determine whether the provided field element is a valid X coordinate."""
return (FE(x)**3 + 7).is_square()
def __str__(self):
"""Convert this group element to a string."""
if self.infinity:
return "(inf)"
return f"({self.x},{self.y})"
def __repr__(self):
"""Get a string representation for this group element."""
if self.infinity:
return "GE()"
return f"GE(0x{int(self.x):x},0x{int(self.y):x})"
def __hash__(self):
"""Compute a non-cryptographic hash of the group element."""
if self.infinity:
return 0 # 0 is not a valid x coordinate
return int(self.x)
# The secp256k1 generator point
G = GE.lift_x(0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798)
class FastGEMul:
"""Table for fast multiplication with a constant group element.
Speed up scalar multiplication with a fixed point P by using a precomputed lookup table with
its powers of 2:
table = [P, 2*P, 4*P, (2^3)*P, (2^4)*P, ..., (2^255)*P]
During multiplication, the points corresponding to each bit set in the scalar are added up,
i.e. on average ~128 point additions take place.
"""
def __init__(self, p):
self.table = [p] # table[i] = (2^i) * p
for _ in range(255):
p = p + p
self.table.append(p)
def mul(self, a):
result = GE()
a = int(a)
for bit in range(a.bit_length()):
if a & (1 << bit):
result += self.table[bit]
return result
# Precomputed table with multiples of G for fast multiplication
FAST_G = FastGEMul(G)

View File

@@ -0,0 +1,24 @@
import hashlib
# This implementation can be sped up by storing the midstate after hashing
# tag_hash instead of rehashing it all the time.
def tagged_hash(tag: str, msg: bytes) -> bytes:
tag_hash = hashlib.sha256(tag.encode()).digest()
return hashlib.sha256(tag_hash + tag_hash + msg).digest()
def bytes_from_int(x: int) -> bytes:
return x.to_bytes(32, byteorder="big")
def xor_bytes(b0: bytes, b1: bytes) -> bytes:
return bytes(x ^ y for (x, y) in zip(b0, b1))
def int_from_bytes(b: bytes) -> int:
return int.from_bytes(b, byteorder="big")
def hash_sha256(b: bytes) -> bytes:
return hashlib.sha256(b).digest()