PySpark Tutorial (II) : Word Count Lab

Purpose: Word Count

分析 Shakespear的文本 (删去多余的注释),来计算莎翁最常用的单词。

Method I: DataFrame

Load a text file

#读入本地txt文件
>> filename = "file:///home/****/Desktop/100.txt"
>> sDF = sqlContext.read.text(filename)
>> sDF.show(15,truncate=False)
+-------------------------------------------------------+
|value                                                  |
+-------------------------------------------------------+
|1609                                                   |
|                                                       |
|THE SONNETS                                            |
|                                                       |
|by William Shakespeare                                 |
|                                                       |
|                                                       |
|                                                       |
|                     1                                 |
|  From fairest creatures we desire increase,           |
|  That thereby beauty's rose might never die,          |
|  But as the riper should by time decease,             |
|  His tender heir might bear his memory:               |
|  But thou contracted to thine own bright eyes,        |
|  Feed'st thy light's flame with self-substantial fuel,|
+-------------------------------------------------------+
only showing top 15 rows

Remove Punctuation

>> from pyspark.sql.functions import regexp_replace, trim, col, lower
>> def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  

    Args:
        column (Column): A Column containing a sentence.

    Returns:
        Column: A Column named 'sentence' with clean-up operations applied.
    """
    return lower(trim(regexp_replace(column,'\\p{Punct}',''))).alias('sentence')

#col:Returns a Column based on the given column name.
>> shakespeareDF = sqlContext.read.text(fileName).select(removePunctuation(col('value')))
>> shakespeareDF.show(15,truncate=False)
+-------------------------------------------------+
|sentence                                         |
+-------------------------------------------------+
|1609                                             |
|                                                 |
|the sonnets                                      |
|                                                 |
|by william shakespeare                           |
|                                                 |
|                                                 |
|                                                 |
|1                                                |
|from fairest creatures we desire increase        |
|that thereby beautys rose might never die        |
|but as the riper should by time decease          |
|his tender heir might bear his memory            |
|but thou contracted to thine own bright eyes     |
|feedst thy lights flame with selfsubstantial fuel|
+-------------------------------------------------+

Words from lines

把每行中的每个单词都列举出来

>> from pyspark.sql.functions import split, explode
# 空格来split,每行是一个数组;explode将每行数组变成一列;where==filter    
>> shakeWordsDF =  (shakespeareDF
                    .select(explode(split(shakespeareDF.sentence,'[\s]+')) 
                    .alias('word'))
                    .where("word!=''"))  
>> shakeWordsDF.show(15)
+-----------+
|       word|
+-----------+
|       1609|
|        the|
|    sonnets|
|         by|
|    william|
|shakespeare|
|          1|
|       from|
|    fairest|
|  creatures|
|         we|
|     desire|
|   increase|
|       that|
|    thereby|
+-----------+
>> shakeWordsDF.count()
882996

Word Count

>>    def wordCount(wordListDF):
        """Creates a DataFrame with word counts.

        Args:
            wordListDF (str): A DataFrame consisting of one string column called 'word'.

        Returns:
            DataFrame of (str, int): A DataFrame containing 'word' and 'count' columns.
        """
        return wordListDF.groupBy('word').count()

#orderBy是DataFrame的Transformations
>> topWordsAndCountsDF = wordCount(shakeWordsDF).orderBy(['count'],ascending=False)  
>> topWordsAndCountsDF.show(10)
+----+-----+                                                                    
|word|count|
+----+-----+
| the|27361|
| and|26028|
|   i|20681|
|  to|19150|
|  of|17463|
|   a|14593|
| you|13615|
|  my|12481|
|  in|10956|
|that|10890|
+----+-----+

Method II: RDD

Load a text file

#读入本地txt文件
>> filename = "file:///home/****/Desktop/100.txt"
>> sRDD = sc.textFile(filename,8)
>> sRDD.take(15)
[u'1609',
 u'',
 u'THE SONNETS',
 u'',
 u'by William Shakespeare',
 u'',
 u'',
 u'                     1',
 u'  From fairest creatures we desire increase,',
 u"  That thereby beauty's rose might never die,",
 u'  But as the riper should by time decease,',
 u'  His tender heir might bear his memory:',
 u'  But thou contracted to thine own bright eyes,',
 u"  Feed'st thy light's flame with self-substantial fuel,",
 u'  Making a famine where abundance lies,']

>> len(sRDD.collect()) 
122656  #一共122656行

Remove Punctuation

>> import re
>> def removePunctuation(text):
       """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

       Note: 
           Only spaces, letters, and numbers should be retained.  

       Args:
           text (str): A string.

       Returns:
           str: The cleaned up string.
        """
       #re.sub替换后,返回一个新的strs
       return re.sub(re.compile(r'[^a-zA-Z0-9\s]'),"",text).lower().strip()

#map(func):Return a new distributed dataset formed by passing each element of the source through a function func.
>> shakespeareRDD = sRDD.map(removePunctuation)
>> print '\n'.join(shakespeareRDD
                   .zipWithIndex()  # to (line, lineNum)
                   .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                   .take(15))
0:1609
1:
2:the sonnets
3:
4:by william shakespeare
5:
6:
7:1
8:from fairest creatures we desire increase
9:that thereby beautys rose might never die
10:but as the riper should by time decease
11:his tender heir might bear his memory
12:but thou contracted to thine own bright eyes
13:feedst thy lights flame with selfsubstantial fuel
14:making a famine where abundance lies

Words from lines

>> shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
>> shakespeareWordCount = shakespeareWordsRDD.count()
>> print shakespeareWordsRDD.top(5)
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
>> print shakespeareWordCount
927631

Remove Blank Space

>> shakeWordsRDD = shakespeareWordsRDD.filter(lambda s:len(s)>0)
>> shakeWordCount = shakeWordsRDD.count()
>> print shakeWordCount
882996

Word Count

>> def wordCount(wordListRDD):
        """Creates a pair RDD with word counts from an RDD of words.

        Args:
            wordListRDD (RDD of str): An RDD consisting of words.

        Returns:
            RDD of (str, int): An RDD consisting of (word, count) tuples.
        """
        return wordListRDD.map(lambda x:(x,1)).reduceByKey(add)

#takeOrdered是Action
>> top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(10,key=lambda x:-x[1])
[(u'the', 27361),
 (u'and', 26028),
 (u'i', 20681),
 (u'to', 19150),
 (u'of', 17463),
 (u'a', 14593),
 (u'you', 13615),
 (u'my', 12481),
 (u'in', 10956),
 (u'that', 10890)]

>> print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
the:27361
and:26028
i:20681
to:19150
of:17463
a:14593
you:13615
my:12481
in:10956
that:10890

反馈与建议

参考文献

results matching ""

    No results matching ""