Debezium系列之:Outbox Event Router
- 一、认识Outbox Event Router
- 二、使用发件箱模式进行可靠的微服务数据交换
- 三、双写问题
- 四、发件箱模式
- 五、基于变更数据捕获的实现
- 六、发件箱表
- 七、发送事件到发件箱
- 八、注册 Debezium 连接器
- 九、主题路由
- 十、Apache Kafka 中的事件
- 十一、消费服务中的重复检测
- 十二、概括
- 十三、发件箱消息示例
- 十四、基本发件箱表
- 十五、基本配置
- 十六、有选择地应用转换的选项
- 十七、使用 Avro 作为负载格式
- 十八、发出带有附加字段的消息
- 十九、将转义的 JSON 字符串扩展为 JSON
- 二十、配置选项
一、认识Outbox Event Router
发件箱模式是一种在多个(微)服务之间安全可靠地交换数据的方式。发件箱模式实现避免了服务的内部状态(通常保存在其数据库中)与需要相同数据的服务使用的事件中的状态之间的不一致。
要在 Debezium 应用程序中实现发件箱模式,请将 Debezium 连接器配置为:
- 捕获发件箱表中的更改
- 应用 Debezium 发件箱事件路由器单消息转换 (SMT)
配置为应用发件箱 SMT 的 Debezium 连接器应仅捕获发件箱表中发生的更改。
只有当每个发件箱表具有相同的结构时,连接器才能捕获多个发件箱表中的更改。
请参阅使用发件箱模式进行可靠的微服务数据交换以了解发件箱模式为何有用及其工作原理。
有关可以运行的示例,请参阅 Debezium 示例存储库中的发件箱模式演示。它包括一个示例,说明如何配置 Debezium 连接器以运行发件箱事件路由器 SMT。
注意:
- 发件箱事件路由器 SMT 与 MongoDB 连接器不兼容。
- MongoDB 用户可以运行 MongoDB 发件箱事件路由器 SMT。
二、使用发件箱模式进行可靠的微服务数据交换
作为其业务逻辑的一部分,微服务通常不仅需要更新自己的本地数据存储,而且还需要将发生的数据更改通知其他服务。发件箱模式描述了一种让服务以安全一致的方式执行这两项任务的方法;它为源服务提供即时的“读你自己写的”语义,同时提供跨服务边界的可靠、最终一致的数据交换。
更新(2019 年 9 月 13 日):为了简化发件箱模式的使用,Debezium 现在提供了一个随时可用的 SMT 来路由发件箱事件。
如果你已经构建了几个微服务,你可能会同意它们最难的部分是数据:微服务不是孤立存在的,它们经常需要在彼此之间传播数据和数据更改。
例如,考虑管理采购订单的微服务:下新订单时,有关该订单的信息可能必须中继到发货服务(因此它可以组装一个或多个订单的发货)和客户服务(因此它可以根据新订单更新客户的总信用余额等内容)。
有不同的方法可以让订单服务知道另外两个关于新采购订单的信息;例如它可以调用这些服务提供的一些 REST、grpc 或其他(同步)API。但是,这可能会产生一些不需要的耦合:发送服务必须知道要调用哪些其他服务以及在哪里可以找到它们。它还必须为这些服务暂时不可用做好准备。 Istio 等服务网格可以提供请求路由、重试、断路器等功能,从而发挥作用。
任何同步方法的一般问题是,一个服务在没有它调用的其他服务的情况下无法真正运行。虽然缓冲和重试在其他服务只需要通知某些事件的情况下可能会有所帮助,但如果服务实际上需要查询其他服务以获取信息,则情况并非如此。例如,下采购订单时,订单服务可能需要从库存服务中获取所购买商品的库存次数信息。
这种同步方法的另一个缺点是它缺乏可重玩性,即新消费者在事件发送后到达并且仍然能够从头开始消费整个事件流的可能性。
这两个问题都可以通过使用异步数据交换方法来解决:即让订单、库存和其他服务通过持久消息日志(如 Apache Kafka)传播事件。通过订阅这些事件流,每个服务都会收到有关其他服务数据更改的通知。它可以对这些事件做出反应,并在需要时使用根据自己的需要定制的表示在自己的数据存储中创建该数据的本地表示。例如,此类视图可能会被非规范化以有效支持特定的访问模式,或者它可能仅包含与消费服务相关的原始数据的一个子集。
持久日志还支持重玩性,即可以根据需要添加新的消费者,启用您最初可能没有想到的用例,并且无需触及源服务。例如。考虑一个数据仓库,它应该保存关于所有曾经下过的订单的信息,或者一些基于 Elasticsearch 的采购订单的全文搜索功能。一旦采购订单事件在 Kafka 主题中(Kafka 主题的保留策略设置可用于确保事件在给定用例和业务需求需要时保留在主题中),新消费者可以订阅、处理主题从一开始就具体化微服务数据库、搜索索引、数据仓库等中所有数据的视图。
处理主题增长
根据数据量(记录的数量和大小、更改频率),将事件保留在主题中很长时间甚至无限期可能可行也可能不可行。通常,在给定时间点之后,从业务角度来看,与给定数据项(例如特定采购订单)有关的一些甚至所有事件可能符合删除条件。
三、双写问题
为了提供它们的功能,微服务通常会有自己的本地数据存储。例如,订单服务可以使用关系数据库来保存有关采购订单的信息。下新订单时,这可能会导致在服务数据库中的表 PurchaseOrder 中执行 INSERT 操作。同时,该服务可能希望向 Apache Kafka 发送有关新订单的事件,以便将该信息传播给其他感兴趣的服务。
但是,简单地发出这两个请求可能会导致潜在的不一致。原因是我们不能拥有一个跨越服务数据库和 Apache Kafka 的共享事务,因为后者不支持在分布式 (XA) 事务中登记。因此,在不幸的情况下,我们最终可能会在本地数据库中保留新的采购订单,但没有将相应的消息发送到 Kafka(例如,由于某些网络问题)。或者,反过来,我们可能已将消息发送到 Kafka,但无法将采购订单保存在本地数据库中。这两种情况都是不可取的;这可能会导致看似已成功下达的订单无法发货。或者创建了一个发货,但是在订单服务本身中没有关于相应采购订单的踪迹。
那么如何避免这种情况呢?答案是只修改两种资源中的一种(数据库或 Apache Kafka),并以最终一致的方式驱动基于此的第二种资源的更新。让我们首先考虑只写入 Apache Kafka 的情况。
当收到新的采购订单时,订单服务不会同步插入其数据库;相反,它只会将描述新订单的事件发送到 Kafka 主题。因此一次只能修改一个资源,如果出现问题,我们会立即发现并向订单服务的调用者报告请求失败。
同时,该服务本身将订阅该 Kafka 主题。这样,当新消息到达主题时它会收到通知,并且它可以将新的采购订单保存在其数据库中。不过,这里有一个微妙的挑战,那就是缺乏“读你自己写”的语义。例如。让我们假设订单服务也有一个 API 用于搜索给定客户的所有采购订单。在下新订单后立即调用该 API 时,由于处理来自 Kafka 主题的消息的异步性质,采购订单可能尚未保存在服务的数据库中,因此不会被该查询返回。这可能会导致非常混乱的用户体验,因为用户可能会错过他们购物历史中新下的订单。有一些方法可以处理这种情况,例如该服务可以将新下达的采购订单保存在内存中,并根据该订单回答后续查询。尽管在实现更复杂的查询或考虑到订单服务可能还包含集群设置中的多个节点时,这很快就会变得非常重要,这将需要在集群内传播该数据。
现在,当仅同步写入数据库并基于此驱动将消息导出到 Apache Kafka 时,情况会怎样?这就是发件箱模式的用武之地。
四、发件箱模式
这种方法的想法是在服务的数据库中有一个“发件箱”表。当接收到放置采购订单的请求时,不仅执行了对 PurchaseOrder 表的 INSERT 操作,而且作为同一事务的一部分,还将表示要发送的事件的记录插入到该发件箱表中。
该记录描述了服务中发生的事件,例如,它可以是一个 JSON 结构,表示已下达新采购订单的事实,包括订单本身的数据、订单行以及上下文信息,例如使用案例标识符。通过发件箱表中的记录显式发出事件,可以确保事件的结构适合外部消费者。这也有助于确保事件消费者不会在例如更改内部域模型或 PurchaseOrder 表时中断。
一个异步进程监视该表的新条目。如果有,它将事件作为消息传播到 Apache Kafka。这为我们提供了一个非常好的特性平衡:通过同步写入 PurchaseOrder 表,源服务受益于“读取您自己的写入”语义。一旦第一个事务被提交,随后的采购订单查询将返回新的持久化订单。同时,我们通过 Apache Kafka 将可靠的、异步的、最终一致的数据传播到其他服务。
现在,发件箱模式实际上并不是一个新想法。它已经使用了相当长的一段时间。事实上,即使在使用实际上可以参与分布式事务的 JMS 样式的消息代理时,它也可能是避免任何耦合和远程资源(例如消息代理)停机的潜在影响的更好选择。您还可以在 Chris Richardson 的优秀 microservices.io 网站上找到该模式的描述。
尽管如此,该模式得到的关注远低于其应有的水平,但它在微服务环境中尤其有用。正如我们将看到的,发件箱模式可以使用变更数据捕获和 Debezium 以非常优雅和高效的方式实现。下面,我们就来探讨一下。
五、基于变更数据捕获的实现
基于日志的变更数据捕获 (CDC) 非常适合捕获发件箱表中的新条目并将它们流式传输到 Apache Kafka。与任何基于轮询的方法相反,事件捕获以非常低的开销近乎实时地发生。 Debezium 带有用于多个数据库的 CDC 连接器,例如 MySQL、Postgres 和 SQL Server。以下示例将使用 Postgres 的 Debezium 连接器。
可以在 GitHub 上找到示例的完整源代码。有关构建和运行示例代码的详细信息,请参阅 README.md。该示例以两个微服务为中心,订单服务和发货服务。两者都是用 Java 实现的,使用 CDI 作为组件模型,使用 JPA/Hibernate 访问它们各自的数据库。订单服务在 WildFly 上运行,并公开了一个简单的 REST API,用于下采购订单和取消特定订单行。它使用 Postgres 数据库作为其本地数据存储。发货服务基于Thorntail;通过 Apache Kafka,它接收订单服务导出的事件,并在自己的 MySQL 数据库中创建相应的发货条目。 Debezium 跟踪订单服务的 Postgres 数据库的事务日志(“预写日志”,WAL),以便捕获发件箱表中的任何新事件并将它们传播到 Apache Kafka。
解决方案的整体架构如下图所示:
请注意,该模式与这些特定的实现选择没有任何关系。它同样可以使用替代技术来实现,例如 Spring Boot(例如,利用 Spring Data 对域事件的支持)、纯 JDBC 或完全不同于 Java 的其他编程语言。
现在让我们仔细看看该解决方案的一些相关组件。
六、发件箱表
发件箱表驻留在订单服务的数据库中,具有以下结构:
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb | not null
它的列是这些:
- id:每条消息的唯一id;消费者可以使用它来检测任何重复事件,例如失败后重新启动以读取消息时。创建新事件时生成。
- aggregatetype:与给定事件相关的聚合根的类型;这个想法是,基于领域驱动设计的相同概念,导出的事件应该引用一个聚合(“可以被视为一个单元的域对象集群”),其中聚合根提供唯一的入口点用于访问聚合中的任何实体。例如,这可以是“采购订单”或“客户”。该值将用于将事件路由到 Kafka 中的相应主题,因此所有与采购订单相关的事件都有一个主题,所有与客户相关的事件都有一个主题等。请注意,与包含在其中的子实体相关的事件一个这样的聚合应该使用相同的类型。所以例如表示取消单个订单行(采购订单聚合的一部分)的事件也应使用其聚合根的类型“order”,确保此事件也将进入“order”Kafka 主题。
- aggregateid:受给定事件影响的聚合根的id;例如,这可以是采购订单的 ID 或客户 ID;与聚合类型类似,属于聚合中包含的子实体的事件应使用包含聚合根的 id,例如订单行取消事件的采购订单 ID。此 id 将用作稍后 Kafka 消息的密钥。这样,与一个聚合根或其包含的任何子实体有关的所有事件都将进入该 Kafka 主题的同一分区,这确保了该主题的消费者将消费与该聚合中的一个和同一聚合相关的所有事件生产时的确切顺序。
- 类型:事件的类型,例如“订单已创建”或“订单行已取消”。允许消费者触发合适的事件处理程序。
- 有效负载:具有实际事件内容的 JSON 结构,例如包含采购订单、有关采购商的信息、包含的订单行、它们的价格等。
七、发送事件到发件箱
为了将事件“发送”到发件箱,订单服务中的代码通常可以只对发件箱表执行 INSERT 操作。然而,使用稍微抽象一点的 API 是个好主意,这样可以在需要时更轻松地调整发件箱的实现细节。 CDI 事件对此非常有用。它们可以在应用程序代码中引发,并由发件箱事件发送器同步处理,发件箱事件发送器将执行所需的 INSERT 操作到发件箱表中。
所有发件箱事件类型都应实现以下契约,类似于之前显示的发件箱表的结构:
public interface ExportedEvent {
String getAggregateId();
String getAggregateType();
JsonNode getPayload();
String getType();
}
为了产生这样的事件,应用程序代码使用注入的 Event 实例,例如在 OrderService 类中:
@ApplicationScoped
public class OrderService {
@PersistenceContext
private EntityManager entityManager;
@Inject
private Event<ExportedEvent> event;
@Transactional
public PurchaseOrder addOrder(PurchaseOrder order) {
order = entityManager.merge(order);
event.fire(OrderCreatedEvent.of(order));
event.fire(InvoiceCreatedEvent.of(order));
return order;
}
@Transactional
public PurchaseOrder updateOrderLine(long orderId, long orderLineId,
OrderLineStatus newStatus) {
// ...
}
}
在 addOrder() 方法中,JPA 实体管理器用于将传入订单保存在数据库中,注入的事件用于触发相应的 OrderCreatedEvent 和 InvoiceCreatedEvent。同样,请记住,尽管有“事件”的概念,但这两件事发生在同一个事务中。即在这个交易中,三个记录将被插入到数据库中:一个在采购订单表中,两个在发件箱表中。
实际的事件实现是直截了当的;例如,这是 OrderCreatedEvent 类:
public class OrderCreatedEvent implements ExportedEvent {
private static ObjectMapper mapper = new ObjectMapper();
private final long id;
private final JsonNode order;
private OrderCreatedEvent(long id, JsonNode order) {
this.id = id;
this.order = order;
}
public static OrderCreatedEvent of(PurchaseOrder order) {
ObjectNode asJson = mapper.createObjectNode()
.put("id", order.getId())
.put("customerId", order.getCustomerId())
.put("orderDate", order.getOrderDate().toString());
ArrayNode items = asJson.putArray("lineItems");
for (OrderLine orderLine : order.getLineItems()) {
items.add(
mapper.createObjectNode()
.put("id", orderLine.getId())
.put("item", orderLine.getItem())
.put("quantity", orderLine.getQuantity())
.put("totalPrice", orderLine.getTotalPrice())
.put("status", orderLine.getStatus().name())
);
}
return new OrderCreatedEvent(order.getId(), asJson);
}
@Override
public String getAggregateId() {
return String.valueOf(id);
}
@Override
public String getAggregateType() {
return "Order";
}
@Override
public String getType() {
return "OrderCreated";
}
@Override
public JsonNode getPayload() {
return order;
}
}
请注意 Jackson 的 ObjectMapper 如何用于创建事件负载的 JSON 表示。
现在让我们看一下使用任何触发的 ExportedEvent 并对发件箱表执行相应写入的代码:
@ApplicationScoped
public class EventSender {
@PersistenceContext
private EntityManager entityManager;
public void onExportedEvent(@Observes ExportedEvent event) {
OutboxEvent outboxEvent = new OutboxEvent(
event.getAggregateType(),
event.getAggregateId(),
event.getType(),
event.getPayload()
);
entityManager.persist(outboxEvent);
entityManager.remove(outboxEvent);
}
}
这相当简单:对于每个事件,CDI 运行时将调用 onExportedEvent() 方法。 OutboxEvent 实体的实例保存在数据库中⟩-⟩并立即删除!
起初这可能令人惊讶。但是记住基于日志的 CDC 是如何工作的是有道理的:它不检查数据库中表的实际内容,而是跟踪仅追加的事务日志。一旦事务提交,对 persist() 和 remove() 的调用将在日志中创建一个 INSERT 和一个 DELETE 条目。之后,Debezium 将处理这些事件:对于任何 INSERT,带有事件有效负载的消息将发送到 Apache Kafka。另一方面,可以忽略 DELETE 事件,因为从发件箱表中删除只是一种技术问题,不需要任何传播到消息代理。所以我们可以通过CDC的方式捕获到发件箱表中添加的事件,但是在查看表本身的内容时,它永远是空的。这意味着该表不需要额外的磁盘空间(日志文件元素在某些时候会自动丢弃),也不需要单独的内务处理过程来阻止它无限增长。
八、注册 Debezium 连接器
发件箱实施到位后,就该注册 Debezium Postgres 连接器了,这样它就可以捕获发件箱表中的任何新事件并将它们中继到 Apache Kafka。这可以通过将以下 JSON 请求发送到 Kafka Connect 的 REST API 来完成:
{
"name": "outbox-connector",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max" : "1",
"database.hostname" : "order-db",
"database.port" : "5432",
"database.user" : "postgresuser",
"database.password" : "postgrespw",
"database.dbname" : "orderdb",
"database.server.name" : "dbserver1",
"schema.whitelist" : "inventory",
"table.whitelist" : "inventory.outboxevent",
"tombstones.on.delete" : "false",
"transforms" : "router",
"transforms.router.type" : "io.debezium.examples.outbox.routingsmt.EventRouter"
}
}
这将设置 io.debezium.connector.postgresql.PostgresConnector 的实例,从指定的 Postgres 实例捕获更改。请注意,通过表白名单,仅捕获发件箱事件表中的更改。它还应用名为 EventRouter 的单个消息转换 (SMT)。
从 Kafka 主题中删除事件
通过将 tombstones.on.delete 设置为 false,当从发件箱表中删除事件记录时,连接器将不会发出删除标记(“逻辑删除”)。这是有道理的,因为从发件箱表中删除不应该影响相应 Kafka 主题中事件的保留。相反,可以在 Kafka 中配置事件主题的特定保留时间,例如将所有采购订单事件保留 30 天。
或者,可以使用紧凑的主题。这将需要对发件箱表中的事件设计进行一些更改:
- 他们必须描述整个聚合体;因此,例如,表示取消单个订单行的事件也应描述包含采购订单的完整当前状态;这样,在日志压缩运行后,消费者在仅看到与给定订单有关的最后一个事件时也能够获得采购订单的完整状态。
- 它们必须有一个布尔属性来指示特定事件是否代表事件聚合根的删除。这样的事件(例如 OrderDeleted 类型的事件)随后可以被下一节中描述的事件路由 SMT 使用,以生成该聚合根的删除标记。当其 OrderDeleted 事件已写入主题时,日志压缩将删除与给定采购订单有关的所有事件。
当然,当删除事件时,事件流将不再从头开始重新播放。根据特定的业务需求,仅保留给定采购订单、客户等的最终状态可能就足够了。这可以使用压缩主题和主题的 delete.retention.ms 设置的足够值来实现。另一种选择是将历史事件移动到某种冷存储(例如 Amazon S3 存储桶),如果需要可以从那里检索它们,然后从 Kafka 主题中读取最新事件。遵循哪种方法取决于具体要求、预期的数据量以及开发和运营解决方案的团队的专业知识。
九、主题路由
默认情况下,Debezium 连接器会将源自一个给定表的所有更改事件发送到同一主题,即我们最终会得到一个名为 dbserver1.inventory.outboxevent 的单个 Kafka 主题,它将包含所有事件,无论是订单事件、客户事件等
为了简化只对特定事件类型感兴趣的消费者的实现,拥有多个主题更有意义,例如OrderEvents、CustomerEvents 等。例如,货运服务可能对任何客户事件都不感兴趣。通过只订阅 OrderEvents 主题,它将确保永远不会收到任何客户事件。
为了将从发件箱表捕获的更改事件路由到不同的主题,使用了自定义 SMT EventRouter。这是其 apply() 方法的代码,Kafka Connect 将针对 Debezium 连接器发出的每条记录调用该方法:
@Override
public R apply(R record) {
// Ignoring tombstones just in case
if (record.value() == null) {
return record;
}
Struct struct = (Struct) record.value();
String op = struct.getString("op");
// ignoring deletions in the outbox table
if (op.equals("d")) {
return null;
}
else if (op.equals("c")) {
Long timestamp = struct.getInt64("ts_ms");
Struct after = struct.getStruct("after");
String key = after.getString("aggregateid");
String topic = after.getString("aggregatetype") + "Events";
String eventId = after.getString("id");
String eventType = after.getString("type");
String payload = after.getString("payload");
Schema valueSchema = SchemaBuilder.struct()
.field("eventType", after.schema().field("type").schema())
.field("ts_ms", struct.schema().field("ts_ms").schema())
.field("payload", after.schema().field("payload").schema())
.build();
Struct value = new Struct(valueSchema)
.put("eventType", eventType)
.put("ts_ms", timestamp)
.put("payload", payload);
Headers headers = record.headers();
headers.addString("eventId", eventId);
return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,
record.timestamp(), headers);
}
// not expecting update events, as the outbox table is "append only",
// i.e. event records will never be updated
else {
throw new IllegalArgumentException("Record of unexpected op type: " + record);
}
}
当接收到删除事件 (op = d) 时,它将丢弃该事件,因为从发件箱表中删除事件记录与下游消费者无关。当接收到创建事件 (op = c) 时,事情变得更有趣了。这样的记录将传播到 Apache Kafka。
Debezium 的更改事件具有复杂的结构,其中包含所表示行的旧(之前)和新(之后)状态。要传播的事件结构是从后状态获得的。捕获的事件记录中的聚合类型值用于构建要将事件发送到的主题的名称。例如,aggregatetype 设置为 Order 的事件将被发送到 OrderEvents 主题。 aggregateid 用作消息键,确保该聚合的所有消息都将进入该主题的同一分区。消息值是一个结构,包含原始事件负载(编码为 JSON)、指示事件产生时间的时间戳和事件类型。最后,事件 UUID 作为 Kafka 标头字段传播。这允许消费者进行有效的重复检测,而无需检查实际的消息内容。
十、Apache Kafka 中的事件
现在让我们来看看 OrderEvents 和 CustomerEvents 主题。
如果您已经检查了示例源并通过 Docker Compose 启动了所有组件(有关更多详细信息,请参阅示例项目中的 README.md 文件),您可以通过订单服务的 REST API 下订单,如下所示:
cat resources/data/create-order-request.json | http POST http://localhost:8080/order-service/rest/orders
同样,可以取消特定的订单行:
cat resources/data/cancel-order-line-request.json | http PUT http://localhost:8080/order-service/rest/orders/1/lines/2
当使用诸如非常实用的 kafkacat 实用程序之类的工具时,您现在应该会在 OrderEvents 主题中看到如下消息:
kafkacat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents
Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd
Key: "4"
Value: {"eventType":"OrderCreated","ts_ms":1550307598558,"payload":"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"}
Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c
Key: "4"
Value: {"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"}
具有消息值的负载字段是原始事件的字符串化 JSON 表示。 Debezium Postgres 连接器将 JSONB 列作为字符串发出(使用 io.debezium.data.Json 逻辑类型名称),这就是引号被转义的原因。 jq 实用程序,更具体地说,它的 fromjson 运算符,可以方便地以更易读的方式显示事件有效负载:
kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'
{
"id": 4,
"lineItems": [
{
"id": 7,
"item": "Debezium in Action",
"status": "ENTERED",
"quantity": 2,
"totalPrice": 39.98
},
{
"id": 8,
"item": "Debezium for Dummies",
"status": "ENTERED",
"quantity": 1,
"totalPrice": 29.99
}
],
"orderDate": "2019-01-31T12:13:01",
"customerId": 123
}
{
"orderId": 4,
"newStatus": "CANCELLED",
"oldStatus": "ENTERED",
"orderLineId": 7
}
您还可以查看 CustomerEvents 主题以检查表示在添加采购订单时创建发票的事件。
十一、消费服务中的重复检测
至此,我们对发件箱模式的实现功能齐全;当订单服务收到下订单(或取消订单行)的请求时,它将在其数据库的 purchaseorder 和 orderline 表中保留相应的状态。同时,在同一个事务内,相应的事件条目将被添加到同一个数据库中的发件箱表中。 Debezium Postgres 连接器捕获对该表的任何插入,并将事件路由到与给定事件表示的聚合类型相对应的 Kafka 主题中。
总结一下,让我们探索另一个微服务(例如货运服务)如何使用这些消息。该服务的入口点是常规的 Kafka 消费者实现,这并不太令人兴奋,因此为了简洁起见在这里省略。您可以在示例存储库中找到它的源代码。对于 Order 主题上的每条传入消息,消费者调用 OrderEventHandler:
@ApplicationScoped
public class OrderEventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);
@Inject
private MessageLog log;
@Inject
private ShipmentService shipmentService;
@Transactional
public void onOrderEvent(UUID eventId, String key, String event) {
if (log.alreadyProcessed(eventId)) {
LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
return;
}
JsonObject json = Json.createReader(new StringReader(event)).readObject();
JsonObject payload = json.containsKey("schema") ? json.getJsonObject("payload") :json;
String eventType = payload.getString("eventType");
Long ts = payload.getJsonNumber("ts_ms").longValue();
String eventPayload = payload.getString("payload");
JsonReader payloadReader = Json.createReader(new StringReader(eventPayload));
JsonObject payloadObject = payloadReader.readObject();
if (eventType.equals("OrderCreated")) {
shipmentService.orderCreated(payloadObject);
}
else if (eventType.equals("OrderLineUpdated")) {
shipmentService.orderLineUpdated(payloadObject);
}
else {
LOGGER.warn("Unkown event type");
}
log.processed(eventId);
}
}
onOrderEvent() 所做的第一件事是检查之前是否处理过具有给定 UUID 的事件。如果是这样,将忽略对同一事件的任何进一步调用。这是为了防止由该数据管道的“至少一次”语义引起的任何重复处理事件。例如,Debezium 连接器或消费服务可能会在分别使用源数据库或消息代理确认特定事件的检索之前失败。在这种情况下,在 Debezium 或消费服务重启后,一些事件可能会被第二次处理。将事件 UUID 作为 Kafka 消息头传播可以有效地检测和排除消费者中的重复项。
如果是第一次收到消息,则解析消息值,并使用事件负载调用特定事件类型对应的 ShippingService 方法的业务方法。最后,消息在消息日志中被标记为已处理。
此 MessageLog 仅跟踪服务本地数据库中表中的所有消费事件:
@ApplicationScoped
public class MessageLog {
@PersistenceContext
private EntityManager entityManager;
@Transactional(value=TxType.MANDATORY)
public void processed(UUID eventId) {
entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
}
@Transactional(value=TxType.MANDATORY)
public boolean alreadyProcessed(UUID eventId) {
return entityManager.find(ConsumedMessage.class, eventId) != null;
}
}
这样,如果事务由于某种原因被回滚,原始消息也不会被标记为已处理,并且异常会冒泡到 Kafka 事件消费者循环。这允许稍后重新尝试处理消息。
请注意,在将任何无法处理的消息重新路由到死信队列或类似队列之前,更完整的实现应该注意仅重试给定消息一定次数。消息日志表也应该有一些内务处理;周期性地,所有早于消费者提交给代理的当前偏移量的事件都可能被删除,因为它确保了这些消息不会再传播给消费者。
十二、概括
发件箱模式是在不同微服务之间传播数据的好方法。
通过仅修改单个资源(源服务自己的数据库),它避免了同时更改不共享一个公共事务上下文(数据库和 Apache Kafka)的多个资源的任何潜在不一致。通过首先写入数据库,源服务具有即时的“读取您自己的写入”语义,这对于一致的用户体验很重要,允许在写入后调用的查询方法立即反映任何数据更改。
同时,该模式支持异步事件传播到其他微服务。 Apache Kafka 充当服务间消息传递的高度可扩展且可靠的主干。给定正确的主题保留设置,新的消费者可能会在事件最初产生很久之后出现,并根据事件历史建立自己的本地状态。
将 Apache Kafka 置于整体架构的中心也确保了相关服务的解耦。例如,如果解决方案的单个组件出现故障或在一段时间内不可用,例如在更新期间,事件将在稍后处理:重新启动后,Debezium 连接器将继续从之前停止的位置开始跟踪发件箱表。同样,任何消费者都将继续处理其先前偏移量的主题。通过跟踪已成功处理的消息,可以检测重复项并将其排除在重复处理之外。
自然地,不同服务之间的这种事件管道最终是一致的,即消费者(例如运输服务)可能会落后于生产者(例如订单服务)。不过,通常这很好,并且可以根据应用程序的业务逻辑来处理。例如,通常不需要在下订单的同一秒内创建发货。此外,由于基于日志的更改数据捕获允许近乎实时地发出事件,因此整体解决方案的端到端延迟通常很低(秒甚至亚秒范围内)。
最后要记住的是,通过发件箱公开的事件结构应该被视为发送服务 API 的一部分。 IE。在需要时,应仔细调整它们的结构并考虑兼容性因素。这是为了确保在升级生产服务时不会意外破坏任何消费者。同时,消费者在处理消息时应该保持宽容,例如在接收到的事件中遇到未知属性时不要失败。
十三、发件箱消息示例
To understand how the Debezium outbox event router SMT is configured, review the following example of a Debezium outbox message:
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
配置为应用发件箱事件路由器 SMT 的 Debezium 连接器通过像这样转换 Debezium 原始消息来生成上述消息:
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"before": null,
"after": {
"id": "406c07f3-26f0-4eea-a50c-109940064b8f",
"aggregateid": "1",
"aggregatetype": "Order",
"payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
"timestamp": 1556890294344,
"type": "OrderCreated"
},
"source": {
"version": "2.2.1.Final",
"connector": "postgresql",
"name": "dbserver1-bare",
"db": "orderdb",
"ts_usec": 1556890294448870,
"txId": 584,
"lsn": 24064704,
"schema": "inventory",
"table": "outboxevent",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "c",
"ts_ms": 1556890294484
}
这个 Debezium 发件箱消息示例基于默认发件箱事件路由器配置,它假定发件箱表结构和基于聚合的事件路由。为了自定义行为,发件箱事件路由器 SMT 提供了许多配置选项。
十四、基本发件箱表
要应用默认发件箱事件路由器 SMT 配置,发件箱表假定具有以下列:
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb |
表 1. 预期发件箱表列的说明
字段 | 描述 |
---|---|
id | 包含事件的唯一 ID。在发件箱消息中,此值是标题。例如,您可以使用此 ID 来删除重复的消息。要从不同的发件箱表列获取事件的唯一 ID,请在连接器配置中设置 table.field.event.id SMT 选项。 |
aggregatetype | 包含 SMT 附加到连接器向其发出发件箱消息的主题的名称的值。默认行为是此值替换 route.topic.replacement SMT 选项中的默认 r o u t e d B y V a l u e 变量。例如,在默认配置中, r o u t e . b y . f i e l d S M T 选项设置为 a g g r e g a t e t y p e , r o u t e . t o p i c . r e p l a c e m e n t S M T 选项设置为 o u t b o x . e v e n t . {routedByValue} 变量。例如,在默认配置中,route.by.field SMT 选项设置为 aggregatetype,route.topic.replacement SMT 选项设置为 outbox.event. routedByValue变量。例如,在默认配置中,route.by.fieldSMT选项设置为aggregatetype,route.topic.replacementSMT选项设置为outbox.event.{routedByValue}。假设您的应用程序向发件箱表添加了两条记录。在第一条记录中,aggregatetype 列中的值为 customers。在第二条记录中,aggregatetype 列中的值为 orders。连接器将第一条记录发送到 outbox.event.customers 主题。连接器将第二条记录发送到 outbox.event.orders 主题。要从不同的发件箱表列获取此值,请在连接器配置中设置 route.by.field SMT 选项。 |
aggregateid | 包含事件密钥,它为负载提供 ID。 SMT 使用此值作为发出的发件箱消息中的键。这对于维护 Kafka 分区中的正确顺序很重要。要从不同的发件箱表列获取事件键,请在连接器配置中设置 table.field.event.key SMT 选项。 |
payload | 发件箱更改事件的表示。默认结构是 JSON。默认情况下,Kafka 消息值仅由负载值组成。但是,如果发件箱事件配置为包含附加字段,则 Kafka 消息值包含一个封装了负载和附加字段的信封,并且每个字段单独表示。有关详细信息,请参阅发出带有附加字段的消息。要从不同的发件箱表列获取事件负载,请在连接器配置中设置 table.field.event.payload SMT 选项。 |
Additional custom columns | 发件箱表中的任何其他列都可以添加到有效负载部分内的发件箱事件或作为消息标头。一个例子可能是一列 eventType,它传达了一个用户定义的值,有助于对事件进行分类或组织。 |
十五、基本配置
要配置 Debezium 连接器以支持发件箱模式,请配置 outbox.EventRouter SMT。要获得 SMT 的默认行为,请将其添加到连接器配置中而不指定任何选项,如以下示例所示:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
自定义配置
连接器可能会发出多种类型的事件消息(例如,心跳消息、墓碑消息或有关事务或架构更改的元数据消息)。要仅将转换应用于发件箱表中产生的事件,请定义一个 SMT 谓词语句,选择性地将转换仅应用于这些事件。
十六、有选择地应用转换的选项
除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于架构更改和事务的元数据消息。由于这些其他消息的结构不同于 SMT 旨在处理的更改事件消息的结构,因此最好将连接器配置为有选择地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT 谓词。
- 为 SMT 使用 route.topic.regex 配置选项。
十七、使用 Avro 作为负载格式
发件箱事件路由器 SMT 支持任意负载格式。发件箱表中的负载列值是透明传递的。使用 JSON 的另一种方法是使用 Avro。这对于消息格式治理和确保发件箱事件模式以向后兼容的方式发展是有益的。
源应用程序如何为发件箱消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。为确保 Kafka 消息值是准确的 Avro 二进制数据,请将以下配置应用于连接器:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
默认情况下,有效负载列值(Avro 数据)是唯一的消息值。配置 BinaryDataConverter 作为值转换器将有效负载列值按原样传播到 Kafka 消息值中。
Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(支持因连接器而异)。 BinaryDataConverter 无法序列化这些事件,因此必须提供额外的配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了在没有模式的情况下使用 Apache Kafka JsonConverter:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
委托转换器实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如使用 schemas.enable=false 禁用上面显示的模式。
注意:
自 Debezium 1.9 版以来,转换器 io.debezium.converters.ByteBufferConverter 已被弃用,并已在 2.0 中删除。此外,在使用 Kafka Connect 时,必须在升级到 Debezium 2.x 之前更新连接器的配置
十八、发出带有附加字段的消息
您的发件箱表可能包含您要将其值添加到发出的发件箱消息中的列。例如,考虑一个发件箱表,它在 aggregatetype 列中具有 purchase-order 的值,而另一列 eventType 的可能值是 order-created 和 order-shipped。可以使用语法 column:placement:alias 添加其他字段。
放置的允许值是: - header - envelope - partition
要在发件箱消息标头中发出 eventType 列值,请像这样配置 SMT:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息的标题,其中类型作为其键,eventType 列的值作为其值。
要在发件箱消息信封中发出 eventType 列值,请像这样配置 SMT:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type
要控制在哪个分区上生成发件箱消息,请像这样配置 SMT:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition
请注意,对于分区放置,添加别名将不起作用。
十九、将转义的 JSON 字符串扩展为 JSON
您可能已经注意到 Debezium 发件箱消息包含表示为字符串的有效负载。因此,当这个字符串实际上是 JSON 时,它在结果 Kafka 消息中显示为已转义,如下所示:
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
发件箱事件路由器允许您将此消息内容扩展为“真正的”JSON,并从 JSON 文档本身推导出伴随模式。这样 Kafka 消息中的结果如下所示:
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}
要启用此转换,您必须将 table.expand.json.payload 设置为 true 并使用如下所示的 JsonConverter:
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter
二十、配置选项
下表描述了您可以为发件箱事件路由器 SMT 指定的选项。在表格中,Group列表示Kafka的配置选项分类。
表 2. 发件箱事件路由器 SMT 配置选项的描述
选项 | 默认值 | 组 | 描述 |
---|---|---|---|
table.op.invalid.behavior | warn | Table | 确定在发件箱表上有 UPDATE 操作时 SMT 的行为。可能的设置是:警告 - SMT 记录警告并继续处理下一个发件箱表记录。错误 - SMT 记录错误并继续处理下一个发件箱表记录。致命 - SMT 记录错误并且连接器停止处理。发件箱表中的所有更改都应该是 INSERT 操作。即发件箱表起到队列的作用;不允许更新发件箱表中的记录。 SMT 自动过滤掉对发件箱表的 DELETE 操作。 |
table.field.event.id | id | Table | 指定包含唯一事件 ID 的发件箱表列。此 ID 将存储在 id 键下的已发出事件的标头中。 |
table.field.event.key | aggregateid | Table | 指定包含事件键的发件箱表列。当此列包含值时,SMT 使用该值作为发出的发件箱消息中的键。这对于维护 Kafka 分区中的正确顺序很重要。 |
table.field.event.timestamp | Table | 默认情况下,发出的发件箱消息中的时间戳是 Debezium 事件时间戳。要在发件箱消息中使用不同的时间戳,请将此选项设置为发件箱表列,该列包含您希望在发出的发件箱消息中使用的时间戳。 | |
table.field.event.payload | payload | Table | 指定包含事件负载的发件箱表列。 |
table.expand.json.payload | false | Table | 指定是否应完成 String 有效负载的 JSON 扩展。如果未找到内容或出现解析错误,则内容将保持“原样”。 |
table.json.payload.null.behavior | ignore | Table | 当启用 JSON 扩展属性 table.expand.json.payload 时,确定在发件箱表上包含空值的 json 有效负载的行为。可能的设置是:ignore - 忽略空值。optional_bytes - 保留空值,并将空值视为连接的可选字节。 |
table.fields.additional.placement | Table, Envelope | 指定要添加到发件箱邮件标题或信封的一个或多个发件箱表列。指定以逗号分隔的对列表。在每一对中,指定列的名称以及您希望该值位于标题中还是位于信封中。用冒号分隔对中的值,例如:id:header,my-field:信封,要为列指定别名,请指定一个以别名作为第三个值的三元组,例如:id:header,my-field✉️my-alias第二个值是位置,它必须始终是页眉或信封。配置示例是在 Debezium 发件箱消息中发出附加字段。 | |
table.field.event.schema.version | Table, Schema | 设置后,此值用作 Kafka Connect Schema Javadoc 中描述的架构版本。 | |
route.by.field | aggregatetype | Router | 指定发件箱表中列的名称。默认行为是此列中的值成为连接器向其发出发件箱消息的主题名称的一部分。一个示例在预期发件箱表的描述中。 |
route.topic.regex | (?<routedByValue>.*) | Router | 指定发件箱 SMT 在 RegexRouter 中应用于发件箱表记录的正则表达式。此正则表达式是 route.topic.replacement SMT 选项设置的一部分。默认行为是 SMT 将 route.topic.replacement SMT 选项设置中的默认 ${routedByValue} 变量替换为 route.by.field 发件箱 SMT 选项设置。 |
route.topic.replacement | outbox.event.${routedByValue} | Router | 指定连接器向其发出发件箱消息的主题的名称。默认主题名称是 outbox.event。后跟发件箱表记录中的 aggregatetype 列值。例如,如果 aggregatetype 值为 customers,则主题名称为 outbox.event.customers。要更改主题名称,您可以:将 route.by.field 选项设置为不同的列。将 route.topic.regex 选项设置为不同的正则表达式。 |
route.tombstone.on.empty.payload | false | Router | 指示空负载或空负载是否会导致连接器发出逻辑删除事件。 |
tracing.span.context.field | tracingspancontext | Tracing | 包含跟踪跨度上下文的字段的名称。 |
tracing.operation.name | debezium-read | Tracing | 代表 Debezium 处理跨度的操作名称。 |
tracing.with.context.field.only | false | Tracing | 当为 true 时,应仅跟踪具有序列化上下文字段的事件。 |