I apologize, but can you give me a little more detail? I have a public key, but I want to find the k nonce by signatures, not the private key. Can you, as a newbie in this difficult matter, give me more details? Now I use the following code:
import os
import time
import gmpy2
from coincurve import PublicKey
from multiprocessing import Pool, cpu_count
import random
import signal
from datetime import datetime
# Parameters of the secp256k1 curve
n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
# Files
input_file = "rsz_8.txt"
output_file = "k_8.txt"
# Global flag for execution control
stop_flag = False
def init_worker():
"""Worker initialization for interrupt handling"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
def parse_signature_file(filename):
"""Reading signatures with handling of leading zeros in R and S"""
signatures = []
with open(filename, 'r') as f:
lines = f.readlines()
current_r, current_s, current_z = None, None, None
for line in lines:
line = line.strip()
if line.startswith("R ="):
current_r = line.split('=')[1].strip().lower()
current_r = current_r.lstrip("0")
if len(current_r) > 64:
continue
current_r = current_r.zfill(64)
elif line.startswith("S ="):
current_s = line.split('=')[1].strip().lower()
current_s = current_s.lstrip("0")
if len(current_s) > 64:
continue
current_s = current_s.zfill(64)
elif line.startswith("Z ="):
current_z = line.split('=')[1].strip().lower()
if current_r and current_s and current_z:
signatures.append((current_r, current_s, current_z))
return signatures
def validate_signature(k, r_hex, s_hex, z_hex):
"""Signature verification for given k (corrected version)"""
r = int(r_hex, 16)
# Calculate R = k*G directly via coincurve
try:
# Generate temporary public key from k (this is R = k*G)
pubkey = PublicKey.from_secret(k.to_bytes(32, 'big'))
# Get x-coordinate of R (first 32 bytes without prefix)
expected_r_bytes = pubkey.format()[1:33]
expected_r = int.from_bytes(expected_r_bytes, 'big') % n
except:
return False
return expected_r == r
def kangaroo_algorithm(start, end, signatures, pubkey=None):
"""Kangaroo algorithm with CPU load limitation"""
global stop_flag
N = 1 << 18 # Number of points
jumps = [1 << random.randint(0, 18) for _ in range(16)]
last_print_time = time.time()
def f(x):
return x + jumps[x % len(jumps)]
# Kangaroo initialization
xT, aT = end, 0
xW, aW = (end - start) // 2, 0
# Main loop with pauses to reduce load
for iteration in range(N):
if stop_flag:
return None
xT = f(xT) % n
aT += 1
k_candidate = (xT - xW + aW) % n
if start <= k_candidate <= end:
for r, s, z in signatures:
if validate_signature(k_candidate, r, s, z):
return k_candidate
# Progress output every 5 seconds
current_time = time.time()
if current_time - last_print_time > 5:
progress = (iteration / N) * 100
print(f"[{datetime.now().strftime('%H:%M:%S')}] Progress: {progress:.2f}% | Checked: {iteration}/{N} iterations | Current candidate: {hex(k_candidate)}")
last_print_time = current_time
# Pause to reduce load (10ms every 1000 iterations)
if iteration % 1000 == 0:
time.sleep(0.01)
# Continue search with pauses
while not stop_flag:
xW = f(xW) % n
aW += 1
for d in range(-10, 10):
k_candidate = (xW - xT + aT + d) % n
if start <= k_candidate <= end:
for r, s, z in signatures:
if validate_signature(k_candidate, r, s, z):
return k_candidate
# Progress output every 5 seconds
current_time = time.time()
if current_time - last_print_time > 5:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Continuing search | Checked: {aW} additional iterations | Current candidate: {hex(k_candidate)}")
last_print_time = current_time
# Pause to reduce load
if aW % 1000 == 0:
time.sleep(0.02)
return None
def worker(args):
"""Function for multi-threaded operation with load limitation"""
try:
chunk_start, chunk_end, signatures, pubkey = args
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting search in range {hex(chunk_start)} - {hex(chunk_end)}")
# Artificial speed limit for this worker
start_time = time.time()
k = kangaroo_algorithm(chunk_start, chunk_end, signatures, pubkey)
if k is not None:
return [(hex(k), r, s, z) for r, s, z in signatures if validate_signature(k, r, s, z)]
except KeyboardInterrupt:
pass
return []
def main():
"""Main function with load control"""
global stop_flag
try:
open(output_file, 'w').close()
signatures = parse_signature_file(input_file)
if not signatures:
print("No signatures to process!")
return
print(f"[{datetime.now().strftime('%H:%M:%S')}] Loaded {len(signatures)} signatures")
pubkey = None
# Define search range
start_k = n - 2**40
end_k = n - 1
print(f"[{datetime.now().strftime('%H:%M:%S')}] Searching in range {hex(start_k)} - {hex(end_k)}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Using {max(1, cpu_count() // 2)} of {cpu_count()} CPU cores")
# Use only part of cores to reduce load
num_cores = max(1, cpu_count() // 2) # Use half of the cores
chunk_size = (end_k - start_k) // num_cores
chunks = [(start_k + i*chunk_size,
start_k + (i+1)*chunk_size -1 if i < num_cores-1 else end_k,
signatures, pubkey)
for i in range(num_cores)]
# Start search with process count limitation
with Pool(num_cores, initializer=init_worker) as pool:
results = pool.imap_unordered(worker, chunks)
for res in results:
for k, r, s, z in res:
with open(output_file, 'a') as f:
f.write(f"k: {k}\nR: {r}\nS: {s}\nZ: {z}\n\n")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Found k: {k}")
# Check stop flag
if stop_flag:
break
except KeyboardInterrupt:
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Received interrupt signal, stopping...")
stop_flag = True
finally:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Execution time: {time.time() - start_time:.2f} sec")
if __name__ == "__main__":
start_time = time.time()
main()