1. map(func): 对DStream中的每个元素应用给定的函数。
# 将每个单词转为大写
transformed_stream = input_stream.map(lambda word: word.upper())
2. flatMap(func): 类似于map,但每个输入元素都可以映射到零个或多个输出元素。
# 将每行文本拆分为单词
transformed_stream = input_stream.flatMap(lambda line: line.split(" "))
3. filter(func): 保留满足给定条件的元素。
# 保留长度大于等于5的单词
transformed_stream = input_stream.filter(lambda word: len(word) >= 5)
4. reduceByKey(func): 对具有相同键的元素进行归约操作。
# 计算每个单词的总次数
word_counts = input_stream.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
5. window(windowLength, slideInterval): 创建一个窗口,每个窗口包含指定长度的数据,并在每个指定间隔滑动一次。
# 每10秒统计过去30秒的单词计数
windowed_stream = input_stream.window(30, 10)
6. updateStateByKey(updateFunc): 在给定键的状态上应用给定的更新函数。用于维护键的状态信息。
# 通过累加器更新每个单词的全局计数
def update_count(new_values, current_count):
return sum(new_values) + (current_count or 0)
word_counts = input_stream.map(lambda word: (word, 1)).updateStateByKey(update_count)
这些转换操作允许你根据实际需求对实时数据进行灵活处理。你可以将它们组合在一起,构建复杂的实时数据处理流水线。请注意,这些操作是惰性求值的,只有在调用行动操作时才会触发实际的计算。
转载请注明出处:http://www.zyzy.cn/article/detail/9350/Spark