前言
大数据应用开发——实时数据采集
大数据应用开发——实时数据处理
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
并在HBase中进行备份
大数据应用开发——数据可视化
hadoop,zookeeper,kafka,flink要开启
目录
题目
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
题目
按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
在IDEA下用maven创建flink项目:
# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号
# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号
修改pox.xml文件,将flink-connector-kafka_...依赖移出来
demo包下有两个.java
PS:一个用于批处理,另一个用于流处理
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置发送的
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("master:9092")
.setTopics("order")
.setGroupId("my_group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 配置接收的
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("master:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("dwd_order")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.NONE)
.build();
// 指定的源创建一个数据流
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 将数据里的'符号去掉
DataStream<String> text = stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.replace("'","");
}
});
// 打印处理结果到控制台
text.print();
// 发送
text.sinkTo(sink);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
将代码打包成.jar,可以先clean,再package
生成位置在当前项目位置/target/项目名称-...jar
放进主节点
# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar
最后,可以用flink控制台或kafka-console-consumer.sh查看