SpringBoot整合Apache Pulsar

快速启动

简单的开始只包括3个简单的步骤。

1. 添加Maven依赖

<dependency>
  <groupId>io.github.majusko</groupId>
  <artifactId>pulsar-java-spring-boot-starter</artifactId>
  <version>${version}</version>
</dependency>

2. 配置生产者

用你想注册的所有生产商创建你的配置类。

@Configuration
public class TestProducerConfiguration {

    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
            .addProducer("my-topic", MyMsg.class)
            .addProducer("other-topic", String.class);
    }
}

通过简单地将PulsarTemplate注入到你的服务中来使用注册的生产者。

@Service
class MyProducer {

	@Autowired
	private PulsarTemplate<MyMsg> producer;

	void send(MyMsg msg) {
		producer.send("my-topic", msg);
	}
}

3. 配置消费者

@PulsarConsumer注解来注解你的服务方法。

@Service
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(MyMsg msg) { 
        producer.send(TOPIC, msg); 
    }
}

4. 最低限度的配置

pulsar.service-url=pulsar://localhost:6650

示例项目

Java Pulsar Example Project

文档

配置

默认配置:


#PulsarClient
pulsar.service-url=pulsar://localhost:6650
pulsar.io-threads=10
pulsar.listener-threads=10
pulsar.enable-tcp-no-delay=false
pulsar.keep-alive-interval-sec=20
pulsar.connection-timeout-sec=10
pulsar.operation-timeout-sec=15
pulsar.starting-backoff-interval-ms=100
pulsar.max-backoff-interval-sec=10
pulsar.consumer-name-delimiter=
pulsar.namespace=default
pulsar.tenant=public

#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
pulsar.consumer.default.ack-timeout-ms=3000

属性解释:

Pulsar客户端

  • pulsar.service-url - 链接 pulsar cluster 的URL.
  • pulsar.io-threads - 处理 brokers 连接得线程数量.
  • pulsar.listener-threads - 设置用于消息监听器/订阅器的线程数。
  • pulsar.enable-tcp-no-delay - 是否在连接上使用TCP无延时标志,以禁用Nagle算法。
  • pulsar.keep-alive-interval-sec - 每个client-broker-connection存活时间间隔。
  • pulsar.connection-timeout-sec - 等待与broker建立连接的时间长度。如果持续时间过了,没有来自经纪人的响应,连接尝试将被放弃。
  • pulsar.operation-timeout-sec - 操作超时。
  • pulsar.starting-backoff-interval-ms - 回退时间间隔的时间长度(重试算法)。
  • pulsar.max-backoff-interval-sec - 回退间隔的最大时间长度(重试算法)。
  • pulsar.consumer-name-delimiter - 消费者名称是由Bean名称和方法组成的连接,并有一个分隔符。默认情况下,没有分界符,单词被连接在一起。
  • pulsar.namespace - 命名空间分离。例如:app1/app2 或 dev/staging/prod。更多内容请见名称空间文档
  • pulsar.tenant - 支持Pulsar多租户。更多内容请见多租户文档

消费者

  • pulsar.consumer.default.dead-letter-policy-max-redeliver-count - pulsar 应该尝试多少次重试向消费者发送信息。
  • pulsar.consumer.default.ack-timeout-ms - 多久应该接受信息,多长时间死信机制会尝试重试发送信息。

其他用途

1. PulsarMessage Wrapper

如果你需要访问Pulsar元数据,你只需使用PulsarMessage作为包装,数据将被注入。

@Service
class MyConsumer {
    
    @PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
    void consume(PulsarMessage<MyMsg> myMsg) { 
        producer.send(TOPIC, msg.getValue()); 
    }
}

2. 支持SpeL

你可以在 application.properties 中配置主题名称。

my.custom.topic.name=foo
@PulsarConsumer(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}

原文:GitHub - majusko/pulsar-java-spring-boot-starter: Simple pulsar spring boot starter with annotation based consumer/producer registration.