启动 Spark Streaming 需要至少两个 CPU 处理器内核。
接下来我们要完成以下操作:
1. 读取 streaming 数据到 Spark 中
2. 基于时间窗口的数据获取和处理
import re def parse(line): match = re.search("Dm=(\d+", line) if match: val = match.group(1) return [int(val)] return []
from pyspark.streming import StreamingContext ssc = StreamingCOntext(sc, 1)4.
lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)5.
vals = lines.flatmap(parse)6.
window = vals.window(10,5)7.
def stats(rdd): print(rdd.collect()) if rdd.count() > 0: print("max = {}, min = {}".format(rdd.max(), rdd.min())) window.foreachRDD(lambda rdd:stats(rdd))8.
ssc.start()9.
ssc.stop()