来源:厦门大学大数据实验室大数据系列课程
创建RDD
从本地文件创建
# 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc=SparkContext('local','sapp') # 文本文件 RDD 可以使用创建 SparkContext 的t extFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合 # 2.读取本地文件,URI为:/root/wordcount.txt rdd = sc.textFile('/root/wordcount.txt') # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容 result=rdd.collect() # 4.打印 rdd 的内容 print(result)
用数组创建
data =[("spark",2),("hadoop",6),("hadoop",4),("spark",6)] rdd =sc.parallelize(data)
RDD简单操作
data =[("spark",2),("hadoop",6),("hadoop",4),("spark",6)] rdd =sc.parallelize(data) rdd.collect() Out[102]: [('spark', 2), ('hadoop', 6), ('hadoop', 4), ('spark', 6)] rdd_1 = rdd.mapValues(lambda x : (x,1)) rdd_1.collect() Out[103]: [('spark', (2, 1)), ('hadoop', (6, 1)), ('hadoop', (4, 1)), ('spark' , (6, 1))] 1 rdd_2 = rdd_1.reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])) 2 rdd_2.collect() Out[104]: [('hadoop', (10, 2)), ('spark', (8, 2))] 1 rdd_3 = rdd_2.mapValues(lambda x : (x[0] / x[1])) 2 rdd_3.collect() Out[105]: [('hadoop', 5.0), ('spark', 4.0)] 1 rdd \ 2 .mapValues(lambda x : (x,1)) \ 3 .reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])) \ 4 .mapValues(lambda x : (x[0] / x[1])) \ 5 .collect() Out[106]: [('hadoop', 5.0), ('spark', 4.0)]
SparkSQL
读json文件
df =spark.read.json('data/people.json')
读csv文件
from pyspark import SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() sc = spark.sparkContext data = spark.read.option("header",True).option("inferSchema", True).csv("/data/shixunfiles/297b8e9689496aed3d0e9145de9b6884_1606802130422.csv") data.printSchema() data.createOrReplaceTempView("counties") data.groupby('date').agg({'cases':'sum','deaths':'sum'}).orderBy('date').show() # df = spark.read.csv('idenid.csv',sep=",", inferSchema="true", header="true")
spark.read.text("hdfs://192.168.40.51:9000/user/test/cxb/aa/aa.txt","hdfs://192.168.40.51:9000/user/test/cxb/bb/bb.txt")
dateframe
dateframe简单操作
df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ df.select(df.name,df.age+1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+ df.filter(df.age > 20 ).show() +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ df.groupBy("age").count().show() | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+ df.createOrReplaceTempView("people")
##YELP数据 ##解压数据 !unzip /data/shixunfiles/cee58c4b8fdab229f0b24cd136a7d30f_1606187962797.zip from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f import os spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() business = spark.read.json('yelp_academic_dataset_business.json') #处理categories,‘,’分割,转化为list split_col = f.split(business['categories'], ',') #categories转化为数组后加入table,并去空 business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna() business.select("categories").show() business.createOrReplaceTempView("business") attributes = spark.sql("select attributes.RestaurantsTakeOut from business").groupBy("RestaurantsTakeOut").count().dropna() attributes.show() categories = spark.sql("SELECT state, city, stars, review_count, explode(categories) AS category FROM business").cache() categories.createOrReplaceTempView("categories_1") avg_stars = spark.sql("SELECT category, AVG(stars) as avg_stars FROM categories_1 GROUP BY category ORDER BY avg_stars DESC") bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC") avg_city = spark.sql("SELECT category, AVG(review_count)as avg_review_count FROM categories_1 GROUP BY category ORDER BY avg_review_count DESC")
YELP
from pyspark import SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f def data_process(raw_data_path): spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() business = spark.read.json(raw_data_path) split_col = f.split(business['categories'], ',') business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna() business.createOrReplaceTempView("business") b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache() b_etl.createOrReplaceTempView("b_etl") outlier = spark.sql( "SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \ as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \ FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC") outlier.createOrReplaceTempView("outlier") joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10") joined.write.parquet("file:///home/hadoop/wangyingmin/yelp-etl/business_etl", mode="overwrite") if __name__ == "__main__": raw_hdfs_path = 'file:///home/hadoop/wangyingmin/yelp_academic_dataset_business.json' print("Start cleaning raw data!") data_process(raw_hdfs_path) print("Successfully done") from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f import os def attribute_score(attribute): att = spark.sql("SELECT attributes.{attr} as {attr}, category, stars FROM for_att".format(attr=attribute)).dropna() att.createOrReplaceTempView("att") att_group = spark.sql("SELECT {attr}, AVG(stars) AS stars FROM att GROUP BY {attr} ORDER BY stars".format(attr=attribute)) att_group.show() att_group.write.json("file:///usr/local/spark/yelp/analysis/{attr}".format(attr=attribute), mode='overwrite') def analysis(data_path): spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() business = spark.read.parquet(data_path).cache() business.createOrReplaceTempView("business") part_business = spark.sql("SELECT state, city, stars, review_count, explode(categories) AS category FROM business").cache() part_business.show() part_business.createOrReplaceTempView('part_business_1') part_business = spark.sql("SELECT state, city, stars, review_count, REPLACE(category, ' ','')as new_category FROM part_business_1") part_business.createOrReplaceTempView('part_business') print("## All distinct categories") all_categories = spark.sql("SELECT business_id, explode(categories) AS category FROM business") all_categories.createOrReplaceTempView('all_categories') distinct = spark.sql("SELECT COUNT(DISTINCT(new_category)) FROM part_business") distinct.show() print("## Top 10 business categories") top_cat = spark.sql("SELECT new_category, COUNT(*) as freq FROM part_business GROUP BY new_category ORDER BY freq DESC") top_cat.show(10) top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category", mode='overwrite') print("## Top business categories - in every city") top_cat_city = spark.sql("SELECT city, new_category, COUNT(*) as freq FROM part_business GROUP BY city, new_category ORDER BY freq DESC") top_cat_city.show() top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category_city", mode='overwrite') print("## Cities with most businesses") bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC") bus_city.show(10) bus_city.write.json("file:///usr/local/spark/yelp/analysis/top_business_city", mode='overwrite') print("## Average review count by category") avg_city = spark.sql( "SELECT new_category, AVG(review_count)as avg_review_count FROM part_business GROUP BY new_category ORDER BY avg_review_count DESC") avg_city.show() avg_city.write.json("file:///usr/local/spark/yelp/analysis/average_review_category", mode='overwrite') print("## Average stars by category") avg_state = spark.sql( "SELECT new_category, AVG(stars) as avg_stars FROM part_business GROUP BY new_category ORDER BY avg_stars DESC") avg_state.show() avg_state.write.json("file:///usr/local/spark/yelp/analysis/average_stars_category", mode='overwrite') print("## Data based on Attribute") for_att = spark.sql("SELECT attributes, stars, explode(categories) AS category FROM business") for_att.createOrReplaceTempView("for_att") attribute = 'RestaurantsTakeout' attribute_score(attribute) if __name__ == "__main__": business_data_path = 'file:///home/hadoop/wangyingmin/yelp-etl/business_etl' print("Start analysis data!") analysis(business_data_path) print("Analysis done")
疫情数据
from pyspark import SparkConf,SparkContext from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql import SparkSession from datetime import datetime import pyspark.sql.functions as func def toDate(inputStr): newStr = "" if len(inputStr) == 8: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7] newStr = s1+"-"+"0"+s2+"-"+"0"+s3 else: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7:] newStr = s1+"-"+"0"+s2+"-"+s3 date = datetime.strptime(newStr, "%Y-%m-%d") return date #主程序: spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False), StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),] schema = StructType(fields) rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt") rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema) shemaUsInfo.createOrReplaceTempView("usInfo") #1.计算每日的累计确诊病例数和死亡数 df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc()) #列重命名 df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths") df1.repartition(1).write.json("result1.json") #写入hdfs #注册为临时表供下一步使用 df1.createOrReplaceTempView("ustotal") #2.计算每日较昨日的新增确诊病例数和死亡病例数 df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)") df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") #写入hdfs #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数 df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state") df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs df3.createOrReplaceTempView("eachStateInfo") #4.找出美国确诊最多的10个州 df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10") df4.repartition(1).write.json("result4.json") #5.找出美国死亡最多的10个州 df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10") df5.repartition(1).write.json("result5.json") #6.找出美国确诊最少的10个州 df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10") df6.repartition(1).write.json("result6.json") #7.找出美国死亡最少的10个州 df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10") df7.repartition(1).write.json("result7.json") #8.统计截止5.19全美和各州的病死率 df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache() df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")