启动 Spark Streaming 需要至少两个 CPU 处理器内核。
接下来我们要完成以下操作:
1. 读取 streaming 数据到 Spark 中
2. 基于时间窗口的数据获取和处理
import re
def parse(line):
match = re.search("Dm=(\d+", line)
if match:
val = match.group(1)
return [int(val)]
return []
from pyspark.streming import StreamingContext ssc = StreamingCOntext(sc, 1)4.
lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)
5.
vals = lines.flatmap(parse)6.
window = vals.window(10,5)7.
def stats(rdd):
print(rdd.collect())
if rdd.count() > 0:
print("max = {}, min = {}".format(rdd.max(), rdd.min()))
window.foreachRDD(lambda rdd:stats(rdd))
8.
ssc.start()9.
ssc.stop()