Kafka高级特性解析之主题

news2024/11/15 21:43:34

1、管理

使用kafka-topics.sh脚本:

选项说明
--config <String: name=value>为创建的或修改的主题指定配置信息。支持下述配置条目:
  1. cleanup.policy
  2. compression.type
  3. delete.retention.ms
  4. file.delete.delay.ms
  5. flush.messages
  6. flush.ms
  7. follower.replication.throttled.replicas
  8. index.interval.bytes
  9. leader.replication.throttled.replicas
  10. max.message.bytes
  11. message.format.version
  12. message.timestamp.difference.max.ms
  13. message.timestamp.type
  14. min.cleanable.dirty.ratio
  15. min.compaction.lag.ms
  16. min.insync.replicas
  17. preallocate
  18. retention.bytes
  19. retention.ms
  20. segment.bytes
  21. segment.index.bytes
  22. segment.jitter.ms
  23. segment.ms
  24. unclean.leader.election.enable
--create创建一个新主题
--delete删除一个主题
--delete-config <String: name>删除现有主题的一个主题配置条目。这些条目就是在 --config中给出的配置条目。
--alter更改主题的分区数量,副本分配和/或配置条目。
--describe列出给定主题的细节。
--disable-rack-aware禁用副本分配的机架感知。
--force抑制控制台提示信息
--help打印帮助信息
--if-exists如果指定了该选项,则在修改或删除主题的时候,只有主题存在才可以执行。
--if-not-exists在创建主题的时候,如果指定了该选项,则只有主题不存在的时候才可以执行命令。
--list列出所有可用的主题。
--partitions <Integer: # of partitions>要创建或修改主题的分区数。
--replica-assignment
<String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2
,broker_id_for_part2_replica1
:broker_id_for_part2_replica2 , ...>
当创建或修改主题的时候手动指定partition-to-broker的分配关系。
--replication-factor <Integer:replication factor>要创建的主题分区副本数。1表示只有一个副本,也就是Leader副本。
--topic <String: topic>要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。
--topics-with-overridesif set when describing topics, only show topics that have overridden configs
--unavailable-partitionsif set when describing topics, only show partitions whose leader is not available
--under-replicated-partitionsif set when describing topics, only show under replicated partitions
--zookeeper <String: urls>必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。

主题中可以使用的参数定义:

属性默认值服务器默认属性说明
cleanup.policydeletelog.cleanup.policy要么是”delete“要么是” compact“; 这个字符串指明了针对旧日志部分的利用方式;默认方式("delete")将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩
compression.typenoneproducer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
delete.retention.ms86400000 (24hours)log.cleaner.delete.retention.ms对于压缩日志保留的最长时间,也是客户端消费消息的最长时间,通log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。此项配置可以在topic创建时的置顶参数覆盖
flush.msNonelog.flush.interval.ms此项配置用来置顶强制进行fsync日志到磁盘的时间间隔;例如,如果设置为1000,那么每1000ms就需要进行一次fsync。一般不建议使用这个选项
flush.messagesNonelog.flush.interval.messages此项配置指定时间间隔:强制进行fsync日志。例如,如果这个选项设置为1,那么每条消息之后都需要进行fsync,如果设置为5,则每5条消息就需要进行一次fsync。一般来说,建议你不要设置这个值。此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
index.interval.bytes4096log.index.interval.bytes默认设置保证了我们每4096个字节就对消息添加一个索引,更多的索引使得阅读的消息更加靠近,但是索引规模却会由此增大;一般不需要改变这个选项
max.message.bytes1000000max.message.byteskafka追加消息的最大尺寸。注意如果你增大这个尺寸,你也必须增大你consumer的fetch 尺寸,这样consumer才能fetch到这些最大尺寸的消息。
min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项配置控制log压缩器试图进行清除日志的频率。默认情况下,将避免清除压缩率超过50%的日志。这个比率避免了最大的空间浪费
min.insync.replicas1min.insync.replicas当producer设置request.required.acks为-1时, min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer会产生异常。
retention.bytesNonelog.retention.bytes如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制
retention.ms7 dayslog.retention.minutes如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间。
segment.bytes1GBlog.segment.byteskafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小
segment.index.bytes10MBlog.index.size.max.bytes此配置是有关offsets和文件位置之间映射的索引文件的大小;一般不需要修改这个配置
segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.
segment.ms7 dayslog.roll.hours即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件
unclean.leader.election.enabletrue指明了是否能够使不在ISR中replicas设置用来作为leader

(1)创建主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

(2)查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe

(3)修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576

kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760

kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

(4)删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

给主题添加删除的标记: 

要过一段时间删除。 

2、增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通过--alter修改主题的分区数,增加分区。

kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2

3、分区副本的分配-了解

副本分配的三个目标:

  • 均衡地将副本分散于各个broker上
  • 对于某个broker上分配的分区,它的其他副本在其他broker上
  • 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  • 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  • 其余副本通过增加偏移进行分配。

4、必要参数配置

kafka-topics.sh --config xx=xx --config yy=yy

配置给主题的参数:

属性默认值服务器默认属性说明
cleanup.policydeletelog.cleanup.policy要么是”delete“要么是” compact“; 这个字符串指明了针对旧日志部分的利用方式;默认方式("delete")将会丢弃旧的部分当他们的回收时间或者尺寸限制到达时。”compact“将会进行日志压缩
compression.typenoneproducer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy、 lz4。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
max.message.bytes1000000max.message.byteskafka追加消息的最大字节数。注意如果你增大这个字节数,也必须增大consumer的fetch字节数,这样consumer才能fetch到这些最大字节数的消息。
min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项配置控制log压缩器试图进行清除日志的频率。默认情况下,将避免清除压缩率超过50%的日志。这个比率避免了最大的空间浪费
min.insync.replicas1min.insync.replicas当producer设置request.required.acks为-1时, min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到, producer会产生异常。
retention.bytesNonelog.retention.bytes如果使用“delete”的retention 策略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制
retention.ms7 dayslog.retention.minutes如果使用“delete”的retention策略,这项配置就是指删除日志前日志保存的时间。
segment.bytes1GBlog.segment.byteskafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小
segment.index.bytes10MBlog.index.size.max.bytes此配置是有关offsets和文件位置之间映射的索引文件的大小;一般不需要修改这个配置
segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.
segment.ms7 dayslog.roll.hours即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件
unclean.leader.election.enabletrue指明了是否能够使不在ISR中replicas设置用来作为lea

5、KafkaAdminClient应用

说明

        除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient

功能与原理介绍

        Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。

KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

名称api
创建主题createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
删除主题deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)
列出所有主题listTopics(final ListTopicsOptions options)
查询主题describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)
查询集群信息describeCluster(DescribeClusterOptions options)
查询配置信息describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)
修改配置信息alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
修改副本的日志目录alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
查询节点的日志目录信息describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)
查询副本的日志目录信息describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options)
增加分区createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。

用到的参数:

属性说明重要性
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。
这个列表仅影响用来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,...
由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。
一般最好两台,以防其中一台宕掉。
high
client.id生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。
一般该id是跟业务有关的字符串。
medium
connections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000medium
receive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认65536,可选值:[-1,...]medium
request.timeout.ms客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000medium
security.protocol跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL.
string类型值,默认:PLAINTEXT
medium
send.buffer.bytes用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。int类型值,默认值:131072medium
reconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
long型值,默认1000,可选值:[0,...]
medium
reconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。 long型值,默认:50low
retriesThe maximum number of times to retry a call before failing it.重试的次数,达到此值,失败。 int类型值,默认5。low
retry.backoff.ms在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。该时间的存在避免了密集循环。
long型值,默认值:100。
low

主要操作步骤:

  • 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
  • 客户端发送请求至Kafka Broker。
  • Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
  • 客户端接收相应的回执并进行解析处理。
  • 和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
package com.lagou.kafka.demo;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class MyAdminClient {

    private KafkaAdminClient client;

    @Before
    public void before() {

        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "node1:9092");
        configs.put("client.id", "admin_001");

        client = (KafkaAdminClient) KafkaAdminClient.create(configs);
    }

    @After
    public void after() {
        // 关闭admin客户端
        client.close();
    }

    @Test
    public void testListTopics() throws ExecutionException, InterruptedException {
        // 列出主题
//        final ListTopicsResult listTopicsResult = client.listTopics();

        ListTopicsOptions options = new ListTopicsOptions();
        // 列出内部主题
        options.listInternal(true);
        // 设置请求超时时间,单位是毫秒
        options.timeoutMs(500);

        final ListTopicsResult listTopicsResult = client.listTopics(options);

//        final Set<String> strings = listTopicsResult.names().get();
//
//        strings.forEach(name -> {
//            System.out.println(name);
//        });

        // 将请求变成同步的请求,直接获取结果
        final Collection<TopicListing> topicListings = listTopicsResult.listings().get();

        topicListings.forEach(new Consumer<TopicListing>() {
            @Override
            public void accept(TopicListing topicListing) {

                // 该主题是否是内部主题
                final boolean internal = topicListing.isInternal();
                // 该主题的名字
                final String name = topicListing.name();


                System.out.println("主题是否是内部主题:" + internal);
                System.out.println("主题的名字:" + name);
                System.out.println(topicListing);
                System.out.println("=====================================");
            }
        });

    }


    @Test
    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
        final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));

        final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
                = describeLogDirsResult.all().get();

        integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
            @Override
            public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
                System.out.println("broker.id = " + integer);
//                log.dirs可以设置多个目录
                stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
                    @Override
                    public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
                        System.out.println("logdir = " + s);
                        final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;

                        replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
                            @Override
                            public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
                                System.out.println("主题分区:" + topicPartition.partition());
                                System.out.println("主题:" + topicPartition.topic());
//                                final boolean isFuture = replicaInfo.isFuture;
//                                final long offsetLag = replicaInfo.offsetLag;
//                                final long size = replicaInfo.size;
                            }
                        });

                    }
                });
            }
        });


    }





}

6、偏移量管理

  • Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
  • 早期由zookeeper管理消费组的偏移量。

查询方法:

  • 通过原生 kafka 提供的工具脚本进行查询。
  • 工具脚本的位置与名称为bin/kafka-consumer-groups.sh

首先运行脚本,查看帮助:

参数说明
--all-topics将所有关联到指定消费组的主题都划归到reset-offsets 操作范围。
--bootstrapserver<String: server to connect to>必须:(基于消费组的新的消费者): 要连接的服务器地址。
--by-duration <String: duration>距离当前时间戳的一个时间段。格式:'PnDTnHnMnS'
--command-config<String: command config property file>指定配置文件,该文件内容传递给Admin Client和消费者。
--delete传值消费组名称,删除整个消费组与所有主题的各个分区偏移量和所有者关系。
如: --group g1 --group g2。
传值消费组名称和单个主题,仅删除该消费组到指定主题的分区偏移量和所属关系。
如: --group g1 --group g2 --topic t1。
传值一个主题名称,仅删除指定主题与所有消费组分区偏移量以及所属关系。
如: --topic t1
注意:消费组的删除仅对基于ZK保存偏移量的消费组有效,并且要小心使用,仅删除不活跃的消费组。
--describe描述给定消费组的偏移量差距(有多少消息还没有消费)。
--execute执行操作。支持的操作: reset-offsets。
--export导出操作的结果到CSV文件。支持的操作: reset-offsets。
--from-file <String: path to CSV file>重置偏移量到CSV文件中定义的值。
--group <String: consumer group>目标消费组。
--list列出所有消费组。
--new-consumer使用新的消费者实现。这是默认值。随后的发行版中会删除这一操作。
--reset-offsets

重置消费组的偏移量。当前一次操作只支持一个消费组,并且该消费组应该是不活跃的。
有三个操作选项

  •  (默认)plan:要重置哪个偏移量。
  •  execute:执行 reset-offsets操作。
  • process:配合 --export将操作结果导出到CSV格式。可以使用如下选项:
    • --to-datetime
    • --by-period
    • --to-earliest
    • --to-latest
    • --shift-by
    • --from-file
    • --to-current。
    • 必须选择一个选项使用。
    • 要定义操作的范围,使用:
    • --all-topics
    • --topic。
    • 必须选择一个,除非使用 --from-file选项。
--shift-by <Long: number-of-offsets>重置偏移量n个。n可以是正值,也可以是负值。
--timeout <Long: timeout (ms)>对某些操作设置超时时间。
如:对于描述指定消费组信息,指定毫秒值的最大等待时间,以获取正常数据(如刚创建的消费组,或者消费组做了一些更改操作)。默认时间: 5000。
--to-current重置到当前的偏移量。
--to-datetime <String: datetime>重置偏移量到指定的时间戳。格式:'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest重置为最早的偏移量
--to-latest重置为最新的偏移量
--to-offset <Long:offset>重置到指定的偏移量。
--topic <String: topic>指定哪个主题的消费组需要删除,或者指定哪个主题的消费组需要包含到 reset-offsets操作中。对于 reset-offsets操作,还可以指定分区: topic1:0,1,2。其中0,1,2表示要包含到操作中的分区号。重置偏移量的操作支持多个主题一起操作。
--zookeeper <String: urls>必须,它的值,你懂的。 --zookeeper node1:2181/myKafka。

这里我们先编写一个生产者,消费者的例子:

        我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询

由于kafka 消费者记录group的消费偏移量有两种方式 :

  • kafka 自维护 (新)
  • zookpeer 维护 (旧) ,已经逐渐被废弃

        所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将--bootstrap-server 换成--zookeeper 即可。

(1)查看有那些 group ID 正在进行消费:

注意: 

  • 这里面是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
  • 注意: 重名的 group.id 只会显示一次

(2)查看指定group.id 的消费者消费情况

kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group

如果消费者停止,查看偏移量信息:

将偏移量设置为最早的: 

将偏移量设置为最新的: 

分别将指定主题的指定分区的偏移量向前移动10个消息: 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/71190.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不影响1,4丁炔二醇(BYD)的情况下去除铜离子的工艺

1,4-丁炔二醇BYD&#xff08;but-2-yne-1,4-diol&#xff09;是一种重要的中间体化工原料&#xff0c;广泛应用于生产丁二醇及其下游产品、维生素B6的主要原料&#xff0c;还可以用于镀镍的增亮剂、防腐抑制剂等领域。 1,4&#xff0d;丁二醇&#xff08;BDO&#xff09;是一种…

(附源码)ssm人才市场招聘信息系统 毕业设计 271621

基于jsp的人才市场招聘信息系统的设计与实现 摘 要 人才市场招聘信息系统采用B/S结构、java开发语言、以及Mysql数据库等技术。系统主要分为管理员、用户、两部分&#xff0c;管理员管理主要功能包括&#xff1a;首页&#xff0c;站点管理&#xff08;轮播图、公告栏&#xf…

实验(七):串行口实验

一、实验目的与任务 实验目的&#xff1a; 1&#xff0e;运行Keil开发环境&#xff0c;完成串行口通信软件编程&#xff1b; 2&#xff0e;利用单片机串行口方式1与主机通信&#xff0c;建立Proteus仿真模型。 3&#xff0e;完成系统仿真与调试。。 任务&#xff1a; 1.根据要求…

mongodb 存引擎及配置

上次我们分享到了 wiredTiger 引擎以及他对于以前默认的 MMAPV1 引擎的优势 关于 wiredTiger 引擎 配置这里补充一下&#xff1a; storage:journal:enabled: truedbPath: /data/xiaomotong/mongo1/directoryPerDB: trueengine: wiredTigerwiredTiger:engineConfig:cacheSizeGB:…

Kotlin 开发Android app(二十):悬浮框WindowManager和动画AnimationDrawable

安卓的悬浮框&#xff0c;悬浮框相当于对桌面的一种控制&#xff0c;在安卓中是允许这样的自定义的小窗体出现在桌面的&#xff0c;其实这种小桌面可以使某些应用调用起来非常的方便&#xff0c;而动画的展现使得程序看起来更加有爱。 悬浮框 悬浮框的使用&#xff0c;通常是跟…

【负荷预测】长短期负荷预测(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f389;作者研究&#xff1a;&#x1f3c5;&#x1f3c5;&#x1f3c5;主要研究方向是电力系统和智能算法、机器学…

Python学习-8.1.3 标准库(turtle库的基础与实例)

2.3 turtle库 turtle库是能够进行基本的图形绘制的标准库。 turtle库包含100多个功能函数&#xff0c;主要包括三类&#xff1a;窗体函数、画笔运动函数、画笔状态函数 2.3.1 窗体函数 注&#xff1a;像素是指组成图像的小方格&#xff0c;每个小方格都有一个明确的位置和被…

图解LeetCode——1812. 判断国际象棋棋盘中一个格子的颜色(难度:简单)

一、题目 给你一个坐标 coordinates &#xff0c;它是一个字符串&#xff0c;表示国际象棋棋盘中一个格子的坐标。下图是国际象棋棋盘示意图。 如果所给格子的颜色是白色&#xff0c;请你返回 true&#xff0c;如果是黑色&#xff0c;请返回 false 。 给定坐标一定代表国际象…

少走弯路 → PlantUML网站推荐

PlantUML官网 Real World PlantUML 建议画图前从这里拷贝模板 PlantUML在线编辑 虽然简洁但是比官网好用的多 PlantUML 在线编辑器画面美观推荐使用 文章目录类图类图 一直都没搞懂 关联&#xff0c;依赖&#xff0c;组合&#xff0c;聚合的关系&#xff0c;看了视频稍微…

hdfs-over-ftp使用说明

hdfs-over-ftp使用说明 一、介绍 hdfs-over-ftp可以将hdfs文件系统通过ftp服务方式暴露出来,可以通过ftp客户端下载和上传hadoop文件。 二、编译及安装配置 原作者很久不更新了https://github.com/iponweb/hdfs-over-ftp 如果要支持hadoop2、hadoop3需要自己编译&#xff1b;可…

阿里妈妈展示广告召回之多场景建模算法

丨目录&#xff1a; 摘要 背景 方法 实验分析 总结 参考文献1. 摘要工业推荐系统通常拥有多个业务场景&#xff0c;并需要同时为这些场景提供推荐服务。在召回阶段&#xff0c;从大量商品库中选出的个高质量商品需要针对不同场景进行相应调整。以阿里妈妈展示广告为例&#xf…

认识 MySQL数据库和Redis缓存的数据一致性问题

文章目录1. 什么是数据的一致性2. 数据不一致情况及应对策略3. 数据一致性中需要注意的其他问题有哪些&#xff1f;1. 什么是数据的一致性 “数据一致”一般指的是&#xff1a;缓存中有数据&#xff0c;缓存的数据值 数据库中的值。 但根据缓存中是有数据为依据&#xff0c;…

微信外卖点餐小程序毕业设计,微信订餐小程序系统设计与实现,微信小程序毕业设计论文怎么写毕设源码开题报告需求分析怎么做

基于微信小程序的毕业设计题目(5)php点菜外卖小程序(含开题报告、任务书、中期报告、答辩PPT、论文模板) 项目背景和意义 目的&#xff1a;本课题主要目标是设计并能够实现一个基于微信小程序外卖点菜系统&#xff0c;前台用户使用小程序&#xff0c;后台管理使用基PHPMySql的B…

【AI入门】利用Paddle实现简单的数字识别

梳理逻辑 整个流程 准备好Paddle的环境准备好训练样本设计模型(定义模型)训练模型模型测试 1、准备好环境 #加载飞桨和相关类库 import paddle from paddle.nn import Linear import paddle.nn.functional as F import os import numpy as np import matplotlib.pyplot as plt…

Kafka 为什么那么快?

有人说&#xff1a;他曾在一台配置较好的机子上对 Kafka 进行性能压测&#xff0c;压测结果是 Kafka 单个节点的极限处理能力接近每秒 2000万 条消息&#xff0c;吞吐量达到每秒 600MB。 那 Kafka 为什么这么快&#xff1f;如何做到这个高的性能&#xff1f; 本篇文章主要从这…

梯度消失、梯度爆炸和梯度裁剪(Gradient Clipping)

消失梯度 网络训练过程中&#xff0c;如果每层网络的梯度都小于 1&#xff0c;各层梯度的偏导数会与后面层 传递而来的梯度相乘得到本层的梯度&#xff0c;并向前一层传递。该过程循环进行&#xff0c;最后导 致梯度指数级地减小&#xff0c;这就产生了梯度消失现象。这种情况…

第6季2:H264编码原理与基本概念

以下内容源于网络资源的学习与整理&#xff0c;如有侵权请告知删除。 参考博客 &#xff08;1&#xff09;H264 编码基本原理_ByteSaid的博客-CSDN博客_h264编码原理 &#xff08;2&#xff09;H264 编码简介_mydear_11000的博客-CSDN博客 &#xff08;3&#xff09;什么是I帧…

鸿蒙3.0应用开发体验

鸿蒙os3.0发布以来&#xff0c;华为官方开始主推etsarkui开发模式&#xff0c;逐渐抛弃java&#xff0c;为以后去安卓化做铺垫&#xff0c;但目前在笔者体验来看&#xff0c;仍需要大力完善&#xff0c;还有很长的路要走&#xff01; 什么是ets&#xff1f;ts是js的超集&#x…

日志、logback

logback下载步骤&#xff1a; logback官网https://logback.qos.ch/index.html教程http://t.csdn.cn/xSK0I 点击SLF4J API进去&#xff0c;注意看右上角的标题是有变化的&#xff0c;是什么目录下就会显示什么目录&#xff0c;点击下载 然后一样点击右下角的Maven 下载这三个&a…

centos7 基于Dledger搭建rocketmq 5.0.0并集成到微服务(1主2从)

小伙伴们&#xff0c;你们好呀&#xff0c;我是老寇&#xff0c;好久不见啦&#xff0c;甚是想念。 rocketmq和rocketmq-console安装包&#xff1a;https://pan.baidu.com/s/1swrV9ffJnmz4S0mfkuBbIw 提取码&#xff1a;1111 1.准备三台主机 192.168.1.1rocketmq&#xff0…