PySpark Tutorial (I) : Basic Procedure & Concept
Spark基本工作流程
Spark的应用分为两个部分:
- 任务调度(driver program)
- 任务执行(workers program):workers program run on cluster nodes or local threads
- DataFrame和RDD均分布在worker上
- 分布式模式:
- SparkContext为程序的总入口,由用户程序来启动 (driver program)
- Executor负责执行任务,运行Executor的机器成为Worker节点。
- 本地模式:
- 内部逻辑相似,只是有所简化:集群管理模块(Cluster manager)简化进程内部的线程池(local threads)
启动IPython pyspark
在终端中,运行:
ipython --profile=pyspark
可以看到以下的界面:
其中,SparkContext 和 SQLContext都已经自动创建好了:
- SparkContext is available as sc
- SparkContext告诉Spark如何获得cluster
- HiveContext is available as sqlContext
- HiveContext提供的SQLContext功能的集成;
- HiveContext的其它功能:通过HiveQL Parser来写查询;Hive UDF(user defined funtions);从Hive表读取数据。
SparkContext 比 SQLContext 底层,RDD 比 DataFrame 底层
因为:
- RDD通常由SparkContext生成
- DataFrame通常由SQLContext生成
- SQLContext是由比它底层的SparkContext创建的
所以:
- SparkContext 比 SQLContext 底层,RDD 比 DataFrame 底层
RDD
RDD (Resilient Distributed DataSets):弹性分布式数据集
创建RDD
#方法一:从list中读取并分区
>> data = [1,2,3,4]
>> rDD = sc.parallelize(data,4)
>> rDD
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
>> rDD.getNumPartitions()
4
#方法二:从文件中读取
>> distFile = sc.textFile("file:///usr/local/spark-1.6.2/README.md")
>> distFile
file:///usr/local/spark-1.6.2/README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2
>> distFile.getNumPartitions()
2
RDD分区 (Partitions)
RDD操作
RDDs 支持两种类型的操作:
- actions: 行动,在数据集上运行计算后返回值
- transformations: 转换,从现有数据集创建一个新的数据集
RDD惰性计算 (lazy computing)
Transformation
注
:RDD,通过transform操作(map,filter,join...),转换成其他RDD。格式还是RDD。
- collect(), take(): 这两个都是action操作,转化为list来展示
- toDF(): 转化为DataFrame展示
>> rdd=sc.parallelize([2,3,4])
#map
>> rdd.map(lambda x:[x,x+5]) #map之后还是RDD
PythonRDD[18] at RDD at PythonRDD.scala:43
>> rdd.map(lambda x:[x,x+5]).toDF() #toDF()之后是DataFrame
DataFrame[_1: bigint, _2: bigint]
>> rdd.map(lambda x:[x,x+5]).collect() #collect()之后是list
[[2, 7], [3, 8], [4, 9]]
#之后的结果不加collect和toDF,简记为:
RDD:[2,3,4]->[[2, 7], [3, 8], [4, 9]]
#flatMap
>> rdd.flatMap(lambda x:[x,x+5])
RDD:[2,3,4]->[2, 7, 3, 8, 4, 9]
#filter
>> rdd.filter(lambda x:x%2==0)
RDD:[2,3,4]->[2,4]
#(K,V) Transformation
#reduceByKey
>> rdd2 = sc.parallelize([(1,2),(1,5),(3,4),(3,5),(3,10)])
>> rdd2.reduceByKey(lambda a,b:a+b)
RDD:[(1, 7), (3, 19)]
#sortByKey
>> rdd3=sc.parallelize([(1,'a'),(2,'d'),(1,'b')])
>> rdd3.sortByKey()
RDD:[(1,'a'),(1,'b'),(2,'d')]
#groupByKey
#Return (K,Iterable<V>)
>> rdd3.groupByKey()
RDD:[(1,['a','b']),(2,['c'])]
Action
>> rdd=sc.parallelize([2,3,4])
#reduce
>> rdd.reduce(lambda a,b:a*b) #累乘:2*3*4
24
#collect
>> rdd.collect()
[2,3,4] #as list
#take
>> rdd.take(2)
[2,3] #as list
#takeOrdered
>> rdd.takeOrdered(3,lambda s:-1*s) #从大到小输出
[4,3,2]
DataFrame
创建DataFrame
- sqlContext.createDataFrame(data)
- 从turple或list形式的RDD格式,来生成DataFrame
- 从list格式,来生成DataFrame
- 从pandas.DataFrame格式,来生成DataFrame
- sqlContext.read.text/json("/path/doc")
- 文件类型可以是:HDFS, text files, JSON files, Apache Parquet, Hypertable, Amazon S3, Apache Hbase, SequenceFiles, any other Hadoop InputFormat.
#方法一:直接将现有数据转化为DataFrame
>> data=[('Alice',1),('Bob',2)]
>> df = sqlContext.createDataFrame(data)
>> df
DataFrame[_1: string, _2: bigint]
>> df.show()
+-----+---+
| _1| _2|
+-----+---+
|Alice| 1|
| Bob| 2|
+-----+---+
#collect():Return as list, each row is a Row object
>> df.collect()
[Row(_1=u'Alice', _2=1), Row(_1=u'Bob', _2=2)]
#创建时,设置好column name
>> df2 = sqlContext.createDataFrame(data,['name','age'])
>> df2.collect()
[Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]
#方法二:从文件中读取
>> df3=sqlContext.read.text("/path/doc")
Transformation
注
:DataFrame, 通过transfomation操作,还是DataFrame格式。
# Column Transformation
# select
>> df.select('*') #select all the columns
>> df.select('name',df.age) #select name and age column
>> new_df = df.select(df.name,df.age+10)
>> new_df.show()
+-----+----------+
| name|(age + 10)|
+-----+----------+
|Alice| 11|
| Bob| 12|
+-----+----------+
# User Defined Function
>> from pyspark.sql.functions import udf
>> from pyspark.sql.types import IntegerType()
>> strLen = udf(lambda s:len(s),IntegerType())
>> slen_df = df.select(strLen(df.name))
>> slen_df.show()
+------------------------+
|PythonUDF#<lambda>(name)|
+------------------------+
| 5|
| 3|
+------------------------+
# filter == where
>> filter_df = df.filter(df.age>1)
>> filter_df.collect()
[Row(name=u'Bob',age=2)]
>> where_df = df.where("age > 1") #加上引号,代表从句,有点类似SQL
# explode
# returns a new row for each element in the given array or map (注:输入是数组或映射)
>> from pyspark.sql import Row
>> from pyspark.sql.functions import explode
>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>> eDF.show()
eDF.show()
+---------+-----------+
| intlist| mapfield|
+---------+-----------+
|[1, 2, 3]|Map(a -> b)|
+---------+-----------+
>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
# sample (采样):是否有放回采样,采样百分比多少
>> df.sampe(withRepalcement=False,fraction=0.1)
Transformation的特例:groupBy():
- groupBy之后,DataFrame转变为了GroupedData格式,它有count(),sum(),max(),avg(),mean()等 aggregation function.
- 经过了aggregation function之后,才转变成为了DataFrame格式。
#groupBy()
#Return GroupedData object that contain various aggregation functions, e.g. count(),sum(),max(),avg()
>> data = [('Alice',1,6),('Bob',2,8),('Alice',3,9),('Bob',4,7)]
>> gDF=sqlContext.createDataFrame(data,['name','age','grade'])
>> gDF.groupBy(gDF.name)
<pyspark.sql.group.GroupedData at 0x7f9ee2367ad0>
#两个方法输出结果大致相同,略有不同
>> df1 = gDF.groupBy(df.name).agg({"*":"count"})
>> df2 = gDF.groupBy(df.name).count()
>> df1.show()
+-----+--------+
| name|count(1)|
+-----+--------+
|Alice| 2|
| Bob| 2|
+-----+--------+
>> df2.show()
+-----+-----+
| name|count|
+-----+-----+
|Alice| 2|
| Bob| 2|
+-----+-----+
>> df.groupBy().avg().show()
+--------+----------+
|avg(age)|avg(grade)|
+--------+----------+
| 2.5| 7.5|
+--------+----------+
>> df.groupBy('name').avg('age','grade').show()
+-----+--------+----------+
| name|avg(age)|avg(grade)|
+-----+--------+----------+
|Alice| 2.0| 7.5|
| Bob| 3.0| 7.5|
+-----+--------+----------+
Action
DataFrame也是惰性计算. Action:
- show(n,truncate)
- collect()
- take(n)
- describe()
缓存
- 缓存:cache()
- 清除缓存:unpersist()
>> df.cache() >> df.is_cached True >> df.unpersist() >> df.is_cached False
反馈与建议
- 微博:@Girl_AI