早在一年前, 就调研过主流的流处理平台, 但是对于其原理一直知之甚少, 借着实践的机会, 特意整理一些流处理的内容以供学习
在交易系统, 物联网, 电信系统中广泛用到流处理系统. 流处理的特点是处理的数据特点在于数据流的规模是无限的, 反馈是实时的, 关注的对象应该是其服务, 而不是数据本身的联系.
典型的流处理平台有 Spark Streaming, Storm, Flink, Samza 等等. 本篇主要总结一下 Spark Streaming 与 Storm 的区别.
描述一个流处理系统时, 我们常常用有向无环图描述其拓扑结构
一般的流处理系统基本不是单机系统, 基本都会把拓扑结点放到分布式平台上. 在总结之前, 先介绍如下几个概念:
编程模型/运行时模型
运行时模型是流处理系统最大的特点, 也是 Spark Streaming 与 Storm 最大的不同, 其差异均来自于这个特性. 原生的流处理系统实现的都是原生流处理模型, 每一个记录到达之后会立即被处理. 非原生的流处理系统实现的是批处理模型, 数据会被人为的按照一定规则分成一批一批的数据, 每一批数据到达之后会进行统一处理.
原生流处理系统
批处理系统
Storm 是典型的原生流处理系统, 但是也可以支持批处理, Spark Streaming 将 Spark 中的 RDD 变成了 DStream, 是批处理的最小单元. 这也导致了编程时模型会有比较大的不同.
例如在 Storm 中, 我们如果想要定义拓扑结构, 大概是以这样的方式:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new Split(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
...
Map counts = new HashMap();
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
counts.put(word, count);
collector.emit(new Values(word, count));
}
其中, Spout 与 Bolt 都是 Storm 的组件, 可以简单的理解为 Spout 用于产生或者接受数据, Bolt 用于处理或者消费数据.
这种方法被称为组合式编程, 通过提供基本的构建模块, 并以接口的方式进行拼接.
例如在 Spark Streaming 中, 我们如果想要定义拓扑结构, 大概是这样的方式:
val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))
val text = ...
val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
ssc 是StreamingContext, 类似于 Spark 中的 SparkContext, 是流处理单元的容器, 我们还定义了批处理的窗口大小是 1s ,可以看到我们定义了一些管道并且拼装到 counts 变量中, 这是函数式编程的方法, 在之前博客 lambda 表达式中有介绍过 Java 8 带来的函数式编程, 而 flatMap 传入恰好是一个 lambda 表达式.
这种方法被称为声明式编程. 声明式API操作是定义的高阶函数, 它允许我们用抽象类型和方法来写函数代码, 并且系统创建拓扑和优化拓扑. 声明式API经常也提供更多高级的操作, 比如,窗口函数或者状态管理.
流处理平台流处理平台应该能提供丰富的功能函数, 比如, map或者filter这类易扩展, 处理单条信息的函数, 处理多条信息的函数aggregation, 跨数据流, 不易扩展的操作 join 等等
状态管理
对于流处理平台而言, 状态管理是一大挑战, 所谓状态管理是指流处理的系统的状态会随着数据的变化而变化.相对于无状态的操作 (只有一个输入数据, 处理过程和输出结果), 有状态的应用会有一个输入数据和一个状态信息, 然后处理过程, 接着输出结果和修改状态信息. 在原生的 Storm 中是不支持状态管理的, 这也符合函数式编程无状态的特征. 但是在实际应用中, 我们往往需要对平台加入状态信息. 无状态的流处理系统是幂等的, 无论多少次输入同样的数据输出都相同, 但是有状态信息的流处理系统必须要保证 exactly once, 否则多次操作可能会导致状态信息的不一致. Trident 是基于 Storm 的流处理平台, 它支持状态管理, 靠定义了 tuple (小批数据)作为处理单元. 并且还定义了三种不同类型的可以容错的spout: “non-transactional”, “transactional” 和”opaque transactional”. 以及3种容错状态: “non-transactional”, “transactional” 和 “opaque transactional”
以上面计算单词出现次数的topology为例, 最后结果是将单词的出现次数以key/value对的形式存储到数据库中. 存储一个数量是不足以知道你是否已经处理过一个batch的, 所以可以通过将value和txid一起存储到数据库中. 这样的话, 当更新这个count之前, 可以先去比较数据库中存储的txid和现在要存储的txid. 如果一样,就跳过什么都不做, 因为这个value之前已经被处理过了. 如果不一样, 就执行存储. 这个逻辑可以工作的前提是 txid永不改变, 并且Trident保证状态的更新是在batch之间严格顺序进行的. 上述三种容错状态确保了这一前提. (具体内容是 Storm 处理细节, 在这里不展开)
在 Spark Streaming 中, 状态信息也被当做微批数据流, 在处理信息时, 状态信息被加载, 接着通过函数操作获得处理后的微批量数据结果并修改加载过的状态信息.
例如如下代码中:
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])
val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}
val stateDstream = wordDstream.trackStateByKey(StateSpec.function(trackStateFunc).initialState(initialRDD))
首先创建了一个RDD来初始化状态并执行 Map 的变形, 然后定义了一个状态函数, 将当前状态与计数加起来, 赋给新的状态, 并且输出. 最后定义了状态信息流, 其中就包含了单词这一状态数.
消息保障与容错
需要单独介绍三种保证机制: + at most once: 消息最多传递一次, 消息可能会丢失 + at least once: 消息最少传递一次, 消息可能重复但不会丢失 + exactly once: 消息只传递一次, 消息既不会丢失也不会重复
Storm 采用的上游数据备份以及消息确认的机制保障消息失败后会重新处理. Topology 会备份所有未被确认确认的消息, 当收到确认消息后, 备份才会失效. 如果不是所有的消息处理确认信息收到, 那么数据记录会被源数据替换. 这保证了 at least once, 消息不会丢失但是可能重复. Storm 对每个源数据记录仅仅要求几个字节存储空间来跟踪确认消息. 纯数据记录消息确认架构下尽管性能不错, 但并不能保证exactly once消息传输机制, 所有应用开发者需要处理重复数据. 这也为 Storm 带来在吞吐比较高的情况下的性能问题.
(同样作为原生的流处理框架, Flink 是利用 Snapshot 机制, 支持分布式数据流的轻量级异步快照恢复)
Spark Streaming 由于是在各个 worker 节点上处理微批数据, 微批数据在结点上是不可变的, 而且有持久化策略, 如果计算出现错误, 重新计算即可.
延迟性与吞吐量
一般而言, 原生的流处理平台延迟普遍比较短, 大约在几十毫秒这个级别. 微批处理的延时一般在秒的级别. 随着状态管理, 消息保障与容错的要求提高, 流处理系统的性能可能会受到比较大的影响. 而且由于流处理平台都是分布式的, 机器之间的通信会消耗大量时间资源, 所以应当考虑尽可能地利用机器本身的本地数据, 以及序列化的方案.
对于 Storm 而言, Storm 非常适合任务量小但是对时间敏感的场景, Storm 的容错恢复和 Trident 状态管理都会降低性能水平. Spark Streaming 由于本身源自于 Spark, 可以很好地利用例如 MLlib这样的 Spark 库, 再加上 Spark Streaming 声明式API的方式, 如果需要 Lambda 架构则可以很好考虑. 一般用于对时间不敏感的场景.