gradle开发pulsar应用程序

spring-boot-starter-pulsar工程

使用gradle工程来进行开发程序,首先查看gradle工程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
plugins {
	java
	id("org.springframework.boot") version "3.4.3"
	id("io.spring.dependency-management") version "1.1.7"
	id("io.freefair.lombok") version "8.0.1" // 使用 FreeFair 的 Lombok 插件
}

group = "com.pulsar"
version = "0.0.1-SNAPSHOT"

java {
	toolchain {
		languageVersion = JavaLanguageVersion.of(17)
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation("org.springframework.boot:spring-boot-starter-web"){
		exclude(group = "org.springframework.boot", module = "spring-boot-starter-logging")
	}
//	testImplementation("org.springframework.boot:spring-boot-starter-test")
//	testRuntimeOnly("org.junit.platform:junit-platform-launcher")
	implementation("org.springframework.boot:spring-boot-starter-log4j2")
	//pulsar
	implementation("org.springframework.boot:spring-boot-starter-pulsar"){
		exclude(group = "org.springframework.boot", module = "spring-boot-starter-logging")
	}
	//lombok
		compileOnly("org.projectlombok:lombok:1.18.28") // 仅在编译时使用 Lombok
		annotationProcessor("org.projectlombok:lombok:1.18.28") // 为注解处理器添加 Lombok
//		testCompileOnly("org.projectlombok:lombok:1.18.28") // 用于测试时编译
//		testAnnotationProcessor("org.projectlombok:lombok:1.18.28") // 测试注解处理器
}

tasks.withType<Test> {
	useJUnitPlatform()
}

pulsar消息队列需要注意的消费者模式: Apache Pulsar 提供了四种主要的消费者消费模式,用于灵活地处理消息传递的不同场景需求。以下是它们的详细介绍:

1. Exclusive 模式(独占模式)

  • 特点:一个订阅只能由一个消费者占用。
  • 场景:确保某个消息只能被一个消费者处理,适合订单处理等需要强一致性的场景。
  • 说明:如果另一个消费者尝试使用同一个订阅连接,它将收到错误。

2. Failover 模式(故障转移模式)

  • 特点:一个订阅有多个消费者,但只有一个是活动消费者,其他的处于备用状态。
  • 故障切换:当活动消费者断开连接时,备用消费者将接管。
  • 场景:高可用性要求,例如需要在消费者崩溃时确保消息继续被处理。

3. Shared 模式(共享模式,又称 Round-robin)

  • 特点:同一个订阅下的多个消费者可以共同消费消息,消息被平均分配(轮询机制)。
  • 场景:高吞吐量需求,例如流处理任务或负载均衡。

4. Key_Shared 模式(基于键的共享模式)

  • 特点:消息根据 Key 分组,同一个 Key 的消息始终由同一个消费者处理。
  • 场景:需要按特定逻辑分组处理的场景,例如按用户 ID 处理请求。

总结

根据应用需求选择消费模式:

  • 独占模式:适用于单一消费者场景。
  • 故障转移模式:确保消息处理的高可用性。
  • 共享模式:提升吞吐量和负载分布。
  • 键共享模式:按业务逻辑分组处理消息。

实现的效果就是: {04B68079-4DE0-49C2-BF91-881A9C177CF0}.png 在24秒收到了14秒的信息。 发送延迟的信息的java代码如下: {3018DC29-A21A-45D3-BFE4-2355BE0FCF2D}.png 在 Apache Pulsar 中,延迟消息(Delayed Message)的功能允许生产者设置消息的延迟时间,使消息在指定的时间后才被消费者接收。这种功能在以下场景中非常有用:

  1. 任务调度

    • 可以用来实现定时任务。例如,发送一条消息,指示某个任务在未来的特定时间点执行。
  2. 重试机制

    • 在分布式系统中,如果某个任务失败,可以通过延迟消息来实现重试逻辑。例如,延迟一段时间后重新尝试处理失败的任务。
  3. 流量控制

    • 在高并发场景下,可以通过延迟消息来缓解系统压力,避免瞬时流量过大导致系统崩溃。
  4. 用户提醒

    • 用于发送延迟的通知或提醒消息,例如在用户注册后的一段时间发送欢迎邮件,或者在某个事件即将发生时发送提醒。
  5. 事件驱动的工作流

    • 在事件驱动的架构中,可以通过延迟消息来协调不同服务之间的时间依赖性。

延迟消息的实现通常依赖于消息头中的时间戳,Pulsar 的 Broker 会根据时间戳决定何时将消息投递给消费者。如果您想了解更多技术细节,可以参考 Pulsar 的延迟消息文档。 如何消费了,这里使用pulsarListener来启用代码如下所示: {478DD4A1-BF9C-4D4F-A47D-676042474A1E}.png

以下是 @PulsarListener 注解中各参数的详细解释:

参数解释

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@PulsarListener(
    schemaType = SchemaType.STRING,
    subscriptionName = "subscription-Shared",
    subscriptionType = SubscriptionType.Shared,
    ackMode = AckMode.MANUAL,
    topics = {"${pulsar.topic.test-topic: test-topic}"},
    autoStartup = "true",
    batch = true,
    properties = { "consumerName=consumerSharedA" },
    consumerCustomizer = "consumerBatchReceiveCustomizer"
)

1. schemaType = SchemaType.STRING

  • 含义:指定消息的序列化模式,SchemaType.STRING 表示消费的消息是字符串格式。

2. subscriptionName = "subscription-Shared"

  • 含义:定义消费组的订阅名称。这个名称唯一标识一个订阅。
  • 作用:Pulsar 使用订阅名称来追踪消息的消费进度。

3. subscriptionType = SubscriptionType.Shared

  • 含义:设置订阅模式为 Shared 模式。
    • 多个消费者可以使用同一个订阅,消息会以轮询方式分发给多个消费者。
    • 提高了消息消费的并发性和吞吐量。

4. ackMode = AckMode.MANUAL

  • 含义:消息的确认方式设置为手动模式。
    • 消费者需要在业务逻辑中显式调用 consumer.acknowledge() 方法来确认消息已被成功处理。
    • 提供对消息确认更精细的控制。

5. topics = {"${pulsar.topic.test-topic: test-topic}"}

  • 含义:指定监听的主题(topic)。
    • 使用占位符 ${pulsar.topic.test-topic},可以从外部配置文件加载主题名称。
    • 如果未设置外部配置,则使用默认值 test-topic

6. autoStartup = "true"

  • 含义:当应用启动时,自动启动该监听器。
    • 设置为 true 表示自动启用监听;设置为 false 则需要手动启动监听器。

7. batch = true

  • 含义:启用批量消费模式。
    • 消费者每次接收一批消息,而不是一条条处理。
    • 提高了高吞吐量场景下的处理效率。

8. properties = { "consumerName=consumerSharedA" }

  • 含义:设置消费者的自定义属性。
    • 在此示例中,消费者的名称被设置为 consumerSharedA
    • 用途:用于标识该消费者,方便日志或监控中进行识别。

9. consumerCustomizer = "consumerBatchReceiveCustomizer"

  • 含义:引用一个自定义的 PulsarListenerConsumerBuilderCustomizer 配置。
    • 作用:进一步定制消费者的行为,例如设置批量接收策略、队列大小等。

总结

这段注解的整体作用是定义一个消费 test-topic 主题的监听器,使用 Shared 订阅模式,允许多个消费者共同消费消息;手动确认消息,支持批量接收。同时,通过配置 consumerNameconsumerCustomizer,可以实现消费者的精细化配置。

comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up