用Python实现一条区块链
点击链接获取可执行文件
本文使用 python 实现了一条简单的区块链。主要分两个模块:实现每个区块的结构和相关的方法、实现整条区块链的结构和相关的方法。下面是对这两个模块的描述。
每个区块主要包括两个成员:区块头和区块体。其中,区块头存储和区块本身相关的一些信息,例如索引值、时间戳、前驱区块的哈希值、本区块的哈希值等;区块体存储交易序列,用一个 list 结构来表示。区块中的方法主要包括两个:根据交易序列获取哈希值、结构化打印本区块的信息。
区块的整体结构如下所示:
# 实现每个区块的结构和相关的方法
import json
from datetime import datetime
# 区块链中每一个block的结构
class Block():
def __init__(self, index=0, timestamp=0, previous_hash=0,
merkle_root=0, nonce=0, transactions=None):
self.block_header = {'index': index,
'timestamp': datetime.timestamp(datetime.now()),
'previous_hash': previous_hash,
'merkle root': merkle_root, 'nonce': nonce}
self.transactions = transactions
# 根据交易信息来计算merkle_root
self.block_header['merkle root'] = self.get_merkle_root().hash_value
# merkle tree中的结点
class MerkleNode:
def __init__(self, node_value, node_id):
self.left = None
self.right = None
self.node_value = node_value
self.node_id = node_id
self.hash_value = hashlib.sha256(str(node_value).encode('utf-8')).hexdigest()
# 获取merkle tree根节点的哈希值
def get_merkle_root(self):
node_id = 0
nodes = []
for transaction in self.transactions:
nodes.append(self.MerkleNode(transaction, node_id))
node_id += 1
while len(nodes) != 1:
# 为当前层的结点创建父结点
parent_nodes = []
# 为每一对node生成一个parent
for i in range(0, len(nodes), 2):
left_node = nodes[i]
if i + 1 < len(nodes):
right_node = nodes[i + 1]
else:
parent_nodes.append(nodes[i])
break
combined_hash_value = left_node.hash_value + right_node.hash_value
parent_node = self.MerkleNode(combined_hash_value, node_id)
node_id += 1
parent_node.left = left_node
parent_node.right = right_node
parent_nodes.append(parent_node)
# 进行下一层结点哈希值的计算
nodes = parent_nodes
return nodes[0]
# 打印本区块的信息
def print_block(self):
print_format = {'block header': self.block_header,
'transactions': self.transactions}
print(json.dumps(print_format, indent=4))
整条区块链主要包括五个成员变量,分别用于定义各区块的容量、存储总的交易序列、存储每一个区块的数据结构、存储新构建的当前区块、挖矿难度。
创世区块产生的逻辑为:当用户初始化一条区块链后,此时区块链中的数据变量也初始化了,但还没有一个实例化的区块。随着交易的累积,到达产生新块的阈值后,区块链会构造出它的第一个新区块,称为创世区块。相比于别的区块,它的特殊点仅仅在于nonce初始值为10,且不需要工作量证明操作,而后续区块的nonce值都根据前驱区块的nonce值计算出来。
新区块产生的逻辑为:每当交易池达到阈值后,分布式网络上的矿工会根据前驱区块的nonce值来不断寻找一个符合要求的数值。当成功找到该值后,该矿工会可以获得构造新块的权力,并将该值作为nonce存储在当前区块中。
工作量证明算法的逻辑为:记前驱区块的nonce值为p,矿工随机选取一个数值p’(本实验中假设该值均为0),循环计算target=hash(p*p’),直到target[0:k]都是0为止(k是设置的挖矿难度)。当成功找到该值后,矿工获得构造新块的权力,并将该值作为nonce存储到新块中。
# 实现整条区块链的结构和相关的方法
class Blockchain(object):
def __init__(self, block_transaction_capacity, difficulty):
"""
创建交易池、本地区块链并添加创世区块
"""
# 每个区块最多储存block_transaction_capacity笔交易
self.block_transaction_capacity = block_transaction_capacity
self.transaction_pool = []
self.chain = []
# TODO: 创建创世区块并加入区块链
self.current_block = None
# 挖矿难度
self.difficulty = difficulty
def add_block(self, proof, previous_hash=None):
"""
TODO: 创建一个新的区块到区块链中,并返回创建成功的区块
:param proof: <int> 由工作证明算法生成的证明
:param previous_hash: <str> 前一个区块的 hash 值
:return: <dict> 新区块
"""
self.current_block = Block(nonce=proof, transactions=self.transaction_pool
[-self.block_transaction_capacity:], previous_hash=previous_hash,
index=len(self.chain))
self.chain.append(self.current_block)
def add_transaction(self, sender, receiver, amount):
"""
TODO: 添加一笔新的交易到交易池中
:param sender: <str> 发送者
:param receiver: <str> 接收者
:param amount: <int> 金额
:return: <bool> 是否添加成功
"""
transaction = str(sender) + ' ' + str(receiver) + ' ' + str(amount)
self.transaction_pool.append(transaction)
pool_size = len(self.transaction_pool)
# 交易池就绪,应该创建新区块
if pool_size % self.block_transaction_capacity == 0:
# 先处理创世块
if len(self.chain) == 0:
self.current_block = Block(nonce=100, transactions=self.transaction_pool
[-self.block_transaction_capacity:])
self.chain.append(self.current_block)
else:
# 获取当前块的哈希值
current_block_hash = self.__last_block()
# 执行计算任务,并获取工作量证明
pre_proof = self.current_block.block_header['nonce']
proof = self.proof_of_work(pre_proof)
# 计算完成,添加区块
self.add_block(proof, current_block_hash)
return True
def __last_block(self):
"""
TODO: 返回本地区块链(最长链)上的最后一个区块的 hash 值
return: <str>
"""
return self.current_block.block_header['merkle root']
def __hash(block):
"""
给一个区块生成 SHA-256 值
:param block: <dict>
:return: <str>
"""
block_string = json.dumps(block, sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()
def proof_of_work(self, pre_proof):
"""
TODO: 简易工作量证明算法
- 找到一个数字p,使得p*p'的hash值的前2个字符是0
- 其中p'是前一个区块的proof,找到的p是工作量证明
:param pre_proof: <int>
:return: <int>
"""
p = 0
target = '1' * 100
while not target[0:difficulty] == '0' * self.difficulty:
target = hashlib.sha256(str(p * pre_proof).encode('utf-8')).hexdigest()
p += 1
return p
def print(self):
"""
TODO: 打印区块链
"""
print('\033[1;31m',
len(self.chain), 'blocks and', len(self.transaction_pool),
'transactions in this private chain:',
':\033[0m')
idx = 0
for block in self.chain:
print('\033[1;31m', 'Block', idx, ':\033[0m')
idx += 1
block.print_block()
pending_num = len(self.transaction_pool) % self.block_transaction_capacity
if pending_num == 0:
print('\033[1;31m',
'0 pending transactions in this private chain.'
'\033[0m')
else:
print('\033[1;31m',
pending_num, ' pending transactions in this private chain:'
'\033[0m')
print(self.transaction_pool[-pending_num:])
下面是测试部分。首先定义好区块链的结构后,随机生成交易双方的用户名和交易额,不断将这些交易信息输入到区块链中,最后打印整条区块链的结构。
import random
# 定义每个块所能存储的交易数
block_transaction_capacity = 4
# 定义总的交易数量
num_all_transactions = 9
# 定义挖矿难度(为k表示要求哈希前k位为0)
difficulty = 5
# 初始化本地区块链
blockchain = Blockchain(block_transaction_capacity, difficulty)
seed = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWX0123456789'
def generate_transactions(num_all_transactions):
for i in range(num_all_transactions):
random_sender, random_receiver = [], []
for i in range(6):
random_sender.append(random.choice(seed))
random_receiver.append(random.choice(seed))
random_sender = ''.join(random_sender)
random_receiver = ''.join(random_receiver)
yield random_sender, random_receiver, random.uniform(0, 100000)
for sender, receiver, amount in generate_transactions(num_all_transactions):
blockchain.add_transaction(sender, receiver, amount)
# 打印区块链结构
blockchain.print()
程序运行结果为: