Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

news2025/1/17 6:06:05

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
在这里插入图片描述

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.4

kafka_2.10-0.8.2.2

spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

在这里插入图片描述

3、操作完成后,再点击Add Library选项

在这里插入图片描述

4、进入以下界面

在这里插入图片描述

5、点击完成即可
6、最后创建如下项目结构的文件

在这里插入图片描述

四、编写代码,运行程序

编写生产者代码

package my.kafka;  
import java.io.BufferedReader;  
import java.io.File;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.Properties;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class KafkaSend {  
    private final Producer<String, String> producer;  
  
    public final static String TOPIC = "kafkasendspark";  
  
    public KafkaSend(){  
        Properties props = new Properties();  
        // 此处配置的是kafka的端口  
        props.put("metadata.broker.list", "localhost:9092");  
        // 配置value的序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        // 配置key的序列化类  
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
        props.put("request.required.acks", "-1");  
        producer = new Producer<String, String>(new ProducerConfig(props));  
    }  
  
    void produce() {  
        int lineNo = 1;  
        File file = new File("/data/case6/buyer_favorite1");  
        BufferedReader reader = null;  
        try {  
            reader = new BufferedReader(new FileReader(file));  
            String tempString = null;  
  
            while ( (tempString = reader.readLine()) != null ) {  
                String key = String.valueOf(lineNo);  
                String data = tempString;  
                producer.send(new KeyedMessage<String, String>(TOPIC, key, data));  
                System.out.println(data);  
                lineNo++;  
  
                Thread.sleep(100);  
  
            }  
        } catch (FileNotFoundException e) {  
            System.err.println(e.getMessage());  
        } catch (IOException e) {  
            System.err.println(e.getMessage());  
        } catch (InterruptedException e) {  
            System.err.println(e.getMessage());  
        }  
    }  
    public static void main(String[] args) {  
        System.out.println("start");  
        new KafkaSend().produce();  
        System.out.println("finish");  
    }  
}  

编写消费者代码

package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  
import scala.collection.immutable.Map  
import org.apache.spark.streaming.kafka.KafkaUtils  
import kafka.serializer.StringDecoder  
import kafka.serializer.StringDecoder  
object SparkReceive {  
  def main(args: Array[String]) {  
  
    val sparkConf = new SparkConf().setAppName("countuser").setMaster("local")  
    val ssc = new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint("checkpoint")  
    val topics = Set("kafkasendspark")  
    val brokers = "localhost:9092"  
    val zkQuorum = "localhost:2181"  
  
    val kafkaParams = Map[String, String](  
        "metadata.broker.list" -> brokers,  
        "serializer.class" -> "kafka.serializer.StringEncoder"  
    )  
  
  
    val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)  
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {  
      //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  
      val currentCount = currValues.sum  
      // 已累加的值  
      val previousCount = prevValueState.getOrElse(0)  
      // 返回累加后的结果,是一个Option[Int]类型  
      Some(currentCount + previousCount)  
    }  
    val result=lines.map(line => (line._2.split("\t")) ).map( row => (row(0),1) ).updateStateByKey[Int](addFunc).print()  
  
    ssc.start();  
    ssc.awaitTermination()  
  }  
}  

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/731849.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(LFPAK56)BUK7Y7R0-40HX 40V、N 通道BUK9Y6R5-40HX表面贴装汽车用MOSFET器件

汽车用MOSFET将低压超级结技术与先进的封装设计相结合&#xff0c;以实现高性能和耐用性。Trench 9 MOSFET系列产品全部符合AEC-Q101标准&#xff0c;且超越了这一国际汽车级标准的要求&#xff0c;在包括温度循环 (TC)、耐高温栅极偏置 (HTGB)、耐高温反向偏置 (HTRB) 和断续工…

DBETR-1X/180G24K4M反馈型比例压力阀放大器

DBETR-1X/30G24K4M&#xff0c;DBETR-1X/315G24K4M&#xff0c;DBETR-1X/80G24K4M&#xff0c;DBETR-1X/180G24K4M&#xff0c;DBETR-1X/230G24K4M&#xff0c;DBETR-1X/350G24K4M比例溢流阀是一种遥控阀。其设计结构为座阀式直动溢流阀&#xff0c;搭配外置式比例放大器。 这…

挑选适合自己的英文原版书

很多人在阅读英文原版小说时感觉十分吃力&#xff0c;有很多生词或长难句。如何寻找适合自己英文阅读水平的书籍呢&#xff1f;下面推荐一种按蓝思值挑选英文原版书的方法。 首先根据自己的受教育程度&#xff0c;选择对应蓝思级别的英文书。如博士可以选择蓝思值为1300L的英文…

图神经网络:(图像分割)三维网格图像分割

文章说明&#xff1a; 1)参考资料&#xff1a;PYG的文档。文档超链。斯坦福大学的机器学习课程。课程超链。(要挂梯子)。博客原文。原文超链。(要挂梯子)。原文理论参考文献。提取码8848。 2)我在百度网盘上传这篇文章的jupyter notebook以及预训练模型。提取码8848. 3)博主水平…

qt信号与槽

信号与槽的概念&#xff1a; 1>信号&#xff1a;信号就是信号函数&#xff0c;可以是组件自身提供&#xff0c;也可以是用户自己定义&#xff0c;自定义时&#xff0c;需要类体的signals权限下进行定义&#xff0c;该函数是一个不完整的函数&#xff0c;只有声明&#xff0…

输入一个链表,输出该链表的倒数第 k 的结点

一、思路 假设 K 是 2&#xff0c;根据下面的图片可以看出&#xff0c;倒数第 K 个结点就是 45。 需要注意的前提是&#xff0c;K 不能是负数也不能是 0 并且也不能超过链表的结点个数&#xff0c;因为要保证 K 是在链表的范围里&#xff0c;才能找到 K&#xff0c;然后返回这…

【网络】TCP三次握手和四次挥手(感性理解)

目录 三次握手 文字描述三次握手过程 为什么是三次握手&#xff1f; 什么是SYN洪水&#xff1f; 连接和半连接队列 一次、两次握手行不行&#xff0c;四/五/六次握手行不行&#xff1f; 三次握手一定会成功吗&#xff1f; 三次握手的过程中可不可以携带数据 TCP中的IS…

模块化规范

常用模块化有两种规范&#xff0c;commonJS和ES6 一&#xff1a;两者区别 二&#xff1a;如何转义&#xff1f; 我们常遇到的使用场景是&#xff0c;在commonJS的模块里需要引入ES6规范的模块。这时就需要把ES6模块转译为commonJS规范的模块&#xff0c;否则报错 转义工具有…

javassist 02 implement interface

创建 interface package com.wsd;public interface AccountDao {int delete(); }利用 javassist 生产一个 类A, Class A implements AccountDao package com.wsd;import javassist.ClassPool; import javassist.CtClass; import javassist.CtMethod; import javassist.Modifi…

mac桌面时钟 浮动 (python)

浮动时钟&#xff0c;多地时区 app store的都要钱&#xff0c;于是。。。。我们让chatgpt来实现一个吧&#xff1a; 数字&#xff1a; 代码&#xff1a; import sys import datetime import pytzfrom PyQt5.QtWidgets import QApplication, QMainWindow, QGraphicsView, QGr…

深度学习不同数据增广方法的选用分析

一般情况下&#xff0c;可以将数据扩增方法分为单数据变形、多数据混合、学习数据分布规律生成新数据和学习增广策略等4 类方法。以上顺序也在一定程度上反映了数据增广方法的发展历程。如果与Shorten和Khoshgoftaar的成果对照&#xff0c;就图像数据而言&#xff0c;基于数据变…

抖音矩阵源码搭建开发技术部署分析

目录 一、 什么是抖音矩阵&#xff1f;源码搭建开发注意事项&#xff1f; 1. 抖音矩阵概述 2. 源码搭建开发注意事项&#xff1a; 二、 使用步骤及开发代码展示 一、 什么是抖音矩阵&#xff1f;源码搭建开发注意事项&#xff1f; 1. 抖音矩阵概述 首先&#xff0c;抖音账…

21夜间车牌识别(matlab程序)

1.简述 简单说一下实现思路&#xff1a; 读取图片&#xff0c;转灰度&#xff0c;计算灰度直方图&#xff0c;估算阈值&#xff08;这里的阈值计算很重要&#xff0c;经过阈值算法&#xff0c;选取一个最恰当的阈值&#xff09;&#xff0c;之后二值化。显示图像即可。 实现目…

爬虫爬取公众号文章

前言 自从chatGPT出现后&#xff0c;对于文本处理的能力直接上升了一个维度。在这之前&#xff0c;我们爬取到网络上的文本内容之后&#xff0c;都需要写一个文本清理的程序&#xff0c;对文本进行清洗&#xff0c;而现在&#xff0c;有了chatGPT的加持&#xff0c;我们只需要…

解决程序占用较多内存的问题

今天发现自己开发的一个程序占用了大量内存而且不会自动释放 &#xff0c;我的程序在windows中运行的&#xff0c;解决办法如下&#xff1a; 第一步&#xff1a;打开任务管理器&#xff0c;打到正在运行程序 &#xff08;这里以sql server为例&#xff09;&#xff0c;然后右击…

设计合并排序算法实现对N个整数排序。

1.题目 设计合并排序算法实现对N个整数排序 2.设计思路 先将无序序列利用分治法划分为子序列,直至每个子序列只有一个元素,然后再对有序子序列逐步进行合并排序。合并方法是循环的将两个有序子序列当前的首元素进行比较,较小的元素取出,置入合并序列的左边空置位,直至其中…

特征选择算法 | Matlab 基于最大相关最小冗余特征选择算法(mRMR)的分类数据特征选择

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 特征选择算法 | Matlab 基于最大相关最小冗余特征选择算法(mRMR)的分类数据特征选择 部分源码 %--------------------

Redis实战案例12-添加秒杀券实现秒杀下单及相关问题解决

1. 添加优惠券 该项目没有后台管理的界面&#xff0c;所以采用postman发送请求 http://localhost:8081/voucher/seckill注意end时间要大于当前系统时间 {"shopId": 2,"title": "100元代金券","subTitle": "周一至周五均可使用&qu…

c++查漏补缺

c语言的struct只能包含变量&#xff0c;而c中的class除了包含变量&#xff0c;还可以包含函数。 通过结构体定义出来的变量还是变量&#xff0c;而通过类定义出来有了新的名称&#xff0c;叫做对象。C语言中&#xff0c;会将重复使用或具有某项功能的代码封装成一个函数&#x…

【剑指offer】8. 斐波那契数列(java)

文章目录 斐波那契数列描述输入描述&#xff1a;返回值描述&#xff1a;示例1示例2示例3思路非递归递归 完整代码 斐波那契数列 描述 大家都知道斐波那契数列&#xff0c;现在要求输入一个正整数 n &#xff0c;请你输出斐波那契数列的第 n 项。 斐波那契数列是一个满足 f …