原文地址: https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
使用 Hibernate 和 Debezium 实现聚合视图
2018 年 9 月 20 日 作者: Gunnar Morling
讨论 实例
数据更改后更新外部全文搜索索引(例如Elasticsearch)是更改数据捕获 (CDC) 的一个非常流行的用例。
正如我们之前在博客文章中讨论的那样,Debezium 的 CDC 源连接器和 Confluence 的Elasticsearch 接收器连接器的组合可以直接捕获 MySQL、Postgres 等中的数据更改,并将它们近乎实时地推送到 Elasticsearch 。这会导致源数据库中的表与 Elasticsearch 中相应的搜索索引之间形成 1:1 的关系,这对于许多用例来说非常适合。
但如果您想将整个聚合放入单个索引中,它会变得更具挑战性。例如,客户及其所有地址;这些通常存储在 RDBMS 中的两个单独的表中,通过外键链接,而您希望 Elasticsearch 中只有一个索引,其中包含嵌入了地址的客户文档,使您能够根据以下条件高效地搜索客户:他们的地址。
继我们最近描述的基于 KStreams 的解决方案之后,我们希望在这篇文章中提出一种替代方案,用于具体化由应用程序层驱动的此类聚合视图。
概述
这个想法是在原始数据发生更改时在源数据库中的单独表中具体化视图。
聚合被序列化为 JSON 结构(自然可以表示任何嵌套对象结构)并存储在特定的表中。这是在更改数据的实际事务中完成的,这意味着聚合视图始终与主数据一致。特别是,这种方法不容易像上面链接的帖子中讨论的基于 KStreams 的解决方案那样暴露中间聚合。
整体架构如下图:
图片来自于官网
将物化聚合视图流式传输到 Elasticsearch
这里聚合视图通过Hibernate ORM的一个小扩展实现,它在源数据库中存储 JSON 聚合(注意“聚合视图”在概念上可以被认为与不同 RDBMS 中已知的“物化视图”相同,如它们实现了“连接”操作的结果,但从技术上讲,我们不使用后者来存储聚合视图,而是使用常规表)。然后 Debezium 捕获对该聚合表的更改,并将其传输到每种聚合类型的一个主题。Elasticsearch接收器连接器可以订阅这些主题并更新相应的全文索引。
您可以在我们的示例存储库中找到此想法的概念验证实现(即 Hibernate 扩展和相关代码)。当然,总体思路并不限于 Hibernate ORM 或 JPA,您可以使用用于访问数据的任何其他 API 来实现类似的功能。
通过 Hibernate ORM 创建聚合视图
接下来,我们假设我们在数据库中保存一个简单的域模型(包含一个Customer实体和一些相关的模型,例如Address,(客户)等)。使用 Hibernate 可以让我们使用Hibernate 事件监听器使Category聚合的创建对实际应用程序代码完全透明。由于其可扩展的架构,我们只需将此类侦听器添加到类路径即可将其插入 Hibernate,在引导实体管理器/会话工厂时将自动从该类路径中获取它。
我们的示例侦听器对注释做出反应,@MaterializeAggregate该注释标记了那些应该是物化聚合根的实体类型。
@Entity
@MaterializeAggregate(aggregateName=“customers-complete”)
public class Customer {
@Id
private long id;
private String firstName;
@OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
private Set<Address> addresses;
@ManyToOne
private Category category;
...
}
现在,如果通过 Hibernate 插入、更新或删除带有 注释的任何实体@MaterializeAggregate,侦听器将启动并具体化聚合根(客户)及其关联实体(地址、类别)的 JSON 视图。
在底层,Jackson API用于将模型序列化为 JSON。这意味着您可以使用其任何注释来自定义 JSON 输出,例如@JsonIgnore排除Addressto 的逆关系Customer:
@Entity
public class Address {
@Id
private long id;
@ManyToOne
@JoinColumn(name = "customer_id")
@JsonIgnore
private Customer customer;
private String street;
private String city;
...
}
请注意,Address它本身没有标记为@MaterializeAggregate,即它本身不会具体化为聚合视图。
使用 JPAEntityManager插入或更新一些客户后,让我们看一下aggregates侦听器填充的表(为了简洁起见,省略了值模式):
select * from aggregates;
| rootType | keySchema | rootId | materialization | valueSchema |
| customers-complete
| {
“schema” : {
“type” : “struct”,
“fields” : [ {
“type” : “int64”,
“optional” : false,
“field” : “id”
} ],
“optional” : false,
“name” : “customers-complete.Key”
}
}
| { “id” : 1004 }
| { “schema” : { … } }
| {
“id” : 1004,
“firstName” : “Anne”,
“lastName” : “Kretchmar”,
“email” : “annek@noanswer.org”,
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category” : {
“id” : 100001,
“name” : “Retail”
},
“addresses” : [ {
“id” : 16,
“street” : “1289 University Hill Road”,
“city” : “Canehill”,
“state” : “Arkansas”,
“zip” : “72717”,
“type” : “SHIPPING”
} ]
} |
该表包含以下列:
rootType@MaterializeAggregate:注释中给出的聚合名称
rootId:聚合的 id 作为序列化 JSON
materialization:聚合本身作为序列化 JSON;在这种情况下,客户及其地址、类别等。
keySchema:行键的 Kafka Connect 架构
valueSchema:物化的 Kafka Connect 模式
让我们稍微讨论一下两个模式列。JSON 本身就其支持的数据类型而言非常有限。例如,我们会丢失有关数字字段的值范围(int 与 long 等)的信息,而无需任何附加信息。因此,侦听器从实体模型中派生出键和聚合视图的相应架构信息,并将其存储在聚合记录中。
现在 Jackson 本身只支持 JSON Schema,这对于我们的目的来说有点太有限了。因此,示例实现为 Jackson 的模式系统提供了自定义序列化器,这允许我们发出 Kafka Connect 的模式表示(具有更精确的类型信息)而不是普通的 JSON 模式。当我们想要将键和值的基于字符串的 JSON 表示形式扩展为正确类型的 Kafka Connect 记录时,这将在下面派上用场。
捕获聚合表的更改
我们现在拥有一种机制,每当通过 Hibernate 更改应用程序数据时,该机制可以透明地将聚合持久保存到源数据库中的单独表中。请注意,这发生在源事务的边界内,因此如果由于某种原因回滚同一事务,聚合视图也不会更新。
Hibernate 侦听器在编写聚合视图时使用插入或更新语义,即对于给定的聚合根,聚合表中始终存在一个反映其当前状态的对应条目。如果删除聚合根实体,侦听器也会从聚合表中删除该条目。
现在让我们设置 Debezium 来捕获对aggregates表的任何更改:
curl -i -X POST
-H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/ -d @- <<-EOF
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”,
“database.whitelist”: “inventory”,
“table.whitelist”: “.*aggregates”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}
EOF
这会将 MySQL 连接器注册到“inventory”数据库(我们使用Debezium 教程中模式的扩展版本),捕获对“aggregates”表的任何更改。
扩展 JSON
如果我们现在要浏览相应的 Kafka 主题,我们会看到表中所有更改的已知 Debezium 格式的数据更改事件aggregates。
不过,具有记录“之后”状态的“物化”字段仍然是包含 JSON 字符串的单个字段。我们更希望拥有的是强类型的 Kafka Connect 记录,其模式准确地描述了聚合结构及其字段的类型。为此,示例项目提供了一个 SMT(单消息转换),它采用 JSON 物化和相应的内容valueSchema,并将其转换为成熟的 Kafka Connect 记录。对键也是如此。DELETE 事件被重写为墓碑事件。最后,SMT 将每条记录重新路由到以聚合根命名的主题,允许消费者仅订阅特定聚合类型的更改。
因此,让我们在注册 Debezium CDC 连接器时添加该 SMT:
…
“transforms”:“expandjson”,
“transforms.expandjson.type”:“io.debezium.aggregation.smt.ExpandJsonSmt”,
…
现在浏览“customers-complete”主题时,我们将看到我们期望的强类型 Kafka Connect 记录:
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “int64”,
“optional”: false,
“field”: “id”
}
],
“optional”: false,
“name”: “customers-complete.Key”
},
“payload”: {
“id”: 1004
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [ … ],
“optional”: true,
“name”: “urn:jsonschema:com:example:domain:Customer”
},
“payload”: {
“id”: 1004,
“firstName”: “Anne”,
“lastName”: “Kretchmar”,
“email”: “annek@noanswer.org”,
“active”: true,
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category”: {
“id”: 100001,
“name”: “Retail”
},
“addresses”: [
{
“id”: 16,
“street”: “1289 University Hill Road”,
“city”: “Canehill”,
“state”: “Arkansas”,
“zip”: “72717”,
“type”: “LIVING”
}
]
}
}
要确认这些是实际键入的 Kafka Connect 记录,而不仅仅是单个 JSON 字符串字段,您可以使用Avro 消息转换器并检查架构注册表中的消息架构。
将聚合消息放入 Elasticsearch
最后缺少的一步是注册 Confluence Elasticsearch 接收器连接器,将其与“customers-complete”主题挂钩,并让它将任何更改推送到相应的索引:
curl -i -X POST
-H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/ -d @- <<-EOF
{
“name”: “es-customers”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“tasks.max”: “1”,
“topics”: “customers-complete”,
“connection.url”: “http://elastic:9200”,
“key.ignore”: “false”,
“schema.ignore” : “false”,
“behavior.on.null.values” : “delete”,
“type.name”: “customer-with-addresses”,
“transforms” : “key”,
“transforms.key.type”: “org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.key.field”: “id”
}
}
EOF
这使用 Connect 的ExtractField转换从键结构中获取实际的 id 值,并将其用作相应 Elasticsearch 文档的键。指定“behavior.on.null.values”选项将使连接器在遇到逻辑删除消息(即带有键但没有值的消息)时从索引中删除相应的文档。
最后,我们可以使用 Elasticsearch REST API 来浏览索引,当然还可以使用其强大的全文查询语言通过地址或嵌入到聚合结构中的任何其他属性来查找客户:
curl -X GET -H “Accept:application/json”
http://localhost:9200/customers-complete/_search?pretty
{
“_shards”: {
“failed”: 0,
“successful”: 5,
“total”: 5
},
“hits”: {
“hits”: [
{
“_id”: “1004”,
“_index”: “customers-complete”,
“_score”: 1.0,
“_source”: {
“active”: true,
“addresses”: [
{
“city”: “Canehill”,
“id”: 16,
“state”: “Arkansas”,
“street”: “1289 University Hill Road”,
“type”: “LIVING”,
“zip”: “72717”
}
],
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category”: {
“id”: 100001,
“name”: “Retail”
},
“email”: “annek@noanswer.org”,
“firstName”: “Anne”,
“id”: 1004,
“lastName”: “Kretchmar”,
“scores”: [],
“someBlob”: null,
“tags”: []
},
“_type”: “customer-with-addresses”
}
],
“max_score”: 1.0,
“total”: 1
},
“timed_out”: false,
“took”: 11
}
现在您已经看到了:客户的完整数据,包括他们的地址、类别、标签等,在 Elasticsearch 中具体化为单个文档。如果您使用 JPA 更新客户,您将看到索引中的数据近乎实时地相应更新。
优点和缺点
那么,与基于 KStreams 的方法相比,这种从多个源表实现聚合的方法有哪些优点和缺点呢?
最大的优点是一致性和事务边界意识,而建议形式的基于 KStreams 的解决方案很容易暴露中间聚合。例如,如果您要存储一个客户和三个地址,则流式查询可能会首先创建客户和先插入的两个地址的聚合,然后不久创建包含所有三个地址的完整聚合。此处讨论的方法并非如此,因为您只能将完整的聚合流式传输到 Kafka。此外,这种方法感觉更“轻量级”,即一个简单的标记注释(与一些用于微调发出的 JSON 结构的 Jackson 注释一起)就足以实现域模型中的聚合,
通过应用程序层驱动聚合的缺点是它不能完全不知道您访问主要数据的方式。如果绕过应用程序,例如直接在数据库中修补数据,这些更新自然会丢失,从而需要刷新受影响的聚合。尽管这也可以通过更改数据捕获和 Debezium 来完成:源表的更改事件可以由应用程序本身捕获和使用,从而允许它在外部数据更改后重新实现聚合。您还可能会争辩说,在源事务中运行 JSON 序列化以及在源数据库中存储聚合会产生一些开销。不过,这通常是可以接受的。
另一个要问的问题是,与简单地将 REST 请求发布到 Elasticsearch 相比,在中间聚合表上使用更改数据捕获有何优势。答案是大幅提高的稳健性和容错能力。如果由于某种原因无法访问 Elasticsearch 集群,一旦接收器再次启动,Kafka 和 Kafka Connect 的机制将确保最终传播任何更改事件。Elasticsearch 之外的其他消费者也可以订阅聚合主题,日志可以从头开始重播等。
请注意,虽然我们主要讨论使用 Elasticsearch 作为数据接收器,但还有其他支持复杂结构化记录的数据存储和连接器。一个例子是 MongoDB 和Hans-Peter Grahsl 维护的接收器连接器,可以使用该连接器将客户聚合接收到 MongoDB 中,例如通过单个主键查找即可高效检索客户及其所有相关数据。
外表
Hibernate ORM 扩展以及本文中讨论的 SMT 可以在我们的示例存储库中找到。目前它们应该被认为处于“概念验证”级别。
话虽这么说,我们正在考虑使其成为一个合适的 Debezium 组件,让您只需引入这个新组件即可在基于 Hibernate 的应用程序中采用这种聚合方法。不过,为此我们必须首先改进一些事情。最重要的是,需要一个 API,它可以让您按需(重新)创建聚合,例如针对现有数据或通过 Criteria API / JPQL 批量更新更新的数据(听众会错过)。如果任何引用的实体发生变化,聚合也应该自动重新创建(在当前的 PoC 中,只有对客户实例本身的更改才会触发其聚合视图的重建,但不会对其地址之一进行更改)。
如果您喜欢这个想法,请告诉我们,以便我们评估对此的普遍兴趣。另外,如果您有兴趣为 Debezium 项目做出贡献,这将是一个很好的项目。期待您的来信,例如在下面的评论部分或我们的邮件列表中。
非常感谢 Hans-Peter Grahsl 对本文早期版本的反馈!