PySpark笔记

来源:厦门大学大数据实验室大数据系列课程

创建RDD

从本地文件创建

 # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc=SparkContext('local','sapp')

# 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
# 2.读取本地文件,URI为:/root/wordcount.txt
    rdd = sc.textFile('/root/wordcount.txt')

# 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    result=rdd.collect()

# 4.打印 rdd 的内容
    print(result)

用数组创建

data =[("spark",2),("hadoop",6),("hadoop",4),("spark",6)]
rdd =sc.parallelize(data)

RDD简单操作

 data =[("spark",2),("hadoop",6),("hadoop",4),("spark",6)]
 rdd =sc.parallelize(data)
  rdd.collect()

Out[102]: [('spark', 2), ('hadoop', 6), ('hadoop', 4), ('spark', 6)]

rdd_1 = rdd.mapValues(lambda x : (x,1))
rdd_1.collect()

Out[103]: [('spark', (2, 1)), ('hadoop', (6, 1)), ('hadoop', (4, 1)), ('spark'
, (6, 1))]

1  rdd_2 = rdd_1.reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1]))
2  rdd_2.collect()

Out[104]: [('hadoop', (10, 2)), ('spark', (8, 2))]

1  rdd_3 = rdd_2.mapValues(lambda x : (x[0] / x[1]))
2  rdd_3.collect()

Out[105]: [('hadoop', 5.0), ('spark', 4.0)]

1  rdd \
2  .mapValues(lambda x : (x,1)) \
3  .reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])) \
4  .mapValues(lambda x : (x[0] / x[1]))  \
5  .collect()

Out[106]: [('hadoop', 5.0), ('spark', 4.0)]

SparkSQL

读json文件

df =spark.read.json('data/people.json')

读csv文件

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
sc = spark.sparkContext
data = spark.read.option("header",True).option("inferSchema", True).csv("/data/shixunfiles/297b8e9689496aed3d0e9145de9b6884_1606802130422.csv")
data.printSchema()
data.createOrReplaceTempView("counties")
data.groupby('date').agg({'cases':'sum','deaths':'sum'}).orderBy('date').show()
# df = spark.read.csv('idenid.csv',sep=",", inferSchema="true", header="true")
spark.read.text("hdfs://192.168.40.51:9000/user/test/cxb/aa/aa.txt","hdfs://192.168.40.51:9000/user/test/cxb/bb/bb.txt")

dateframe

dateframe简单操作

df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
df.select(df.name,df.age+1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
df.groupBy("age").count().show()
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
df.createOrReplaceTempView("people")
##YELP数据

##解压数据
!unzip /data/shixunfiles/cee58c4b8fdab229f0b24cd136a7d30f_1606187962797.zip
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
business = spark.read.json('yelp_academic_dataset_business.json')
#处理categories,‘,’分割,转化为list
split_col = f.split(business['categories'], ',')
#categories转化为数组后加入table,并去空
business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna()
business.select("categories").show()
business.createOrReplaceTempView("business")
attributes = spark.sql("select attributes.RestaurantsTakeOut from business").groupBy("RestaurantsTakeOut").count().dropna()
attributes.show()
categories = spark.sql("SELECT state, city, stars, review_count, explode(categories) AS category FROM business").cache()
categories.createOrReplaceTempView("categories_1")
avg_stars = spark.sql("SELECT category, AVG(stars) as avg_stars FROM categories_1 GROUP BY category ORDER BY avg_stars DESC")
bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC")
avg_city = spark.sql("SELECT category, AVG(review_count)as avg_review_count FROM categories_1 GROUP BY category ORDER BY avg_review_count DESC")

YELP

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
 
def data_process(raw_data_path):
 
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    business = spark.read.json(raw_data_path)
    split_col = f.split(business['categories'], ',')
    business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna()
    business.createOrReplaceTempView("business")
 
    b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache()
    b_etl.createOrReplaceTempView("b_etl")
    outlier = spark.sql(
        "SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \
        as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \
        FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC")
    outlier.createOrReplaceTempView("outlier")
    joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10")
    joined.write.parquet("file:///home/hadoop/wangyingmin/yelp-etl/business_etl", mode="overwrite")
 
 
if __name__ == "__main__":
    raw_hdfs_path = 'file:///home/hadoop/wangyingmin/yelp_academic_dataset_business.json'
    print("Start cleaning raw data!")
    data_process(raw_hdfs_path)
    print("Successfully done")


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os
 
def attribute_score(attribute):
    att = spark.sql("SELECT attributes.{attr} as {attr}, category, stars FROM for_att".format(attr=attribute)).dropna()
    att.createOrReplaceTempView("att")
    att_group = spark.sql("SELECT {attr}, AVG(stars) AS stars FROM att GROUP BY {attr} ORDER BY stars".format(attr=attribute))
    att_group.show()    
    att_group.write.json("file:///usr/local/spark/yelp/analysis/{attr}".format(attr=attribute), mode='overwrite')
 
 
 
def analysis(data_path):
    spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
    business = spark.read.parquet(data_path).cache()
    business.createOrReplaceTempView("business")
 
    part_business = spark.sql("SELECT state, city, stars, review_count, explode(categories) AS category FROM business").cache()
    part_business.show()
    part_business.createOrReplaceTempView('part_business_1')
    part_business = spark.sql("SELECT state, city, stars, review_count, REPLACE(category, ' ','')as new_category FROM part_business_1")
    part_business.createOrReplaceTempView('part_business')
 
 
    print("## All distinct categories")
    all_categories = spark.sql("SELECT business_id, explode(categories) AS category FROM business")
    all_categories.createOrReplaceTempView('all_categories')
 
    distinct = spark.sql("SELECT COUNT(DISTINCT(new_category)) FROM part_business")
    distinct.show()
 
    print("## Top 10 business categories")
    top_cat = spark.sql("SELECT new_category, COUNT(*) as freq FROM part_business GROUP BY new_category ORDER BY freq DESC")
    top_cat.show(10)   
    top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category", mode='overwrite')
 
    print("## Top business categories - in every city")
    top_cat_city = spark.sql("SELECT city, new_category, COUNT(*) as freq FROM part_business GROUP BY city, new_category ORDER BY freq DESC")
    top_cat_city.show()  
    top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category_city", mode='overwrite')
 
    print("## Cities with most businesses")
    bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC")
    bus_city.show(10)   
    bus_city.write.json("file:///usr/local/spark/yelp/analysis/top_business_city", mode='overwrite')
 
    print("## Average review count by category")
    avg_city = spark.sql(
        "SELECT new_category, AVG(review_count)as avg_review_count FROM part_business GROUP BY new_category ORDER BY avg_review_count DESC")
    avg_city.show()  
    avg_city.write.json("file:///usr/local/spark/yelp/analysis/average_review_category", mode='overwrite')
 
 
    print("## Average stars by category")
    avg_state = spark.sql(
        "SELECT new_category, AVG(stars) as avg_stars FROM part_business GROUP BY new_category ORDER BY avg_stars DESC")
    avg_state.show()   
    avg_state.write.json("file:///usr/local/spark/yelp/analysis/average_stars_category", mode='overwrite')
 
    print("## Data based on Attribute")
    for_att = spark.sql("SELECT attributes, stars, explode(categories) AS category FROM business")
    for_att.createOrReplaceTempView("for_att")
    attribute = 'RestaurantsTakeout'
    attribute_score(attribute)
 
 
if __name__ == "__main__":
    business_data_path = 'file:///home/hadoop/wangyingmin/yelp-etl/business_etl' 
    print("Start analysis data!")
    analysis(business_data_path)
    print("Analysis done")

疫情数据

    from pyspark import SparkConf,SparkContext
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from datetime import datetime
    import pyspark.sql.functions as func
     
    def toDate(inputStr):
        newStr = ""
        if len(inputStr) == 8:
            s1 = inputStr[0:4]
            s2 = inputStr[5:6]
            s3 = inputStr[7]
            newStr = s1+"-"+"0"+s2+"-"+"0"+s3
        else:
            s1 = inputStr[0:4]
            s2 = inputStr[5:6]
            s3 = inputStr[7:]
            newStr = s1+"-"+"0"+s2+"-"+s3
        date = datetime.strptime(newStr, "%Y-%m-%d")
        return date
     
     
     
    #主程序:
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
     
    fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                        StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
    schema = StructType(fields)
     
    rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
    rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
     
     
    shemaUsInfo = spark.createDataFrame(rdd1,schema)
     
    shemaUsInfo.createOrReplaceTempView("usInfo")
     
    #1.计算每日的累计确诊病例数和死亡数
    df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
     
    #列重命名
    df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
    df1.repartition(1).write.json("result1.json")                               #写入hdfs
     
    #注册为临时表供下一步使用
    df1.createOrReplaceTempView("ustotal")
     
    #2.计算每日较昨日的新增确诊病例数和死亡病例数
    df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
     
    df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入hdfs
     
    #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
    df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
     
    df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
     
    df3.createOrReplaceTempView("eachStateInfo")
     
    #4.找出美国确诊最多的10个州
    df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
    df4.repartition(1).write.json("result4.json")
     
    #5.找出美国死亡最多的10个州
    df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
    df5.repartition(1).write.json("result5.json")
     
    #6.找出美国确诊最少的10个州
    df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
    df6.repartition(1).write.json("result6.json")
     
    #7.找出美国死亡最少的10个州
    df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
    df7.repartition(1).write.json("result7.json")
     
    #8.统计截止5.19全美和各州的病死率
    df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
    df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

发表评论

邮箱地址不会被公开。 必填项已用*标注