springcloudsteam整合kafka进行消息发送与接收-创新互联
spring cloud steam :
Binder和Binding
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
整合配置yml文件:
spring:
cloud:
# function:
# definition: testChannel
stream:
default-binder: kafka #默认的binder是kafka(粘合剂粘合的类型为kafka)
#可以动态绑定的目标列表(如:动态路由),如果设置,则只能绑定列出的目的地
# dynamic-destinations:
#绑定信息
bindings:
#消息报错后,数据保存的topic
error:
destination: myError
testChannel-in-0:
#消费者
consumer:
# bindingName: myInConsumer
#消费者并发 默认为1
concurrency: 1
#是否分区接收数据 默认false
partitioned: false
#头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
header-mode: headers
#重试次数
max-attempts: 3
#初始回退间隔时间
back-off-initial-interval: 1000
#大回退间隔时间
back-off-max-interval: 10000
#回退倍数
back-off-multiplier: 2.0
#大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
instance-index: -1
#大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
instance-count: -1
content-type: application/json
#生产者
producer:
#一个确定如何分配出站数据的SpEL表达式
partition-key-expression: headers.cs
#一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
#partition-key-extractor-class:
#一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
#partition-selector-class:
#partition-selector-expression:
#如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
partition-count: 3
#消息发送失败的处理逻辑默认是关闭的 test.errors
error-channel-enabled: true
destination: test #目标主题 相当于kafka的topic
binder: kafka #粘合器
content-type: application/json
# content-type: text/html
group: group2
testChannel-out-0:
#消费者
consumer:
# bindingName: myOutConsumer
#消费者并发 默认为1
concurrency: 1
#是否分区接收数据 默认false
partitioned: false
#头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
header-mode: headers
#重试次数
max-attempts: 3
#初始回退间隔时间
back-off-initial-interval: 1000
#大回退间隔时间
back-off-max-interval: 10000
#回退倍数
back-off-multiplier: 2.0
#大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
instance-index: -1
#大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
instance-count: -1
# #生产者
# producer:
# #一个确定如何分配出站数据的SpEL表达式
# partition-key-expression: headers.cs
# #一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
# #partition-key-extractor-class:
# #一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
# #partition-selector-class:
# #partition-selector-expression:
# #如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
# partition-count: 3
# #消息发送失败的处理逻辑默认是关闭的 test.errors
# error-channel-enabled: true
destination: test #本例子创建了另外一个topic (test1)用于区分不同的功能区分。
binder: kafka
content-type: application/json
group: group1
# producer:
# error-channel-enabled: true
## partitionSelectorName: customPartitionSelector
## partitionKeyExtractorName: customPartitionKeyExtractor
# partitionCount: 3
# partitionKeyExpression: headers.cs
## partition-key-extractor-name: customPartitionKeyExtractor
## partition-selector-name: customPartitionSelector
binders:
kafka:
binder:
#kafka brokers,默认localhost
brokers: localhost
#kafka 端口号,默认9092
default-broker-port: 9092
#kafka zk节点,默认localhost
zk-nodes: localhost
#zookeeper 端口
default-zk-port: 2181
#配置,map
#configuration:
#自定义标题列表
headers: cs
#偏移量保存时间(ms)窗口,0:忽略,默认10000(ms)
offset-update-time-window: 10000
#偏移量保存次数,与时间窗口互斥
offset-update-count: 0
#broker 需要的ack数量
required-acks: 1
#只有设置autoCreateTopics或autoAddPartitions才有效
min-partition-count: 1
#自动创建topic时 生成的副本数量
replication-factor: 1
#自动创建主题
auto-create-topics: true
#如果设置为true,则绑定器将根据需要创建新的分区。如果设置为false,则绑定器将依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定器将无法启动。
auto-add-partitions: true
#socket 缓冲区大小
socket-buffer-size: 2097152
bootstrap-servers: 127.0.0.1:9092 #kafka服务地址,集群部署的时候需要配置多个
#配置,map
configuration:
acks: -1
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
# value:
# serializer: org.apache.kafka.common.serialization.StringSerializer
max:
poll:
records: 200
retries: 3
session:
timeout:
ms: 40000 # 每次消费的处理时间
#绑定
bindings:
testChannel-out-put:
#消费者
consumer:
#主题分区消费者组成员之间自动平衡
auto-rebalance-enabled: true
#自动提交偏移量
auto-commit-offset: true
# auto-commit-on-error:
#连接恢复尝试之间的间隔,以毫秒为单位。
recovery-interval: 5000
#是否将消费者偏移量重置为start-offset提供的值
reset-offsets: false
#新组的起始偏移量,或resetOffsets为true时的起始偏移量。允许的值:earliest,latest,默认值:null(相当于earliest)
start-offset: earliest
enable-dlq: false
# configuration:
#接收错误消息的DLQ主题的名称。默认值:null(如果未指定,将导致错误的消息将转发到名为error::的主题)。:
# dlq-name:
#生产者
producer:
# configuration:
buffer-size: 16348
#生产者是否是同步的
sync: true
#生产者在发送之前等待多长时间,以便允许更多消息在同一批次中累积。(通常,生产者根本不等待,并且简单地发送在先前发送进行中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。
batch-timeout: 0
# key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
health-timeout: 60
消息生产者:
@Component
public class MessageProducer {private final StreamBridge streamBridge;
@Autowired(required = false)
private BinderAwareChannelResolver resolver;
public MessageProducer(StreamBridge streamBridge) {this.streamBridge = streamBridge;
}
public String resolverSendMessage(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
sendMessageDto.setIp(UUID.randomUUID().toString());
sendMessageDto.setMessage(messages);
sendMessageDto.setTiem(new Date().toString());
MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
stringMessageBuilder.setHeader("cs","1");
// stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
// Messagebuild = ;
GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
resolver.resolveDestination("testChannel-in-0").send(stringMessage);
return "yes!";
}
public String send(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
sendMessageDto.setIp(UUID.randomUUID().toString());
sendMessageDto.setMessage(messages);
sendMessageDto.setTiem(new Date().toString());
// String s = JSON.toJSONString(sendMessageDto);
MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
stringMessageBuilder.setHeader("cs","1");
// stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
// Messagebuild = ;
GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
streamBridge.send("testChannel-in-0", stringMessage);
return "发送消息: " + messages;
}
}
消息消费者
//注意这里采用的是函数式编程,向spring 容器中注入名为testChannel 的bean, 应为高版本的spring cloud steam 弃用了
//@StreamListener ,@Input等注解,而是提倡函数式接口
//testChannel 与生产者写入消息的通道名“testChannel-in-0” 有所差异,-in-0是spring cloud steam存在的默认规则
@Bean(name = "testChannel")
ConsumertestChannel( ) {return str ->{System.out.println("消费者处理消息:" +str );
};
}
如果需要指定消息的分区,需要在配置文件中自定义分区的计算逻辑属性为:
partition-key-expression: headers.cs
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
标题名称:springcloudsteam整合kafka进行消息发送与接收-创新互联
路径分享:http://azwzsj.com/article/jscoi.html