pulsar的简单使用


pulsar的简单使用

Apache Pulsar 是一个云原生的分布式消息传递和流媒体平台,最初由 Yahoo! 创建。现在是作为Apache软件基金会的顶级项目.
在pulsar这个项目中还大量使用了另外一个Apache的顶级项目Bookkeeper作为存储层的解决方案,在研究完成Pulsar之后也会对Bookkeeper进行研究.
本文用于记录本地安装Pulsar的过程,希望对你有所帮助,祝好!

安装Pulsar

根据官网介绍安装Pulsar一共有三种安装方式,分别是

  • 本地运行
  • docker中运行
  • k8s中运行

本地运行

#下载
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.9.1/apache-pulsar-2.9.1-bin.tar.gz
#解压
$ tar xvfz apache-pulsar-2.9.1-bin.tar.gz
# 启动pulsar
$ cd apache-pulsar-2.9.1
$ bin/pulsar standalone

docker中运行

  • pulsar
docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone -nfw
  • pulsar admin

# 启动pulsar admin
docker run -it  -p 9527:9527 -p 7750:7750 -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager:v0.2.0

# 修改密码
curl  -H 'X-XSRF-TOKEN: $CSRF_TOKEN' -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;'  -H "Content-Type: application/json" -X PUT http://localhost:7750/pulsar-manager/users/superuser -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
  • pulsar admin

pulsar admin

k8s中运行

k8s中需要使用helm来运行


helm install 
    --values examples/values-minikube.yaml 
    --set initialize=true 
    --namespace pulsar 
    pulsar-mini apache/pulsar

pulsar示例

使用kotlin来演示

普通消息功能

  • PulsarUtil
/**
 * 用于创建Pulasr使用中用到的一些工具
 */
class PulsarUtil

/**
 * 创建pulsar客户端对象
 */
fun createClient(): PulsarClient {
    return PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build()
}


/**
 * 创建Topic
 */
fun createProduct(topic: String, client: PulsarClient): Producer<String> {
    return client.newProducer(Schema.STRING)
        .topic(topic)
        .create()
}
  • PulsarProducer
class PulsarProducer

fun main(){
    val client = createClient()
    val product = createProduct("pulsar-topic-1", client)

    for (i in 0..100){
        product.send("message:$i")
    }
}
  • PulsarConsumer

fun main() {
    val client = createClient()
    val subscribe = client.newConsumer()
        .topic("pulsar-topic-1")
        .subscriptionName("pulsar-topic-1-sub")
        .ackTimeout(10, TimeUnit.SECONDS)
        .subscriptionType(SubscriptionType.Exclusive)
        .subscribe();

    while (true) {
        val receive = subscribe.receive()
        println("message:" + String(receive.data))
        subscribe.acknowledge(receive.messageId)
    }
}

第一个例子只是使用一个普通消息来演示pulsar的消息功能,推送完成后可以看到消息被消费了.

异步消息功能

异步消息分为’异步发送’和’异步接收’,异步接收应该是puslar特有的功能,虽然业务代码在使用MQ时也可以通过自定义线程池的方式来完成异步接收的效果
下面展示一个发送消息和异步接收消息的示例

  • PulsarAsyncProducer
/**
 * 异步消息
 */
private fun asyncMsg(product: Producer<String>) {
    println(Thread.currentThread().name + ":提交任务")
    for (i in 0..10) {
        product.sendAsync("message:$i|" + LocalDateTime.now()).thenAccept {
            println("Message with ID $it successfully sent|" + Thread.currentThread().name)
        }
    }
}
  • 执行结果

异步消息执行结果

可以看到回调线程不是main,而是单独的一个线程池中的线程


  TOC