As mentioned in the State documentation, Flink has two types of state: keyed and non-keyed state (also called operator state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the deprecation of the aligned window operators from Flink 1.1 (see Aligned Processing Time Window Operators).
The migration process will serve two goals:
allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its Flink 1.1 predecessor.
After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 simply by taking a savepoint with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off.
As running examples for the remainder of this document we will use the CountMapper
and the BufferingSink
functions. The first is an example of a function with keyed state, while
the second has non-keyed state. The code for the aforementioned two functions in Flink 1.1 is presented below:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
The CountMapper
is a RichFlatMapFuction
which assumes a grouped-by-key input stream of the form
(word, 1)
. The function keeps a counter for each incoming key (ValueState<Integer> counter
) and if
the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
containing the word itself and the number of occurrences.
The BufferingSink
is a SinkFunction
that receives elements (potentially the output of the CountMapper
)
and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
This is a common way to avoid many expensive calls to a database or an external storage system. To do the
buffering in a fault-tolerant manner, the buffered elements are kept in a list (bufferedElements
) which is
periodically checkpointed.
To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you are guaranteed that the new version of your job will start from where its predecessor left off.
Keyed State: Something to note before delving into the details of the migration process is that if your function has only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support for the new features and full backwards compatibility. Changes could be made just for better code organization, but this is just a matter of style.
With the above said, the rest of this section focuses on the non-keyed state.
The first modification is the transition from the old Checkpointed<T extends Serializable>
state interface
to the new ones. In Flink 1.2, a stateful function can implement either the more general CheckpointedFunction
interface, or the ListCheckpointed<T extends Serializable>
interface, which is semantically closer to the old
Checkpointed
one.
In both cases, the non-keyed state is expected to be a List
of serializable objects, independent from each other,
thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the BufferingSink
contains elements (test1, 2)
and (test2, 2)
, when increasing the parallelism to 2, (test1, 2)
may end up in task 0,
while (test2, 2)
will go to task 1.
More details on the principles behind rescaling of both keyed state and non-keyed state can be found in the State documentation.
The ListCheckpointed
interface requires the implementation of two methods:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
Their semantics are the same as their counterparts in the old Checkpointed
interface. The only difference
is that now snapshotState()
should return a list of objects to checkpoint, as stated earlier, and
restoreState
has to handle this list upon recovery. If the state is not re-partitionable, you can always
return a Collections.singletonList(MY_STATE)
in the snapshotState()
. The updated code for BufferingSink
is included below:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
As shown in the code, the updated function also implements the CheckpointedRestoring
interface. This is for backwards
compatibility reasons and more details will be explained at the end of this section.
The CheckpointedFunction
interface requires again the implementation of two methods:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
As in Flink 1.1, snapshotState()
is called whenever a checkpoint is performed, but now initializeState()
(which is
the counterpart of the restoreState()
) is called every time the user-defined function is initialized, rather than only
in the case that we are recovering from a failure. Given this, initializeState()
is not only the place where different
types of state are initialized, but also where state recovery logic is included. An implementation of the
CheckpointedFunction
interface for BufferingSink
is presented below.
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
The initializeState
takes as argument a FunctionInitializationContext
. This is used to initialize
the non-keyed state “container”. This is a container of type ListState
where the non-keyed state objects
are going to be stored upon checkpointing:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
After initializing the container, we use the isRestored()
method of the context to check if we are
recovering after a failure. If this is true
, i.e. we are recovering, the restore logic is applied.
As shown in the code of the modified BufferingSink
, this ListState
recovered during state
initialization is kept in a class variable for future use in snapshotState()
. There the ListState
is cleared
of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the initializeState()
method. This can be done
using the FunctionInitializationContext
given as argument, instead of the RuntimeContext
, which is the case
for Flink 1.1. If the CheckpointedFunction
interface was to be used in the CountMapper
example,
the old open()
method could be removed and the new snapshotState()
and initializeState()
methods
would look like this:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
Notice that the snapshotState()
method is empty as Flink itself takes care of snapshotting managed keyed state
upon checkpointing.
So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my already running job from Flink 1.1 stopped?”.
The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
implement the CheckpointedRestoring
interface, as shown in the code above. This has a single method, the
familiar restoreState()
from the old Checkpointed
interface from Flink 1.1. As shown in the modified code of
the BufferingSink
, the restoreState()
method is identical to its predecessor.
In Flink 1.1, and only when operating on processing time with no specified evictor or trigger,
the command timeWindow()
on a keyed stream would instantiate a special type of WindowOperator
. This could be
either an AggregatingProcessingTimeWindowOperator
or an AccumulatingProcessingTimeWindowOperator
. Both of
these operators are referred to as aligned window operators as they assume their input elements arrive in
order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
WindowOperator
. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
that is compatible with the generic WindowOperator
, and resume execution using the generic WindowOperator
.
Note Although deprecated, you can still use the aligned window operators
in Flink 1.2 through special WindowAssigners
introduced for exactly this purpose. These assigners are the
SlidingAlignedProcessingTimeWindows
and the TumblingAlignedProcessingTimeWindows
assigners, for sliding and tumbling
windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
resume execution from a Flink 1.1 savepoint while using these operators.
Attention The aligned window operators provide no rescaling capabilities and no backwards compatibility with Flink 1.1.
The code to use the aligned window operators in Flink 1.2 is presented below:
// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
// for tumbling windows
val window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
val window2 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)