快速启动
简单的开始只包括3个简单的步骤。
1. 添加Maven依赖
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>${version}</version>
</dependency>
2. 配置生产者
用你想注册的所有生产商创建你的配置类。
@Configuration
public class TestProducerConfiguration {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("my-topic", MyMsg.class)
.addProducer("other-topic", String.class);
}
}
通过简单地将PulsarTemplate注入到你的服务中来使用注册的生产者。
@Service
class MyProducer {
@Autowired
private PulsarTemplate<MyMsg> producer;
void send(MyMsg msg) {
producer.send("my-topic", msg);
}
}
3. 配置消费者
用@PulsarConsumer
注解来注解你的服务方法。
@Service
class MyConsumer {
@PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
void consume(MyMsg msg) {
producer.send(TOPIC, msg);
}
}
4. 最低限度的配置
pulsar.service-url=pulsar://localhost:6650
示例项目
Java Pulsar Example Project
文档
配置
默认配置:
#PulsarClient
pulsar.service-url=pulsar://localhost:6650
pulsar.io-threads=10
pulsar.listener-threads=10
pulsar.enable-tcp-no-delay=false
pulsar.keep-alive-interval-sec=20
pulsar.connection-timeout-sec=10
pulsar.operation-timeout-sec=15
pulsar.starting-backoff-interval-ms=100
pulsar.max-backoff-interval-sec=10
pulsar.consumer-name-delimiter=
pulsar.namespace=default
pulsar.tenant=public
#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
pulsar.consumer.default.ack-timeout-ms=3000
属性解释:
Pulsar客户端
-
pulsar.service-url
- 链接 pulsar cluster 的URL. -
pulsar.io-threads
- 处理 brokers 连接得线程数量. -
pulsar.listener-threads
- 设置用于消息监听器/订阅器的线程数。 -
pulsar.enable-tcp-no-delay
- 是否在连接上使用TCP无延时标志,以禁用Nagle算法。 -
pulsar.keep-alive-interval-sec
- 每个client-broker-connection存活时间间隔。 -
pulsar.connection-timeout-sec
- 等待与broker建立连接的时间长度。如果持续时间过了,没有来自经纪人的响应,连接尝试将被放弃。 -
pulsar.operation-timeout-sec
- 操作超时。 -
pulsar.starting-backoff-interval-ms
- 回退时间间隔的时间长度(重试算法)。 -
pulsar.max-backoff-interval-sec
- 回退间隔的最大时间长度(重试算法)。 -
pulsar.consumer-name-delimiter
- 消费者名称是由Bean名称和方法组成的连接,并有一个分隔符。默认情况下,没有分界符,单词被连接在一起。 -
pulsar.namespace
- 命名空间分离。例如:app1/app2 或 dev/staging/prod。更多内容请见名称空间文档。 -
pulsar.tenant
- 支持Pulsar多租户。更多内容请见多租户文档。
消费者
-
pulsar.consumer.default.dead-letter-policy-max-redeliver-count
- pulsar 应该尝试多少次重试向消费者发送信息。 -
pulsar.consumer.default.ack-timeout-ms
- 多久应该接受信息,多长时间死信机制会尝试重试发送信息。
其他用途
1. PulsarMessage Wrapper
如果你需要访问Pulsar元数据,你只需使用PulsarMessage
作为包装,数据将被注入。
@Service
class MyConsumer {
@PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
void consume(PulsarMessage<MyMsg> myMsg) {
producer.send(TOPIC, msg.getValue());
}
}
2. 支持SpeL
你可以在 application.properties
中配置主题名称。
my.custom.topic.name=foo
@PulsarConsumer(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}