一文详解Flink知识体系

园陌
关注

4) Flink 关联 Hive 分区表

Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。通过这种方式,用户无需编写 DataStream 程序即可完成 Kafka 流实时关联最新的 Hive 分区实现数据打宽。

具体用法:

在 Sql Client 中注册 HiveCatalog:

vim conf/sql-client-defaults.yaml
catalogs:
 - name: hive_catalog
   type: hive
   hive-conf-dir: /disk0/soft/hive-conf/ #该目录需要包hive-site.xml文件

创建 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
   master Row

Flink 事实表与 Hive 最新分区数据关联

dim_extend_shop_info 是 Hive 中已存在的表,所以我们用 table hint 动态地开启维表参数。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,  
    ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
   from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
      JOIN hive_catalog.flink_db.dim_extend_shop_info  
 + OPTIONS('streaming-source.enable'='true',  
    'streaming-source.partition.include' = 'latest',  
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name')
   FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表  
   ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
   where groupID in (202042)) t  where t.rn = 1

参数解释:

streaming-source.enable 开启流式读取 Hive 数据。

streaming-source.partition.include 有以下两个值:

latest 属性: 只读取最新分区数据。all: 读取全量分区数据 ,默认值为 all,表示读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能直接读取最新分区数据。

streaming-source.monitor-interval 监听新分区生成的时间、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。需要注意的是,1.12.1 放开了这个限制,但仍建议按照实际业务不要配个太短的 interval。

streaming-source.partition-order 分区策略,主要有以下 3 种,其中最为推荐的是 partition-name:

partition-name 使用默认分区名称顺序加载最新分区create-time 使用分区文件创建时间顺序partition-time 使用分区时间顺序六、Flink 状态管理

我们前面写的 wordcount 的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

因此可以说flink因为引入了state和checkpoint所以才支持的exactly once

首先区分一下两个概念:

state:

state一般指一个具体的task/operator的状态:

state数据默认保存在java的堆内存中,TaskManage节点的内存中。

operator表示一些算子在运行的过程中会产生的一些中间结果。

checkpoint:

checkpoint可以理解为checkpoint是把state数据定时持久化存储了,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。

注意:task(subTask)是Flink中执行的基本单位。operator指算子(transformation)

State可以被记录,在失败的情况下数据还可以恢复。

Flink中有两种基本类型的State:

Keyed State

Operator State

Keyed State和Operator State,可以以两种形式存在:

原始状态(raw state)

托管状态(managed state)

托管状态是由Flink框架管理的状态。

我们说operator算子保存了数据的中间结果,中间结果保存在什么类型中,如果我们这里是托管状态,则由flink框架自行管理

原始状态由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

1. State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。

保存state的数据结构:

ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

MapState

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。

1. ValueState

使用ValueState保存中间结果对下面数据进行分组求和。

开发步骤:

1. 获取流处理执行环境
 2. 加载数据源
 3. 数据分组
 4. 数据转换,定义ValueState,保存中间结果
 5. 数据打印
 6. 触发执行

ValueState:测试数据源:

List(
  (1L, 4L),
  (2L, 3L),
  (3L, 1L),
  (1L, 2L),
  (3L, 2L),
  (1L, 2L),
  (2L, 2L),
  (2L, 9L)
)

示例代码:

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object TestKeyedState {
 class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
   *
    * ValueState状态句柄. 第一个值为count,第二个值为sum。
   
   private var sum: ValueState[(Long, Long)] = _
   override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
     // 获取当前状态值
     val tmpCurrentSum: (Long, Long) = sum.value
     // 状态默认值
     val currentSum = if (tmpCurrentSum != null) {
       tmpCurrentSum
     } else {
       (0L, 0L)
     }
     // 更新
     val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
     // 更新状态值
     sum.update(newSum)
     // 如果count >=3 清空状态值,重新计算
     if (newSum._1 >= 3) {
       out.collect((input._1, newSum._2 / newSum._1))
       sum.clear()
     }
   }
   override def open(parameters: Configuration): Unit = {
     sum = getRuntimeContext.getState(
       new ValueStateDescriptor[(Long, Long)]("average", // 状态名称
         TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型
     )
   }
 }  
 def main(args: Array[String]): Unit = {
   //初始化执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //构建数据源
   val inputStream: DataStream[(Long, Long)] = env.fromCollection(
     List(
       (1L, 4L),
       (2L, 3L),
       (3L, 1L),
       (1L, 2L),
       (3L, 2L),
       (1L, 2L),
       (2L, 2L),
       (2L, 9L))
   )
   //执行数据处理
   inputStream.keyBy(0)
     .flatMap(new CountWithKeyedState)
     .setParallelism(1)
     .print
   //运行任务
   env.execute
 }
}  
2. MapState

使用MapState保存中间结果对下面数据进行分组求和:

1. 获取流处理执行环境
 2. 加载数据源
 3. 数据分组
 4. 数据转换,定义MapState,保存中间结果
 5. 数据打印
 6. 触发执行

MapState:测试数据源:

List(
  ("java", 1),
  ("python", 3),
  ("java", 2),
  ("scala", 2),
  ("python", 1),
  ("java", 1),
  ("scala", 2)
)  

示例代码:

object MapState {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   *
     * 使用MapState保存中间结果对下面数据进行分组求和
     * 1.获取流处理执行环境
     * 2.加载数据源
     * 3.数据分组
     * 4.数据转换,定义MapState,保存中间结果
     * 5.数据打印
     * 6.触发执行
     
   val source: DataStream[(String, Int)] = env.fromCollection(List(
     ("java", 1),
     ("python", 3),
     ("java", 2),
     ("scala", 2),
     ("python", 1),
     ("java", 1),
     ("scala", 2)))
 
   source.keyBy(0)
     .map(new RichMapFunction[(String, Int), (String, Int)] {
       var mste: MapState[String, Int] = _
       override def open(parameters: Configuration): Unit = {
         val msState = new MapStateDescriptor[String, Int]("ms",
           TypeInformation.of(new TypeHint[(String)] {}),
           TypeInformation.of(new TypeHint[(Int)] {}))
         mste = getRuntimeContext.getMapState(msState)
       }
       override def map(value: (String, Int)): (String, Int) = {
         val i: Int = mste.get(value._1)
         mste.put(value._1, value._2 + i)
         (value._1, value._2 + i)
       }
     }).print()
   env.execute()
 }
}  
2. State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state。

保存state的数据结构:

ListState

举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

步骤:

获取执行环境

设置检查点机制:路径,重启策略

自定义数据源

需要继承并行数据源和CheckpointedFunction设置listState,通过上下文对象context获取数据处理,保留offset制作快照

数据打印

触发执行

示例代码:

import java.util
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object ListOperate {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   env.enableCheckpointing(5000)
   env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //重启策略
   env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
   //模拟kakfa偏移量
   env.addSource(new MyRichParrelSourceFun)
     .print()
   env.execute()
 }
}
class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
 with CheckpointedFunction {
 var listState: ListState[Long] = _
 var offset: Long = 0L
 //任务运行
 override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
   val iterState: util.Iterator[Long] = listState.get().iterator()
   while (iterState.hasNext) {
     offset = iterState.next()
   }
   while (true) {
     offset += 1
     ctx.collect("offset:"+offset)
     Thread.sleep(1000)
     if(offset > 10){
       1/0
     }
   }
 }
 //取消任务
 override def cancel(): Unit = ???
 //制作快照
 override def snapshotState(context: FunctionSnapshotContext): Unit = {
   listState.clear()
   listState.add(offset)
 }
 //初始化状态
 override def initializeState(context: FunctionInitializationContext): Unit = {
   listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
     "listState", TypeInformation.of(new TypeHint[Long] {})
   ))
 }
}
3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

1) API介绍

通常,我们首先会创建一个Keyed或Non-Keyed的Data Stream,然后再创建一个Broadcasted Stream,最后通过Data Stream来连接(调用connect方法)到Broadcasted Stream上,这样实现将Broadcast State广播到Data Stream下游的每个Task中。

如果Data Stream是Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用KeyedBroadcastProcessFunction来实现,下面是KeyedBroadcastProcessFunction的API,代码如下所示:

public abstract class KeyedBroadcastProcessFunction

上面泛型中的各个参数的含义,说明如下:

KS:表示Flink程序从最上游的Source Operator开始构建Stream,当调用keyBy时所依赖的Key的类型;IN1:表示非Broadcast的Data Stream中的数据记录的类型;IN2:表示Broadcast Stream中的数据记录的类型;OUT:表示经过KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。

如果Data Stream是Non-Keyed Stream,则连接到Broadcasted Stream后,添加处理ProcessFunction时需要使用BroadcastProcessFunction来实现,下面是BroadcastProcessFunction的API,代码如下所示:

public abstract class BroadcastProcessFunction

上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction的泛型类型中的后3个含义相同,只是没有调用keyBy操作对原始Stream进行分区操作,就不需要KS泛型参数。

注意事项:

Broadcast State 是Map类型,即K-V类型。

Broadcast State 只有在广播一侧的方法中processBroadcastElement可以修改;在非广播一侧方法中processElement只读。

Broadcast State在运行时保存在内存中。

2) 场景举例

动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广播到下游Task中。

实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。

七、Flink的容错1. Checkpoint介绍

checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。

CheckpointCoordinator(检查点协调器) 周期性的向该流应用的所有source算子发送 barrier(屏障)。

当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理

下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。

每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。

当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败。

如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示:

假设算子C有A和B两个输入源

在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier 先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。

当输入源B发出的barrier到来时,算子C制作自身快照并向 CheckpointCoordinator 报告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。

当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被处理一次(Exactly Once)。

2. 持久化存储1) MemStateBackend

该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及快照的数据量非常小时使用,并不推荐用作大规模商业部署。

MemoryStateBackend 的局限性:

默认情况下,每个状态的大小限制为 5 MB。可以在MemoryStateBackend的构造函数中增加此值。

无论配置的最大状态大小如何,状态都不能大于akka帧的大小(请参阅配置)。

聚合状态必须适合 JobManager 内存。

建议MemoryStateBackend 用于:

本地开发和调试。

状态很少的作业,例如仅包含一次记录功能的作业(Map,FlatMap,Filter,...),kafka的消费者需要很少的状态。

2) FsStateBackend

该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。

建议FsStateBackend:

具有大状态,长窗口,大键 / 值状态的作业。

所有高可用性设置。

3) RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。

4) 语法val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 设置checkpoint的执行模式,最多执行一次或者至少执行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置checkpoint的超时时间
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照过程中出现错误,是否让整体任务失败:true是  false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//设置同一时间有多少 个checkpoint可以同时执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
5) 修改State Backend的两种方式

第一种:单任务调整

修改当前任务代码

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

第二种:全局调整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

6) Checkpoint的高级选项

默认checkpoint功能是disabled的,想要使用的时候需要先启用checkpoint开启之后,默认的checkPointMode是Exactly-once

//配置一秒钟开启一个checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的执行模式
//两种可选:
//CheckpointingMode.EXACTLY_ONCE:默认值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情况下选择CheckpointingMode.EXACTLY_ONCE,除非场景要求极低的延迟(几毫秒)
注意:如果需要保证EXACTLY_ONCE,source和sink要求必须同时保证EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默认情况下,检查点不被保留,仅用于在故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点,注意,在这种情况下,您必须在取消后手动清理检查点状态
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被cancel时,删除检查点,检查点仅在作业失败时可用
//设置checkpoint超时时间
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超时时间,超时时间内没有完成则被终止
//Checkpointing最小时间间隔,用于指定上一个checkpoint完成之后
//最小等多久可以触发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//设置同一个时间是否可以有多个checkpoint执行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定运行中的checkpoint最多可以有多少个
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint发生异常的时候,是否应该fail该task,默认是true,如果设置为false,则task会拒绝checkpoint然后继续运行
2. Flink的重启策略

Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启。集群可以通过默认的重启策略来重启,这个默认的重启策略通常在未指定重启策略的情况下使用,而如果Job提交的时候指定了重启策略,这个重启策略就会覆盖掉集群的默认重启策略。

1) 概览

默认的重启策略是通过Flink的 flink-conf.yaml 来指定的,这个配置参数 restart-strategy 定义了哪种策略会被采用。如果checkpoint未启动,就会采用 no restart 策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用 fixed-delay 策略,重试 Integer.MAX_VALUE 次。请参考下面的可用重启策略来了解哪些值是支持的。

每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

重启策略重启策略值Fixed delayfixed-delayFailure ratefailure-rateNo restartNone

除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在 ExecutionEnvironment 中调用 setRestartStrategy() 方法来程序化地调用,注意这种方式同样适用于 StreamExecutionEnvironment。

下面的例子展示了如何为Job设置一个固定延迟重启策略,一旦有失败,系统就会尝试每10秒重启一次,重启3次。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重启次数
 Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
))
2) 固定延迟重启策略(Fixed Delay Restart Strategy)

固定延迟重启策略会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:

restart-strategy: fixed-delay
配置参数描述默认值restart-strategy.fixed-delay.attempts在Job最终宣告失败之前,Flink尝试执行的次数1,如果启用checkpoint的话是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延迟重启意味着一个执行失败之后,并不会立即重启,而是要等待一段时间。akka.ask.timeout,如果启用checkpoint的话是1s

例子:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延迟重启也可以在程序中设置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重启次数
 Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
))
3) 失败率重启策略

失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

声明: 本文由入驻OFweek维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。
侵权投诉

下载OFweek,一手掌握高科技全行业资讯

还不是OFweek会员,马上注册
打开app,查看更多精彩资讯 >
  • 长按识别二维码
  • 进入OFweek阅读全文
长按图片进行保存