一文彻底搞懂Producer端流程以及原理

news2025/1/18 10:28:41

一、引言

无论你是刚接触Pulsar还是使用Pulsar多年,相信你对下面这段代码都很熟悉,这就是生产者端最常写的代码没有之一,其中最核心的其实就三行代码,分别用红色数字标识出来了,其中对应的就是1、客户端对象创建 2、生产者对象创建 3、消息发送。今天就分别针对这三个步骤进行深入的探索。
在这里插入图片描述

二、创建客户端对象

无论是写生产者还是消费者端代码,第一步都是要创建客户端对象,那么客户端对象都做了些什么事情呢?这里将客户端对象创建的步骤绘制成以下图
在这里插入图片描述

客户端对象的创建以下几个东西,其中最重要的是前两个

  1. Lookup服务
  2. 连接池
  3. 线程池
  4. 内存控制器MemoryLimitController
  5. 创建客户端计数器HashedWheelTimer

1. Lookup服务

Lookup服务负责获取Topic的归属Broker地址以及Schema信息等,非常重要。默认有HTTP和二进制传输两种实现,如果创建客户端对象时.serviceUrl方法传入的地址是http开头则使用HTTP实现,否则就是二进制传输实现。

Lookup服务的创建如下

//初始化LookupService服务
if (conf.getServiceUrl().startsWith("http")) {
    lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
    lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
            conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
}

进入看看HttpLookupService的构造方法可以看到它是内部套了一个HttpClient对象来对外进行HTTP通信,在继续看HttpClient的构造函数可以它内部实际上是调用的DefaultAsyncHttpClient构造函数来创建,这个对象是外部包 async-http-client-2.12.1.jar的实现,跟了一下代码,底层也是基于Netty来进行实现的HTTP长连接通信

    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
            throws PulsarClientException {
        this.httpClient = new HttpClient(conf, eventLoopGroup);
        this.useTls = conf.isUseTls();
        this.listenerName = conf.getListenerName();
    }
    
    protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
    		....
    	  httpClient = new DefaultAsyncHttpClient(config);
    }

跟到这里就差不多了,我们知道创建客户端对象的时候会初始化Lookup服务,而Lookup服务初始化的时候会创建跟外部进行通信的异步HTTP客户端,用于在创建生产者时去查询Topic的归属Broker的IP地址,这样生产者才知道具体去跟哪台Broker创建TCP连接

2. 连接池ConnectionPool

连接池是池化技术在网络连接这个场景的使用,使用连接池可以避免重复创建关闭TCP连接造成资源浪费以及提升性能。在创建客户端对象的构造方法中,ConnectionPool的创建如下,可以是通过new方式进行创建,因此我们看下它的构造方法

connectionPoolReference =
                    connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);

ConnectionPool的构造方法如下,可以看到核心逻辑其实就是通过Bootstrap创建Netty客户端对象,通过 pool = new ConcurrentHashMap<>(); 这行代码也很重要,这个HashMap就是存储咱们的网络连接

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
                      Supplier<ClientCnx> clientCnxSupplier,
                      Optional<AddressResolver<InetSocketAddress>> addressResolver)
        throws PulsarClientException {
		....
    //启动Netty客户端
    pool = new ConcurrentHashMap<>();
    bootstrap = new Bootstrap();
    bootstrap.group(eventLoopGroup);
    bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
		//Netty相关配置
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
    bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
    bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);

    try {
        channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
        bootstrap.handler(channelInitializerHandler);
    } catch (Exception e) {
        log.error("Failed to create channel initializer");
        throw new PulsarClientException(e);
    }
		....
}

三、生产者对象创建

看完了客户端对象创建,再来看看生产者对象的创建,从这条语句进行切入 Producer<String> producer = pulsarClient.newProducer(...).create(); 在创建生产者对象时会进行一下几步
在这里插入图片描述

1. 指定路由策略

这个主要就是根据创建生产者对象时指定的,如果没有配置的话默认使用的轮询路由策略。setMessageRoutingMode 这个方法就是指定路由策略的,然后指定完后下面的代码可以看到,如果有配置拦截器的话在创建生产者对象时也会把它给拦截逻辑加载进去。

    public CompletableFuture<Producer<T>> createAsync() {
				....
        try {
            setMessageRoutingMode();
        } catch (PulsarClientException pce) {
            return FutureUtil.failedFuture(pce);
        }

        return interceptorList == null || interceptorList.size() == 0
                ? client.createProducerAsync(conf, schema, null)
                : client.createProducerAsync(conf, schema, new ProducerInterceptors(interceptorList));
    }

setMessageRoutingMode 逻辑如下,默认用轮询路由,如果配置其他的就用其他的,其次就是做路由规则的校验,看看用户是否配置的信息有冲突的提前感知并抛出去。

    private void setMessageRoutingMode() throws PulsarClientException {
        if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) {
            messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
        } else if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() != null) {
            messageRoutingMode(MessageRoutingMode.CustomPartition);
        } else if (conf.getMessageRoutingMode() == MessageRoutingMode.CustomPartition
                && conf.getCustomMessageRouter() == null) {
            throw new PulsarClientException("When 'messageRoutingMode' is " + MessageRoutingMode.CustomPartition
                + ", 'messageRouter' should be set");
        } else if (conf.getMessageRoutingMode() != MessageRoutingMode.CustomPartition
                && conf.getCustomMessageRouter() != null) {
            throw new PulsarClientException("When 'messageRouter' is set, 'messageRoutingMode' "
                    + "should be set as " + MessageRoutingMode.CustomPartition);
        }
    }

2. 获取Topic分区数和Schema

通过Lookup机制去Broker中查询这个Topic的Schema信息,可以看到如果服务端没有配置Schema信息的话则默认用 Schema.BYTES

return lookup.getSchema(TopicName.get(conf.getTopicName()))
        .thenCompose(schemaInfoOptional -> {
            if (schemaInfoOptional.isPresent()) {
                SchemaInfo schemaInfo = schemaInfoOptional.get();
                if (schemaInfo.getType() == SchemaType.PROTOBUF) {
                    autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo));
                } else {
                    autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
                }
            } else {
                autoProduceBytesSchema.setSchema(Schema.BYTES);
            }
            return createProducerAsync(topic, conf, schema, interceptors);
        });

咱们进去看看 getSchema 的实现,可以看到构造的是HTTP地址并通过GET方式请求Broker进行获取

public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
        CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>();

        String schemaName = topicName.getSchemaName();
        String path = String.format("admin/v2/schemas/%s/schema", schemaName);
        if (version != null) {
            if (version.length == 0) {
                future.completeExceptionally(new SchemaSerializationException("Empty schema version"));
                return future;
            }
            path = String.format("admin/v2/schemas/%s/schema/%s",
                    schemaName,
                    ByteBuffer.wrap(version).getLong());
        }
        httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
            if (response.getType() == SchemaType.KEY_VALUE) {
                SchemaData data = SchemaData
                        .builder()
                        .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
                                response.getData().getBytes(StandardCharsets.UTF_8)))
                        .type(response.getType())
                        .props(response.getProperties())
                        .build();
                future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
            } else {
                future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
            }
        }).exceptionally(ex -> {
            ....
        });
        return future;
    }

除了读取Schema,还会读取Topic的分区信息,从下面创建的代码可以看到,如果存在分区则创建PartitionedProducerImpl对象,不存在分区则创建ProducerImpl对象。获取分区的方法是 getPartitionedTopicMetadata,咱们进去看看它的实现

private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
                                                                   ProducerConfigurationData conf,
                                                                   Schema<T> schema,
                                                                   ProducerInterceptors interceptors) {
        CompletableFuture<Producer<T>> producerCreatedFuture = new CompletableFuture<>();

        getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            ....
            ProducerBase<T> producer;
            if (metadata.partitions > 0) {
                producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture,
                        metadata);
            } else {
                producer = newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture,
                        Optional.empty());
            }

            producers.add(producer);
        }).exceptionally(ex -> {
            ....
        });
        return producerCreatedFuture;
    }

getPartitionedTopicMetadata方法的核心逻辑如下,通过一路跟踪下去发现也是通过Lookup服务进行HTTP进行查询读取分区信息

    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {

        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();

        try {
            TopicName topicName = TopicName.get(topic);
            AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs());
            ....
            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
                                        metadataFuture, new ArrayList<>());
        } catch (IllegalArgumentException e) {
            ....
        }
        return metadataFuture;
    }

    private void getPartitionedTopicMetadata(TopicName topicName,
                                             Backoff backoff,
                                             AtomicLong remainingTime,
                                             CompletableFuture<PartitionedTopicMetadata> future,
                                             List<Throwable> previousExceptions) {
        long startTime = System.nanoTime();
        lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
            remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
            long nextDelay = Math.min(backoff.next(), remainingTime.get());
            // skip retry scheduler when set lookup throttle in client or server side which will lead to
            // `TooManyRequestsException`
            boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause())
                || e.getCause() instanceof PulsarClientException.AuthenticationException;
            if (nextDelay <= 0 || isLookupThrottling) {
                PulsarClientException.setPreviousExceptions(e, previousExceptions);
                future.completeExceptionally(e);
                return null;
            }
						....
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
        String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
        return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
                PartitionedTopicMetadata.class);
    }

3. 创建生产者对象

在上一小结可以看到已经查到分区信息,在有分区的情况是会调用 newPartitionedProducerImpl 方法进行生产者对象的初始化,现在就从这里开始进行跟踪,可以看到这个方法只是封装了new PartitionedProducerImpl对象的操作,于是继续看PartitionedProducerImpl的构造函数的逻辑

    protected <T> PartitionedProducerImpl<T> newPartitionedProducerImpl(String topic,
                                                                        ProducerConfigurationData conf,
                                                                        Schema<T> schema,
                                                                        ProducerInterceptors interceptors,
                                                                        CompletableFuture<Producer<T>>
                                                                                producerCreatedFuture,
                                                                        PartitionedTopicMetadata metadata) {
        return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
                producerCreatedFuture, schema, interceptors);
    }

  public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                                   int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture,
                                   Schema<T> schema, ProducerInterceptors interceptors) {
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
        this.producers =
                ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
        this.topicMetadata = new TopicMetadataImpl(numPartitions);
    		//配置路由策略
        this.routerPolicy = getMessageRouter();
        stats = client.getConfiguration().getStatsIntervalSeconds() > 0
                ? new PartitionedTopicProducerStatsRecorderImpl()
                : null;

        //配置最大同时等待的消息数
        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
        conf.setMaxPendingMessages(maxPendingMessages);

        final List<Integer> indexList;
        if (conf.isLazyStartPartitionedProducers()
                && conf.getAccessMode() == ProducerAccessMode.Shared) {
            // try to create producer at least one partition
            indexList = Collections.singletonList(routerPolicy
                    .choosePartition(((TypedMessageBuilderImpl<T>) newMessage()).getMessage(), topicMetadata));
        } else {
            // try to create producer for all partitions
            indexList = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toList());
        }

        firstPartitionIndex = indexList.get(0);
    		//这里是核心逻辑,从这里进去
        start(indexList);

        // start track and auto subscribe partition increasement
        if (conf.isAutoUpdatePartitions()) {
            topicsPartitionChangedListener = new TopicsPartitionChangedListener();
            partitionsAutoUpdateTimeout = client.timer()
                .newTimeout(partitionsAutoUpdateTimerTask,
                        conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
    }

   private void start(List<Integer> indexList) {
     		....
        final ProducerImpl<T> firstProducer = createProducer(indexList.get(0));
        firstProducer.producerCreatedFuture().handle((prod, createException) -> {
           ....
        }).thenApply(name -> {
            for (int i = 1; i < indexList.size(); i++) {
              	//循环分区数创建与之对应的ProducerImpl对象
                createProducer(indexList.get(i), name).producerCreatedFuture().handle((prod, createException) -> {
                    afterCreatingProducer.accept(false, createException);
                    return null;
                });
            }
            return null;
        });
    }

继续从 createProducer 方法进行跟踪

    private ProducerImpl<T> createProducer(final int partitionIndex, final Optional<String> overrideProducerName) {
      	//创建ProducerImpl后会统一放到producers这个Map中,key是分区号,value就是ProducerImpl对象
        return producers.computeIfAbsent(partitionIndex, (idx) -> {
            String partitionName = TopicName.get(topic).getPartition(idx).toString();
          	//继续进入看看ProducerImpl创建的逻辑
            return client.newProducerImpl(partitionName, idx,
                    conf, schema, interceptors, new CompletableFuture<>(), overrideProducerName);
        });
    }

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                        ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
        ....
        this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
				....
        //这里是核心逻辑,用于创建ProducerImpl对象跟对应的Broker的TCP网络连接
        grabCnx();
    }

4. 创建连接

从上一小节可以看到代码逻辑跟到了grabCnx 方法,继续一路跟踪下去,可以看到最后都会调用 state.client.getConnection, 继续往里面看看

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    protected void grabCnx() {
        grabCnx(Optional.empty());
    }

    protected void grabCnx(Optional<URI> hostURI) {
				....
        cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
        ....
    }

    public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress logicalAddress,
                                                      final InetSocketAddress physicalAddress,
                                                      final int randomKeyForSelectConnection) {
      	//可以看到调用连接池进行网络连接的获取,继续进去看看实现细节
        return cnxPool.getConnection(logicalAddress, physicalAddress, randomKeyForSelectConnection);
    }

 public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, final int randomKey) {
        if (maxConnectionsPerHosts == 0) {
            // 如果配置没开启连接池,则每一次都重新新建一个TCP连接
            return createConnection(logicalAddress, physicalAddress, -1);
        }

        final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
                pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
   			// 新建TCP网络连接并放在连接池中以备之后的复用,继续进去看createConnection的逻辑
        CompletableFuture<ClientCnx> completableFuture = innerPool
                .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
        ....
    }

   private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, int connectionKey) {
       ....
        // 继续进去看createConnection的逻辑
        createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
           ....
        }).exceptionally(exception -> {
           ....
        });

        return cnxFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress,
                                                        InetSocketAddress unresolvedPhysicalAddress) {
        CompletableFuture<List<InetSocketAddress>> resolvedAddress;
        try {
						....
            return resolvedAddress.thenCompose(
                    inetAddresses -> connectToResolvedAddresses(
                            logicalAddress,
                            unresolvedPhysicalAddress,
                            inetAddresses.iterator(),
                            isSniProxy ? unresolvedPhysicalAddress : null)
            );
        } catch (URISyntaxException e) {
            ....
        }
    }

  private CompletableFuture<Channel> connectToResolvedAddresses(...) {
        CompletableFuture<Channel> future = new CompletableFuture<>();

        // 继续往下跟踪
        connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost)
               			....
                    return null;
                });
        return future;
    }

    private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress,
                                                        InetSocketAddress physicalAddress,
                                                        InetSocketAddress unresolvedPhysicalAddress,
                                                        InetSocketAddress sniHost) {
				....
        //终于跟踪到了,就是bootstrap.register() 这个方法,可以尝试往里面看看实现,从这里开始就是Netty的代码了
        return toCompletableFuture(bootstrap.register())
          .thenCompose(channelInitializerHandler::initSocks5IfConfig)
          .thenCompose(ch ->
                  channelInitializerHandler.initializeClientCnx(ch, logicalAddress,
                          unresolvedPhysicalAddress))
          .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));
    }

    public ChannelFuture register() {
        validate();
      	//继续往下
        return initAndRegister();
    }

   final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
          	//新建一条网络通道到Broker
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
						....
        }
				....
        return regFuture;
    }

从上面的代码跟踪可以看到,生产者对象在创建的时候会通过Netty客户端跟Topic所在的Broker建立TCP网络连接,方便后续的通信

四、消息发送

消息发送流程大致如下图
在这里插入图片描述

  1. 在选择消息发送是,生产者对象会根据路由策略来决定用目标分区所对应的ProducerImpl对象进行处理
  2. 发送前会按照顺序对数据进行拦截器链逻辑处理(如果有配置的话),然后进行压缩最后再进行序列化操作(消息传输/存储必须序列化)
  3. 消息发送前会放到待确认队列中进行维护,每个分区都有一个对应的确认队列,在消息写入成功后会从对应的确认队列中将自己删除,否则这条消息不算写入成功。
  4. 将消息发送操作封装成一个任务交给线程池中的一个线程进行最后的发送操作
  5. Broker将数据写入成功后向客户端返回ack,客户端通过ack中携带的消息信息到待确认队列中进行消息的删除

那么老规矩,继续一起看下代码实现吧

1. 发送流程前置操作

就从常见的这条语句进行切入 producer.sendAsync("hello java API pulsar:"+i+", 当前时间为:"+new Date()); 。通过代码中可以看到sendAsync 方法是从其父类ProducerBase中继承的

    public CompletableFuture<MessageId> sendAsync(Message<?> message) {
      	//继续跟踪进去
        return internalSendAsync(message);
    }
		
		//这是一个抽象方法,有分区生产者和非分区生产者两种实现,跟踪分区生产者的实现逻辑
    abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);

    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
      	//继续往里面跟踪
        return internalSendWithTxnAsync(message, null);
    }

   CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
        ....
        //还有印象吗,在生产者创建时会维护一个以分区号为key,ProducerImpl为value的Map,现在从里面获取相对应的对象进行处理
        return producers.get(partition).internalSendWithTxnAsync(message, txn);
    }

2. 消息处理以及包装

又回到ProducerImpl对象的逻辑了,相当于分区生产者对象只是个壳,无论分区还是非分区最终都是调用的ProducerImpl进行消息发送的真正逻辑,不废话继续往下看

    CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
        ....
      	return internalSendAsync(message);
    }

   CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
       	....
        //拦截器的逻辑就是在这里生效
        MessageImpl<?> interceptorMessage = (MessageImpl) beforeSend(message);
        ....
        //核心逻辑,继续切入
        sendAsync(interceptorMessage, new SendCallback() {
      		....
        });
        return future;
    }

    public void sendAsync(Message<?> message, SendCallback callback) {
        checkArgument(message instanceof MessageImpl);
				....
        //核心逻辑,从名字可以看到就是对消息进行序列化并进行发送逻辑,继续跟进去
				serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
        readStartIndex, payloadChunkSize, compressedPayload, compressed,
        compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
        } catch (PulsarClientException e) {
						....
        }
    }

  private void serializeAndSendMessage(....) throws IOException {
   					//核心逻辑,op是OpSendMsg对象,封装了要发送的消息内容,继续往下跟
            processOpSendMsg(op);
        }
    }

3. 消息发送

数据已经处理包装得差不多了,接下来就是发送的逻辑,咱们顺着 processOpSendMsg 方法继续往下看

protected void processOpSendMsg(OpSendMsg op) {
						....
            //将消息加到OpSendMsgQueue队列中,这就是“等待确认队列”
            pendingMessages.add(op);
						....
            //ClientCnx对象维护着Netty实现跟Broker的TCP连接
            final ClientCnx cnx = getCnxIfReady();
            if (cnx != null) {
								....
                //WriteInEventLoopCallback方法的run方法执行会将数据发送出去,然后队列中维护消息的状态
                cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
            } else {
               	....
            }
        } catch (Throwable t) {
            ....
        }
    }

		//WriteInEventLoopCallback是一个线程类,被放到线程池里面执行,因此直接看它的run方法
    private static final class WriteInEventLoopCallback implements Runnable {
    	public void run() {
						....
            try {
              	//熟悉Netty的朋友相信对writeAndFlush方法不默认,就是通过之间建立好的TCP连接将数据发送到Broker去
                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                op.updateSentTimestamp();
            } finally {
                recycle();
            }
        }
    }

跟踪到这里基本就结束了,对Netty感兴趣的朋友可以再继续往下跟,这里可以简单说一下,其实它内部是对JDK的NIO做了包装和优化,最底层也是通过Java的Socket连接网络端口进行的数据发送。不知道有没有小伙伴发现,咦,怎么没有对返回结果进行处理的逻辑呢?这就是所谓的异步设计,Netty不就是一个异步通信框架吗,客户端发送的逻辑到这里就是彻底结束了,而消息的处理结束就是要等服务端Broker向客户端发起消息ack请求了,想知道Pulsar怎么实现的吗?那就跟着我一起瞅瞅吧~

4. 消息确认

消息确认流程是由Broker端发起的,那么生产者对象肯定是通过Netty客户端接收的,所以直接看Pulsar实现的ChannelInboundHandlerAdapter类的channelRead的逻辑即可

public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      ....
      switch (cmd.getType()) {
          	....
          	//可以看得到消息有写成功的后续处理动作,那就从这里看看
            case PRODUCER_SUCCESS:
                checkArgument(cmd.hasProducerSuccess());
                handleProducerSuccess(cmd.getProducerSuccess());
                break;

            case UNSUBSCRIBE:
                checkArgument(cmd.hasUnsubscribe());
                safeInterceptCommand(cmd);
                handleUnsubscribe(cmd.getUnsubscribe());
                break;
      }
    }
}

handleProducerSuccess这个方法的是由ClientCnx对象进行实现的,那就跟进来看看吧

  protected void handleProducerSuccess(CommandProducerSuccess success) {
        checkArgument(state == State.Ready);

        if (log.isDebugEnabled()) {
            log.debug("{} Received producer success response from server: {} - producer-name: {}", ctx.channel(),
                    success.getRequestId(), success.getProducerName());
        }
        long requestId = success.getRequestId();
        if (!success.isProducerReady()) {
						//还记得pendingRequests 这个“待确认队列”吗,现在会从里面查询对应的消息
            TimedCompletableFuture<?> requestFuture = pendingRequests.get(requestId);
            if (requestFuture != null) {
              	//日志进行打印
                log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
                        success.getProducerName(), requestId);
              	//标记为成功
                requestFuture.markAsResponded();
            }
            return;
        }
				//客户端处理的主要逻辑就是从队列中移除此消息
        CompletableFuture<ProducerResponse> requestFuture =
                (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
        ....
    }

五、总结

生产者写数据的流程到这里基本就结束了,怎么样,是不是没想象中那么可怕?也许你对1、客户端对象创建 2、生产者对象创建 3、消息发送 这三步之间的关系还有点迷迷糊糊,那就请允许我给你举个例子,1、客户端对象创建相当于一座新城市的创建,打好城市的地基,2、生产者对象创建 相当于在这个地基的基础上建起了城市,发电厂等等,最重要的是修建其通往其他各个城市的交通要道,最后的3、消息发送 相当于这个新城市的居民们乘坐高铁去其他的城市。这么说相信你一定已经明白了~如果还有其他疑问欢迎在下面的评论区一起讨论

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1533808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【软考高项】十四、信息系统工程之数据工程

1、数据建模 概念模型&#xff1a;信息模型&#xff0c;不依赖具体的计算机系统&#xff0c;也不对应具体的DBMS&#xff0c;概念级别&#xff1b;基本元素包括&#xff1a;实体、属性、域&#xff08;属性的取值范围&#xff09;、键&#xff08;标识符、身份证&#xff09;、…

【嵌入式硬件】步进电机

1.步进电机简介 1.1步进电机基本原理 步进电机的英文是stepping motor。step的中文意思是行走、迈步。所以仅从字面上我们就可以得知,步进电机就是一步一步移动的电动机。说的官方一点儿,步进电机是一种将电脉冲信号转换成相应角位移或者线位移的电动机(直线电机)。下图为…

用 ElementPlus的日历组件如何改为中文

文章目录 问题分析 问题 直接引入日历组件后&#xff0c;都是英文&#xff0c;应该如何把头部英文改为中文 <template><el-calendar><template #date-cell"{ data }"><p :class"data.isSelected ? is-selected : ">{{ data.da…

并发编程所需的底层基础

一、计算机运行的底层原理 1.多级层次的存储结构 ①:辅存 固态盘不是主要的应用对象&#xff0c;因为固态盘的使用次数是有限的&#xff0c;无法支撑高并发场景 磁盘存储的最基本原理是电生磁。 磁盘的磁道里边有很多的磁颗粒&#xff0c;磁颗粒上边有一层薄膜为了防止磁点氧…

微火共享wifi贴如何加盟?

最近看到有多人在求助&#xff0c;微火共享wifi贴码如何加盟&#xff0c;所以&#xff0c;小编特意整理了一下&#xff0c;将想加入的具体办法都写了出来&#xff0c;以供大家参考。 首先&#xff0c;要想加入微火共享wifi&#xff0c;可以先进入其官网&#xff0c;方法有两种&…

力扣---括号生成---回溯---dfs/二进制

暴力--二进制 采用与&#xff1a;力扣---子集---回溯&#xff08;子集型回溯&#xff09;---递归-CSDN博客 中二进制求解一样的思路&#xff0c;即遍历0~-1&#xff08;从二进制去考虑&#xff09;&#xff0c;如果这个数的第 i 位为0&#xff0c;则括号的第 i 位为‘&#xff…

HTTP系列之HTTP缓存 —— 强缓存和协商缓存

文章目录 HTTP缓存强缓存协商缓存状态码区别缓存优先级如何设置强缓存和协商缓存使用场景 HTTP缓存 HTTP缓存时利用HTTP响应头将所请求的资源在浏览器进行缓存&#xff0c;缓存方式分两种&#xff1a;强缓存和协商缓存。 浏览器缓存是指将之前请求过的资源在浏览器进行缓存&am…

鸿蒙Harmony应用开发—ArkTS-全局UI方法(日历选择器弹窗)

点击日期弹出日历选择器弹窗&#xff0c;可选择弹窗内任意日期。 说明&#xff1a; 该组件从API Version 10开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 本模块功能依赖UI的执行上下文&#xff0c;不可在UI上下文不明确的地方使用&…

第3章 数据治理

思维导图 数据治理的定义&#xff1a;是在管理数据资产过程中行使权力和管控&#xff0c;包括计划、监控、和实施。 职能&#xff1a;指导所有其他数据管理领域的活动。目的&#xff1a;确保根据数据管理制度和最佳实践正确地管理数据。整体驱动力&#xff1a;确保组织可以从其…

hashmap专题

hashmap专题 常见maphashMapjdk1.8HashMap新变化hashmap源码中重要的常量存储结构put 存入数据过程取数据过程get()扩容hashmap 树化/链化hashmap的容量 桶的数量为什么要是2的n次方&#xff1f;hashmap为什么线程不安全 常见map hashtable线程安全&#xff0c;但效率太低。ha…

《深入Linux内核架构》第2章 进程管理和调度 (6)

目录 2.8 调度器增强 2.8.1 SMP调度 2.8.2 调度域和控制组 2.8.3 内核抢占和低延迟相关工作 2.9 小结 2.8 调度器增强 2.8.1 SMP调度 进程迁移&#xff1a; 含义&#xff1a;把进程从一个CPU就绪队列迁移至另一个CPU就绪队列。 作用&#xff1a;CPU负荷均衡。 缺点&…

【评分标准】【网络系统管理】2019年全国职业技能大赛高职组计算机网络应用赛项H卷 无线网络勘测设计

第一部分&#xff1a;无线网络勘测设计评分标准 序号评分项评分细项评分点说明评分方式分值1点位设计图AP编号AP编号符合“AP型号位置编号”完全匹配5AP型号独立办公室、小型会议室选用WALL AP110完全匹配5员工寝室选用智分&#xff0c;其他用放装完全匹配5其它区域选用放装AP…

【原创】JDK17获取CPU占用率、内存占用率以及堆内存使用情况

前言 我之前一篇文章&#xff1a; 【原创】Java获取CPU占用率、内存占用率最简单的方式_java获取cpu使用率-CSDN博客 这篇文章虽然简单&#xff0c;但是只能针对JDK8&#xff0c;换成现在模块化的JDK后&#xff0c;OperatingSystemMXBean类就无法反射获取其中的信息了&#…

代码随想录 动态规划-股票问题

目录 121.买卖股票的最佳时机 122买卖股票的最佳时机II 123. 买卖股票的最佳时机III 188.买卖股票的最佳时机IV 309.买卖股票的最佳时机含冷冻期 714.买卖股票的最佳时机含手续费 121.买卖股票的最佳时机 121. 买卖股票的最佳时机 简单 给定一个数组 prices &…

【FLOOD FILL专题】【蓝桥杯备考训练】:扫雷、动态网格、走迷宫、画图、山峰和山谷【已更新完成】

目录 1、扫雷&#xff08;Google Kickstart2014 Round C Problem A&#xff09; 2、动态网格&#xff08;Google Kickstart2015 Round D Problem A&#xff09; 3、走迷宫&#xff08;模板&#xff09; 4、画图&#xff08;第六次CCF计算机软件能力认证&#xff09; 5、山…

Docker 安装 Skywalking以及UI界面

关于Skywalking 在现代分布式系统架构中&#xff0c;应用性能监控&#xff08;Application Performance Monitoring, APM&#xff09;扮演着至关重要的角色。本文将聚焦于一款备受瞩目的开源APM工具——Apache Skywalking&#xff0c;通过对其功能特性和工作原理的详细介绍&am…

cocos(困扰了我一晚上的代码)地图适配不同手机屏幕

Creator 版本&#xff1a;3.8.2 目标平台&#xff1a; 微信小游戏 有人反馈我的小游戏没有做屏幕适配。其实我是做了的。但是在不同手机分辨率的情况下。 适配出了问题。然后我改了一晚上的代码终于做好了。 设计的分辨率是960*640.也就是白色区域。 但是大部分手机是…

2024全国水科技大会:【协办单位】山东文远环保科技股份有限公司

山东文远环保科技股份有限公司坐落于千年古城齐国故都--临淄。初始成立于2011年&#xff0c;是淄博市首批国有资本参股的混合改制企业。 公司着力打造环保设备制造、环保工程及服务、环保水务/固废处理/新能源项目投资及运营管理、固废循环经济产业园等四大板块。是一家集投资、…

【spring】@Conditional注解学习

Conditional介绍 Conditional注解用于按照设定的条件进行判断&#xff0c;从而决定是否将某个bean注册到Spring容器中。 Conditional注解是在Spring 4.0版本中引入的&#xff0c;它提供了一种更加灵活的方式来控制bean的创建和注册。在此之前&#xff0c;开发者通常使用Profi…