from kazoo. client import KazooClient, KazooState
from kazoo. exceptions import NoNodeError, NodeExistsError, NotEmptyError
import json
zk = KazooClient( hosts= '127.0.0.1:2181' )
zk. start( )
path = "/main_node/sub_node"
def ensure_path_exists ( zk, path) :
parts = path. strip( '/' ) . split( '/' )
for i in range ( 1 , len ( parts) ) :
current_path = '/' . join( parts[ : i] )
try :
zk. get( current_path)
except NoNodeError:
zk. create( current_path, b"" )
def create_node ( zk, path, data) :
"""
创建一个 ZooKeeper 节点。
:param zk: KazooClient 实例
:param path: 节点路径
:param data: 节点数据
"""
try :
ensure_path_exists( zk, path)
zk. create( path, data)
print ( f"Node { path} created successfully with data: { data. decode( 'utf-8' ) } " )
except NodeExistsError:
print ( f"Node { path} already exists." )
def update_node ( zk, path, data) :
"""
更新一个 ZooKeeper 节点的数据。
:param zk: KazooClient 实例
:param path: 节点路径
:param data: 新的节点数据
"""
try :
zk. set ( path, data)
print ( f"Node { path} updated successfully with new data: { data. decode( 'utf-8' ) } " )
except NoNodeError:
print ( f"Node { path} does not exist. Cannot update non-existing node." )
def delete_node ( zk, path, recursive= False ) :
"""
删除一个 ZooKeeper 节点。
:param zk: KazooClient 实例
:param path: 节点路径
:param recursive: 是否递归删除节点及其所有子节点
"""
try :
zk. delete( path, recursive= recursive)
print ( f"Node { path} deleted successfully" )
except NoNodeError:
print ( f"Node { path} does not exist." )
except NotEmptyError:
print ( f"Node { path} is not empty. Use recursive=True to delete it and its children." )
except Exception as e:
print ( f"An error occurred: { e} " )
def read_node ( zk, path) :
"""
读取一个 ZooKeeper 节点的数据。
:param zk: KazooClient 实例
:param path: 节点路径
"""
try :
data, stat = zk. get( path)
print ( f"Node { path} data: { data. decode( 'utf-8' ) } " )
return data. decode( 'utf-8' ) , stat
except NoNodeError:
print ( f"Node { path} does not exist." )
return None , None
def write_json_to_node ( zk, path, data) :
"""
将 JSON 数据写入 ZooKeeper 节点。
:param zk: KazooClient 实例
:param path: 节点路径
:param data: 要写入的字典或对象
"""
try :
json_data = json. dumps( data)
if not zk. exists( path) :
zk. create( path, json_data. encode( 'utf-8' ) )
print ( f"Node { path} created and JSON data written." )
else :
zk. set ( path, json_data. encode( 'utf-8' ) )
print ( f"JSON data written to node { path} " )
except TypeError:
print ( "Provided data is not serializable to JSON." )
except NodeExistsError:
print ( f"Node { path} already exists, but was expected to be created." )
def read_json_from_node ( zk, path) :
"""
从 ZooKeeper 节点读取 JSON 数据。
:param zk: KazooClient 实例
:param path: 节点路径
:return: 反序列化后的 JSON 数据对象
"""
try :
data, stat = zk. get( path)
json_data = json. loads( data. decode( 'utf-8' ) )
print ( f"JSON data read from node { path} : { json_data} " )
return json_data
except NoNodeError:
print ( f"Node { path} does not exist." )
return None
except json. JSONDecodeError:
print ( "Failed to decode JSON from node data." )
return None