大数据技术之Kafka集成

news2024/9/22 6:44:53

一、集成Flume

 1.1 Flume生产者

(1)启动Kafka集群

zkServer.sh start

nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &

(2)启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server 192.168.153.139:9092 --topic first

(3)配置flume

配置flume

(4)启动flume

mkdir /opt/soft/kafka212/conf/jobs
vim  /opt/soft/kafka212/conf/jobs/file_to_kafka.conf

# 1. 定义组件
a1.sources = r1
a1.sink2 = k1
a1.channels = c1

# 2. 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# 3. 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4. 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers
hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 5. 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)启动flume

cd /opt/soft/flume190/
./bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf ./jobs/file_to_kafka.conf

(6)追加数据,查看Kafka消费者消费情况 

1.2 Flume消费者

(1)配置flume

mkdir /opt/soft/kafka212/conf/jobs
vim  /opt/soft/kafka212/conf/jobs/kafka_to_file.conf

# 1. 定义组件
a1.sources = r1
a1.sink2 = k1
a1.channels = c1

# 2. 配置source
a1.sources.r1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id

# 3. 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4. 配置sink
a1.sinks.k1.type = logger

# 5. 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动flume

cd /opt/soft/flume190/
./bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf ./jobs/kafka_to_file.conf

(3)启动Kafka生产者

bin/kafka-console-producer.sh --bootstrap-server hadoop02:9092 --topic first

(4)输入数据并监控 

二、集成SpringBoot

 

1)在IDEA中安装lombok插件

2)SpringBoot环境准备

(1)创建一个Spring Initializr

(2)添加项目依赖

 (3)检查自动生成的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.1 SpringBoot生产者

(1)修改SpringBoot核心配置文件application.properties,添加生产者相关信息

# 应用名称
    spring.application.name=atguigu_springboot_kafka

# 指定 kafka 的地址
    spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#指定 key 和 value 的序列化器
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

(2)创建controller从浏览器接收数据,并写入指定的topic

package com.atguigu.springboot;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
 
    // Kafka 模板用来向 kafka 发送数据
    @Autowired
    KafkaTemplate<String, String> kafka;
 
    @RequestMapping("/atguigu")
    public String data(String msg) {
        kafka.send("first", msg);
        return "ok";
    }
}

(3)在浏览器中给/atguigu接口发送数据

http://localhost:8080/atguigu?msg=hello

3.2 SpringBoot消费者

(1)修改SpringBoot核心配置文件application.properties

# =========消费者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 指定 key 和 value 的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#指定消费者组的 group_id
spring.kafka.consumer.group-id=atguigu
# =========消费者配置结束=========

(2)创建类消费Kafka中指定topic的数据

package com.atguigu.springboot;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
 
    // 指定要监听的 topic
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg) { // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }
}

(3)向first主题发送数据

 

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap=server hadoop102:9092 --topic first

三、集成Spark

1)scala环境准备

2)spark环境准备

(1)创建一个maven项目spark-kafka

(2)在项目spark-kafka上添加框架支持Add Framework Support,选择sacla

(3)在main下创建scala文件夹,添加为源码包

(4)添加配置文件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(5)将log4j.properties文件添加到resources中,更改打印日志级别为error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

3.1 spark生产者

(1)在当前包下创建scala object:SparkKafkaProducer

package com.atguigu.spark

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object SparkKafkaProducer {
    def main(args: Array[String]): Unit = {

        // 0 kafka 配置信息
        val properties = new Properties()

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

        // 1 创建 kafka 生产者
        var producer = new KafkaProducer[String, String](properties)

        // 2 发送数据
        for (i <- 1 to 5){
            producer.send(new ProducerRecord[String,String]("first","atguigu" + i))
        }

        // 3 关闭资源
        producer.close()
    }
}

(2)启动Kafka消费者

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(3)执行SparkKafkaProducer程序,观察Kafka消费者控制台情况

3.2 spark消费者

(1)添加配置文件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(2)在当前包下创建scala object:SparkKafkaConsumer

package com.atguigu.spark

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkKafkaConsumer {
    def main(args: Array[String]): Unit = {

        //1.创建 SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")

        //2.创建 StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        //3.定义 Kafka 参数:kafka 集群地址、消费者组名称、key 序列化、value 序列化
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )

        //4.读取 Kafka 数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent, //优先位置
            ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)
        )

        //5.将每条消息的 KV 取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

         //6.计算 WordCount
        valueDStream.print()

        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

(3)启动SparkKafkaConsumer消费者

(4)启动Kafka生产者

[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(5)观察idea控制台数据打印

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/452927.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django框架之定义模型和表迁移

django3.0 定义表模型并通过定义好的模型实现源代码创建数据表。 概述 模型是一个用于表示数据的Python类&#xff0c;包含基本的数据字段和行为。 通常一个模型就代表一张数据库表。 模型继承自django.db.models.Model&#xff0c;模型的每一个属性代表一个数据的字段。 定…

SLAM面试笔记(2) - ORB-SLAM2

目录 1 四叉树实现特征点均匀化分布 2 Bow词袋模型 2.1 什么是词袋&#xff1f; 2.2 词袋在ORB-SLAM2中的作用 2.3 离线训练字典树流程 3 ORB-SLAM的跟踪方法 3.1 恒速模型跟踪 3.2 重定位跟踪 3.3 参考关键帧跟踪 持续更新中... 1 四叉树实现特征点均匀化分布 参考…

SpringBoot实战(十六) 集成Hystrix

目录 一、简介1.Hystrix 的定义&#xff1f;2.Hystrix 的用处&#xff1f;3.Hystrix 的三种状态&#xff1f;4.Hystrix 解决什么问题&#xff1f;5.Hystrix 的设计原理&#xff1f;6.Hystrix 的实现原理&#xff1f; 二、集成 Hystrix1.Maven 依赖2.application.yml简易版&…

【CSS3】CSS3 伪元素选择器 ( 伪元素选择器语法简介 | 伪元素选择器权重计算 | 代码示例 )

文章目录 一、CSS3 伪元素选择器二、CSS3 伪元素选择器权重二、代码示例 一、CSS3 伪元素选择器 CSS3 伪元素选择器 : ::before 选择符 : 在 指定的标签元素内部的 前面 插入内容 ;::after 选择符 : 在指定的标签元素内部的 后面 插入内容 ; CSS3 伪元素选择器注意事项 : con…

QT里的网络通信简介

QTcpSocket类简介 QTcpSocket类提供了一个TCP套接字。TCP&#xff08;传输控制协议&#xff09;是一种可靠的、面向流的、面向连接的传输协议。它特别适合数据的连续传输。QTcpSocket是QAbstractSocket的一个子类&#xff0c;它允许您建立TCP连接和传输数据流。有关详细信息&a…

连接器信号完整性仿真教程 二

在连接器信号完整性仿真教程一中Step by Step演示了如何进行连接器信号完整性仿真&#xff0c;看完这片博文后应该可以做类似产品的仿真。如果说&#xff0c;看了这篇博文就学会了连接器信号完整性仿真&#xff0c;那就有点过了。有人也许会说信号完整性仿真难学&#xff0c;不…

利用GPT2 预测 福彩3d预测

使用GPT2预测福彩3D项目 个人总结彩票数据是随机的,可以预测到1-2个数字,但是有一两位总是随机的 该项目紧做模型学习用,通过该项目熟练模型训练调用生成过程. 福彩3D数据下载 https://www.17500.cn/getData/3d.TXT data数据格式 处理后数据格式 每行 2023 03 08 9 7 3 训…

Java入门教程||Java 继承||Java 重写(Override)与重载(Overload)

Java 继承 继承是所有 OOP 语言和 Java 语言不可缺少的组成部分。 继承是 Java 面向对象编程技术的一块基石&#xff0c;是面向对象的三大特征之一&#xff0c;也是实现软件复用的重要手段&#xff0c;继承可以理解为一个对象从另一个对象获取属性的过程。 如果类 A 是类 B …

玩机搞机----mtk芯片机型 另类制作备份线刷包的方式 读写分区等等

前面分享了几期高通和mtk芯片机型备份字库的几种方法教程。这些针对与很多没有线刷包资源的手机机型玩机操作。前面对接一个友商的mtk芯片杂牌机。和另外一个国外mtk芯片级都是来制作线刷包。因为&#xff0c;这些机型没有固件流出。而同一批机型中安卓版本高低不固定。支持的资…

谷歌TAG警告说俄罗斯黑客在乌克兰进行网络钓鱼攻击

与俄罗斯军事情报机构有关的精英黑客与针对乌克兰数百名用户的大批量网络钓鱼活动有关&#xff0c;以提取情报并影响与战争有关的公共言论。 谷歌的威胁分析小组&#xff08;TAG&#xff09;正在监测这个名为FROZENLAKE的行为者的活动&#xff0c;该小组表示&#xff0c;这些攻…

3105—IIS部署子站点

一、父站点 1—web.config配置 新增并设定location段落 <configuration><location path"." allowOverride"false" inheritInChildApplications"false"><system.webServer><handlers><add name"aspNetCore"…

科海思除COD树脂,大孔树脂,除COD专用树脂

一、产品介绍 Tulsimer A-722 MP具有控制孔径的大孔强碱性Ⅰ型阴离子交换树脂 Tulsimer A-722 MP 是一款具有便于颜色和有机物去除的控制孔径的&#xff0c;专门开发的大孔强碱性Ⅰ型阴离子交换树脂。 Tulsimer A-722 MP&#xff08;氯型&#xff09;专门应用于去除COD…

Vite与WebPack的对比,及解决了什么痛点,及什么是ESM?

一、简要 ESM&#xff0c;是指构成ESM规范的一系列的JavaScript特性或者API 1、首先要明确的是&#xff0c;Vite跟WebPack的优势只在开发环境。当把包部署到了生产环境后&#xff0c;大家都是一样的&#xff0c;甚至webpack的兼容性可能会更好。 这也是为什么有人提出&#x…

图像描述算法排位赛:SceneXplain 与 MiniGPT4 谁将夺得桂冠?

如果你对图像描述算法的未来感到好奇&#xff0c;本场“图像描述算法排位赛”绝对是你不能错过的&#xff01;在这场较量中&#xff0c;SceneXplain 和 MiniGPT-4 将会比试&#xff0c;谁将摘得这场比赛的桂冠&#xff1f; 背景介绍 在上篇文章中&#xff0c;我们介绍了图像描述…

C++类与对象—上

本期我们来学习类与对象 目录 面向过程和面向对象初步认识 类的引入 访问限定符 类的定义 封装 类的作用域 类的实例化 this指针 C语言和C实现Stack的对比 面向过程和面向对象初步认识 C 语言是 面向过程 的&#xff0c; 关注 的是 过程 &#xff0c;分析出求解问题的…

研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案

研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案 目录 研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案 一、简单介绍 二、计算机网络第七版(谢希仁) 第一章 课后答案 1、 计算机网络向用户可以提供哪些服务&#xff1f; 2、 试简述分组交换的要点。 3…

人工神经网络

&#x1f351; CV专栏 1. 单个神经元 &#x1f351; 神经网络 即 模型 &#x1f364; 输入 四个参数 --> 结果 &#x1f351; 模型训练(学习) 例子 &#x1f351; 模型的输入x 乘 权值ω 减去阈值θ --> 激活函数 f &#x1f351; 输出 yi &#xff08;向下传递 或 直…

一步步带你学习Python编程:从零开始的查缺补漏

在快节奏的生活中&#xff0c;很难找到时间来学习新的技能。但有时候&#xff0c;我们会突然发现自己有一些空闲时间&#xff0c;而又不想虚度光阴。无聊的时候&#xff0c;我们可以选择学习一项新技能来充实自己。最近&#xff0c;我就因为有些无聊&#xff0c;决定重新学习Py…

linux实现网络程序

1️⃣ 在linux下&#xff0c;通过套接字实现服务器和客户端的通信。 2️⃣ 实现单线程、多线程通信。或者实现线程池来通信。 3️⃣ 优化通信&#xff0c;增加守护进程。 有情提醒&#xff0c;类里面默认的函数是内联。内联函数在调用的地方展开&#xff0c;没有函数地址&…

【Springboot系列】Springboot整合Swagger3不简单

1、缘由 Swagger是一个根据代码注解生成接口文档的工具&#xff0c;减少和前端之间的沟通&#xff0c;前端同学看着文档就可以开发了&#xff0c;提升了效率&#xff0c;之前很少写swagger&#xff0c;这次自己动手写&#xff0c;还是有点麻烦&#xff0c;不怎么懂&#xff0c;…