PySpark Tutorial (II) : Word Count Lab
Purpose: Word Count
分析 Shakespear的文本 (删去多余的注释),来计算莎翁最常用的单词。
Method I: DataFrame
Load a text file
#读入本地txt文件
>> filename = "file:///home/****/Desktop/100.txt"
>> sDF = sqlContext.read.text(filename)
>> sDF.show(15,truncate=False)
+-------------------------------------------------------+
|value |
+-------------------------------------------------------+
|1609 |
| |
|THE SONNETS |
| |
|by William Shakespeare |
| |
| |
| |
| 1 |
| From fairest creatures we desire increase, |
| That thereby beauty's rose might never die, |
| But as the riper should by time decease, |
| His tender heir might bear his memory: |
| But thou contracted to thine own bright eyes, |
| Feed'st thy light's flame with self-substantial fuel,|
+-------------------------------------------------------+
only showing top 15 rows
Remove Punctuation
>> from pyspark.sql.functions import regexp_replace, trim, col, lower
>> def removePunctuation(column):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained.
Args:
column (Column): A Column containing a sentence.
Returns:
Column: A Column named 'sentence' with clean-up operations applied.
"""
return lower(trim(regexp_replace(column,'\\p{Punct}',''))).alias('sentence')
#col:Returns a Column based on the given column name.
>> shakespeareDF = sqlContext.read.text(fileName).select(removePunctuation(col('value')))
>> shakespeareDF.show(15,truncate=False)
+-------------------------------------------------+
|sentence |
+-------------------------------------------------+
|1609 |
| |
|the sonnets |
| |
|by william shakespeare |
| |
| |
| |
|1 |
|from fairest creatures we desire increase |
|that thereby beautys rose might never die |
|but as the riper should by time decease |
|his tender heir might bear his memory |
|but thou contracted to thine own bright eyes |
|feedst thy lights flame with selfsubstantial fuel|
+-------------------------------------------------+
Words from lines
把每行中的每个单词都列举出来
>> from pyspark.sql.functions import split, explode
# 空格来split,每行是一个数组;explode将每行数组变成一列;where==filter
>> shakeWordsDF = (shakespeareDF
.select(explode(split(shakespeareDF.sentence,'[\s]+'))
.alias('word'))
.where("word!=''"))
>> shakeWordsDF.show(15)
+-----------+
| word|
+-----------+
| 1609|
| the|
| sonnets|
| by|
| william|
|shakespeare|
| 1|
| from|
| fairest|
| creatures|
| we|
| desire|
| increase|
| that|
| thereby|
+-----------+
>> shakeWordsDF.count()
882996
Word Count
>> def wordCount(wordListDF):
"""Creates a DataFrame with word counts.
Args:
wordListDF (str): A DataFrame consisting of one string column called 'word'.
Returns:
DataFrame of (str, int): A DataFrame containing 'word' and 'count' columns.
"""
return wordListDF.groupBy('word').count()
#orderBy是DataFrame的Transformations
>> topWordsAndCountsDF = wordCount(shakeWordsDF).orderBy(['count'],ascending=False)
>> topWordsAndCountsDF.show(10)
+----+-----+
|word|count|
+----+-----+
| the|27361|
| and|26028|
| i|20681|
| to|19150|
| of|17463|
| a|14593|
| you|13615|
| my|12481|
| in|10956|
|that|10890|
+----+-----+
Method II: RDD
Load a text file
#读入本地txt文件
>> filename = "file:///home/****/Desktop/100.txt"
>> sRDD = sc.textFile(filename,8)
>> sRDD.take(15)
[u'1609',
u'',
u'THE SONNETS',
u'',
u'by William Shakespeare',
u'',
u'',
u' 1',
u' From fairest creatures we desire increase,',
u" That thereby beauty's rose might never die,",
u' But as the riper should by time decease,',
u' His tender heir might bear his memory:',
u' But thou contracted to thine own bright eyes,',
u" Feed'st thy light's flame with self-substantial fuel,",
u' Making a famine where abundance lies,']
>> len(sRDD.collect())
122656 #一共122656行
Remove Punctuation
>> import re
>> def removePunctuation(text):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained.
Args:
text (str): A string.
Returns:
str: The cleaned up string.
"""
#re.sub替换后,返回一个新的strs
return re.sub(re.compile(r'[^a-zA-Z0-9\s]'),"",text).lower().strip()
#map(func):Return a new distributed dataset formed by passing each element of the source through a function func.
>> shakespeareRDD = sRDD.map(removePunctuation)
>> print '\n'.join(shakespeareRDD
.zipWithIndex() # to (line, lineNum)
.map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line'
.take(15))
0:1609
1:
2:the sonnets
3:
4:by william shakespeare
5:
6:
7:1
8:from fairest creatures we desire increase
9:that thereby beautys rose might never die
10:but as the riper should by time decease
11:his tender heir might bear his memory
12:but thou contracted to thine own bright eyes
13:feedst thy lights flame with selfsubstantial fuel
14:making a famine where abundance lies
Words from lines
>> shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
>> shakespeareWordCount = shakespeareWordsRDD.count()
>> print shakespeareWordsRDD.top(5)
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
>> print shakespeareWordCount
927631
Remove Blank Space
>> shakeWordsRDD = shakespeareWordsRDD.filter(lambda s:len(s)>0)
>> shakeWordCount = shakeWordsRDD.count()
>> print shakeWordCount
882996
Word Count
>> def wordCount(wordListRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
wordListRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return wordListRDD.map(lambda x:(x,1)).reduceByKey(add)
#takeOrdered是Action
>> top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(10,key=lambda x:-x[1])
[(u'the', 27361),
(u'and', 26028),
(u'i', 20681),
(u'to', 19150),
(u'of', 17463),
(u'a', 14593),
(u'you', 13615),
(u'my', 12481),
(u'in', 10956),
(u'that', 10890)]
>> print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
the:27361
and:26028
i:20681
to:19150
of:17463
a:14593
you:13615
my:12481
in:10956
that:10890
反馈与建议
- 微博:@Girl_AI