Kafka搭建
单机版的kafka搭建非常简单,不过我们今天采用Docker搭建kafka。Kafka使用Zookeeper存储Consumer、Broker信息,安装kafak的时候,需要先安装Zookeeper。
Zookeeper安装:
docker run -d --name zookeeper -p 3181:3181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
讲解:/etc/localtime:/etc/localtime:使容器与宿主机时间能够同步
Kafka安装:
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.223:3181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.223:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
讲解:
KAFKA_BROKER_ID:当前Kafka的唯一ID
KAFKA_ZOOKEEPER_CONNECT:当前Kafka使用的Zookeeper配置信息
KAFKA_ADVERTISED_LISTENERS:对外发布(暴露)的监听器,对外发布监听端口、地址
KAFKA_LISTENERS:监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka服务。
IP更改:
外部程序如果想链接Kafka,需要根据IP链接,所以我们可以给Kafka一个IP名字,编辑:/opt/kafka_2.12-2.4.1/config/server.properties,在文件最末尾添加如下代码:
host.name=192.168.211.137
队列创建
进入kafka容器,创建队列:
docker exec -it kafka /bin/sh
cd /opt/kafka_2.12-2.4.1/bin
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic itemaccess
讲解:
解释:使用kafka-topics.sh创建队列
--create:执行创建一个新的队列操作
--bootstrap-server:需要链接的kafka配置,必填
--replication-factor 1:设置分区的副本数量
--topic itemaccess:队列的名字叫itemaccess
消息发布
在kafka容器中执行消息发送(接着上面的步骤执行):
./kafka-console-producer.sh --broker-list localhost:9092 --topic itemaccess
讲解:
解释:使用kafka-console-producer.sh实现向kafka的test队列发送消息
--broker-list:指定将消息发给指定的Kafka服务的链接列表配置 HOST1:Port1,HOST2:Port2
--topic itemaccess:指定要发送消息的队列名字
我们发送的消息如下(输入信息,回车即可发送):
{"actime":"2020-4-10 9:50:10","uri":"http://www-seckill.zhushanglin.net/items/333.html","IP":"119.123.33.231","Token":"Bearer zhushanglin"}
消息订阅
在kafka容器中执行消息订阅(接着上面的步骤执行,但要先按ctrl+c退出控制台):
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic itemaccess --from-beginning
讲解:
解释:使用kafka-console-consumer.sh从kafka中消费test队列的数据
--bootstrap-server:从指定的kafka中读取消息
--topic itemaccess:读取队列的名字
--from-beginning:从最开始的数据读取,也就是读取所有数据的意思
查看已经存在的主题:
./kafka-topics.sh --zookeeper localhost:3181 --list
删除主题:
./kafka-topics.sh --zookeeper localhost:3181 --delete --topic itemaccess
查看主题信息:
/kafka-topics.sh --zookeeper localhost:3181 --describe --topic itemaccess
信息查看
上面执行整个流程如下图:
Kafka注册信息查看:
我们进入到zookeeper中,可以查看到kafka的注册信息,相关操作命令如下:
docker exec -it zookeeper /bin/bash
cd bin
./zkCli.sh
ls /
效果如下:
关于Kafka的学习,大家可以直接参考:http://kafka.apache.org/quickstart
收集日志-Lua
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。
OpenResty® 通过汇聚各种设计精良的 Nginx 模块(主要由 OpenResty 团队自主开发),从而将 Nginx 有效地变成一个强大的通用 Web 应用平台。这样,Web 开发人员和系统工程师可以使用 Lua 脚本语言调动 Nginx 支持的各种 C 以及 Lua 模块,快速构造出足以胜任 10K 乃至 1000K 以上单机并发连接的高性能 Web 应用系统。
关于Lua的基本知识,我们这里就不学习了,直接进入日志收集的使用操作。
OpenRestry安装
关于OpenRestry的学习,大家可以参考:http://openresty.org/cn/
下载OpenRestry:
wget https://openresty.org/download/openresty-1.11.2.5.tar.gz
解压:
tar -xf openresty-1.11.2.5.tar.gz
安装(进入到解压目录进行安装):
cd openresty-1.11.2.5
./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module
make
make install
软件会安装到/usr/local/openresty,这里面会包含nginx。
配置环境变量:
vi /etc/profile
export PATH=/usr/local/openresty/nginx/sbin:$PATH
source /etc/profile
详情页发布
商品详情页生成后会存储在/usr/local/server/web/items目录下,详情页是静态网页,我们可以使用Nginx直接发布。
商品详情页的访问:http://192.168.211.137/items/S1235433012716498944.html,我们可以让所有以/items/
的请求直接到/usr/local/server/web/
目录下找。
修改nginx.conf:
cd /usr/local/openresty/nginx/conf/
vi nginx.conf
修改内容如下:
启动nginx,并访问测试:http://192.168.211.137/items/S1235433012716498944.html
Lua日志收集
使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka
1)收集流程
日志收集流程如下:
用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。
2)插件配置
lua-restry-kafka:https://github.com/doujiang24/lua-resty-kafka
在资料\lua中已经提供了该包lua-resty-kafka-master.zip,我们需要将该文件上传到/usr/local/openrestry目录下,并解压,再配置使用。
解压:
unzip lua-resty-kafka-master.zip
配置:
修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:
lua_package_path "/usr/local/openresty/lua-resty-kafka-master/lib/?.lua;;";
配置效果图如下:
3)日志收集
用户访问详情页的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。
我们定义一个消息格式:
{
"actime": "2020-4-10 9:50:30",
"uri": "http://192.168.211.137/items/S1235433012716498944.html",
"ip": "119.123.33.231",
"token": "Bearer ITHEIMAOOPJAVAITCAST"
}
生产者脚本:
定义好了消息格式后,创建一个生产者,往Kafka中发送详情页的访问信息。我们创建一个lua脚本,items-access.lua,脚本内容如下:
上图脚本内容如下:
--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
{ host = "192.168.211.137", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer ITHEIMA"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))
--页面跳转
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
ngx.exec(uri)
4)nginx配置
按照上面的流程图,我们需要配置nginx的2个location,修改nginx.conf,代码如下:
上图代码如下:
server {
listen 80;
server_name localhost;
#/web开始的请求,做日志记录,然后跳转到下面的location
location /web/items/ {
content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
}
#商品详情页,以/items/开始的请求,直接在详情页目录下找文件
location /items/ {
#日志处理
#content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
root /usr/local/server/web/;
}
}
5)日志收集测试
请求地址:http://192.168.211.137/web/items/S1235433012716498944.html
查看Kafka的itemaccess队列数据: