文章目录
- 一、实训目的
- 二、实训要求
- 三、实训任务
- 四、完成任务
- (一)准备工作
- (二)实现步骤
- 1、创建Maven项目
- 2、添加相关依赖
- 3、创建日志属性文件
- 4、创建股票价格类
- (1)创建init()方法
- (2)创建client1()方法
- (3)创建client2()方法
- 5、运行程序,测试效果
一、实训目的
- 通过实训,使学生更加熟练掌握通过Java代码实现ZooKeeper节点的操作。
二、实训要求
- 认真完成实训任务,写一篇CSDN博客,记录操作过程。
三、实训任务
- 每支股票都受到很多因素的影响,比如包括政策、盈利、产品等,将这些因素量化成数字以后,可以呈现股价的增减。比如广深铁路上市时股价为
6.0
元,然后随着时间推移,政策发生变化提升股价0.1
元,盈利提升股价0.2
元,产品降低股价0.2
元,则此时股价为6.1
元。 - 创建节点
/guangshen
,数据为6
,并创建子节点:/policy
、/profit
、/product
开始默认值都为0
。模拟两个客户端,第一个客户端定时修改三个子节点/policy
、/profit
、/product
的值,要求值是随机的,范围在[-1, 1]
之间,当三个节点/policy
、/profit
和/product
的值变化时,则修改/guangshen
节点的值。另外一个客户端监控实时股价,并显示当前股价为多少。
四、完成任务
(一)准备工作
- 在master虚拟机上启动ZK服务
- 在slave1虚拟机上启动ZK服务
- 在slave2虚拟机上启动ZK服务
(二)实现步骤
1、创建Maven项目
- Maven项目 -
ZkStock
- 单击【Finish】按钮
2、添加相关依赖
- 在
pom.xml
文件里添加ZooKeeper和junit依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.hw.zk</groupId>
<artifactId>ZkStock</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>
</project>
3、创建日志属性文件
- 在
resources
目录里创建log4j.properties
文件
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/zkprice.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4、创建股票价格类
- 创建
net.hw.zk
包,在包里创建StockPrice
类
(1)创建init()方法
- 连接zk服务器,创建相关节点
package net.hw.zk;
import org.apache.zookeeper.*;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
/**
* 功能:股票价格
* 作者:华卫
* 日期:2022年12月10日
*/
public class StockPrice {
@Test
public void init() throws Exception {
final CountDownLatch cdl = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("master:2181",
3000,
new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接zk服务器集群成功!");
cdl.countDown();
}
}
);
cdl.await();
final CountDownLatch cdl1 = new CountDownLatch(1);
zk.create("/guangshen", "6".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("节点" + path + "创建成功!");
} else {
System.out.println("节点" + path + "创建失败!");
}
cdl1.countDown();
}
}, "stock");
cdl1.await();
final CountDownLatch cdl2 = new CountDownLatch(1);
zk.create("/guangshen/policy", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("节点" + path + "创建成功!");
} else {
System.out.println("节点" + path + "创建失败!");
}
cdl2.countDown();
}
}, "stock");
cdl2.await();
final CountDownLatch cdl3 = new CountDownLatch(1);
zk.create("/guangshen/profit", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("节点" + path + "创建成功!");
} else {
System.out.println("节点" + path + "创建失败!");
}
cdl3.countDown();
}
}, "stock");
cdl3.await();
final CountDownLatch cdl4 = new CountDownLatch(1);
zk.create("/guangshen/product", "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("节点" + path + "创建成功!");
} else {
System.out.println("节点" + path + "创建失败!");
}
cdl4.countDown();
}
}, "stock");
cdl4.await();
}
}
(2)创建client1()方法
- 定时修改节点的值
@Test
public void client1() throws Exception {
final CountDownLatch cdl = new CountDownLatch(1);
final ZooKeeper zk = new ZooKeeper("master:2181",
3000,
new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接zk服务器集群成功!");
cdl.countDown();
}
}
);
cdl.await();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Random random = new Random();
try {
zk.setData("/guangshen/policy", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
zk.setData("/guangshen/profit", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
zk.setData("/guangshen/product", String.valueOf(random.nextDouble() * 2 - 1).getBytes(), -1);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
while (true) {
final CountDownLatch cdl1 = new CountDownLatch(3);
try {
byte[] policy = zk.getData("/guangshen/policy", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
cdl1.countDown();
}
}
}, null);
byte[] profit = zk.getData("/guangshen/profit", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
cdl1.countDown();
}
}
}, null);
byte[] product = zk.getData("/guangshen/product", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
cdl1.countDown();
}
}
}, null);
cdl1.await();
double p1 = Double.parseDouble(new String(policy));
double p2 = Double.parseDouble(new String(profit));
double p3 = Double.parseDouble(new String(product));
double price = 6 + p1 + p2 + p3;
zk.setData("/guangshen", String.valueOf(price).getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
(3)创建client2()方法
- 实时监控股价并显示
@Test
public void client2() throws Exception {
final CountDownLatch cdl = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("master:2181",
3000,
new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接zk服务器集群成功!");
cdl.countDown();
}
}
);
cdl.await();
while (true) {
byte[] data = zk.getData("/guangshen", null, null);
String price = new String(data);
price = price.substring(0, price.indexOf(".") + 3);
System.out.println("当前股价:" + price + "元");
final CountDownLatch cdl1 = new CountDownLatch(1);
zk.getData("/guangshen", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
System.out.println("股价发生变化……");
cdl1.countDown();
}
}
}, null);
cdl1.await();
}
}
5、运行程序,测试效果
- 依次运行
init()
方法、client1()
方法、client2()
方法