Post
Topic
Board Development & Technical Discussion
Topic OP
Attacking ECDSA by Lattice Sieving: Bridging Gap with Fourier Analysis Attacks
by
krashfire
on 12/10/2024, 04:47:35 UTC
I had chance upon this research.. https://eprint.iacr.org/2024/296

The author had through multiple signatures found the private key through less than 4 bits of nonce leakage.

My problem is, i do not know the k nonce. there is no leakage. I do not know the private key. i do not know the message that was used..  So i modify the script accordingly.  i believe if it takes less than 4 bits of nonce leakage to eventually gain the private key,

I could 'leak' the 6 bits of the k nonce. try on every pattern (Total This script will try all 262,144 (64^3) possible combinations) and eventually get the right 'leak' for the full k nonce.

Notwithstanding using GPU, CPU ONLY, it will take me roughly 22 hours to complete the full pattern.

What i want to ask is, why do you think my method won't work?

Thank You. I Appreciate your opinion.

Code:

import os
os.environ['CUDA_PATH'] = '/usr/local/cuda-11.5'

import time
import logging
import multiprocessing
import itertools
import numpy as np
import cupy as cp
import cupyx
from cupy.linalg import qr
import pickle

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Print CUDA version for verification
logger.info(f"CUDA version: {cp.cuda.runtime.runtimeGetVersion()}")
logger.info(f"CuPy version: {cp.__version__}")

p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
a, b = 0, 7
E = EllipticCurve(GF(p), [a, b])
G = E(0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798,
      0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8)
n = G.order()

def ecdsa_to_hnp(signatures, l):
    logger.info(f"Converting {len(signatures)} ECDSA signatures to HNP instances")
    q = n
    w = q // (2**(l+1))
    samples = []
    for sig in signatures:
        r = int(sig['r'], 16)
        s = int(sig['s'], 16)
        z = int(sig['z'], 16)
        t = (Mod(r, q) * Mod(s, q)**(-1)) % q
        a = (Mod(z, q) * Mod(s, q)**(-1) - w) % q
        samples.append((t, a))
    logger.info(f"Conversion complete. Generated {len(samples)} HNP samples")
    return samples

def construct_lattice(samples, x, l):
    logger.info(f"Constructing lattice with {len(samples)} samples, x={x}, l={l}")
    m = len(samples)
    q = int(n)
    w = q // (2**(l+1))
    B = matrix(ZZ, m+1, m+1)
    for i in range(m-1):
        B[i, i] = q
    B[m-1, :m-1] = [x * t for t, _ in samples]
    B[m-1, m-1] = x
    B[m, :m-1] = [a for _, a in samples]
    B[m, m] = int((w * w // 3)**0.5)
    logger.info(f"Lattice construction complete. Dimension: {B.nrows()} x {B.ncols()}")
    return cp.array(B.numpy())

def gpu_lll(B, delta=0.99):
    logger.info(f"Starting GPU-accelerated LLL reduction on {B.shape[0]}x{B.shape[1]} matrix")
    Q, R = qr(B)
    n = B.shape[0]
    k = 1
    while k < n:
        for j in range(k-1, -1, -1):
            mu = cp.round(R[j,k] / R[j,j])
            if mu != 0:
                R[:j+1,k] -= mu * R[:j+1,j]
                Q[:,k] -= mu * Q[:,j]
        if delta * R[k-1,k-1]**2 > R[k,k]**2 + R[k-1,k]**2:
            R[[k-1,k]] = R[[k,k-1]]
            Q[:,[k-1,k]] = Q[:,[k,k-1]]
            k = max(k-1, 1)
        else:
            k += 1
    logger.info("GPU-accelerated LLL reduction complete")
    return Q @ R

def gpu_bkz(B, block_size=20):
    logger.info(f"Starting GPU-accelerated BKZ reduction with block size {block_size}")
    n = B.shape[0]
    for i in range(0, n - block_size + 1):
        logger.debug(f"Processing block {i}/{n - block_size}")
        block = B[i:i+block_size, i:i+block_size]
        block = gpu_lll(block)
        B[i:i+block_size, i:i+block_size] = block
    logger.info("GPU-accelerated BKZ reduction complete")
    return B

def gauss_sieve(B, target_norm, max_list_size=None):
    logger.info(f"Starting Gauss sieve with target norm {target_norm}")
    L = []
    S = []
    C = B.get().tolist()  # Convert CuPy array to list
   
    while C:
        v = C.pop(0)
        v = cp.array(v)  # Convert back to CuPy array for GPU operations
        if cp.linalg.norm(v) > target_norm:
            continue
       
        if not L:
            L.append(v)
            continue
       
        changed = True
        while changed:
            changed = False
            for w in L:
                if cp.linalg.norm(v - w) < cp.linalg.norm(v):
                    v = v - w
                    changed = True
                    break
                elif cp.linalg.norm(v + w) < cp.linalg.norm(v):
                    v = v + w
                    changed = True
                    break
       
        if cp.linalg.norm(v) <= target_norm:
            L.append(v)
            if max_list_size and len(L) > max_list_size:
                L.sort(key=lambda x: cp.linalg.norm(x))
                L = L[:max_list_size]
        else:
            S.append(v)
   
    logger.info(f"Gauss sieve complete. Found {len(L)} vectors")
    return L

def interval_reduction(low, high, samples, q, l):
    logger.info(f"Starting interval reduction: [{low}, {high}]")
    M = high - low + 1
    N = int(np.log2(M).ceil())
    R = [(low, high)]
   
    for t, a in samples[:N]:
        R_new = []
        for interval in R:
            low, high = interval
            n_min = ((t * low - a - q/(2**(l+1))) // q).ceil()
            n_max = ((t * high - a + q/(2**(l+1))) // q).floor()
            for n in range(n_min, n_max + 1):
                new_low = max(low, ((a + n*q - q/(2**(l+1))) // t).ceil())
                new_high = min(high, ((a + n*q + q/(2**(l+1))) // t).floor())
                if new_low <= new_high:
                    R_new.append((new_low, new_high))
        R = R_new
   
    logger.info(f"Interval reduction complete. Resulting intervals: {len(R)}")
    return R

def pre_screening(alpha0, samples, q, l, x):
    logger.debug(f"Pre-screening candidate α₀: {alpha0}")
    w = q // (2**(l+1))
    result = all(abs(((x * t * alpha0 - a + q//2) % q) - q//2) <= w + q//(2**(l+4)) for t, a in samples)
    logger.debug(f"Pre-screening result: {'Passed' if result else 'Failed'}")
    return result

def improved_linear_predicate(v, samples, q, l, tau):
    logger.debug(f"Checking improved linear predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Predicate failed initial checks")
        return None
   
    k0 = -np.sign(v[1]) * v[0] + q/(2**(l+1))
    alpha = (Mod(samples[0][1] + k0, q) * Mod(samples[0][0], q)**(-1)) % q
   
    N = 2 * int(np.log2(q).ceil())
    M = sum(1 for t, a in samples[:N] if abs((t * alpha - a) % q) < q/(2**l))
   
    if M > N * (1 - np.log2(q)/(2**l) + 2**(-l)) / 2:
        logger.debug(f"Predicate passed. Potential α: {alpha}")
        return int(alpha)
    logger.debug("Predicate failed final check")
    return None

def decomposition_predicate(v, samples, q, l, tau, x):
    logger.debug(f"Checking decomposition predicate for v: {v}")
    if v[0] == 0 or abs(v[0]) > q/(2**(l+1)) or abs(v[1]) != tau:
        logger.debug("Decomposition predicate failed initial checks")
        return None
   
    low = -np.sign(v[1]) * v[0] - x//2
    high = -np.sign(v[1]) * v[0] + x//2
   
    R = interval_reduction(low, high, samples, q, l)
   
    for interval in R:
        for h in range(interval[0], interval[1] + 1):
            alpha = improved_linear_predicate((h, -tau), samples, q, l, tau)
            if alpha is not None and pre_screening(alpha, samples, q, l, x):
                logger.info(f"Decomposition predicate found potential solution: {alpha}")
                return alpha
   
    logger.debug("Decomposition predicate failed to find a solution")
    return None

def progressive_bkz_sieve(B, predicate, start_dim=20, step=5, max_dim=None):
    if max_dim is None:
        max_dim = B.shape[0]
   
    for d in range(start_dim, max_dim + 1, step):
        logger.info(f"Processing dimension {d}")
        B_sub = B[:d, :d]
       
        B_sub = gpu_bkz(B_sub, block_size=min(20, d))
       
        target_norm = cp.sqrt(4/3) * cp.linalg.det(B_sub)**(1/d)
        logger.info(f"Target norm for dimension {d}: {target_norm}")
        sieved_vectors = gauss_sieve(B_sub, target_norm, max_list_size=d*10)
       
        logger.info(f"Checking predicates for {len(sieved_vectors)} vectors")
        for v in sieved_vectors:
            sk = predicate(v[-2:])
            if sk is not None:
                logger.info(f"Found potential solution: {sk}")
                return sk
   
    logger.info("Progressive BKZ-sieve completed without finding a solution")
    return None

def try_nonce_patterns(args):
    signatures, l, x, pubkey, patterns = args
    logger.info(f"Trying nonce pattern: {patterns}")
    modified_sigs = [{**sig, 'r': hex(int(sig['r'], 16) ^ (p << (256 - l)))[2:].zfill(64)}
                     for sig, p in zip(signatures, patterns)]
    samples = ecdsa_to_hnp(modified_sigs, l)
    B = construct_lattice(samples, x, l)
   
    def predicate(v):
        return decomposition_predicate(v, samples, int(n), l, int(B[-1, -1].get()), x)
   
    sk = progressive_bkz_sieve(B, predicate, start_dim=20)
   
    if sk:
        recovered_pubkey_point = sk * G
        recovered_pubkey = '04' + hex(recovered_pubkey_point[0])[2:].zfill(64) + hex(recovered_pubkey_point[1])[2:].zfill(64)
        if recovered_pubkey == pubkey:
            logger.info(f"Successfully recovered private key: {hex(sk)}")
            return sk
    logger.info(f"Failed to recover private key for pattern: {patterns}")
    return None

def save_progress(completed_patterns):
    logger.info(f"Saving progress. Completed patterns: {len(completed_patterns)}")
    with open("progress.pkl", "wb") as f:
        pickle.dump(completed_patterns, f)

def load_progress():
    if os.path.exists("progress.pkl"):
        with open("progress.pkl", "rb") as f:
            completed_patterns = pickle.load(f)
        logger.info(f"Loaded progress. Completed patterns: {len(completed_patterns)}")
        return completed_patterns
    logger.info("No previous progress found. Starting from scratch.")
    return set()


def main():
    signatures = [
        {
            'r': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        },
        {
            'r': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        },
        {
            'r': '5caxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            's': '21bxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
            'z': 'hexadecimalxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        }
    ]
   
    l = 6  # 6-bit guessing
    x = 2**15
    pubkey = "04xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
   
    all_patterns = list(itertools.product(range(2**l), repeat=len(signatures)))
    completed_patterns = load_progress()
    patterns_to_try = [p for p in all_patterns if p not in completed_patterns]
   
    num_processors = min(24, multiprocessing.cpu_count())
    logger.info(f"Using {num_processors} processors")

    try:
        with multiprocessing.Pool(num_processors) as pool:
            args = [(signatures, l, x, pubkey, pattern) for pattern in patterns_to_try]
            for i, result in enumerate(pool.imap_unordered(try_nonce_patterns, args)):
                if result is not None:
                    print(f"Successfully recovered private key: {hex(result)}")
                    return
                completed_patterns.add(patterns_to_try[i])
                if (i+1) % 1000 == 0:
                    save_progress(completed_patterns)
                    logger.info(f"Completed {i+1}/{len(patterns_to_try)} pattern combinations")
    except KeyboardInterrupt:
        logger.info("Interrupted by user. Saving progress...")
        save_progress(completed_patterns)
   
    print("Failed to recover the private key.")

if __name__ == "__main__":
    main()


The code is pretty much self explanatory. i already extended some definitions that has no #comments so its easier for you to read and understand what the code is achieving..