博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkSQL——用之惜之
阅读量:4580 次
发布时间:2019-06-09

本文共 6792 字,大约阅读时间需要 22 分钟。

  原文链接:

  SparkSql作为Spark的结构化数据处理模块,提供了非常强大的API,让分析人员用一次,就会为之倾倒,为之着迷,为之至死不渝。在内部,SparkSQL使用额外结构信息来执行额外的优化。在外部,可以使用SQL和DataSet 的API与之交互。本文笔者将带你走进SparkSql的世界,领略SparkSql之诸多妙处。

一、DataSet和DataFrame

  当使用编程语言对结构化数据进行操作时候,SparkSql中返回的数据类型是DataSet/DataFrame,因此开篇笔者就先对这两种数据类型进行简单的介绍。

  Dataset 是分布式的数据集合。是Spark 1.6中添加的一个新接口,是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作,数据集可以由JVM对象构造,然后使用函数转换(map、flatmap、filter等)进行操作。Dataset 支持Scala和javaAPI,不支持Python API。

  DataFrame是由列组成的数据集,它在概念上等同于关系数据库中的表或R/Python中的data frame,但在查询引擎上进行了丰富的优化。DataFrame可以由各种各样的源构建,例如:结构化数据文件、hive中的表、外部数据库或现有的RDD。

二、SparkSQL基于DataFrame的操作

 

import org.apache.spark.sql.SparkSession 2val spark = SparkSession 3  .builder() 4  .appName("Spark SQL basic example") 5  .getOrCreate() 6//引入Spark的隐式类型转换,如将RDD转换成 DataFrame 7import spark.implicits._ 8val df = spark.read.json("/data/tmp/SparkSQL/people.json") 9df.show() //将DataFrame的内容进行标准输出10//+---+-------+11//|age|   name|12//+---+-------+13//|   |Michael|14//| 19|   Andy|15//| 30| Justin|16//+---+-------+1718df.printSchema()  //打印出DataFrame的表结构19//root20// |-- age: string (nullable = true)21// |-- name: string (nullable = true)2223df.select("name").show() 24//类似于select name from DataFrame的SQL语句2526df.select($"name", $"age" + 1).show()27//类似于select name,age+1 from DataFrame的SQL语句28//此处注意,如果对列进行操作,所有列名前都必须加上$符号2930df.filter($"age" > 21).show()31//类似于select * from DataFrame where age>21 的SQL语句3233df.groupBy("age").count().show()34//类似于select age,count(age) from DataFrame group by age;3536//同时也可以直接写SQL进行DataFrame数据的分析37df.createOrReplaceTempView("people")38val sqlDF = spark.sql("SELECT * FROM people")39sqlDF.show()

  

 

三、SparkSQL基于DataSet的操作

  由于DataSet吸收了RDD和DataFrame的优点,所有可以同时向操作RDD和DataFrame一样来操作DataSet。看下边一个简单的例子。

1case class Person(name: String, age: Long) 2// 通过 case类创建DataSet 3val caseClassDS = Seq(Person("Andy", 32)).toDS() 4caseClassDS.show() 5// +----+---+ 6// |name|age| 7// +----+---+ 8// |Andy| 32| 9// +----+---+1011// 通过基本类型创建DataSet12importing spark.implicits._13val primitiveDS = Seq(1, 2, 3).toDS()14primitiveDS.map(_ + 1).collect() 15// Returns: Array(2, 3, 4)1617// 将DataFrames转换成DataSet18val path = "examples/src/main/resources/people.json"19val peopleDS = spark.read.json(path).as[Person]20peopleDS.show()21// +----+-------+22// | age|   name|23// +----+-------+24// |null|Michael|25// |  30|   Andy|26// |  19| Justin|27// +----+-------+

  在上边的例子中能够发现DataSet的创建是非常简单的,但是笔者需要强调一点,DataSet是强类型的,也就是说DataSet的每一列都有指定的列标识符和数据类型。下边的列子将进一步介绍DataSet与RDD的交互。

1import spark.implicits._ 2//将RDD转换成DataFrame 3val peopleDF = spark.sparkContext 4  .textFile("examples/src/main/resources/people.txt") 5  .map(_.split(",")) 6  .map(attributes=>Person(attributes(0),attributes(1).trim.toInt)) 7  .toDF() 8// 将RDD注册为一个临时视图 9peopleDF.createOrReplaceTempView("people")10//对临时视图进行Sql查询11val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")1213// 对teenagersDF 对应的DataFrame进行RDD的算子map操作14teenagersDF.map(teenager => "Name: " + teenager(0)).show()15// +------------+16// |       value|17// +------------+18// |Name: Justin|19// +------------+2021// 与上一条语句效果一样22teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()23// +------------+24// |       value|25// +------------+26// |Name: Justin|27// +------------+

  

 

四、SparkSQL操作HIve表

  Spark SQL支持读取和写入存储在Apache HIVE中的数据。然而,由于Hive具有大量的依赖关系,默认情况下这些依赖性不包含在Spark分布中。如果能在classpath路径找到Hive依赖文件,Spark将自动加载它们。另外需要注意的是,这些Hive依赖项须存在于所有Spark的Worker节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。

1import java.io.File  2import org.apache.spark.sql.{Row, SaveMode, SparkSession}  3  4case class Record(key: Int, value: String)  5  6// 设置hive数据库默认的路径  7val warehouseLocation = new File("spark-warehouse").getAbsolutePath  8  9val spark = SparkSession 10  .builder() 11  .appName("Spark Hive Example") 12  .config("spark.sql.warehouse.dir", warehouseLocation) 13  .enableHiveSupport() 14  .getOrCreate() 15 16import spark.implicits._ 17import spark.sql 18 19//创建hive表,导入数据,并且查询数据 20sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") 21sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") 22sql("SELECT * FROM src").show() 23 24// +---+-------+ 25// |key|  value| 26// +---+-------+ 27// |238|val_238| 28// | 86| val_86| 29// |311|val_311| 30// ... 31 32//对hive表数据进行聚合操作 33sql("SELECT COUNT(*) FROM src").show() 34// +--------+ 35// |count(1)| 36// +--------+ 37// |    500 | 38// +--------+ 39 40// sql执行的查询结果返回DataFrame类型数据,支持常用的RDD操作 41val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") 42val stringsDS = sqlDF.map { 43  case Row(key: Int, value: String) => s"Key: $key, Value: $value" 44} 45stringsDS.show() 46// +--------------------+ 47// |               value| 48// +--------------------+ 49// |Key: 0, Value: val_0| 50// |Key: 0, Value: val_0| 51// |Key: 0, Value: val_0| 52// ... 53 54// 通过DataFrames创建一个临时视图val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) 55recordsDF.createOrReplaceTempView("records") 56 57// 查询操作可以将临时的视图与HIve表中数据进行关联查询 58sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() 59// +---+------+---+------+ 60// |key| value|key| value| 61// +---+------+---+------+ 62// |  2| val_2|  2| val_2| 63// |  4| val_4|  4| val_4| 64// |  5| val_5|  5| val_5| 65// ... 66 67// 创建一个Hive表,并且以parquet格式存储数据 68sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET") 69// 讲DataFrame中数据保存到Hive表里 70val df = spark.table("src") 71df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records") 72sql("SELECT * FROM hive_records").show() 73// +---+-------+ 74// |key|  value| 75// +---+-------+ 76// |238|val_238| 77// | 86| val_86| 78// |311|val_311| 79// ... 80 81// 在指定路径创建一个Parquet文件并且写入数据 82val dataDir = "/tmp/parquet_data" 83spark.range(10).write.parquet(dataDir) 84// 创建HIve外部表 85sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") 86sql("SELECT * FROM hive_ints").show() 87// +---+ 88// |key| 89// +---+ 90// |  0| 91// |  1| 92// |  2| 93// ... 94 95// Turn on flag for Hive Dynamic Partitioning 96spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 97spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 98// 通过DataFrame的API创建HIve分区表 99df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")100sql("SELECT * FROM hive_part_tbl").show()101// +-------+---+102// |  value|key|103// +-------+---+104// |val_238|238|105// | val_86| 86|106// |val_311|311|107// ...108109spark.stop()

  当然SparkSql的操作远不止这些,它可以直接对文件快执行Sql查询,也可以通过JDBC连接到关系型数据库,对关系型数据库中的数据进行一些运算分析操作。如果读者感觉不过瘾,可以留言与笔者交流,也可以通过Spark官网查阅相关例子进行学习。下一篇关于Spark的文章,笔者将详细的介绍Spark的常用算子,以满足渴望进行数据分析的小伙伴们的求知的欲望。

 

 

更多精彩内容,欢迎扫码关注以下微信公众号:大数据技术宅。大数据、AI从关注开始

 

 

转载于:https://www.cnblogs.com/followees/p/8909859.html

你可能感兴趣的文章
hdu 6435 CSGO(最大曼哈顿距离)
查看>>
logback框架之——日志分割所带来的潜在问题
查看>>
链路追踪工具之Zipkin学习小记
查看>>
iOS中通讯录的开发
查看>>
怎么让table中的<td>内容向上对齐
查看>>
[Java] 遍历HashMap和HashMap转换成List的两种方式
查看>>
mongodb
查看>>
LeetCode 46. Permutations
查看>>
jmeter- 性能测试3:聚合报告(Aggregate Report )
查看>>
JavaScript高级程序设计---学习笔记(二)
查看>>
vim 插件的学习
查看>>
Uncaught SyntaxError: Unexpected token ILLEGAL
查看>>
一个预处理定义的问题
查看>>
ANDROID L——Material Design综合应用(Demo)
查看>>
自我介绍以及关于软件工程的问题
查看>>
struts (一)
查看>>
【新番推荐】工作细胞
查看>>
NYOJ 16 矩形嵌套
查看>>
Leetcode中的SQL题目练习(二)
查看>>
dubbo 集群容错源码
查看>>