[Cache Request] deepseek-ai/deepseek-coder-33b-instruct
Please add the following model to the neuron cache#!/usr/bin/env python3
"""
Bitcoin Signature Script Pattern Analyzer
This script analyzes Bitcoin transaction signature scripts to:
- Parse signature scripts completely
- Extract public keys and signatures
- Identify repeated signing patterns
- Verify signature compatibility with public keys
- Detect pattern repetition across multiple scripts
Requirements:
- pip install ecdsa hashlib
"""
import hashlib
import struct
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Optional
import binascii
class BlockchainScraper:
def init(self, api_key: Optional[str] = None):
self.api_key = api_key
self.base_urls = {
'blockchair': 'https://api.blockchair.com/bitcoin',
'blockchain_info': 'https://blockchain.info/rawaddr',
'blockcypher': 'https://api.blockcypher.com/v1/btc/main',
'mempool_space': 'https://mempool.space/api'
}
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Bitcoin-SigScript-Analyzer/1.0'
})
def get_address_transactions(self, address: str, limit: int = 50, api_provider: str = 'mempool_space') -> List[Dict]:
"""Fetch transactions for a given Bitcoin address"""
try:
if api_provider == 'mempool_space':
return self._get_mempool_transactions(address, limit)
elif api_provider == 'blockcypher':
return self._get_blockcypher_transactions(address, limit)
elif api_provider == 'blockchain_info':
return self._get_blockchain_info_transactions(address, limit)
else:
raise ValueError(f"Unsupported API provider: {api_provider}")
except Exception as e:
print(f"Error fetching transactions for {address}: {e}")
return []
def _get_mempool_transactions(self, address: str, limit: int) -> List[Dict]:
"""Fetch transactions using mempool.space API"""
url = f"{self.base_urls['mempool_space']}/address/{address}/txs"
response = self.session.get(url)
response.raise_for_status()
transactions = response.json()
return transactions[:limit] if len(transactions) > limit else transactions
def _get_blockcypher_transactions(self, address: str, limit: int) -> List[Dict]:
"""Fetch transactions using BlockCypher API"""
url = f"{self.base_urls['blockcypher']}/addrs/{address}"
params = {'limit': limit}
if self.api_key:
params['token'] = self.api_key
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get('txrefs', [])
def _get_blockchain_info_transactions(self, address: str, limit: int) -> List[Dict]:
"""Fetch transactions using Blockchain.info API"""
url = f"{self.base_urls['blockchain_info']}/{address}"
params = {'limit': limit, 'format': 'json'}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get('txs', [])
def get_transaction_details(self, txid: str, api_provider: str = 'mempool_space') -> Dict:
"""Fetch detailed transaction data by transaction ID"""
try:
if api_provider == 'mempool_space':
url = f"{self.base_urls['mempool_space']}/tx/{txid}"
elif api_provider == 'blockcypher':
url = f"{self.base_urls['blockcypher']}/txs/{txid}"
if self.api_key:
url += f"?token={self.api_key}"
elif api_provider == 'blockchain_info':
url = f"https://blockchain.info/rawtx/{txid}?format=json"
else:
raise ValueError(f"Unsupported API provider: {api_provider}")
response = self.session.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching transaction {txid}: {e}")
return {}
def get_block_transactions(self, block_height: int, api_provider: str = 'mempool_space') -> List[Dict]:
"""Fetch all transactions from a specific block"""
try:
if api_provider == 'mempool_space':
url = f"{self.base_urls['mempool_space']}/block-height/{block_height}"
response = self.session.get(url)
block_hash = response.text.strip()
url = f"{self.base_urls['mempool_space']}/block/{block_hash}/txs"
response = self.session.get(url)
return response.json()
elif api_provider == 'blockcypher':
url = f"{self.base_urls['blockcypher']}/blocks/{block_height}"
params = {}
if self.api_key:
params['token'] = self.api_key
response = self.session.get(url, params=params)
data = response.json()
return data.get('txids', [])
except Exception as e:
print(f"Error fetching block {block_height} transactions: {e}")
return []
def extract_signature_scripts(self, transactions: List[Dict]) -> List[str]:
"""Extract signature scripts from transaction data"""
sig_scripts = []
for tx in transactions:
# Handle different API response formats
inputs = tx.get('vin', tx.get('inputs', []))
for input_data in inputs:
# Different APIs use different field names
script_sig = (
input_data.get('scriptSig', {}).get('hex') or
input_data.get('script') or
input_data.get('witness', [{}])[0] if input_data.get('witness') else None
)
if script_sig and isinstance(script_sig, str):
sig_scripts.append(script_sig)
elif script_sig and isinstance(script_sig, list):
# Handle witness data
for witness_item in script_sig:
if isinstance(witness_item, str) and len(witness_item) > 10:
sig_scripts.append(witness_item)
return [script for script in sig_scripts if script and len(script) >= 20]
def scrape_recent_transactions(self, num_blocks: int = 10, api_provider: str = 'mempool_space') -> List[str]:
"""Scrape signature scripts from recent blocks"""
print(f"Scraping signature scripts from last {num_blocks} blocks...")
try:
# Get latest block height
if api_provider == 'mempool_space':
response = self.session.get(f"{self.base_urls['mempool_space']}/blocks/tip/height")
latest_height = int(response.text.strip())
else:
# Fallback method
latest_height = 820000 # Approximate current height, update as needed
all_sig_scripts = []
for i in range(num_blocks):
block_height = latest_height - i
print(f"Processing block {block_height}...")
transactions = self.get_block_transactions(block_height, api_provider)
# If we get transaction IDs, fetch full transaction data
if transactions and isinstance(transactions[0], str):
detailed_txs = []
for txid in transactions[:20]: # Limit to avoid rate limits
tx_detail = self.get_transaction_details(txid, api_provider)
if tx_detail:
detailed_txs.append(tx_detail)
time.sleep(0.1) # Rate limiting
transactions = detailed_txs
sig_scripts = self.extract_signature_scripts(transactions)
all_sig_scripts.extend(sig_scripts)
print(f"Found {len(sig_scripts)} signature scripts in block {block_height}")
time.sleep(0.5) # Rate limiting between blocks
print(f"Total signature scripts collected: {len(all_sig_scripts)}")
return all_sig_scripts
except Exception as e:
print(f"Error scraping recent transactions: {e}")
return []
def scrape_address_patterns(self, addresses: List[str], api_provider: str = 'mempool_space') -> List[str]:
"""Scrape signature scripts from specific addresses"""
print(f"Scraping signature scripts from {len(addresses)} addresses...")
all_sig_scripts = []
for address in addresses:
print(f"Processing address: {address}")
transactions = self.get_address_transactions(address, limit=50, api_provider=api_provider)
# Get detailed transaction data if needed
if transactions:
detailed_txs = []
for tx in transactions[:10]: # Limit to avoid rate limits
if isinstance(tx, dict) and 'txid' in tx:
tx_detail = self.get_transaction_details(tx['txid'], api_provider)
if tx_detail:
detailed_txs.append(tx_detail)
elif isinstance(tx, dict):
detailed_txs.append(tx)
time.sleep(0.1) # Rate limiting
sig_scripts = self.extract_signature_scripts(detailed_txs)
all_sig_scripts.extend(sig_scripts)
print(f"Found {len(sig_scripts)} signature scripts for {address}")
time.sleep(0.5) # Rate limiting between addresses
print(f"Total signature scripts collected: {len(all_sig_scripts)}")
return all_sig_scripts
class SigScriptAnalyzer:
def init(self):
self.patterns = defaultdict(list)
self.pubkey_patterns = defaultdict(list)
self.signature_patterns = defaultdict(list)
def parse_script(self, script_hex: str) -> Dict:
"""Parse a signature script and extract components"""
try:
script_bytes = bytes.fromhex(script_hex)
components = []
i = 0
while i < len(script_bytes):
opcode = script_bytes[i]
i += 1
if opcode == 0x00: # OP_0
components.append({"type": "OP_0", "data": None})
elif 1 <= opcode <= 75: # Push data
if i + opcode <= len(script_bytes):
data = script_bytes[i:i+opcode]
components.append({
"type": "PUSH_DATA",
"length": opcode,
"data": data.hex(),
"raw_data": data
})
i += opcode
else:
break
elif opcode == 0x76: # OP_DUP
components.append({"type": "OP_DUP", "data": None})
elif opcode == 0x88: # OP_EQUALVERIFY
components.append({"type": "OP_EQUALVERIFY", "data": None})
elif opcode == 0x87: # OP_EQUAL
components.append({"type": "OP_EQUAL", "data": None})
elif opcode == 0xA9: # OP_HASH160
components.append({"type": "OP_HASH160", "data": None})
elif opcode == 0xAC: # OP_CHECKSIG
components.append({"type": "OP_CHECKSIG", "data": None})
elif opcode == 0xAE: # OP_CHECKMULTISIG
components.append({"type": "OP_CHECKMULTISIG", "data": None})
else:
components.append({"type": f"OP_{opcode:02X}", "data": None})
return {
"script_hex": script_hex,
"components": components,
"signatures": self.extract_signatures(components),
"pubkeys": self.extract_pubkeys(components),
"pattern": self.generate_pattern(components)
}
except Exception as e:
return {"error": str(e), "script_hex": script_hex}
def extract_signatures(self, components: List[Dict]) -> List[Dict]:
"""Extract DER signatures from script components"""
signatures = []
for comp in components:
if comp["type"] == "PUSH_DATA" and comp["raw_data"]:
data = comp["raw_data"]
# Check if it looks like a DER signature (starts with 0x30)
if len(data) > 6 and data[0] == 0x30:
sig_len = data[1]
if sig_len + 2 <= len(data):
signatures.append({
"der_signature": data[:sig_len + 2].hex(),
"sighash_type": data[sig_len + 2] if sig_len + 2 < len(data) else None,
"r_value": self.extract_r_value(data),
"s_value": self.extract_s_value(data)
})
return signatures
def extract_pubkeys(self, components: List[Dict]) -> List[Dict]:
"""Extract public keys from script components"""
pubkeys = []
for comp in components:
if comp["type"] == "PUSH_DATA" and comp["raw_data"]:
data = comp["raw_data"]
# Compressed pubkey (33 bytes, starts with 0x02 or 0x03)
if len(data) == 33 and data[0] in [0x02, 0x03]:
pubkeys.append({
"type": "compressed",
"pubkey": data.hex(),
"x_coord": data[1:].hex()
})
# Uncompressed pubkey (65 bytes, starts with 0x04)
elif len(data) == 65 and data[0] == 0x04:
pubkeys.append({
"type": "uncompressed",
"pubkey": data.hex(),
"x_coord": data[1:33].hex(),
"y_coord": data[33:].hex()
})
return pubkeys
def extract_r_value(self, der_sig: bytes) -> Optional[str]:
"""Extract R value from DER signature"""
try:
if len(der_sig) < 6 or der_sig[0] != 0x30:
return None
r_len = der_sig[3]
if r_len + 4 > len(der_sig):
return None
return der_sig[4:4+r_len].hex()
except:
return None
def extract_s_value(self, der_sig: bytes) -> Optional[str]:
"""Extract S value from DER signature"""
try:
if len(der_sig) < 6 or der_sig[0] != 0x30:
return None
r_len = der_sig[3]
s_start = 4 + r_len + 2 # Skip R value and S header
if s_start >= len(der_sig):
return None
s_len = der_sig[s_start - 1]
if s_start + s_len > len(der_sig):
return None
return der_sig[s_start:s_start+s_len].hex()
except:
return None
def generate_pattern(self, components: List[Dict]) -> str:
"""Generate a pattern string from script components"""
pattern_parts = []
for comp in components:
if comp["type"] == "PUSH_DATA":
if comp["raw_data"]:
data = comp["raw_data"]
if len(data) == 33 and data[0] in [0x02, 0x03]:
pattern_parts.append("PUBKEY_COMPRESSED")
elif len(data) == 65 and data[0] == 0x04:
pattern_parts.append("PUBKEY_UNCOMPRESSED")
elif len(data) > 6 and data[0] == 0x30:
pattern_parts.append("DER_SIGNATURE")
else:
pattern_parts.append(f"DATA_{len(data)}")
else:
pattern_parts.append("EMPTY_PUSH")
else:
pattern_parts.append(comp["type"])
return " -> ".join(pattern_parts)
def analyze_scripts(self, scripts: List[str]) -> Dict:
"""Analyze multiple signature scripts"""
parsed_scripts = []
all_patterns = []
for script in scripts:
parsed = self.parse_script(script)
parsed_scripts.append(parsed)
if "pattern" in parsed:
all_patterns.append(parsed["pattern"])
# Store patterns by type
self.patterns[parsed["pattern"]].append(script)
# Store pubkey patterns
for pubkey in parsed.get("pubkeys", []):
self.pubkey_patterns[pubkey["pubkey"]].append(script)
# Store signature patterns
for sig in parsed.get("signatures", []):
if sig.get("r_value") and sig.get("s_value"):
sig_pattern = f"{sig['r_value'][:8]}...{sig['s_value'][:8]}"
self.signature_patterns[sig_pattern].append(script)
# Find repeated patterns
pattern_counts = Counter(all_patterns)
repeated_patterns = {k: v for k, v in pattern_counts.items() if v > 1}
return {
"total_scripts": len(scripts),
"parsed_scripts": parsed_scripts,
"pattern_counts": dict(pattern_counts),
"repeated_patterns": repeated_patterns,
"unique_patterns": len(pattern_counts),
"pubkey_reuse": {k: len(v) for k, v in self.pubkey_patterns.items() if len(v) > 1},
"signature_reuse": {k: len(v) for k, v in self.signature_patterns.items() if len(v) > 1}
}
def verify_pubkey_signature_compatibility(self, pubkey_hex: str, signature_hex: str, message_hash: str) -> bool:
"""Verify if a signature is compatible with a public key"""
try:
# This is a simplified check - in practice, you'd need the actual message
# and perform ECDSA verification
pubkey_bytes = bytes.fromhex(pubkey_hex)
sig_bytes = bytes.fromhex(signature_hex)
# Basic format checks
if len(pubkey_bytes) not in [33, 65]:
return False
if len(sig_bytes) < 6 or sig_bytes[0] != 0x30:
return False
# Extract signature components
r_len = sig_bytes[3]
s_start = 4 + r_len + 2
if s_start >= len(sig_bytes):
return False
# This is a basic compatibility check
# Real verification would require ECDSA verification with the message
return True
except Exception:
return False
def find_common_patterns(self) -> Dict:
"""Find the most common signing patterns"""
common_patterns = {}
# Most common script patterns
if self.patterns:
most_common_pattern = max(self.patterns.keys(), key=lambda k: len(self.patterns[k]))
common_patterns["most_common_script_pattern"] = {
"pattern": most_common_pattern,
"count": len(self.patterns[most_common_pattern]),
"scripts": self.patterns[most_common_pattern]
}
# Most reused public keys
if self.pubkey_patterns:
most_reused_pubkey = max(self.pubkey_patterns.keys(), key=lambda k: len(self.pubkey_patterns[k]))
common_patterns["most_reused_pubkey"] = {
"pubkey": most_reused_pubkey,
"count": len(self.pubkey_patterns[most_reused_pubkey]),
"scripts": self.pubkey_patterns[most_reused_pubkey]
}
return common_patterns
def main():
"""Enhanced main function with blockchain scraping capabilities"""
print("Bitcoin Signature Script Pattern Analyzer with Blockchain Integration")
print("=" * 70)
# Initialize scraper and analyzer
scraper = BlockchainScraper()
analyzer = SigScriptAnalyzer()
# Choose data source
print("\nData Source Options:")
print("1. Scrape recent blocks from blockchain")
print("2. Analyze specific addresses")
print("3. Use example data")
try:
choice = input("Enter choice (1-3) or press Enter for option 3: ").strip()
if choice == "1":
# Scrape recent blocks
num_blocks = int(input("Number of recent blocks to analyze (default 5): ") or "5")
api_provider = input("API provider (mempool_space/blockcypher/blockchain_info) [default: mempool_space]: ").strip() or "mempool_space"
print(f"\nScraping signature scripts from last {num_blocks} blocks using {api_provider}...")
sig_scripts = scraper.scrape_recent_transactions(num_blocks, api_provider)
elif choice == "2":
# Analyze specific addresses
addresses_input = input("Enter Bitcoin addresses (comma-separated): ").strip()
addresses = [addr.strip() for addr in addresses_input.split(",") if addr.strip()]
if not addresses:
print("No valid addresses provided. Using example data.")
sig_scripts = get_example_scripts()
else:
api_provider = input("API provider (mempool_space/blockcypher/blockchain_info) [default: mempool_space]: ").strip() or "mempool_space"
sig_scripts = scraper.scrape_address_patterns(addresses, api_provider)
else:
# Use example data
print("Using example signature scripts...")
sig_scripts = get_example_scripts()
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
return
except Exception as e:
print(f"Error during data collection: {e}")
print("Falling back to example data...")
sig_scripts = get_example_scripts()
if not sig_scripts:
print("No signature scripts found. Exiting.")
return
print(f"\nAnalyzing {len(sig_scripts)} signature scripts...")
print("=" * 50)
# Analyze the scripts
results = analyzer.analyze_scripts(sig_scripts)
print(f"Total scripts analyzed: {results['total_scripts']}")
print(f"Unique patterns found: {results['unique_patterns']}")
print("\nPattern Distribution:")
for pattern, count in results['pattern_counts'].items():
print(f" {count}x: {pattern}")
if results['repeated_patterns']:
print("\nRepeated Patterns:")
for pattern, count in results['repeated_patterns'].items():
print(f" {count}x: {pattern}")
print(f" Example scripts using this pattern:")
example_scripts = analyzer.patterns[pattern][:2] # Show first 2 examples
for i, script in enumerate(example_scripts):
print(f" {i+1}. {script[:60]}...")
if results['pubkey_reuse']:
print("\nPublic Key Reuse:")
for pubkey, count in results['pubkey_reuse'].items():
print(f" {count}x: {pubkey[:40]}...")
if results['signature_reuse']:
print("\nSignature Pattern Reuse:")
for sig_pattern, count in results['signature_reuse'].items():
print(f" {count}x: {sig_pattern}")
# Find common patterns
common = analyzer.find_common_patterns()
if common:
print("\nMost Common Patterns:")
for pattern_type, info in common.items():
print(f" {pattern_type}: {info['count']} occurrences")
# Statistical summary
print("\nStatistical Summary:")
print("-" * 30)
total_sigs = sum(len(script.get('signatures', [])) for script in results['parsed_scripts'] if 'signatures' in script)
total_pubkeys = sum(len(script.get('pubkeys', [])) for script in results['parsed_scripts'] if 'pubkeys' in script)
print(f"Total signatures extracted: {total_sigs}")
print(f"Total public keys extracted: {total_pubkeys}")
print(f"Average signatures per script: {total_sigs/len(results['parsed_scripts']):.2f}")
print(f"Average public keys per script: {total_pubkeys/len(results['parsed_scripts']):.2f}")
# Show detailed analysis for most common pattern
if results['repeated_patterns']:
most_common = max(results['repeated_patterns'].items(), key=lambda x: x[1])
print(f"\nDetailed Analysis of Most Common Pattern:")
print(f"Pattern: {most_common[0]}")
print(f"Occurrences: {most_common[1]}")
# Show first example in detail
example_scripts = analyzer.patterns[most_common[0]]
if example_scripts:
example_parsed = analyzer.parse_script(example_scripts[0])
print("Example breakdown:")
for i, component in enumerate(example_parsed.get('components', [])):
print(f" {i+1}. {component['type']}: {component.get('data', 'N/A')[:40]}{'...' if component.get('data') and len(component.get('data', '')) > 40 else ''}")
def get_example_scripts():
"""Return example signature scripts for testing"""
return [
# P2PKH scripts
"483045022100f3581e1972ae8ac7c7367a7a253bc1135223adb9a468bb3a59233f45bc578380022059af01ca17d00e41928954ac7c28a9b5e9e8f4e3f0c5b8b3d8b5c2a1b2c3d4e501210279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798",
"47304402204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860a4acdd12909d831cc56cbbac4622082221a8768d1d0901210279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798",
# Multisig script
"004730440220181522ec8eca07de4860a4acdd12909d831cc56cbbac4622082221a8768d1d09022041f4d4b1b0c4c4b2b2a2b2d2e2f2a2b2c2d2e2f2a2b2c2d2e2f2a2b2c2d2e2f201",
# P2SH script
"160014b7536c788837f88b0c8e8e6f3d5f7c5a4c8b2d1e473044022041b1a0c5c9b7e1e3f1d1c1b1a1918171615141312110090807060504030201000220123456789abcdef123456789abcdef123456789abcdef123456789abcdef1234",
# Repeated pattern
"483045022100f3581e1972ae8ac7c7367a7a253bc1135223adb9a468bb3a59233f45bc578380022059af01ca17d00e41928954ac7c28a9b5e9e8f4e3f0c5b8b3d8b5c2a1b2c3d4e501210279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798"
]
if name == "main":
main()
Made with
1