PySpark Tutorial (I) : Basic Procedure & Concept

Spark基本工作流程

Spark的应用分为两个部分:

  1. 任务调度(driver program)
  2. 任务执行(workers program):workers program run on cluster nodes or local threads
    • DataFrame和RDD均分布在worker上

  • 分布式模式:
    • SparkContext为程序的总入口,由用户程序来启动 (driver program)
    • Executor负责执行任务,运行Executor的机器成为Worker节点。
  • 本地模式:
    • 内部逻辑相似,只是有所简化:集群管理模块(Cluster manager)简化进程内部的线程池(local threads)

启动IPython pyspark

在终端中,运行:

ipython --profile=pyspark

可以看到以下的界面:

其中,SparkContext 和 SQLContext都已经自动创建好了:

  • SparkContext is available as sc
    • SparkContext告诉Spark如何获得cluster
  • HiveContext is available as sqlContext
    • HiveContext提供的SQLContext功能的集成;
    • HiveContext的其它功能:通过HiveQL Parser来写查询;Hive UDF(user defined funtions);从Hive表读取数据。

SparkContext 比 SQLContext 底层,RDD 比 DataFrame 底层

因为:

  1. RDD通常由SparkContext生成
  2. DataFrame通常由SQLContext生成
  3. SQLContext是由比它底层的SparkContext创建的

所以:

  • SparkContext 比 SQLContext 底层,RDD 比 DataFrame 底层

RDD

RDD (Resilient Distributed DataSets):弹性分布式数据集

创建RDD

#方法一:从list中读取并分区
>> data = [1,2,3,4]
>> rDD = sc.parallelize(data,4)
>> rDD
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
>> rDD.getNumPartitions()
4

#方法二:从文件中读取
>> distFile = sc.textFile("file:///usr/local/spark-1.6.2/README.md")
>> distFile
file:///usr/local/spark-1.6.2/README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2
>> distFile.getNumPartitions()
2
RDD分区 (Partitions)

RDD操作

RDDs 支持两种类型的操作:

  • actions: 行动,在数据集上运行计算后返回值
  • transformations: 转换,从现有数据集创建一个新的数据集
RDD惰性计算 (lazy computing)

Transformation

RDD,通过transform操作(map,filter,join...),转换成其他RDD。格式还是RDD

  • collect(), take(): 这两个都是action操作,转化为list来展示
  • toDF(): 转化为DataFrame展示
>> rdd=sc.parallelize([2,3,4])

#map
>> rdd.map(lambda x:[x,x+5])   #map之后还是RDD
PythonRDD[18] at RDD at PythonRDD.scala:43
>> rdd.map(lambda x:[x,x+5]).toDF()  #toDF()之后是DataFrame
DataFrame[_1: bigint, _2: bigint]
>> rdd.map(lambda x:[x,x+5]).collect()  #collect()之后是list
[[2, 7], [3, 8], [4, 9]]
#之后的结果不加collect和toDF,简记为:
RDD:[2,3,4]->[[2, 7], [3, 8], [4, 9]]

#flatMap
>> rdd.flatMap(lambda x:[x,x+5])
RDD:[2,3,4]->[2, 7, 3, 8, 4, 9]

#filter
>> rdd.filter(lambda x:x%2==0)
RDD:[2,3,4]->[2,4]

#(K,V) Transformation
#reduceByKey
>> rdd2 = sc.parallelize([(1,2),(1,5),(3,4),(3,5),(3,10)])
>> rdd2.reduceByKey(lambda a,b:a+b)
RDD:[(1, 7), (3, 19)]

#sortByKey
>> rdd3=sc.parallelize([(1,'a'),(2,'d'),(1,'b')])
>> rdd3.sortByKey()
RDD:[(1,'a'),(1,'b'),(2,'d')]

#groupByKey
#Return (K,Iterable<V>)
>> rdd3.groupByKey()
RDD:[(1,['a','b']),(2,['c'])]
Action
>> rdd=sc.parallelize([2,3,4])

#reduce 
>> rdd.reduce(lambda a,b:a*b)  #累乘:2*3*4
24

#collect
>> rdd.collect()  
[2,3,4]  #as list

#take
>> rdd.take(2)
[2,3]    #as list

#takeOrdered
>> rdd.takeOrdered(3,lambda s:-1*s)  #从大到小输出
[4,3,2]

DataFrame

创建DataFrame

  • sqlContext.createDataFrame(data)
    • 从turple或list形式的RDD格式,来生成DataFrame
    • list格式,来生成DataFrame
    • 从pandas.DataFrame格式,来生成DataFrame
  • sqlContext.read.text/json("/path/doc")
    • 文件类型可以是:HDFS, text files, JSON files, Apache Parquet, Hypertable, Amazon S3, Apache Hbase, SequenceFiles, any other Hadoop InputFormat.
#方法一:直接将现有数据转化为DataFrame
>> data=[('Alice',1),('Bob',2)]
>> df = sqlContext.createDataFrame(data)
>> df
DataFrame[_1: string, _2: bigint]
>> df.show()
+-----+---+
|   _1| _2|
+-----+---+
|Alice|  1|
|  Bob|  2|
+-----+---+

#collect():Return as list, each row is a Row object
>> df.collect()
[Row(_1=u'Alice', _2=1), Row(_1=u'Bob', _2=2)]

#创建时,设置好column name
>> df2 = sqlContext.createDataFrame(data,['name','age'])
>> df2.collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]

#方法二:从文件中读取
>> df3=sqlContext.read.text("/path/doc")

Transformation

DataFrame, 通过transfomation操作,还是DataFrame格式。

# Column Transformation
# select
>> df.select('*')   #select all the columns
>> df.select('name',df.age)   #select name and age column
>> new_df = df.select(df.name,df.age+10)
>> new_df.show()
+-----+----------+
| name|(age + 10)|
+-----+----------+
|Alice|        11|
|  Bob|        12|
+-----+----------+

# User Defined Function
>> from pyspark.sql.functions import udf
>> from pyspark.sql.types import IntegerType()
>> strLen = udf(lambda s:len(s),IntegerType())
>> slen_df = df.select(strLen(df.name))
>> slen_df.show()
+------------------------+
|PythonUDF#<lambda>(name)|
+------------------------+
|                       5|
|                       3|
+------------------------+

# filter == where
>> filter_df = df.filter(df.age>1)
>> filter_df.collect()
[Row(name=u'Bob',age=2)]
>> where_df = df.where("age > 1")   #加上引号,代表从句,有点类似SQL


# explode
# returns a new row for each element in the given array or map (注:输入是数组或映射)
>> from pyspark.sql import Row
>> from pyspark.sql.functions import explode
>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>> eDF.show()
eDF.show()
+---------+-----------+
|  intlist|   mapfield|
+---------+-----------+
|[1, 2, 3]|Map(a -> b)|
+---------+-----------+
>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+

# sample (采样):是否有放回采样,采样百分比多少
>> df.sampe(withRepalcement=False,fraction=0.1)

Transformation的特例:groupBy():

  • groupBy之后,DataFrame转变为了GroupedData格式,它有count(),sum(),max(),avg(),mean()等 aggregation function.
  • 经过了aggregation function之后,才转变成为了DataFrame格式。
#groupBy()
#Return GroupedData object that contain various aggregation functions, e.g. count(),sum(),max(),avg()
>> data = [('Alice',1,6),('Bob',2,8),('Alice',3,9),('Bob',4,7)]
>> gDF=sqlContext.createDataFrame(data,['name','age','grade'])
>> gDF.groupBy(gDF.name)
<pyspark.sql.group.GroupedData at 0x7f9ee2367ad0>

#两个方法输出结果大致相同,略有不同
>> df1 = gDF.groupBy(df.name).agg({"*":"count"})
>> df2 = gDF.groupBy(df.name).count()
>> df1.show()
+-----+--------+
| name|count(1)|
+-----+--------+
|Alice|       2|
|  Bob|       2|
+-----+--------+
>> df2.show()
+-----+-----+
| name|count|
+-----+-----+
|Alice|    2|
|  Bob|    2|
+-----+-----+

>> df.groupBy().avg().show()
+--------+----------+
|avg(age)|avg(grade)|
+--------+----------+
|     2.5|       7.5|
+--------+----------+
>> df.groupBy('name').avg('age','grade').show()
+-----+--------+----------+
| name|avg(age)|avg(grade)|
+-----+--------+----------+
|Alice|     2.0|       7.5|
|  Bob|     3.0|       7.5|
+-----+--------+----------+

Action

DataFrame也是惰性计算. Action:

  • show(n,truncate)
  • collect()
  • take(n)
  • describe()

缓存

  • 缓存:cache()
  • 清除缓存:unpersist()
    >> df.cache()
    >> df.is_cached
    True
    >> df.unpersist()
    >> df.is_cached
    False
    

反馈与建议

参考文献

results matching ""

    No results matching ""