这里主要说一下Spark的SQL操作。
1、dataframe操作数据
#加载json数据
val df = spark.read.json("/usr/hadoop/person.json")
#加载CSV数据
#val df = spark.read.csv("/usr/hadoop/person.csv")
#查询前20行
df.show()
#查看结构
df.printSchema()
#选择一列
df.select("NAME").show()
#按条件过滤行
df.filter($"BALANCE_COST" < 10 && $"BALANCE_COST" > 1).show()
#分组统计
df.groupBy("SEX_CODE").count().show()
2、sql操作数据
#创建视图
df.createOrReplaceTempView("person")
#查看数据
spark.sql("SELECT * FROM person").show()
#统计数据
spark.sql("SELECT * FROM person").count()
#带条件选择
spark.sql("SELECT * FROM person WHERE BALANCE_COST<10 and BALANCE_COST>1 order by BALANCE_COST").show()
3、转为DS
#转为Dataset
case class PERSONC(PATIENT_NO : String,NAME : String,SEX_CODE : String,BIRTHDATE : String,BALANCE_CODE : String)
var personDS = spark.read.json("/usr/hadoop/person.json").as[PERSONC]
4、sql与map reduce混用
personDS.select("BALANCE_COST").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b)
spark.sql("select BALANCE_COST from person").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b)
5、数据映射为Class对象
val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt")
val persons = personRDD.map(_.split(",")).map(attributes => Person(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).toDouble))
6、自定义schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
#加载数据
val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt")
#转为org.apache.spark.sql.Row
val rowRDD = personRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).replace("\"","").toDouble))
#定义新的Schema
val personSchema = StructType(List(StructField("PatientNum",StringType,nullable = true), StructField("Name",StringType,nullable = true), StructField("SexCode",StringType,nullable = true), StructField("BirthDate",StringType,nullable = true), StructField("BalanceCode",DoubleType,nullable = true)))
#建立新的DF
val personDF = spark.createDataFrame(rowRDD, personSchema)
#使用DF
personDF.select("PatientNum").show()