一、引言
无论你是刚接触Pulsar还是使用Pulsar多年,相信你对下面这段代码都很熟悉,这就是生产者端最常写的代码没有之一,其中最核心的其实就三行代码,分别用红色数字标识出来了,其中对应的就是1、客户端对象创建 2、生产者对象创建 3、消息发送
。今天就分别针对这三个步骤进行深入的探索。
二、创建客户端对象
无论是写生产者还是消费者端代码,第一步都是要创建客户端对象,那么客户端对象都做了些什么事情呢?这里将客户端对象创建的步骤绘制成以下图
客户端对象的创建以下几个东西,其中最重要的是前两个
- Lookup服务
- 连接池
- 线程池
- 内存控制器MemoryLimitController
- 创建客户端计数器HashedWheelTimer
1. Lookup服务
Lookup服务负责获取Topic的归属Broker地址以及Schema信息等,非常重要。默认有HTTP和二进制传输两种实现,如果创建客户端对象时.serviceUrl方法传入的地址是http开头则使用HTTP实现,否则就是二进制传输实现。
Lookup服务的创建如下
//初始化LookupService服务
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
}
进入看看HttpLookupService的构造方法可以看到它是内部套了一个HttpClient对象来对外进行HTTP通信,在继续看HttpClient的构造函数可以它内部实际上是调用的DefaultAsyncHttpClient构造函数来创建,这个对象是外部包 async-http-client-2.12.1.jar
的实现,跟了一下代码,底层也是基于Netty来进行实现的HTTP长连接通信
public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
}
protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
....
httpClient = new DefaultAsyncHttpClient(config);
}
跟到这里就差不多了,我们知道创建客户端对象的时候会初始化Lookup服务,而Lookup服务初始化的时候会创建跟外部进行通信的异步HTTP客户端,用于在创建生产者时去查询Topic的归属Broker的IP地址,这样生产者才知道具体去跟哪台Broker创建TCP连接
2. 连接池ConnectionPool
连接池是池化技术在网络连接这个场景的使用,使用连接池可以避免重复创建关闭TCP连接造成资源浪费以及提升性能。在创建客户端对象的构造方法中,ConnectionPool的创建如下,可以是通过new方式进行创建,因此我们看下它的构造方法
connectionPoolReference =
connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
ConnectionPool的构造方法如下,可以看到核心逻辑其实就是通过Bootstrap
创建Netty客户端对象,通过 pool = new ConcurrentHashMap<>();
这行代码也很重要,这个HashMap就是存储咱们的网络连接
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier,
Optional<AddressResolver<InetSocketAddress>> addressResolver)
throws PulsarClientException {
....
//启动Netty客户端
pool = new ConcurrentHashMap<>();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
//Netty相关配置
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try {
channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
bootstrap.handler(channelInitializerHandler);
} catch (Exception e) {
log.error("Failed to create channel initializer");
throw new PulsarClientException(e);
}
....
}
三、生产者对象创建
看完了客户端对象创建,再来看看生产者对象的创建,从这条语句进行切入 Producer<String> producer = pulsarClient.newProducer(...).create();
在创建生产者对象时会进行一下几步
1. 指定路由策略
这个主要就是根据创建生产者对象时指定的,如果没有配置的话默认使用的轮询路由策略。setMessageRoutingMode
这个方法就是指定路由策略的,然后指定完后下面的代码可以看到,如果有配置拦截器的话在创建生产者对象时也会把它给拦截逻辑加载进去。
public CompletableFuture<Producer<T>> createAsync() {
....
try {
setMessageRoutingMode();
} catch (PulsarClientException pce) {
return FutureUtil.failedFuture(pce);
}
return interceptorList == null || interceptorList.size() == 0
? client.createProducerAsync(conf, schema, null)
: client.createProducerAsync(conf, schema, new ProducerInterceptors(interceptorList));
}
setMessageRoutingMode
逻辑如下,默认用轮询路由,如果配置其他的就用其他的,其次就是做路由规则的校验,看看用户是否配置的信息有冲突的提前感知并抛出去。
private void setMessageRoutingMode() throws PulsarClientException {
if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) {
messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
} else if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() != null) {
messageRoutingMode(MessageRoutingMode.CustomPartition);
} else if (conf.getMessageRoutingMode() == MessageRoutingMode.CustomPartition
&& conf.getCustomMessageRouter() == null) {
throw new PulsarClientException("When 'messageRoutingMode' is " + MessageRoutingMode.CustomPartition
+ ", 'messageRouter' should be set");
} else if (conf.getMessageRoutingMode() != MessageRoutingMode.CustomPartition
&& conf.getCustomMessageRouter() != null) {
throw new PulsarClientException("When 'messageRouter' is set, 'messageRoutingMode' "
+ "should be set as " + MessageRoutingMode.CustomPartition);
}
}
2. 获取Topic分区数和Schema
通过Lookup机制去Broker中查询这个Topic的Schema信息,可以看到如果服务端没有配置Schema信息的话则默认用 Schema.BYTES
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent()) {
SchemaInfo schemaInfo = schemaInfoOptional.get();
if (schemaInfo.getType() == SchemaType.PROTOBUF) {
autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo));
} else {
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
}
} else {
autoProduceBytesSchema.setSchema(Schema.BYTES);
}
return createProducerAsync(topic, conf, schema, interceptors);
});
咱们进去看看 getSchema 的实现,可以看到构造的是HTTP地址并通过GET方式请求Broker进行获取
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>();
String schemaName = topicName.getSchemaName();
String path = String.format("admin/v2/schemas/%s/schema", schemaName);
if (version != null) {
if (version.length == 0) {
future.completeExceptionally(new SchemaSerializationException("Empty schema version"));
return future;
}
path = String.format("admin/v2/schemas/%s/schema/%s",
schemaName,
ByteBuffer.wrap(version).getLong());
}
httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
if (response.getType() == SchemaType.KEY_VALUE) {
SchemaData data = SchemaData
.builder()
.data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
response.getData().getBytes(StandardCharsets.UTF_8)))
.type(response.getType())
.props(response.getProperties())
.build();
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
} else {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
}
}).exceptionally(ex -> {
....
});
return future;
}
除了读取Schema,还会读取Topic的分区信息,从下面创建的代码可以看到,如果存在分区则创建PartitionedProducerImpl对象,不存在分区则创建ProducerImpl对象。获取分区的方法是 getPartitionedTopicMetadata
,咱们进去看看它的实现
private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
ProducerConfigurationData conf,
Schema<T> schema,
ProducerInterceptors interceptors) {
CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
....
ProducerBase<T> producer;
if (metadata.partitions > 0) {
producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture,
metadata);
} else {
producer = newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture,
Optional.empty());
}
producers.add(producer);
}).exceptionally(ex -> {
....
});
return producerCreatedFuture;
}
getPartitionedTopicMetadata方法的核心逻辑如下,通过一路跟踪下去发现也是通过Lookup服务进行HTTP进行查询读取分区信息
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
TopicName topicName = TopicName.get(topic);
AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs());
....
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
metadataFuture, new ArrayList<>());
} catch (IllegalArgumentException e) {
....
}
return metadataFuture;
}
private void getPartitionedTopicMetadata(TopicName topicName,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture<PartitionedTopicMetadata> future,
List<Throwable> previousExceptions) {
long startTime = System.nanoTime();
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
long nextDelay = Math.min(backoff.next(), remainingTime.get());
// skip retry scheduler when set lookup throttle in client or server side which will lead to
// `TooManyRequestsException`
boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause())
|| e.getCause() instanceof PulsarClientException.AuthenticationException;
if (nextDelay <= 0 || isLookupThrottling) {
PulsarClientException.setPreviousExceptions(e, previousExceptions);
future.completeExceptionally(e);
return null;
}
....
});
}
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
PartitionedTopicMetadata.class);
}
3. 创建生产者对象
在上一小结可以看到已经查到分区信息,在有分区的情况是会调用 newPartitionedProducerImpl
方法进行生产者对象的初始化,现在就从这里开始进行跟踪,可以看到这个方法只是封装了new PartitionedProducerImpl对象的操作,于是继续看PartitionedProducerImpl的构造函数的逻辑
protected <T> PartitionedProducerImpl<T> newPartitionedProducerImpl(String topic,
ProducerConfigurationData conf,
Schema<T> schema,
ProducerInterceptors interceptors,
CompletableFuture<Producer<T>>
producerCreatedFuture,
PartitionedTopicMetadata metadata) {
return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
producerCreatedFuture, schema, interceptors);
}
public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture,
Schema<T> schema, ProducerInterceptors interceptors) {
super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.producers =
ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
//配置路由策略
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0
? new PartitionedTopicProducerStatsRecorderImpl()
: null;
//配置最大同时等待的消息数
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
conf.setMaxPendingMessages(maxPendingMessages);
final List<Integer> indexList;
if (conf.isLazyStartPartitionedProducers()
&& conf.getAccessMode() == ProducerAccessMode.Shared) {
// try to create producer at least one partition
indexList = Collections.singletonList(routerPolicy
.choosePartition(((TypedMessageBuilderImpl<T>) newMessage()).getMessage(), topicMetadata));
} else {
// try to create producer for all partitions
indexList = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toList());
}
firstPartitionIndex = indexList.get(0);
//这里是核心逻辑,从这里进去
start(indexList);
// start track and auto subscribe partition increasement
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask,
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
}
private void start(List<Integer> indexList) {
....
final ProducerImpl<T> firstProducer = createProducer(indexList.get(0));
firstProducer.producerCreatedFuture().handle((prod, createException) -> {
....
}).thenApply(name -> {
for (int i = 1; i < indexList.size(); i++) {
//循环分区数创建与之对应的ProducerImpl对象
createProducer(indexList.get(i), name).producerCreatedFuture().handle((prod, createException) -> {
afterCreatingProducer.accept(false, createException);
return null;
});
}
return null;
});
}
继续从 createProducer
方法进行跟踪
private ProducerImpl<T> createProducer(final int partitionIndex, final Optional<String> overrideProducerName) {
//创建ProducerImpl后会统一放到producers这个Map中,key是分区号,value就是ProducerImpl对象
return producers.computeIfAbsent(partitionIndex, (idx) -> {
String partitionName = TopicName.get(topic).getPartition(idx).toString();
//继续进入看看ProducerImpl创建的逻辑
return client.newProducerImpl(partitionName, idx,
conf, schema, interceptors, new CompletableFuture<>(), overrideProducerName);
});
}
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
....
this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
....
//这里是核心逻辑,用于创建ProducerImpl对象跟对应的Broker的TCP网络连接
grabCnx();
}
4. 创建连接
从上一小节可以看到代码逻辑跟到了grabCnx
方法,继续一路跟踪下去,可以看到最后都会调用 state.client.getConnection
, 继续往里面看看
void grabCnx() {
this.connectionHandler.grabCnx();
}
protected void grabCnx() {
grabCnx(Optional.empty());
}
protected void grabCnx(Optional<URI> hostURI) {
....
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
....
}
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress logicalAddress,
final InetSocketAddress physicalAddress,
final int randomKeyForSelectConnection) {
//可以看到调用连接池进行网络连接的获取,继续进去看看实现细节
return cnxPool.getConnection(logicalAddress, physicalAddress, randomKeyForSelectConnection);
}
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// 如果配置没开启连接池,则每一次都重新新建一个TCP连接
return createConnection(logicalAddress, physicalAddress, -1);
}
final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
// 新建TCP网络连接并放在连接池中以备之后的复用,继续进去看createConnection的逻辑
CompletableFuture<ClientCnx> completableFuture = innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
....
}
private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, int connectionKey) {
....
// 继续进去看createConnection的逻辑
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
....
}).exceptionally(exception -> {
....
});
return cnxFuture;
}
private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress unresolvedPhysicalAddress) {
CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
....
return resolvedAddress.thenCompose(
inetAddresses -> connectToResolvedAddresses(
logicalAddress,
unresolvedPhysicalAddress,
inetAddresses.iterator(),
isSniProxy ? unresolvedPhysicalAddress : null)
);
} catch (URISyntaxException e) {
....
}
}
private CompletableFuture<Channel> connectToResolvedAddresses(...) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// 继续往下跟踪
connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost)
....
return null;
});
return future;
}
private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress,
InetSocketAddress unresolvedPhysicalAddress,
InetSocketAddress sniHost) {
....
//终于跟踪到了,就是bootstrap.register() 这个方法,可以尝试往里面看看实现,从这里开始就是Netty的代码了
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress,
unresolvedPhysicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
}
public ChannelFuture register() {
validate();
//继续往下
return initAndRegister();
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//新建一条网络通道到Broker
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
....
}
....
return regFuture;
}
从上面的代码跟踪可以看到,生产者对象在创建的时候会通过Netty客户端跟Topic所在的Broker建立TCP网络连接,方便后续的通信
四、消息发送
消息发送流程大致如下图
- 在选择消息发送是,生产者对象会根据路由策略来决定用目标分区所对应的ProducerImpl对象进行处理
- 发送前会按照顺序对数据进行拦截器链逻辑处理(如果有配置的话),然后进行压缩最后再进行序列化操作(消息传输/存储必须序列化)
- 消息发送前会放到待确认队列中进行维护,每个分区都有一个对应的确认队列,在消息写入成功后会从对应的确认队列中将自己删除,否则这条消息不算写入成功。
- 将消息发送操作封装成一个任务交给线程池中的一个线程进行最后的发送操作
- Broker将数据写入成功后向客户端返回ack,客户端通过ack中携带的消息信息到待确认队列中进行消息的删除
那么老规矩,继续一起看下代码实现吧
1. 发送流程前置操作
就从常见的这条语句进行切入 producer.sendAsync("hello java API pulsar:"+i+", 当前时间为:"+new Date());
。通过代码中可以看到sendAsync
方法是从其父类ProducerBase中继承的
public CompletableFuture<MessageId> sendAsync(Message<?> message) {
//继续跟踪进去
return internalSendAsync(message);
}
//这是一个抽象方法,有分区生产者和非分区生产者两种实现,跟踪分区生产者的实现逻辑
abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
//继续往里面跟踪
return internalSendWithTxnAsync(message, null);
}
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
....
//还有印象吗,在生产者创建时会维护一个以分区号为key,ProducerImpl为value的Map,现在从里面获取相对应的对象进行处理
return producers.get(partition).internalSendWithTxnAsync(message, txn);
}
2. 消息处理以及包装
又回到ProducerImpl对象的逻辑了,相当于分区生产者对象只是个壳,无论分区还是非分区最终都是调用的ProducerImpl进行消息发送的真正逻辑,不废话继续往下看
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
....
return internalSendAsync(message);
}
CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
....
//拦截器的逻辑就是在这里生效
MessageImpl<?> interceptorMessage = (MessageImpl) beforeSend(message);
....
//核心逻辑,继续切入
sendAsync(interceptorMessage, new SendCallback() {
....
});
return future;
}
public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);
....
//核心逻辑,从名字可以看到就是对消息进行序列化并进行发送逻辑,继续跟进去
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
} catch (PulsarClientException e) {
....
}
}
private void serializeAndSendMessage(....) throws IOException {
//核心逻辑,op是OpSendMsg对象,封装了要发送的消息内容,继续往下跟
processOpSendMsg(op);
}
}
3. 消息发送
数据已经处理包装得差不多了,接下来就是发送的逻辑,咱们顺着 processOpSendMsg
方法继续往下看
protected void processOpSendMsg(OpSendMsg op) {
....
//将消息加到OpSendMsgQueue队列中,这就是“等待确认队列”
pendingMessages.add(op);
....
//ClientCnx对象维护着Netty实现跟Broker的TCP连接
final ClientCnx cnx = getCnxIfReady();
if (cnx != null) {
....
//WriteInEventLoopCallback方法的run方法执行会将数据发送出去,然后队列中维护消息的状态
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
} else {
....
}
} catch (Throwable t) {
....
}
}
//WriteInEventLoopCallback是一个线程类,被放到线程池里面执行,因此直接看它的run方法
private static final class WriteInEventLoopCallback implements Runnable {
public void run() {
....
try {
//熟悉Netty的朋友相信对writeAndFlush方法不默认,就是通过之间建立好的TCP连接将数据发送到Broker去
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
recycle();
}
}
}
跟踪到这里基本就结束了,对Netty感兴趣的朋友可以再继续往下跟,这里可以简单说一下,其实它内部是对JDK的NIO做了包装和优化,最底层也是通过Java的Socket连接网络端口进行的数据发送。不知道有没有小伙伴发现,咦,怎么没有对返回结果进行处理的逻辑呢?这就是所谓的异步设计,Netty不就是一个异步通信框架吗,客户端发送的逻辑到这里就是彻底结束了,而消息的处理结束就是要等服务端Broker向客户端发起消息ack请求了,想知道Pulsar怎么实现的吗?那就跟着我一起瞅瞅吧~
4. 消息确认
消息确认流程是由Broker端发起的,那么生产者对象肯定是通过Netty客户端接收的,所以直接看Pulsar实现的ChannelInboundHandlerAdapter类的channelRead的逻辑即可
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
....
switch (cmd.getType()) {
....
//可以看得到消息有写成功的后续处理动作,那就从这里看看
case PRODUCER_SUCCESS:
checkArgument(cmd.hasProducerSuccess());
handleProducerSuccess(cmd.getProducerSuccess());
break;
case UNSUBSCRIBE:
checkArgument(cmd.hasUnsubscribe());
safeInterceptCommand(cmd);
handleUnsubscribe(cmd.getUnsubscribe());
break;
}
}
}
handleProducerSuccess这个方法的是由ClientCnx对象进行实现的,那就跟进来看看吧
protected void handleProducerSuccess(CommandProducerSuccess success) {
checkArgument(state == State.Ready);
if (log.isDebugEnabled()) {
log.debug("{} Received producer success response from server: {} - producer-name: {}", ctx.channel(),
success.getRequestId(), success.getProducerName());
}
long requestId = success.getRequestId();
if (!success.isProducerReady()) {
//还记得pendingRequests 这个“待确认队列”吗,现在会从里面查询对应的消息
TimedCompletableFuture<?> requestFuture = pendingRequests.get(requestId);
if (requestFuture != null) {
//日志进行打印
log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
success.getProducerName(), requestId);
//标记为成功
requestFuture.markAsResponded();
}
return;
}
//客户端处理的主要逻辑就是从队列中移除此消息
CompletableFuture<ProducerResponse> requestFuture =
(CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
....
}
五、总结
生产者写数据的流程到这里基本就结束了,怎么样,是不是没想象中那么可怕?也许你对1、客户端对象创建 2、生产者对象创建 3、消息发送
这三步之间的关系还有点迷迷糊糊,那就请允许我给你举个例子,1、客户端对象创建
相当于一座新城市的创建,打好城市的地基,2、生产者对象创建
相当于在这个地基的基础上建起了城市,发电厂等等,最重要的是修建其通往其他各个城市的交通要道,最后的3、消息发送
相当于这个新城市的居民们乘坐高铁去其他的城市。这么说相信你一定已经明白了~如果还有其他疑问欢迎在下面的评论区一起讨论