【pulsar学习】Pulsar Functions

文章目录

  • 1、pulsar Fuction背景
  • 2、编程模型
  • 3、简单实例
  • 4 总结

1、pulsar Fuction背景

当我们进行流式处理的时候,很多情况下,我们的需求可能只是下面这些简单的操作:简单的 ETL 操作\聚合计算操作等相关服务。

但为了实现这些功能,我们不得不去部署一整套 SPE 服务。部署成功后才发现需要的仅是 SPE(流处理引擎) 服务中的一小部分功能,部署 SPE 的成本可能比用户开发这个功能本身更困难。由于SPE 本身 API 的复杂性,我们需要了解这些算子的使用场景,明白不同算子之间有哪些区别,什么情况下,应该使用什么算子来处理相应的逻辑。

基于以上原因,我们设计并实现了 Pulsar Functions,在 Pulsar Functions 中,用户只需关心计算逻辑本身,而不需要去了解或者部署 SPE 的相关服务,当然你也可以将 pulsar-function 与现有的 SPE 服务一起使用。也就是说,在 Pulsar Functions 中,无需部署 SPE 的整套服务,就可以达到与 SPE 服务同样的优势 。

2、编程模型

在这里插入图片描述

  • Input topic 是数据的来源,在 Pulsar Functions 中,所有的数据均来自 input topic。当数据进入input topic 中,Pulsar Functions 充当消费者的角色,去 input topic 中消费消息;当从 input topic 中拿到需要处理的消息时,Pulsar Functions 充当生产者的角色往 output topic 或者 log topic 中生产消息。
  • Output topic 和 log topic 都可以看作是 Pulsar Functions 的输出。从是否会有 output 这个点来看,我们可以将 Pulsar Functions 分为两类,当有输出的时候 Pulsar Functions 会将相应的 output 输出到 output topic 中。log topic 主要存储用户的日志信息,当 Pulsar Functions 出现问题时,方便用户定位错误并调试。
  • 综上所述:我们不难看出 Pulsar Functions 充当了一个消息处理和转运的角色。可以独立承担如数据清洗、转换等操作,而且每个操作支持不同的语言如java和python进行编写。

3、简单实例

(1)修改配置文件:conf文件夹下的broker.conf文件

# 修改1161行:由false改为true,集群中所有的机器都需要修改
functionsWorkerEnabled=false  

接下来重启broker。

(2)java程序编写
引入依赖:

<dependency><groupId>org.apache.pulsargroupId><artifactId>pulsar-functions-apiartifactId><version>2.8.1version>
dependency>

编写程序:继承Function接口,实现process函数,其中就是简单的日期字符转换操作。

package www.whuhhh.cn.funtion;import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;/*** @author hhhSir* @create 2022-06-05 21:02*/
public class WordCountFunction implements Function<String, String> {private SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd HH/mm/ss");private SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic String process(String input, Context context) throws Exception {Date oldDate = format1.parse(input);return format2.format(oldDate);}
}

项目进行打包
在这里插入图片描述
将jar包传入服务器:

bin/pulsar-admin functions create \
--jar myFunctions/pulsarUpper-timeTrans.jar \
--classname www.whuhhh.cn.funtion.WordCountFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/test \
--tenant public \
--namespace default \
--name test2

显示:"Created successfully"就说明创建成功了。

其中:

bin/pulsar-admin functions
属性说明:functions: 可选值:localrun: 创建本地function进行运行create: 在集群模式下创建delete: 删除在集群中运行的functionget: 获取function的相关信息restart: 重启stop : 停止运行start: 启动status: 检查状态stats: 查看状态list: 查看特定租户和名称空间下的所有的function
--classname: 设置function执行类
--jar 设置function对应的jar包
--inputs : 输入的topic
--output : 输出的topic
--tenant : 设置function运行在那个租户中
--namespace: 设置function运行在那个名称空间中
--name : 定义function的名称

(3)程序测试
trigger 触发启动, 并向函数发送数据测试

bin/pulsar-admin functions trigger --name test2 --trigger-value "2021/10/10 15/30/30"

结果就显示了输出后的结果,说明程序运行成功。
在这里插入图片描述
可以编写程序向persistent://public/default/exclamation-input发送数据,并在persistent://public/default/test接收数据,java程序写法如上一篇博客介绍。测试结果如下:
在这里插入图片描述
在这里插入图片描述
说明数据转化成功。
(4)pulsar Functions查看

./bin/pulsar-admin functions list --tenant public --namespace default

(5)pulsar Functions重启

./bin/pulsar-admin functions restart \
--tenant public \
--namespace default \
--name ExclamationFunctio123“Restarted successfully”

(6)pulsar Functions更新

./bin/pulsar-admin functions update --tenant public --namespace default --name ExclamationFunctio123 --cpu 10"Updated successfully"

(7)pulsar Function删除

./bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name ExclamationFunctio123"Deleted successfully"

4 总结

成长也有期限的


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部