--

Spark streaming



启动 Spark Streaming 需要至少两个 CPU 处理器内核。

接下来我们要完成以下操作:
1. 读取 streaming 数据到 Spark 中
2. 基于时间窗口的数据获取和处理



1. 启动 PySpark 和 streaming

pyspark,启动 浏览器,输入
localhost:8889/tree/coursera/big-data=3/spark-streaming
打开 Spark-Streaming.ipynb 文件

2. 定义一个辅助函数
  import re
  def parse(line):
    match = re.search("Dm=(\d+", line)
    if match:
      val = match.group(1)
      return [int(val)]
    return []


3.
  from pyspark.streming import StreamingContext
  ssc = StreamingCOntext(sc, 1)
4.
  lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)
5.
  vals = lines.flatmap(parse)
6.
  window = vals.window(10,5)
7.
  def stats(rdd):
  print(rdd.collect())
  if rdd.count() > 0:
    print("max = {}, min = {}".format(rdd.max(), rdd.min()))

  window.foreachRDD(lambda rdd:stats(rdd))
8.
  ssc.start()
9.
  ssc.stop()