RabbitMQ Connector

RabbitMQ连接器许可证

Flink下的RabbitMQ连接器位于一个maven依赖” RabbitMQ AMQP Java Clien”上,由Mozilla Public License v1.1 (MPL 1.1) 许可。

Flink本身不重写” RabbitMQ AMQP Java Clien”中的源码,也不对其进行打包成二进制文件。 用户基于flink的rabbitMQ连接器(即RabbitMQ AMQP Java Clien)创建和发布拓展开的工作,可能会受到Mozilla Public License v1.1 (MPL 1.1)说明的一些限制。

RabbitMQ连接器

该连接器访问流数据来源RabbitMQ。为使用连接器,请添加如下依赖在你的项目中,

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-rabbitmq_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

译者注:上述方法是由maven构建项目时使用,当使用sbt构建项目时,需要在build.sbt中的正确子项目中添加如下:

("org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion).
              exclude("org.apache.flink","flink-shaded-hadoop1_2.10"),
如果使用scala2.10,则不需要exclude.

注意的是,流连接器当前都不是二元分布。更多集群执行请看这里

Back to top

安装RabbitMQ

阅读Rabbitmq下载页的介绍。安装后,服务器会自动启动,应用程序连接rabbitmq将会启动。

Back to top

Rabbitmq数据源

连接器提供一个类RMQSource,以消费源自于rabbitmq队列里的消息。消费RabbitMQ数据源可有三个不同层级保证,取决于flink的配置如何。

1, 仅有一次,为实现保证仅有一次消费rabbitmq数据源,如下是需要的–

  • 可检查点:检查点生效后,在检查点完成后,消息是互相确认的(因此,会把消息从rabbitmq中删除)。

  • 使用相关编号:相关编号是rabbitmq应用的特征,当提交一个消息进rabbitmq时,必须得在消息配置中设置一个相关编号。在检查点恢复是,源利用相关编号去重已经被处理过的数据,

  • 非并行的源:实现仅有一次,源必须非并行(并行度为1)。这个限制是因为rabbitmq是从一个单一队列存在多个消费者的调度消息方式。

2, 至少一次:当检查点生效,但是没有使用相关编号或者源是并行的,源仅仅提供至少消费一次的保证。

3, 没有保证:如果检查点未生效,源没有任何强分发的保证。这种设置下,替代flink的检查点,一旦接收和处理消息后,消息将自动确认。

如下代码是设置成仅一次消费的例子。注释内容解释哪部分设置可忽略,以得到更多灵活保证。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(...);// 仅一次或至少一次,检查点是必须的

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
        connectionConfig,            // rabbitmq连接的配置
        "queueName",                 // rabbitmq的队列名,消费的队列名
        true,                        // 使用相关编号,至少一次时设置为false
        new SimpleStringSchema()))   // 反序列化成java的对象
    .setParallelism(1);              // 非并行是仅一次所必须的
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(...)

val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build

val stream = env
    .addSource(new RMQSource[String](
        connectionConfig,            // 配置
        "queueName",                 // 队列名
        true,                        // 使用相关编号
        new SimpleStringSchema))     // 反序列化
    .setParallelism(1)               // 非序列化

Back to top

Rabbitmq接收

连接器提供类RMQSink来发送消息到rabbitmq队列里,以下代码是一个rabbitmq接收的配置例子,

final DataStream<String> stream = ...

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

stream.addSink(new RMQSink<String>(
connectionConfig,
    "queueName",
new SimpleStringSchema()));  //序列化
val stream: DataStream[String] = ...
val connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build

stream.addSink(new RMQSink[String](
    connectionConfig,
"queueName",
    new SimpleStringSchema))

更多rabbitmq可从此了解

Back to top