Debezium日常分享系列之:使用数据库中的数据流进行在线机器学习
- 一、背景介绍
- 二、数据集准备
- 三、使用 Apache Flink 进行分类
- 四、使用 Debezium 和 Kafka 作为源数据流
- 五、构建 Flink 流 k-means
- 六、评估模型
- 七、使用 Apache Spark 进行分类
- 八、定义数据流
- 九、定义和评估模型
- 十、结论
一、背景介绍
- 使用 Debezium 从数据库创建多个数据流,并使用其中一个流进行持续学习并改进我们的模型,使用第二个流对数据进行预测。
- 当模型不断改进或调整以适应最新的数据样本时,这种方法称为在线机器学习。在线学习仅适合某些用例,实现给定算法的在线变体可能具有挑战性,甚至是不可能的。然而,在可以进行在线学习的情况下,它成为一种非常强大的工具,因为它允许人们实时对数据的变化做出反应,并避免重新训练和重新部署新模型的需要,从而节省了时间。
- 硬件和运营成本。随着数据流变得越来越普遍,例如随着物联网的出现,可以预期在线学习将变得越来越流行。它通常非常适合在可能的用例中分析流数据。
- 目标不是为给定的用例构建最佳的模型,而是研究如何构建一个完整的管道,从将数据插入数据库到将数据传递到模型并将其用于模型训练和预测。为了简单起见,将使用机器学习教程中经常使用的另一个众所周知的数据样本。将探索如何使用 k 均值聚类算法的在线变体对各种鸢尾花进行分类。使用 Apache Flink 和 Apache Spark 来处理数据流。这两个框架都是非常流行的数据处理框架,并且包含一个机器学习库,除其他外,它还实现在线 k-means 算法。因此,我们可以专注于构建一个完整的管道,将数据从数据库传递到给定的模型中,实时处理它,而不必处理算法的实现细节。
二、数据集准备
- 将使用鸢尾花数据集,目标是根据鸢尾花的几个测量值来确定鸢尾花的种类:萼片长度、萼片宽度、花瓣长度和花瓣宽度。
- 该数据集可以从各种来源下载。可以利用这样一个事实:它已经在例如中进行了预处理。 scikit-learn 工具包并从那里使用它。每个样本行包含一个数据点(萼片长度、萼片宽度、花瓣长度和花瓣宽度)和标签。标签为数字 0、1 或 2,其中 0 代表 Iris setosa,1 代表 Iris versicolor,2 代表 Iris virginica。数据集很小 - 仅包含 150 个数据点。
- 将数据加载到数据库中时,将首先准备 SQL 文件,稍后将其传递到数据库。需要将原始数据样本分为三个子样本——两个用于训练,一个用于测试。初始训练将使用第一个训练数据样本。这个数据样本故意很小,以免在第一次测试模型时产生良好的预测,这样就可以看到当向模型提供更多数据时,模型的预测将如何实时增加。
- 可以使用随附演示存储库中的以下 Python 脚本来生成所有三个 SQL 文件。
$ ./iris2sql.py
- train1.sql 在启动时会自动加载到 Postgres 数据库中。 test.sql 和 train2.sql 稍后将手动加载到数据库中。
三、使用 Apache Flink 进行分类
首先,我们看一下如何在 Apache Flink 中进行在线鸢尾花分类和学习。下图描述了整个管道的高级架构。
我们将使用 Postgres 作为我们的源数据库。 Debezium 部署为 Kafka Connect 源连接器,跟踪数据库中的更改并创建从新插入的数据发送到 Kafka 的数据流。 Kafka 将这些流发送到 Apache Flink,后者采用流式 k-means 算法进行模型拟合和数据分类。测试数据流模型的预测作为另一个流生成并发送回 Kafka。
我们的数据库包含两个表。第一个存储我们的训练数据,第二个存储测试数据。因此,有两个数据流,每个数据流对应一张表——一个用于学习的数据流,一个需要分类的数据点。在实际应用中,可以仅使用一张表,或者相反,可以使用更多表。甚至可以部署更多 Debezium 连接器,从而合并来自多个数据库的数据。
四、使用 Debezium 和 Kafka 作为源数据流
Apache Flink 与 Kafka 具有出色的集成。可以传递 Debezium 记录,例如JSON 记录。对于创建 Flink 表,它甚至支持 Debezium 的记录格式,但对于流,需要提取部分 Debezium 消息,其中包含表中新存储的行。然而,这非常容易,因为 Debezium 提供了 SMT,提取新的记录状态 SMT,正是这样做的。完整的 Debezium 配置如下所示:
{
"name": "iris-connector-flink",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "flink",
"table.include.list": "public.iris_.*",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
该配置捕获公共架构中包含以 iris_ 前缀开头的表的所有表。由于将训练和测试数据存储在两个表中,因此分别创建了两个名为 flink.public.iris_train 和 flink.public.iris_test 的 Kafka 主题。 Flink 的 DataStreamSource 代表传入的数据流。当将记录编码为 JSON 时,它将是 JSON ObjectNode 对象的流。构建源流非常简单:
KafkaSource<ObjectNode> train = KafkaSource.<ObjectNode>builder()
.setBootstrapServers("kafka:9092")
.setTopics("flink.public.iris_train")
.setClientIdPrefix("train")
.setGroupId("dbz")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false)))
.build();
DataStreamSource<ObjectNode> trainStream = env.fromSource(train, WatermarkStrategy.noWatermarks(), "Debezium train");
Flink 主要运行在 Table 抽象对象上。此外,机器学习模型仅接受表格作为输入,并且预测也以表格形式生成。因此,必须首先将输入流转换为 Table 对象。首先将输入数据流转换为表行流。需要定义一个映射函数,该函数将返回一个 Row 对象,其中包含一个数据点的向量。由于k-means算法属于无监督学习算法,即模型不需要数据点对应的“正确答案”,因此可以从向量中跳过标签字段:
private static class RecordMapper implements MapFunction<ObjectNode, Row> {
@Override
public Row map(ObjectNode node) {
JsonNode payload = node.get("value").get("payload");
StringBuffer sb = new StringBuffer();
return Row.of(Vectors.dense(
payload.get("sepal_length").asDouble(),
payload.get("sepal_width").asDouble(),
payload.get("petal_length").asDouble(),
payload.get("petal_width").asDouble()));
}
}
Flink内部管道的各个部分可以在不同的工作节点上运行,因此,还需要提供有关表的类型信息。这样就可以创建表对象了:
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TypeInformation<?>[] types = {DenseVectorTypeInfo.INSTANCE};
String names[] = {"features"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> inputStream = trainStream.map(new RecordMapper()).returns(typeInfo);
Table trainTable = tEnv.fromDataStream(inputStream).as("features");
五、构建 Flink 流 k-means
一旦有了一个 Table 对象,就可以将它传递给模型。因此,创建一个并向其传递一个训练流以进行连续模型训练:
OnlineKMeans onlineKMeans = new OnlineKMeans()
.setFeaturesCol("features")
.setPredictionCol("prediction")
.setInitialModelData(tEnv.fromDataStream(env.fromElements(1).map(new IrisInitCentroids())))
.setK(3);
OnlineKMeansModel model = onlineKMeans.fit(trainTable);
为了让事情变得更简单,直接将所需簇的数量设置为 3,而不是通过挖掘数据(例如使用肘法)来找到最佳簇数。还为集群的中心设置了一些初始值,而不是使用随机数(Flink 提供了一个方便的方法 - KMeansModelData.generateRandomModelData() 如果你想尝试使用随机中心)。
为了获得测试数据的预测,再次需要将测试流转换为表。该模型将包含测试数据的表转换为包含预测的表。最后,将预测转换为流并保存,例如在Kafka主题中:
DataStream<Row> testInputStream = testStream.map(new RecordMapper()).returns(typeInfo);
Table testTable = tEnv.fromDataStream(testInputStream).as("features");
Table outputTable = model.transform(testTable)[0];
DataStream<Row> resultStream = tEnv.toChangelogStream(outputTable);
resultStream.map(new ResultMapper()).sinkTo(kafkaSink);
现在,已经准备好构建应用程序,并且几乎准备好将其提交给 Flink 执行。在此之前,需要先创建所需的 Kafka 主题。虽然主题可以为空,但 Flink 要求它们至少存在。由于在数据库启动时在 Postgres 训练表中包含了一小部分数据,因此 Debezium 在 Kafka Connect 中注册 Debezium Postgres 连接器时会创建相应的主题。由于测试数据表还不存在,需要在Kafka中手动创建主题:
$ docker compose -f docker-compose-flink.yaml exec kafka /kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic flink.public.iris_test
现在,准备向 Flink提交应用程序。
如果您不使用本演示源代码中提供的 Docker compose,请将 Flink ML 库包含在 Flink lib 文件夹中,因为 ML 库不是默认 Flink 发行版的一部分。
Flink 提供了友好的 UI,可以在 http://localhost:8081/ 上找到。在那里,除其他事项外,还可以检查您的工作状态,例如:作业执行计划以出色的图形表示形式:
六、评估模型
从用户的角度来看,与模型的所有交互都是通过将新记录插入数据库或读取带有预测的 Kafka 主题来发生的。由于在数据库启动时已经创建了一个非常小的初始训练数据样本,因此可以通过将测试数据样本插入数据库来直接检查模型预测:
$ psql -h localhost -U postgres -f postgres/iris_test.sql
插入会在 Kafka 中生成测试数据的即时数据流,将其传递到模型中并将预测发送回 iris_predictions Kafka 主题。在只有两个集群的非常小的数据集上训练模型时,预测不准确。下图显示了我们的初步预测:
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 2
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 2
[6.1, 2.8, 4.7, 1.2] is classified as 2
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 2
在我们的例子中,正确的答案应该是:
[5.4, 3.7, 1.5, 0.2] is 0
[4.8, 3.4, 1.6, 0.2] is 0
[7.6, 3.0, 6.6, 2.1] is 2
[6.4, 2.8, 5.6, 2.2] is 2
[6.0, 2.7, 5.1, 1.6] is 1
[5.4, 3.0, 4.5, 1.5] is 1
[6.7, 3.1, 4.7, 1.5] is 1
[5.5, 2.4, 3.8, 1.1] is 1
[6.1, 2.8, 4.7, 1.2] is 1
[4.3, 3.0, 1.1, 0.1] is 0
[5.8, 2.7, 3.9, 1.2] is 1
在比较结果时,由于初始样本训练数据的大小,只有 11 个数据点中的 5 个被正确分类。另一方面,由于并不是从完全随机的集群开始,所以预测也不是完全错误的。
当向模型提供更多训练数据时情况会发生什么变化:
$ psql -h localhost -U postgres -f postgres/iris_train2.sql
为了查看更新后的预测,再次将相同的测试数据样本插入数据库:
psql -h localhost -U postgres -f postgres/iris_test.sql
由于已经提供了所有三个类别,因此以下预测要好得多。还正确分类了 11 个数据点中的 7 个。
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 2
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 1
[6.1, 2.8, 4.7, 1.2] is classified as 2
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 1
由于整个数据样本非常小,为了进一步的模型训练,可以重复使用第二个训练数据样本:
$ psql -h localhost -U postgres -f postgres/iris_train2.sql
$ psql -h localhost -U postgres -f postgres/iris_test.sql
这导致以下预测。
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 1
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 1
[6.1, 2.8, 4.7, 1.2] is classified as 1
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 1
现在发现 11 个数据点中有 9 个被正确分类。虽然这仍然不是一个出色的结果,但预计结果只能部分准确,因为这只是一个预测。这里的主要动机是展示整个流程并证明该模型可以在添加新数据时改进预测,而无需重新训练和重新部署模型。
七、使用 Apache Spark 进行分类
从用户的角度来看,Apache Spark 与 Flink 非常相似,实现也非常相似。
Spark 有两种流模型:较旧的 DStreams(现在处于遗留状态)和较新且推荐的结构化流模型。但是,由于 Spark ML 库中包含的流式 k-means 算法仅适用于 DStream,因此为简单起见,本示例中使用了 DStream。更好的方法是使用结构化流并自己实现流 k-means。
Spark 支持使用 DStreams 从 Kafka 进行流式传输。然而,不支持将 DStream 写回 Kafka,尽管这是可能的,但并不简单。
同样,为了简单起见,跳过最后部分,仅将预测写入控制台,而不是将其写回 Kafka。管道的总体情况如下所示:
八、定义数据流
与 Flink 类似,从 Kafka 流创建 Spark 流非常简单,并且大多数参数都是不言自明的:
Set<String> trainTopic = new HashSet<>(Arrays.asList("spark.public.iris_train"));
Set<String> testTopic = new HashSet<>(Arrays.asList("spark.public.iris_test"));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "dbz");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> trainStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(trainTopic, kafkaParams));
JavaDStream<LabeledPoint> train = trainStream.map(ConsumerRecord::value)
.map(SparkKafkaStreamingKmeans::toLabeledPointString)
.map(LabeledPoint::parse);
在最后一行,将 Kafka 流转换为标记点流,Spark ML 库使用它来处理其 ML 模型。标记点应为格式化为数据点标签的字符串,数据点标签与空格分隔的数据点值之间用逗号分隔。所以地图函数看起来像这样:
private static String toLabeledPointString(String json) throws ParseException {
JSONParser jsonParser = new JSONParser();
JSONObject o = (JSONObject)jsonParser.parse(json);
return String.format("%s, %s %s %s %s",
o.get("iris_class"),
o.get("sepal_length"),
o.get("sepal_width"),
o.get("petal_length"),
o.get("petal_width"));
}
k 均值是一种无监督算法并且不使用数据点标签,这一点仍然适用。但是,将它们传递给 LabeledPoint 类很方便,稍后我们可以将它们与模型预测一起显示。
我们再链接一个映射函数来解析字符串并从中创建一个标记数据点。在本例中,它是 Spark LabeledPoint 的内置函数。
与Flink相反,Spark不需要Kafka主题预先存在,因此在部署模型时,不必创建主题。一旦创建了包含测试数据的表并填充了数据,就可以让 Debezium 创建它们。
九、定义和评估模型
定义流式 k-means 模型与 Flink 非常相似:
StreamingKMeans model = new StreamingKMeans()
.setK(3)
.setInitialCenters(initCenters, weights);
model.trainOn(train.map(lp -> lp.getFeatures()));
另外,在这种情况下,直接将簇数设置为3,并为簇提供相同的初始中心点。也只传递数据点进行训练,而不传递标签。
如上所述,我们可以使用标签将它们与预测一起显示:
JavaPairDStream<Double, Vector> predict = test.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()));
model.predictOnValues(predict).print(11);
将 11 个流元素打印到带有预测的结果流上的控制台,因为这是测试样本的大小。与 Flink 一样,在非常小的数据样本上进行初始训练后的结果可能会更好。元组中的第一个数字是数据点标签,而第二个数字是模型所做的相应预测:
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,0)
spark_1 | (1.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,0)
spark_1 | (0.0,0)
spark_1 | (1.0,0)
然而,当提供更多的训练数据时,预测会更好:
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,1)
spark_1 | (0.0,0)
spark_1 | (1.0,0)
如果再次传递第二个训练数据样本进行训练,模型会对整个测试样本做出正确的预测:
---
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (0.0,0)
spark_1 | (1.0,1)
----
预测是 k-means 算法创建的簇的编号,与数据样本中的标签无关。这意味着例如(0.0,1) 不一定是错误的预测。标签为 0 的数据点可能会被分配到正确的簇,但是 Spark 在内部将其标记为簇号 1。在评估模型时需要牢记这一点。
因此,与 Flink 类似,当传递更多训练数据时,会得到更好的结果,而无需重新训练和重新部署模型。在这种情况下,得到了比 Flink 模型更好的结果。
十、结论
- 展示了如何将数据库中的数据作为数据流实时传递到 Apache Flink 和 Apache Spark。在这两种情况下,集成都很容易设置并且运行良好。
- 示例中进行了演示,该示例允许我们使用在线学习算法,即在线 k-means 算法,来突出数据流的强大功能。在线机器学习使我们能够对数据流进行实时预测,并在新的训练数据到达时立即改进或调整模型。模型调整不需要在单独的计算集群上重新训练任何模型并重新部署新模型,从而使 ML-ops 更加简单且更具成本效益。