本文章利用 Python 实现一个简单的功能较为完善的区块链系统(包括区块链结构、账户、钱包、转账),采用的共识机制是 POW。
一、区块与区块链结构
Block.py
import hashlib
from datetime import datetime
class Block:
"""
区块链结构:
prev_hash: 父区块哈希值
data: 区块内容
timestamp: 区块创建时间
hash: 区块哈希值
"""
def __init__(self, data, prev_hash):
# 将传入的父区块哈希值和数据保存到变量中
self.prev_hash = prev_hash
self.data = data
# 获得当前的时间
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 计算区块哈希值
# 获取哈希对象
message = hashlib.sha256()
# 先将数据内容转为字符串并进行编码,再将它们哈希
# 注意:update() 方法现在只接受 bytes 类型的数据,不接收 str 类型
message.update(str(self.prev_hash).encode('utf-8'))
message.update(str(self.prev_hash).encode('utf-8'))
message.update(str(self.prev_hash).encode('utf-8'))
# update() 更新 hash 对象,连续的调用该方法相当于连续的追加更新
# 返回字符串类型的消息摘要
self.hash = message.hexdigest()
BlockChain.py
from Block import Block class BlockChain: """ 区块链结构体 blocks: 包含区块的列表 """ def __init__(self): self.blocks = [] def add_block(self, block): """ 添加区块 :param block: :return: """ self.blocks.append(block) # 新建区块 genesis_block = Block(data="创世区块", prev_hash="") new_block1 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash) new_block2 = Block(data="张三转给王五三个比特币", prev_hash=genesis_block.hash) # 新建一个区块链对象 blockChain = BlockChain() # 将刚才新建的区块加入区块链 blockChain.add_block(genesis_block) blockChain.add_block(new_block1) blockChain.add_block(new_block2) # 打印区块链信息 print("区块链包含区块个数为:%d\n" % len(blockChain.blocks)) blockHeight = 0 for block in blockChain.blocks: print(f"本区块高度为:{blockHeight}") print(f"父区块哈希:{block.prev_hash}") print(f"区块内容:{block.data}") print(f"区块哈希:{block.hash}") print() blockHeight += 1
测试结果
二、加入工作量证明(POW)
将工作量证明加入到 Block.py 中
import hashlib
from datetime import datetime
from time import time
class Block:
"""
区块链结构:
prev_hash: 父区块哈希值
data: 区块内容
timestamp: 区块创建时间
hash: 区块哈希值
nonce: 随机数
"""
def __init__(self, data, prev_hash):
# 将传入的父区块哈希值和数据保存到变量中
self.prev_hash = prev_hash
self.data = data
# 获得当前的时间
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置随机数、哈希初始值为 None
self.nonce = None
self.hash = None
# 类的 __repr__() 方法定义了实例化对象的输出信息
def __repr__(self):
return f"区块内容:{self.data}\n区块哈希值:{self.hash}"
class ProofOfWork:
"""
工作量证明:
block: 区块
difficulty: 难度值
"""
def __init__(self, block, difficult=5):
self.block = block
# 定义出块难度,默认为 5,表示有效哈希值以 5 个零开头
self.difficulty = difficult
def mine(self):
"""
挖矿函数
:return:
"""
i = 0
prefix = '0' * self.difficulty
while True:
message = hashlib.sha256()
message.update(str(self.block.prev_hash).encode('utf-8'))
message.update(str(self.block.data).encode('utf-8'))
message.update(str(self.block.timestamp).encode('utf-8'))
message.update(str(i).encode('utf-8'))
# digest() 返回摘要,作为二进制数据字符串值
# hexdigest() 返回摘要,作为十六进制数据字符串值
digest = message.hexdigest()
# str.startswith(prefix) 检测字符串是否是以 prefix(字符串)开头,返回布尔值
if digest.startswith(prefix):
# 幸运数字
self.block.nonce = i
# 区块哈希值为十六进制数据字符串摘要
self.block.hash = digest
return self.block
i += 1
def validate(self):
"""
验证有效性
:return:
"""
message = hashlib.sha256()
message.update(str(self.block.prev_hash).encode('utf-8'))
message.update(str(self.block.data).encode('utf-8'))
message.update(str(self.block.timestamp).encode('utf-8'))
message.update(str(self.block.nonce).encode('utf-8'))
digest = message.hexdigest()
prefix = '0' * self.difficulty
return digest.startswith(prefix)
# ++++++++测试++++++++
# 定义一个区块
b = Block(data="测试", prev_hash="")
# 定义一个工作量证明
w = ProofOfWork(b)
# 开始时间
start_time = time()
# 挖矿,并统计函数执行时间
print("+++开始挖矿+++")
valid_block = w.mine()
# 结束时间
end_time = time()
print(f"挖矿花费时间:{end_time - start_time}秒")
# 验证区块
print(f"区块哈希值是否符合规则:{w.validate()}")
print(f"区块哈希值为:{b.hash}")
测试结果
更新 BlockChain.py
from Block import Block, ProofOfWork
class BlockChain:
"""
区块链结构体
blocks: 包含区块的列表
"""
def __init__(self):
self.blocks = []
def add_block(self, block):
"""
添加区块
:param block:
:return:
"""
self.blocks.append(block)
# 新建一个区块链对象
blockChain = BlockChain()
# 新建区块
block1 = Block(data="创世区块", prev_hash="")
w1 = ProofOfWork(block1)
genesis_block = w1.mine()
blockChain.add_block(genesis_block)
block2 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash)
w2 = ProofOfWork(block2)
block = w2.mine()
blockChain.add_block(block)
block3 = Block(data="张三转给王五三个比特币", prev_hash=block.hash)
w3 = ProofOfWork(block3)
block = w3.mine()
blockChain.add_block(block)
# 打印区块链信息
print("区块链包含区块个数为:%d\n" % len(blockChain.blocks))
blockHeight = 0
for block in blockChain.blocks:
print(f"本区块高度为:{blockHeight}")
print(f"父区块哈希:{block.prev_hash}")
print(f"区块内容:{block.data}")
print(f"区块哈希:{block.hash}")
print()
blockHeight += 1
测试结果
三、实现钱包、账户、交易功能
实现钱包、账户、交易功能要先安装非对称加密算法库 ecdsa。如果网速慢,引用下面这个网站
-i https://pypi.tuna.tsinghua.edu.cn/simple
添加钱包、账户功能 Wallet.py
import base64
import binascii
from hashlib import sha256
# 导入椭圆曲线算法
from ecdsa import SigningKey, SECP256k1, VerifyingKey
class Wallet:
"""
钱包
"""
def __init__(self):
"""
钱包初始化时基于椭圆曲线生成一个唯一的秘钥对,代表区块链上一个唯一的账户
"""
# 生成私钥
self._private_key = SigningKey.generate(curve=SECP256k1)
# 基于私钥生成公钥
self._public_key = self._private_key.get_verifying_key()
@property
def address(self):
"""
这里通过公钥生成地址
"""
h = sha256(self._public_key.to_pem())
# 地址先由公钥进行哈希算法,再进行 Base64 计算而成
return base64.b64encode(h.digest())
@property
def pubkey(self):
"""
返回公钥字符串
"""
return self._public_key.to_pem()
def sign(self, message):
"""
生成数字签名
"""
h = sha256(message.encode('utf8'))
# 利用私钥生成签名
# 签名生成的是一串二进制字符串,为了便于查看,这里转换为 ASCII 字符串进行输出
return binascii.hexlify(self._private_key.sign(h.digest()))
def verify_sign(pubkey, message, signature):
"""
验证签名
"""
verifier = VerifyingKey.from_pem(pubkey)
h = sha256(message.encode('utf8'))
return verifier.verify(binascii.unhexlify(signature), h.digest())
实现转账功能 Transaction.py
import json
class Transaction:
"""
交易的结构
"""
def __init__(self, sender, recipient, amount):
"""
初始化交易,设置交易的发送方、接收方和交易数量
"""
# 交易发送者的公钥
self.pubkey = None
# 交易的数字签名
self.signature = None
if isinstance(sender, bytes):
sender = sender.decode('utf-8')
self.sender = sender # 发送方
if isinstance(recipient, bytes):
recipient = recipient.decode('utf-8')
self.recipient = recipient # 接收方
self.amount = amount # 交易数量
def set_sign(self, signature, pubkey):
"""
为了便于验证这个交易的可靠性,需要发送方输入他的公钥和签名
"""
self.signature = signature # 签名
self.pubkey = pubkey # 发送方公钥
def __repr__(self):
"""
交易大致可分为两种,一是挖矿所得,而是转账交易
挖矿所得无发送方,以此进行区分显示不同内容
"""
if self.sender:
s = f"从{self.sender}转自{self.recipient}{self.amount}个加密货币"
elif self.recipient:
s = f"{self.recipient}挖矿所得{self.amount}个加密货币"
else:
s = "error"
return s
class TransactionEncoder(json.JSONEncoder):
"""
定义Json的编码类,用来序列化Transaction
"""
def default(self, obj):
if isinstance(obj, Transaction):
return obj.__dict__
else:
return json.JSONEncoder.default(self, obj)
# return super(TransactionEncoder, self).default(obj)
更新 Block.py
import hashlib
import json
from datetime import datetime
from Transaction import Transaction, TransactionEncoder
class Block:
"""
区块结构
prev_hash: 父区块哈希值
transactions: 交易对
timestamp: 区块创建时间
hash: 区块哈希值
Nonce: 随机数
"""
def __init__(self, transactions, prev_hash):
# 将传入的父哈希值和数据保存到类变量中
self.prev_hash = prev_hash
# 交易列表
self.transactions = transactions
# 获取当前时间
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置Nonce和哈希的初始值为None
self.nonce = None
self.hash = None
# 类的 __repr__() 方法定义了实例化对象的输出信息
def __repr__(self):
return f"区块内容:{self.transactions}\n区块哈希值:{self.hash}"
class ProofOfWork:
"""
工作量证明
block: 区块
difficulty: 难度值
"""
def __init__(self, block, miner, difficult=5):
self.block = block
self.miner = miner
# 定义工作量难度,默认为5,表示有效的哈希值以5个“0”开头
self.difficulty = difficult
# 添加挖矿奖励
self.reward_amount = 1
def mine(self):
"""
挖矿函数
"""
i = 0
prefix = '0' * self.difficulty
# 设置挖矿自动生成交易信息,添加挖矿奖励
t = Transaction(
sender="",
recipient=self.miner.address,
amount=self.reward_amount,
)
sig = self.miner.sign(json.dumps(t, cls=TransactionEncoder))
t.set_sign(sig, self.miner.pubkey)
self.block.transactions.append(t)
while True:
message = hashlib.sha256()
message.update(str(self.block.prev_hash).encode('utf-8'))
# 更新区块中的交易数据
# message.update(str(self.block.data).encode('utf-8'))
message.update(str(self.block.transactions).encode('utf-8'))
message.update(str(self.block.timestamp).encode('utf-8'))
message.update(str(i).encode("utf-8"))
digest = message.hexdigest()
if digest.startswith(prefix):
self.block.nonce = i
self.block.hash = digest
return self.block
i += 1
def validate(self):
"""
验证有效性
"""
message = hashlib.sha256()
message.update(str(self.block.prev_hash).encode('utf-8'))
# 更新区块中的交易数据
# message.update(str(self.block.data).encode('utf-8'))
message.update(json.dumps(self.block.transactions).encode('utf-8'))
message.update(str(self.block.timestamp).encode('utf-8'))
message.update(str(self.block.nonce).encode('utf-8'))
digest = message.hexdigest()
prefix = '0' * self.difficulty
return digest.startswith(prefix)
更新 BlockChain.py
from Block import Block, ProofOfWork
from Transaction import Transaction
from Wallet import Wallet, verify_sign
class BlockChain:
"""
区块链结构体
blocks: 包含的区块列表
"""
def __init__(self):
self.blocks = []
def add_block(self, block):
"""
添加区块
"""
self.blocks.append(block)
def print_list(self):
print(f"区块链包含个数为:{len(self.blocks)}")
for block in self.blocks:
height = 0
print(f"区块链高度为:{height}")
print(f"父区块为:{block.prev_hash}")
print(f"区块内容为:{block.transactions}")
print(f"区块哈希值为:{block.hash}")
height += 1
print()
为了方便我们对区块链进行操作,我们可以在 BlockChain.py 中补充一些方法
# 传入用户和区块链,返回用户的“余额”
def get_balance(user, blockchain):
balance = 0
for block in blockchain.blocks:
for t in block.transactions:
if t.sender == user.address.decode():
balance -= t.amount
elif t.recipient == user.address.decode():
balance += t.amount
return balance
# user生成创世区块(新建区块链),并添加到区块链中
def generate_genesis_block(user):
blockchain = BlockChain()
new_block = Block(transactions=[], prev_hash="")
w = ProofOfWork(new_block, user)
genesis_block = w.mine()
blockchain.add_block(genesis_block)
# 返回创世区块
return blockchain
# 用户之间进行交易并记入交易列表
def add_transaction(sender, recipient, amount):
# 新建交易
new_transaction = Transaction(
sender=sender.address,
recipient=recipient.address,
amount=amount
)
# 生成数字签名
sig = sender.sign(str(new_transaction))
# 传入付款方的公钥和签名
new_transaction.set_sign(sig, sender.pubkey)
return new_transaction
# 验证交易,若验证成功则加入交易列表
def verify_new_transaction(new_transaction, transactions):
if verify_sign(new_transaction.pubkey,
str(new_transaction),
new_transaction.signature
):
# 验证交易签名没问题,加入交易列表
print("交易验证成功")
transactions.append(new_transaction)
else:
print("交易验证失败")
# 矿工将全部验证成功的交易列表打包出块
def generate_block(miner, transactions, blockchain):
new_block = Block(transactions=transactions,
prev_hash=blockchain.blocks[len(blockchain.blocks) - 1].hash)
print("生成新的区块...")
# 挖矿
w = ProofOfWork(new_block, miner)
block = w.mine()
print("将新区块添加到区块链中")
blockchain.add_block(block)
进行测试
# 新建交易列表
transactions = []
# 创建 3 个用户
alice = Wallet()
tom = Wallet()
bob = Wallet()
print("alice创建创世区块...")
blockchain = generate_genesis_block(alice)
print()
print(f"alice 的余额为{get_balance(alice, blockchain)}个比特币")
print(f"tom 的余额为{get_balance(tom, blockchain)}个比特币")
print(f"bob 的余额为{get_balance(bob, blockchain)}个比特币")
print()
# 打印区块链信息
blockchain.print_list()
print("新增交易:alice 转账 0.5 比特币给 tom")
nt = add_transaction(alice, tom, 0.5)
print()
verify_new_transaction(nt, transactions)
print(f"矿工 bob 将全部验证成功的交易列表打包出块...")
generate_block(bob, transactions, blockchain)
print("添加完成\n")
# 打印区块链信息
blockchain.print_list()
测试结果