pulsar的简单使用
Apache Pulsar 是一个云原生的分布式消息传递和流媒体平台,最初由 Yahoo! 创建。现在是作为Apache软件基金会的顶级项目.
在pulsar这个项目中还大量使用了另外一个Apache的顶级项目Bookkeeper作为存储层的解决方案,在研究完成Pulsar之后也会对Bookkeeper进行研究.
本文用于记录本地安装Pulsar的过程,希望对你有所帮助,祝好!
安装Pulsar
根据官网介绍安装Pulsar一共有三种安装方式,分别是
- 本地运行
- docker中运行
- k8s中运行
本地运行
#下载
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.9.1/apache-pulsar-2.9.1-bin.tar.gz
#解压
$ tar xvfz apache-pulsar-2.9.1-bin.tar.gz
# 启动pulsar
$ cd apache-pulsar-2.9.1
$ bin/pulsar standalone
docker中运行
- pulsar
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone -nfw
- pulsar admin
# 启动pulsar admin
docker run -it -p 9527:9527 -p 7750:7750 -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager:v0.2.0
# 修改密码
curl -H 'X-XSRF-TOKEN: $CSRF_TOKEN' -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' -H "Content-Type: application/json" -X PUT http://localhost:7750/pulsar-manager/users/superuser -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
- pulsar admin
k8s中运行
k8s中需要使用helm来运行
helm install
--values examples/values-minikube.yaml
--set initialize=true
--namespace pulsar
pulsar-mini apache/pulsar
pulsar示例
使用kotlin来演示
普通消息功能
- PulsarUtil
/**
* 用于创建Pulasr使用中用到的一些工具
*/
class PulsarUtil
/**
* 创建pulsar客户端对象
*/
fun createClient(): PulsarClient {
return PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build()
}
/**
* 创建Topic
*/
fun createProduct(topic: String, client: PulsarClient): Producer<String> {
return client.newProducer(Schema.STRING)
.topic(topic)
.create()
}
- PulsarProducer
class PulsarProducer
fun main(){
val client = createClient()
val product = createProduct("pulsar-topic-1", client)
for (i in 0..100){
product.send("message:$i")
}
}
- PulsarConsumer
fun main() {
val client = createClient()
val subscribe = client.newConsumer()
.topic("pulsar-topic-1")
.subscriptionName("pulsar-topic-1-sub")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while (true) {
val receive = subscribe.receive()
println("message:" + String(receive.data))
subscribe.acknowledge(receive.messageId)
}
}
第一个例子只是使用一个普通消息来演示pulsar的消息功能,推送完成后可以看到消息被消费了.
异步消息功能
异步消息分为’异步发送’和’异步接收’,异步接收应该是puslar特有的功能,虽然业务代码在使用MQ时也可以通过自定义线程池的方式来完成异步接收的效果
下面展示一个发送消息和异步接收消息的示例
- PulsarAsyncProducer
/**
* 异步消息
*/
private fun asyncMsg(product: Producer<String>) {
println(Thread.currentThread().name + ":提交任务")
for (i in 0..10) {
product.sendAsync("message:$i|" + LocalDateTime.now()).thenAccept {
println("Message with ID $it successfully sent|" + Thread.currentThread().name)
}
}
}
- 执行结果
可以看到回调线程不是main,而是单独的一个线程池中的线程