Flink 支持多种不同的重启策略,这些策略控制了在失败情况下工作要如何重启。 集群在启动时会伴随一个默认的重启策略,在没有定义具体工作重启策略时会使用该默认策略。 如果在工作提交时制定一个重启策略,该策略会覆盖集群的默认设定。
默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml
指定。
配置参数 restart-strategy 定义了哪个策略被使用。
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE
参数是尝试重启次数。
参阅下列可用的重启策略来了解什么值能被支持。
每个重启策略都有自己的一组参数来控制策略的行为。 这些值也可以在配置文件中设置。 每个重启策略的描述包括了更多关于对应配置值的信息。
重启策略 | 对应值 |
---|---|
固定间隔 (Fixed delay) | fixed-delay |
失败率 (Failure rate) | failure-rate |
无重启 (No restart) | none |
除了定义默认的重启策略,也可以为每个 Flink 工作定义一个具体的重启策略。
这个重启策略可以通过调用 ExecutionEnvironment
的 setRestartStrategy
方法在编程时设置。
该方法对 StreamExecutionEnvironment
同样有效。
下列例子展示我们如何为我们的工作设置一个固定间隔重启策略。 在失败的情况下,系统会重启工作 3 次,并在连续两次尝试重启中等待 10 秒。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
))
以下各部分描述与具体重启策略相关的配置选项。
固定截个重启策略会根据指定的次数尝试重启工作。 如果超过了最大尝试次数,则工作最终失败。 在连续两次的重启尝试中,重启策略会等待一段固定的时间。
这个策略可以通过设置 flink-conf.yaml
中的下列配置参数作为默认策略启动。
restart-strategy: fixed-delay
配置参数 | 描述 | 默认值 |
---|---|---|
restart-strategy.fixed-delay.attempts |
在工作宣布失败之前 Flink 尝试重启的次数. | 1, 或 Integer.MAX_VALUE 如果被 checkpointing 激活 |
restart-strategy.fixed-delay.delay |
间隔重启指的是在一次失败的执行之后,并不会立即重新开始另一次执行,而是在一段间隔之后再开始. 当程序与外部系统进行交互时,比如连接或待定事务需要在 Flink 尝试重新执行之前超时,该方法是有助的. | akka.ask.timeout , 或 10s 如果被 checkpointing 激活 |
举个例子:
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
固定间隔重启策略也可以在编程时进行设定:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
))
失败率重启策略在失败后重启工作,当超过 failure rate
(一个时间段内的失败次数) 时工作宣告失败。
在连续两次重启尝试中,该重启策略会等待一端固定的时间。
这个策略可以通过设置 flink-conf.yaml
中的下列配置参数作为默认策略启动。
restart-strategy: failure-rate
配置参数 | 描述 | 默认值 |
---|---|---|
在工作宣告失败之前,给定时间段内的最大重启次数 | 1 | |
衡量失败率的时间段. | 1 minute | |
连续两次重启尝试间的间隔 |
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
失败率重启策略也可以在编程时进行设定:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
))
没有任何工作重启,若工作失败则直接宣告失败。
restart-strategy: none
无重启策略也可以通过编程来设定:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
使用集群定义的重启策略。 该策略对启用 checkpointing 的流式程序有帮助。 如果没有其他的定义的重启策略,每一次默认启用都会启动固定间隔重启策略,