Flink:把状态State全都扒光,远走他乡:State初始,广播状态

Statechu初始:

State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在aggregation过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。

Flink:把状态State全都扒光,远走他乡:State初始,广播状态

 

State分类:Keyed State和Operator State

Keyed State

Keyed State是一种基于key的,它永远和key绑定,key和key之间的state没有关系,不会相互影响

Operator State

Operator State是一种基于Operate的,每个操作有状态的,每个操作之间不会相互影响。举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

Raw State和Managed State

  • Raw即原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
  • Managed State:即托管状态,托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

Keyed State ---- Managed State

  • ValueState:这将保留一个可以更新和检索的值(如上所述,作用域为输入元素的键,因此该操作看到的每个键可能会有一个值)。该值可以使用设置update(T)和使用检索 T value()。
    实例:
package flinkscala.State.Keyed_Stateimport org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject valueStateTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L),(1L, 6L),(1L, 2L),(1L, 9L),(1L, 2L),(1L, 3L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()env.execute("average Test")}
}class CountWindowAverage extends RichFlatMapFunction[(Long,Long),(Long,Long)]{//定义一个ValueState,保存着(元素的个数,元素的和)private var sum: ValueState[(Long,Long)] = _override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {//先访问ValueState,取出ValueState中的和//当然,如果是空的话,也就是初次使用的话,就初始化为0,0var tmpCurrentSum = sum.value()val surrentSum = if(tmpCurrentSum !=null){tmpCurrentSum}else {(0L,0L)}/** 元素个数+1,元素和+当前进来的元素**/val newSum = (surrentSum._1+1,surrentSum._2+value._2)//更新状态Statesum.update(newSum)//如果达到了两个元素,就计算一次平均值if(newSum._1>=2){out.collect((value._1,newSum._2/newSum._1))sum.clear()//清空状态}}override def open(parameters: Configuration): Unit ={sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average",createTypeInformation[(Long,Long)]))}
}

Flink:把状态State全都扒光,远走他乡:State初始,广播状态

 

ListState:这保留了元素列表。您可以追加元素并检索Iterable 所有当前存储的元素。使用add(T)或添加元素addAll(List),可以使用检索Iterable Iterable get()。您还可以使用以下方法覆盖现有列表update(List)

ReducingState:这将保留一个值,该值代表添加到状态的所有值的集合。介面与相似,ListState但使用新增的元素 add(T)会使用指定的简化为汇总ReduceFunction。

Aggregatin


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部