- 创建.net 6 API
- 安装依赖包
- 创建kafka生产者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using KafkaHelper.Config;
using Microsoft.Extensions.Options;
namespace KafkaHelper
{
public class KafkaProducer
{
public IOptionsMonitor<KafkaConfig> _kafkaconfig;
public KafkaProducer(IOptionsMonitor<KafkaConfig> kafkaconfig)
{
_kafkaconfig = kafkaconfig;
}
public void sendMessage()
{
}
//创建topic
public async Task<bool> createTopic(string topicName,short factorNum,int partitionNum)
{
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}"}).Build())
{
try
{
await adminClient.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }});
return true;
}
catch (CreateTopicsException e)
{
Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
return false;
}
}
}
//删除topic
public async Task<bool> deleteTopic(List<string> topicName)
{
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build())
{
try
{
await adminClient.DeleteTopicsAsync(topicName, null);
return true;
}
catch (Exception)
{
return false;
}
}
}
//判断topic存在与否
public async Task<bool> checkTopic(string topicName)
{
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build())
{
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
var topicsMetadata = metadata.Topics;
var topicNames = metadata.Topics.Select(a => a.Topic).ToList();
return topicNames.Contains(topicName);
}
}
public static void handler(DeliveryReport<Null, string> deliveryReport)
{
Console.WriteLine(!deliveryReport.Error.IsError
? $"Delivered message to {deliveryReport.TopicPartitionOffset}"
: $"Delivery Error: {deliveryReport.Error.Reason}");
}
public bool sendMessage(string topicName)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
MessageSendMaxRetries = 2
};
try
{
using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
{
p.Produce(topicName, new Message<Null, string> { Value = $"my message" }, handler);
p.Flush(TimeSpan.FromSeconds(10));
}
return true;
}
catch (Exception)
{
return false;
}
}
//自定义将消息发送到某个topic的分区中,以保证这个分区只存储某一个特定类型的数据
public bool sendMessagePartition(string topicName)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
MessageSendMaxRetries = 2
};
try
{
using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
{
var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build();
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
var topic = meta.Topics.SingleOrDefault(t => t.Topic == topicName);
var topicPartitions = topic.Partitions;
TopicPartition topicPartition = new TopicPartition(topicName, new Partition(1));
p.Produce(topicPartition, new Message<Null, string> { Value = $"my message" }, handler);
p.Flush(TimeSpan.FromSeconds(10));
}
return true;
}
catch (Exception)
{
return false;
}
}
//添加标头
public bool sendMessageHeader(string topicName)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
MessageSendMaxRetries = 2
};
try
{
using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
{
var header = new Headers();
header.Add("ellis", Encoding.UTF8.GetBytes("{\"ellis\":\"dalian\"}"));
p.Produce(topicName, new Message<Null, string> { Value = $"my message",Headers=header }, handler);
p.Flush(TimeSpan.FromSeconds(10));
}
return true;
}
catch (Exception)
{
return false;
}
}
public static void keyhandler(DeliveryReport<string, string> deliveryReport)
{
Console.WriteLine(!deliveryReport.Error.IsError
? $"Delivered message to {deliveryReport.TopicPartitionOffset}"
: $"Delivery Error: {deliveryReport.Error.Reason}");
}
//通过指定key,让kafka按照key的hash值进行message的分区选择,相同的key会发送到相同的分区
public bool sendMessageKey(string topicName,string key)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
MessageSendMaxRetries = 2,
CompressionType= CompressionType.Gzip,
Acks= Acks.All,
EnableIdempotence= true,
};
try
{
using (var p = new ProducerBuilder<string, string>(producerConfig).Build())
{
p.Produce(topicName, new Message<string, string> { Key=key, Value = $"my test" }, keyhandler);
p.Flush(TimeSpan.FromSeconds(10));
}
return true;
}
catch (Exception)
{
return false;
}
}
}
}
p.Flush(TimeSpan.FromSeconds(10)),这里Flush函数的作用是等待所有回调函数执行完成,参数是超时时间,也就是最大的等待时间,这个操作无法被取消,所以应该设置较短的时间。还有需要注意的是,Flush函数的位置,不要让阻塞出现在循环中。
需要说明的是,kafka生产者在不指定key的时候,消息会均衡的分布在各个分区,我们可以指定消息的key,使得同一个key的消息发送到同一个分区。也可以指定消息发送的partition。
同一个分区消息是有序的。
关于kafka生产者的配置
https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
样例
https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples
本博客源码
https://github.com/xdqt/asp.net-core-efcore-jwt-middleware/tree/master/CoreKafka