Flink未来将与 Pulsar集成提供大规模的弹性数据处理

Python进阶学习交流
关注

未来整合

Pulsar可以以不同的方式与Apache Flink集成。一些潜在的集成包括使用流式连接器为流式工作负载提供支持,并使用批量源连接器支持批量工作负载。Pulsar还提供对schema 的本地支持,可以与Flink集成并提供对数据的结构化访问,例如使用Flink SQL作为在Pulsar中查询数据的方式。最后,集成这些技术的另一种方法可能包括使用Pulsar作为Flink的状态后端。由于Pulsar具有分层架构(Streams和Segmented Streams,由Apache Bookkeeper提供支持),因此将Pulsar用作存储层并存储Flink状态变得很自然。

从体系结构的角度来看,我们可以想象两个框架之间的集成,它使用Apache Pulsar作为统一的数据层视图,Apache Flink作为统一的计算和数据处理框架和API。

现有集成

两个框架之间的集成正在进行中,开发人员已经可以通过多种方式将Pulsar与Flink结合使用。例如,Pulsar可用作Flink DataStream应用程序中的流媒体源和流式接收器。开发人员可以将Pulsar中的数据提取到Flink作业中,该作业可以计算和处理实时数据,然后将数据作为流式接收器发送回Pulsar主题。这样的例子如下所示:

// create and configure Pulsar consumer

PulsarSourceBuilder<String>builder = PulsarSourceBuilder

.builder(new SimpleStringSchema())

.serviceUrl(serviceUrl)

.topic(inputTopic)

.subscriptionName(subscription);

SourceFunction<String> src = builder.build();

// ingest DataStream with Pulsar consumer

DataStream<String> words = env.addSource(src);

// perform computation on DataStream (here a simple WordCount)

DataStream<WordWithCount> wc = words

.flatMap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {

collector.collect(new WordWithCount(word, 1));

})

.returns(WordWithCount.class)

.keyBy("word")

.timeWindow(Time.seconds(5))

.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->

new WordWithCount(c1.word, c1.count + c2.count));

// emit result via Pulsar producer

wc.addSink(new FlinkPulsarProducer<>(

serviceUrl,

outputTopic,

new AuthenticationDisabled(),

wordWithCount -> wordWithCount.toString().getBytes(UTF_8),

wordWithCount -> wordWithCount.word)

);

开发人员可以利用的两个框架之间的另一个集成包括将Pulsar用作Flink SQL或Table API查询的流式源和流式表接收器,如下例所示:

// obtain a DataStream with words

DataStream<String> words = ...

// register DataStream as Table "words" with two attributes ("word", "ts").

//   "ts" is an event-time timestamp.

tableEnvironment.registerDataStream("words", words, "word, ts.rowtime");

// create a TableSink that produces to Pulsar

TableSink sink = new PulsarJsonTableSink(

serviceUrl,

outputTopic,

new AuthenticationDisabled(),

ROUTING_KEY);

// register Pulsar TableSink as table "wc"

tableEnvironment.registerTableSink(

"wc",

sink.configure(

new String[]{"word", "cnt"},

new TypeInformation[]{Types.STRING, Types.LONG}));

// count words per 5 seconds and write result to table "wc"

tableEnvironment.sqlUpdate(

"INSERT INTO wc " +

"SELECT word, COUNT(*) AS cnt " +

"FROM words " +

"GROUP BY word, TUMBLE(ts, INTERVAL '5' SECOND)");

最后,Flink将批量工作负载与Pulsar集成为批处理接收器,其中所有结果在Apache Flink完成静态数据集中的计算后被推送到Pulsar。这样的例子如下所示:

// obtain DataSet from arbitrary computation

DataSet<WordWithCount> wc = ...

// create PulsarOutputFormat instance

OutputFormat pulsarOutputFormat = new PulsarOutputFormat(

serviceUrl,

topic,

new AuthenticationDisabled(),

wordWithCount -> wordWithCount.toString().getBytes());

// write DataSet to Pulsar

wc.output(pulsarOutputFormat);

结论

Pulsar和Flink都对应用程序的数据和计算级别如何以批量作为特殊情况流“流式传输”方式分享了类似的观点。通过Pulsar的Segmented Streams方法和Flink在一个框架下统一批处理和流处理工作负载的步骤,有许多方法将这两种技术集成在一起,以提供大规模的弹性数据处理。

声明: 本文由入驻OFweek维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。
侵权投诉

下载OFweek,一手掌握高科技全行业资讯

还不是OFweek会员,马上注册
打开app,查看更多精彩资讯 >
  • 长按识别二维码
  • 进入OFweek阅读全文
长按图片进行保存