Amazon AWS Kinesis Streams Connector

The Kinesis connector provides access to Amazon AWS Kinesis Streams.

To use the connector, add the following Maven dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kinesis_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

The flink-connector-kinesis_2.10 has a dependency on code licensed under the Amazon Software License (ASL). Linking to the flink-connector-kinesis will include ASL licensed code into your application.

The flink-connector-kinesis_2.10 artifact is not deployed to Maven central as part of Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source.

Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module:

mvn clean install -Pinclude-kinesis -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests

The streaming connectors are not part of the binary distribution. See how to link with them for cluster execution here.

Using the Amazon Kinesis Streams Service

Follow the instructions from the Amazon Kinesis Streams Developer Guide to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.

Kinesis Consumer

The FlinkKinesisConsumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis.

Before consuming data from Kinesis streams, make sure that all streams are created with the status “ACTIVE” in the AWS dashboard.

Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

val env = StreamExecutionEnvironment.getEnvironment

val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))

The above is a simple example of using the consumer. Configuration for the consumer is supplied with a java.util.Properties instance, the configuration keys for which can be found in ConsumerConfigConstants. The example demonstrates consuming a single Kinesis stream in the AWS region “us-east-1”. The AWS credentials are supplied using the basic method in which the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting ConsumerConfigCons