这里主要说一下Spark的SQL操作。
1、dataframe操作数据
#加载json数据 val df = spark.read.json("/usr/hadoop/person.json") #加载CSV数据 #val df = spark.read.csv("/usr/hadoop/person.csv") #查询前20行 df.show() #查看结构 df.printSchema() #选择一列 df.select("NAME").show() #按条件过滤行 df.filter($"BALANCE_COST" < 10 && $"BALANCE_COST" > 1).show() #分组统计 df.groupBy("SEX_CODE").count().show()
2、sql操作数据
#创建视图 df.createOrReplaceTempView("person") #查看数据 spark.sql("SELECT * FROM person").show() #统计数据 spark.sql("SELECT * FROM person").count() #带条件选择 spark.sql("SELECT * FROM person WHERE BALANCE_COST<10 and BALANCE_COST>1 order by BALANCE_COST").show()
3、转为DS
#转为Dataset case class PERSONC(PATIENT_NO : String,NAME : String,SEX_CODE : String,BIRTHDATE : String,BALANCE_CODE : String) var personDS = spark.read.json("/usr/hadoop/person.json").as[PERSONC]
4、sql与map reduce混用
personDS.select("BALANCE_COST").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b) spark.sql("select BALANCE_COST from person").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b)
5、数据映射为Class对象
val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt") val persons = personRDD.map(_.split(",")).map(attributes => Person(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).toDouble))
6、自定义schema
import org.apache.spark.sql.Row import org.apache.spark.sql.types._ #加载数据 val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt") #转为org.apache.spark.sql.Row val rowRDD = personRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).replace("\"","").toDouble)) #定义新的Schema val personSchema = StructType(List(StructField("PatientNum",StringType,nullable = true), StructField("Name",StringType,nullable = true), StructField("SexCode",StringType,nullable = true), StructField("BirthDate",StringType,nullable = true), StructField("BalanceCode",DoubleType,nullable = true))) #建立新的DF val personDF = spark.createDataFrame(rowRDD, personSchema) #使用DF personDF.select("PatientNum").show()