Post
Topic
Board Development & Technical Discussion
Re: K nonce pollard's kangaroo
by
Paulfontrahel
on 30/03/2025, 18:26:26 UTC
I apologize, but can you give me a little more detail? I have a public key, but I want to find the k nonce by signatures, not the private key. Can you, as a newbie in this difficult matter, give me more details? Now I use the following code:

import os
import time
import gmpy2
from coincurve import PublicKey
from multiprocessing import Pool, cpu_count
import random
import signal
from datetime import datetime

# Parameters of the secp256k1 curve
n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141

# Files
input_file = "rsz_8.txt"
output_file = "k_8.txt"

# Global flag for execution control
stop_flag = False

def init_worker():
    """Worker initialization for interrupt handling"""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def parse_signature_file(filename):
    """Reading signatures with handling of leading zeros in R and S"""
    signatures = []
    with open(filename, 'r') as f:
        lines = f.readlines()
   
    current_r, current_s, current_z = None, None, None
    for line in lines:
        line = line.strip()
        if line.startswith("R ="):
            current_r = line.split('=')[1].strip().lower()
            current_r = current_r.lstrip("0")
            if len(current_r) > 64:
                continue
            current_r = current_r.zfill(64)
        elif line.startswith("S ="):
            current_s = line.split('=')[1].strip().lower()
            current_s = current_s.lstrip("0")
            if len(current_s) > 64:
                continue
            current_s = current_s.zfill(64)
        elif line.startswith("Z ="):
            current_z = line.split('=')[1].strip().lower()
            if current_r and current_s and current_z:
                signatures.append((current_r, current_s, current_z))
    return signatures

def validate_signature(k, r_hex, s_hex, z_hex):
    """Signature verification for given k (corrected version)"""
    r = int(r_hex, 16)
   
    # Calculate R = k*G directly via coincurve
    try:
        # Generate temporary public key from k (this is R = k*G)
        pubkey = PublicKey.from_secret(k.to_bytes(32, 'big'))
        # Get x-coordinate of R (first 32 bytes without prefix)
        expected_r_bytes = pubkey.format()[1:33]
        expected_r = int.from_bytes(expected_r_bytes, 'big') % n
    except:
        return False
   
    return expected_r == r

def kangaroo_algorithm(start, end, signatures, pubkey=None):
    """Kangaroo algorithm with CPU load limitation"""
    global stop_flag
   
    N = 1 << 18  # Number of points
    jumps = [1 << random.randint(0, 18) for _ in range(16)]
    last_print_time = time.time()
   
    def f(x):
        return x + jumps[x % len(jumps)]
   
    # Kangaroo initialization
    xT, aT = end, 0
    xW, aW = (end - start) // 2, 0
   
    # Main loop with pauses to reduce load
    for iteration in range(N):
        if stop_flag:
            return None
           
        xT = f(xT) % n
        aT += 1
       
        k_candidate = (xT - xW + aW) % n
        if start <= k_candidate <= end:
            for r, s, z in signatures:
                if validate_signature(k_candidate, r, s, z):
                    return k_candidate
       
        # Progress output every 5 seconds
        current_time = time.time()
        if current_time - last_print_time > 5:
            progress = (iteration / N) * 100
            print(f"[{datetime.now().strftime('%H:%M:%S')}] Progress: {progress:.2f}% | Checked: {iteration}/{N} iterations | Current candidate: {hex(k_candidate)}")
            last_print_time = current_time
       
        # Pause to reduce load (10ms every 1000 iterations)
        if iteration % 1000 == 0:
            time.sleep(0.01)
   
    # Continue search with pauses
    while not stop_flag:
        xW = f(xW) % n
        aW += 1
       
        for d in range(-10, 10):
            k_candidate = (xW - xT + aT + d) % n
            if start <= k_candidate <= end:
                for r, s, z in signatures:
                    if validate_signature(k_candidate, r, s, z):
                        return k_candidate
       
        # Progress output every 5 seconds
        current_time = time.time()
        if current_time - last_print_time > 5:
            print(f"[{datetime.now().strftime('%H:%M:%S')}] Continuing search | Checked: {aW} additional iterations | Current candidate: {hex(k_candidate)}")
            last_print_time = current_time
       
        # Pause to reduce load
        if aW % 1000 == 0:
            time.sleep(0.02)
   
    return None

def worker(args):
    """Function for multi-threaded operation with load limitation"""
    try:
        chunk_start, chunk_end, signatures, pubkey = args
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting search in range {hex(chunk_start)} - {hex(chunk_end)}")
       
        # Artificial speed limit for this worker
        start_time = time.time()
        k = kangaroo_algorithm(chunk_start, chunk_end, signatures, pubkey)
       
        if k is not None:
            return [(hex(k), r, s, z) for r, s, z in signatures if validate_signature(k, r, s, z)]
    except KeyboardInterrupt:
        pass
    return []

def main():
    """Main function with load control"""
    global stop_flag
   
    try:
        open(output_file, 'w').close()
        signatures = parse_signature_file(input_file)
        if not signatures:
            print("No signatures to process!")
            return
       
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Loaded {len(signatures)} signatures")
        pubkey = None
       
        # Define search range
        start_k = n - 2**40
        end_k = n - 1
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Searching in range {hex(start_k)} - {hex(end_k)}")
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Using {max(1, cpu_count() // 2)} of {cpu_count()} CPU cores")
       
        # Use only part of cores to reduce load
        num_cores = max(1, cpu_count() // 2)  # Use half of the cores
        chunk_size = (end_k - start_k) // num_cores
        chunks = [(start_k + i*chunk_size,
                  start_k + (i+1)*chunk_size -1 if i < num_cores-1 else end_k,
                  signatures, pubkey)
                 for i in range(num_cores)]
       
        # Start search with process count limitation
        with Pool(num_cores, initializer=init_worker) as pool:
            results = pool.imap_unordered(worker, chunks)
           
            for res in results:
                for k, r, s, z in res:
                    with open(output_file, 'a') as f:
                        f.write(f"k: {k}\nR: {r}\nS: {s}\nZ: {z}\n\n")
                    print(f"[{datetime.now().strftime('%H:%M:%S')}] Found k: {k}")
               
                # Check stop flag
                if stop_flag:
                    break
               
    except KeyboardInterrupt:
        print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Received interrupt signal, stopping...")
        stop_flag = True
    finally:
        print(f"[{datetime.now().strftime('%H:%M:%S')}] Execution time: {time.time() - start_time:.2f} sec")

if __name__ == "__main__":
    start_time = time.time()
    main()