下面是一个简单的 Spark Streaming 离散流的例子:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 SparkContext,这是 Spark 通信的入口点
sc = SparkContext("local[2]", "SparkStreamingExample")
# 创建 StreamingContext,每隔1秒处理一次数据
ssc = StreamingContext(sc, 1)
# 从 TCP socket 读取数据流
lines = ssc.socketTextStream("localhost", 9999)
# 在每个批次中对数据进行处理
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
# 输出每个批次的单词计数结果
word_counts.pprint()
# 启动 StreamingContext
ssc.start()
# 等待终止
ssc.awaitTermination()
在这个例子中:
1. ssc.socketTextStream("localhost", 9999) 从本地的 TCP socket 读取数据流。
2. flatMap 操作用于将每个批次中的行拆分成单词。
3. map 和 reduceByKey 操作用于计算每个单词的出现次数。
4. pprint() 用于输出每个批次的结果到控制台。
5. ssc.start() 用于启动 StreamingContext。
在 Spark Streaming 中,这种处理方式允许你以类似于批处理的方式来处理实时数据流,通过将实时数据划分为小的时间窗口,并在这些时间窗口上执行批处理操作。这种离散流的模型使得开发者可以使用 Spark 的强大分布式计算能力来处理实时数据。
转载请注明出处:http://www.zyzy.cn/article/detail/9348/Spark