简而言之,此功能允许用户从外部查询flink托管的分区的状态(see 使用状态)。在某些场景,可查询状态取消了需要和外部系统进行分布式交互的依赖,例如在实践中经常是瓶颈的键值存储。
为了使用可查询状态,通过设置配置参数 query.server.enable
为 true
(当前的默认值),是全局
激活可查询状态服务的第一步。然后某些的状态需要可查询可以通过
* QueryableStateStream
, 是一个方便的对象,其行为像一个接收器,可对流入的值进行状态查询,或
* StateDescriptor#setQueryable(String queryableStateName)
,使某个算子的键值状态可查询。
下面的段落解释这两种方法的使用。
键值流
可以通过下面的方法可以将他的值作为可查询状态:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
ListState
sink 由于不断增长的列表可能不可以清理,
将会占用大量内存。
调用这些方法将会返回QueryableStateStream
,无法再做其他操作, 且当前只保存可查询状态的名称以及
值和键序列化的可查询状态流。它与水槽(sinlk)相当,不能再进一步转变。
在内部一个 QueryableStateStream
被翻译成一个使用所有流入记录来更新可查询状态的算子的实例。
下面的项目,所有键值流的记录将会被用来更新状态实例,通过ValueState#update(value)
或者
AppendingState#add(value)
, 依赖于所选状态的转化:
stream.keyBy(0).asQueryableState("query-name")
这样做就像Scala API的“flatMapWithState”。
一个算子的被管理键值状态
(see 使用托管的键值状态)
可以通过使适当的状态描述符可查询来进行查询,例如下面的例子通过StateDescriptor#setQueryable(String queryableStateName)
:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
QueryableStateClient
帮助器类可用于针对内部服务状态的“KvState”实例的查询。它需要设置一个有效的 JobManager
地址和端口, 创建方式如下:
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
QueryableStateClient client = new QueryableStateClient(config);
查询的方法是:
Future<byte[]> getKvState(
JobID jobID,
String queryableStateName,
int keyHashCode,
byte[] serializedKeyAndNamespace)
对这个方法的调用返回一个Future
,最终持有ID为“jobID”的job,它的“queryableStateName”标识的可查询状态实例拥
有的序列化状态值。keyHashCode
是由Object.hashCode()'返回的键的哈希码,而
serializedKeyAndNamespace`
是序列化的键和命名空间。
QueryableStateClient#shutdown()
关闭来释放资源,当不需要的时候。
目前的实现仍然是比较低级别的,因为它只适用于此用于提供键/命名空间和返回结果的序列化数据。用户(或一些后续实用程序) 需要为此设置序列化程序。这样做的好处是查询服务不需要担心任何类加载问题等。
有一些序列化帮助方法用于键/命名空间和值序列化在KvStateRequestSerializer
里。
下面的案例继承了 CountWindowAverage
, 案例
(see 使用托管的键值状态)
如何使它可查询,且如何查询这个值:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // 状态名字
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 类型信息
Tuple2.of(0L, 0L)); // 如果没有,将此值设为默认值。
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
一旦在job中使用,您可以检索作业ID,然后从该算子中查询任何键的当前状态:
final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
QueryableStateClient client = new QueryableStateClient(config);
final TypeSerializer<Long> keySerializer =
TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
Future<byte[]> serializedResult =
client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
//现在等待结果并返回它
final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
byte[] serializedValue = Await.result(serializedResult, duration);
Tuple2<Long, Long> value =
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
创建“TypeSerializer”实例时,请使用可用的Scala扩展。 添加以下导入:
import org.apache.flink.streaming.api.scala._
现在你可以通过下面的方式创建 type serializers:
val keySerializer = createTypeInformation[Long]
.createSerializer(new ExecutionConfig)
如果你不这样做,则可能会遇到Flink作业和客户端代码中使用的序列化器之间的不匹配,因为类似于scala.Long
的类型在运行时无法获取。
以下配置参数会影响可查询状态服务器和客户端的行为。它们定义在“QueryableStateOptions”中。
query.server.enable
: 标记是否开启可查询状态服务端query.server.port
: 端口绑定到内部的KvStateServer
(0 => 选择随机可用端口)query.server.network-threads
: KvStateServer
(0 => #slots)的网络(事件循环)线程数query.server.query-threads
: KvStateServerHandler
(0 => #slots)的异步查询线程数。QueryableStateClient
)query.client.network-threads
: KvStateClient
(0 =>可用内核数)的网络(事件循环)线程数query.client.lookup.num-retries
: 位置查找失败次数。query.client.lookup.retry-delay
: 位置查找失败重试延时间隔(毫秒)