二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

news2025/1/22 21:52:01

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:10:00",
    "data": {
        "cycle": 300,
        "evaluationList": [{
            "laneNo": 1,
            "laneType": null,
            "volume": 3,
            "queueLenMax": 11.43,
            "sampleNum": 0,
            "stopAvg": 0.54,
            "delayAvg": 0.0,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 2,
            "laneType": null,
            "volume": 7,
            "queueLenMax": 23.18,
            "sampleNum": 0,
            "stopAvg": 0.47,
            "delayAvg": 10.57,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 3,
            "laneType": null,
            "volume": 9,
            "queueLenMax": 11.54,
            "sampleNum": 0,
            "stopAvg": 0.18,
            "delayAvg": 9.67,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 4,
            "laneType": null,
            "volume": 6,
            "queueLenMax": 11.36,
            "sampleNum": 0,
            "stopAvg": 0.27,
            "delayAvg": 6.83,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        }]
    }
}

三、Java代码

package com.kgc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaKafkaEvaluation {
    // 添加 Kafka Producer 配置
    private static Properties producerProps() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        return props;
    }

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 每一个消费,都要定义不同的Group_ID
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));
        ObjectMapper mapper = new ObjectMapper();

        // 初始化 Kafka Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    JsonNode rootNode = mapper.readTree(record.value());

                    System.out.println("原始数据"+rootNode);

                    String device_no = rootNode.get("deviceNo").asText();
                    String source_device_type = rootNode.get("sourceDeviceType").asText();
                    String sn = rootNode.get("sn").asText();
                    String model = rootNode.get("model").asText();
                    String create_time = rootNode.get("createTime").asText();
                    String cycle = rootNode.get("data").get("cycle").asText();

                    JsonNode evaluationList = rootNode.get("data").get("evaluationList");
                    for (JsonNode evaluationItem : evaluationList) {
                        String lane_no = evaluationItem.get("laneNo").asText();
                        String lane_type = evaluationItem.get("laneType").asText();
                        String volume = evaluationItem.get("volume").asText();
                        String queue_len_max = evaluationItem.get("queueLenMax").asText();
                        String sample_num = evaluationItem.get("sampleNum").asText();
                        String stop_avg = evaluationItem.get("stopAvg").asText();
                        String delay_avg = evaluationItem.get("delayAvg").asText();
                        String pass_rate = evaluationItem.get("passRate").asText();
                        String travel_dist = evaluationItem.get("travelDist").asText();
                        String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();

                        String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",
                                device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,
                                volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);

                        // 发送数据到 Kafka
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);
                        producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            consumer.commitAsync();
        }
    }

}

1、服务器IP都是   192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

create external table  if not exists  hurys_dc_ods.ods_evaluation(
    device_no           string        COMMENT '设备编号',
    source_device_type  string        COMMENT '设备类型',
    sn                  string        COMMENT '设备序列号 ',
    model               string        COMMENT '设备型号',
    create_time         timestamp     COMMENT '创建时间',
    cycle               int           COMMENT '评价数据周期',
    lane_no             int           COMMENT '车道编号',
    lane_type           int           COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    volume              int           COMMENT '车道内过停止线流量(辆)',
    queue_len_max       float         COMMENT '车道内最大排队长度(m)',
    sample_num          int           COMMENT '评价数据计算样本量',
    stop_avg            float         COMMENT '车道内平均停车次数(次)',
    delay_avg           float         COMMENT '车道内平均延误时间(s)',
    pass_rate           float         COMMENT '车道内一次通过率',
    travel_dist         float         COMMENT '车道内检测行程距离(m)',
    travel_time_avg     float         COMMENT '车道内平均行程时间'
)
comment '评价数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

--刷新表分区
msck repair table ods_evaluation;
--查看表分区
show partitions hurys_dc_ods.ods_evaluation;
--查看表数据
select * from hurys_dc_ods.ods_evaluation
where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2103560.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

启动.cmd文件一闪而过,看不到报错信息

在window的环境中&#xff0c;双击.cmd文件&#xff0c;有报错信息&#xff0c;但是一闪而过 例如启动zookeeper时&#xff0c;没有zoo.cfg文件会报错&#xff0c;但是启动一闪而过&#xff0c;你看不到报错信息 有文本工具编辑cmd文件&#xff0c;在最后添加 pause 再次启…

Linux 之 lsblk 【可用块的设备信息】

功能介绍 在 Linux 系统中&#xff0c;“lsblk”&#xff08;list block devices&#xff09;命令用于列出所有可用的块设备信息 应用场景 查看存储设备信息&#xff1a;“lsblk” 命令可以帮助你快速了解系统中的存储设备&#xff0c;包括硬盘、固态硬盘、U 盘等。你可以查…

9_4_QTextEdit

QTextEdit //核心属性//获取文本 toPlainText(); toHtml(); toMarkdown(); //输入框为空时的提示功能 placeHolderText(); //只读 readOnly();//定义文本光标 QTextcursor cursorcursor.position(); cursor.selectedText();//核心信号//文本改变 textChanged(); //选中范围 se…

【黑马点评】附近商户

需求 选择商铺类型后&#xff0c;按照距离当前用户所在位置从近到远的顺序&#xff0c;分页展示该类型的所有商铺。 接口&#xff1a; 参数&#xff1a; typeId&#xff1a;商铺类型current&#xff1a;页码x&#xff1a;经度y&#xff1a;纬度 返回值&#xff1a;所有typeId…

LVS 负载均衡集群指南

1. 引言 LVS (Linux Virtual Server) 虚拟服务器&#xff0c;是 Linux 内核中实现的负载均衡技术&#xff0c;以其高性能、高可靠性和高可用性而闻名。LVS 工作在 TCP/IP 协议栈的第四层 (传输层)&#xff0c;通过将流量分配到多个后端服务器&#xff0c;提高系统性能、可用性…

硬件工程师笔试面试知识器件篇——电阻

目录 1、电阻 1.1 基础 电阻原理图 阻实物图 1.1.1、定义 1.1.2、工作原理 1.1.3、类型 1.1.4、材料 1.1.5、标记 1.1.6、应用 1.1.7、特性 1.1.8、测量 1.1.9、计算 1.1.10、颜色编码 1.1.11、公差 1.1.12、功率 1.1.13、重要性 1.2、相关问题 1.2.1、电阻…

数组和指针 笔试题(1)

目录 0.复习 1.笔试题1 2.笔试题2 3.笔试题3 4.笔试题4 5.笔试题5 0.复习 在做笔试题之前&#xff0c;我们首先复习一下数组名的理解 数组名的所有情况&#xff1a; 1.&数组名&#xff0c;取出的是整个数组的地址 2.sizeof&#xff08;数组名&#xff09;&#x…

LLM常见问题(Attention 优化部分)

1. 传统 Attention 存在哪些问题&#xff1f; 传统的 Attention 机制忽略了源端或目标端句子中词与词之间的依赖关系。传统的 Attention 机制过度依赖 Encoder-Decoder 架构上。传统的 Attention 机制依赖于Decoder的循环解码器&#xff0c;所以依赖于 RNN,LSTM 等循环结构。传…

【Transformer】Tokenization

文章目录 直观理解分词方式词粒度-Word字粒度-Character子词粒度-Subword&#xff08;目前最常使用&#xff09; 词表大小的影响参考资料 直观理解 在理解Transformer或者大模型对输入进行tokenize之前&#xff0c;需要理解什么是token&#xff1f; 理工科的兄弟姐妹们应该都…

027集——goto语句用法——C#学习笔记

goto语句可指定代码的跳行运行&#xff1a; 实例如下&#xff1a; 代码如下&#xff1a; using System; using System.Collections.Generic; using System.Linq; using System.Security.Policy; using System.Text; using System.Threading.Tasks;namespace ConsoleApp2 { //…

采用基于企业服务总线(ESB)的面向服务架构(SOA)集成方案实现统一管理维护的银行信息系统

目录 案例 【题目】 【问题 1】(7 分) 【问题 2】(12 分) 【问题 3】(6 分) 【答案】 【问题 1】解析 【问题 2】解析 【问题 3】解析 相关推荐 案例 阅读以下关于 Web 系统设计的叙述&#xff0c;在答题纸上回答问题 1 至问题 3。 【题目】 某银行拟将以分行为主体…

是噱头还是低成本新宠?加州大学用视觉追踪实现跨平台的机器手全掌控?

导读&#xff1a; 在当今科技飞速发展的时代&#xff0c;机器人的应用越来越广泛。从工业生产到医疗保健&#xff0c;从物流运输到家庭服务&#xff0c;机器人正在逐渐改变我们的生活方式。而机器人的有效操作和控制&#xff0c;离不开高效的遥操作系统。今天&#xff0c;我们要…

OHIF Viewer (3.9版本最新版) 适配移动端——最后一篇

根据一些调用资料和尝试,OHIF 的底层用的是Cornerstonejs ,这个是基于web端写的,如果说写在微信小程序里,确实有很多报错, 第一个问题就是 npm下载的依赖, 一、运行环境差异 微信小程序的运行环境与传统的 Node.js 环境有很大不同。小程序在微信客户端中运行,有严格的…

传输大咖38 | 如何应对汽车行业内外网文件交换挑战?

在数字化浪潮的推动下&#xff0c;汽车行业正经历着前所未有的变革。随着智能网联汽车的兴起&#xff0c;内外网文件的安全交换成为了一个亟待解决的问题。本文将探讨汽车行业在内外网文件交换中遇到的难题&#xff0c;并介绍镭速如何为这些问题提供有效的解决方案。 跨网文件传…

js封装上传组件 点击拖拽上传

效果图 上传组件代码 <template><div id"appp"><label for"fileInput" class"upload" dragover"fileDragover" drop"fileDrop" v-if"log ! checkLog"><!-- <div class"jia" …

CTFHub技能树-备份文件下载-.DS_Store

目录 方法一&#xff1a;直接访问/.DS_Store文件 方法二&#xff1a;使用Python-dsstore工具解析.Dsstore文件 方法三&#xff1a;直接使用crul访问/./.DS_Store文件 .DS_Store 是 Mac OS 保存文件夹的自定义属性的隐藏文件。通过.DS_Store可以知道这个目录里面所有文件的清单…

中西结合治疗帕金森怎么样

中西结合治疗帕金森怎么样 中西结合治疗帕金森病是一种综合运用中医和西医治疗方法的策略&#xff0c;旨在通过各自的优势来改善患者的症状、延缓病情进展&#xff0c;并提高生活质量。西医治疗帕金森病主要依赖药物治疗&#xff0c;如左旋多巴和多巴胺受体激动剂&#xff0c;…

基于web知识库管理系统设计与实现

第二章 系统分析 2.1 知识库管理系统可行性分析 可行性分析对系统的开发至关重要&#xff0c;可以大幅减少不必要的损失&#xff0c;保证系统开发的顺利进行。因此要对系统进行技术可行性、经济可行性两方面的系统可行性分析。 2.1.1技术可行性 随着网络技术日新月异的高速…

换热站可视化:提升热能管理效率

通过图扑 HT 搭建换热站可视化解决方案&#xff0c;实时监控与数据展示&#xff0c;优化热能分配与运行管理&#xff0c;提高能源效率并降低运营成本。

俄罗斯Ozon选品三要素,简单实用的选品方法

在 Ozon 上选品可以参考以下三个要素&#xff1a; 要素一&#xff1a;市场需求 关注热门品类&#xff1a;从 Ozon 的销售数据和市场趋势来看&#xff0c;像电子产品&#xff08;如手机、耳机、智能穿戴设备等&#xff09;、时尚服饰&#xff08;包括流行服装、鞋类、配饰&…