spring-boot-starter-pulsar工程
使用gradle工程来进行开发程序,首先查看gradle工程。
|
|
pulsar消息队列需要注意的消费者模式: Apache Pulsar 提供了四种主要的消费者消费模式,用于灵活地处理消息传递的不同场景需求。以下是它们的详细介绍:
1. Exclusive 模式(独占模式)
- 特点:一个订阅只能由一个消费者占用。
- 场景:确保某个消息只能被一个消费者处理,适合订单处理等需要强一致性的场景。
- 说明:如果另一个消费者尝试使用同一个订阅连接,它将收到错误。
2. Failover 模式(故障转移模式)
- 特点:一个订阅有多个消费者,但只有一个是活动消费者,其他的处于备用状态。
- 故障切换:当活动消费者断开连接时,备用消费者将接管。
- 场景:高可用性要求,例如需要在消费者崩溃时确保消息继续被处理。
3. Shared 模式(共享模式,又称 Round-robin)
- 特点:同一个订阅下的多个消费者可以共同消费消息,消息被平均分配(轮询机制)。
- 场景:高吞吐量需求,例如流处理任务或负载均衡。
4. Key_Shared 模式(基于键的共享模式)
- 特点:消息根据 Key 分组,同一个 Key 的消息始终由同一个消费者处理。
- 场景:需要按特定逻辑分组处理的场景,例如按用户 ID 处理请求。
总结
根据应用需求选择消费模式:
- 独占模式:适用于单一消费者场景。
- 故障转移模式:确保消息处理的高可用性。
- 共享模式:提升吞吐量和负载分布。
- 键共享模式:按业务逻辑分组处理消息。
实现的效果就是:
在24秒收到了14秒的信息。
发送延迟的信息的java代码如下:
在 Apache Pulsar 中,延迟消息(Delayed Message)的功能允许生产者设置消息的延迟时间,使消息在指定的时间后才被消费者接收。这种功能在以下场景中非常有用:
-
任务调度:
- 可以用来实现定时任务。例如,发送一条消息,指示某个任务在未来的特定时间点执行。
-
重试机制:
- 在分布式系统中,如果某个任务失败,可以通过延迟消息来实现重试逻辑。例如,延迟一段时间后重新尝试处理失败的任务。
-
流量控制:
- 在高并发场景下,可以通过延迟消息来缓解系统压力,避免瞬时流量过大导致系统崩溃。
-
用户提醒:
- 用于发送延迟的通知或提醒消息,例如在用户注册后的一段时间发送欢迎邮件,或者在某个事件即将发生时发送提醒。
-
事件驱动的工作流:
- 在事件驱动的架构中,可以通过延迟消息来协调不同服务之间的时间依赖性。
延迟消息的实现通常依赖于消息头中的时间戳,Pulsar 的 Broker 会根据时间戳决定何时将消息投递给消费者。如果您想了解更多技术细节,可以参考 Pulsar 的延迟消息文档。
如何消费了,这里使用pulsarListener来启用代码如下所示:
以下是 @PulsarListener
注解中各参数的详细解释:
参数解释
|
|
1. schemaType = SchemaType.STRING
- 含义:指定消息的序列化模式,
SchemaType.STRING
表示消费的消息是字符串格式。
2. subscriptionName = "subscription-Shared"
- 含义:定义消费组的订阅名称。这个名称唯一标识一个订阅。
- 作用:Pulsar 使用订阅名称来追踪消息的消费进度。
3. subscriptionType = SubscriptionType.Shared
- 含义:设置订阅模式为
Shared
模式。- 多个消费者可以使用同一个订阅,消息会以轮询方式分发给多个消费者。
- 提高了消息消费的并发性和吞吐量。
4. ackMode = AckMode.MANUAL
- 含义:消息的确认方式设置为手动模式。
- 消费者需要在业务逻辑中显式调用
consumer.acknowledge()
方法来确认消息已被成功处理。 - 提供对消息确认更精细的控制。
- 消费者需要在业务逻辑中显式调用
5. topics = {"${pulsar.topic.test-topic: test-topic}"}
- 含义:指定监听的主题(topic)。
- 使用占位符
${pulsar.topic.test-topic}
,可以从外部配置文件加载主题名称。 - 如果未设置外部配置,则使用默认值
test-topic
。
- 使用占位符
6. autoStartup = "true"
- 含义:当应用启动时,自动启动该监听器。
- 设置为
true
表示自动启用监听;设置为false
则需要手动启动监听器。
- 设置为
7. batch = true
- 含义:启用批量消费模式。
- 消费者每次接收一批消息,而不是一条条处理。
- 提高了高吞吐量场景下的处理效率。
8. properties = { "consumerName=consumerSharedA" }
- 含义:设置消费者的自定义属性。
- 在此示例中,消费者的名称被设置为
consumerSharedA
。 - 用途:用于标识该消费者,方便日志或监控中进行识别。
- 在此示例中,消费者的名称被设置为
9. consumerCustomizer = "consumerBatchReceiveCustomizer"
- 含义:引用一个自定义的
PulsarListenerConsumerBuilderCustomizer
配置。- 作用:进一步定制消费者的行为,例如设置批量接收策略、队列大小等。
总结
这段注解的整体作用是定义一个消费 test-topic
主题的监听器,使用 Shared
订阅模式,允许多个消费者共同消费消息;手动确认消息,支持批量接收。同时,通过配置 consumerName
和 consumerCustomizer
,可以实现消费者的精细化配置。