Apache Spark
Table of Contents
Overview
Reference
Spark API
File Input pattern
*
(match 0 or more character)?
(match single character)[ab]
(character class)[^ab]
(negated character class)[a-b]
(character range){a,b}
(alternation)\c
(escape character)
You can use commas as delimiters of multiple patterns:
sc.textFile("/user/Orders/2015072[7-9]*,/user/Orders/2015073[0-1]*")
Which is same as:
sc.textFile("/user/Orders/201507{2[7-9],3[0-1]}*")
Spark SQL
Reference Links
There seems to be a newly documented page: https://docs.databricks.com/spark/latest/spark-sql/index.html
Spark SQL's query languages is based on HiveQL
.
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
- http://spark.apache.org/docs/latest/sql-programming-guide.html#supported-hive-features
Examples
-- basics
select col1 from t1 where col1 > 10
select * from t1 limit 5
-- plan
explain select * from t1
-- group by
select col1, count(*) from t1 group by col1
select col1, sum(col2) from t1 group by col1
-- 'group by' can be specified with position numbers(1-indexed from selected columns)
select col1, sum(col2) from t1 group by 1
-- distinct
select col1, col2 from t1
1 3
1 3
1 4
2 5
select distinct col1, col2 from t1
1 3
1 4
2 5
select distinct col1 from t1
1
2
-- distinct can be used within 'count'
select col1, count(distinct col2) from t1 group by col1
-- having
select col1 from t1 group by col1 having sum(col2) > 10
-- same as above
select col1 from (select col1, sum(col2) as col2sum from t1 group by col1) t2 where t2.col2sum > 10
-- order by
select col1 from t1 order by col1 desc
-- join
select a.* from a join b on (a.id = b.id and a.department = b.department)
select a.* from a left outer join b on (a.id <> b.id)
select a.val, b.val, c.val from a join b on (a.key = b.key1) join c on (c.key = b.key1)
-- joins occur before where clauses, this is a bad way:
select a.val, b.val from a left outer join b on (a.key=b.key)
where a.ds='2009-07-07' and b.ds='2009-07-07'
-- following is better:
select a.val, b.val from a left outer join b
on (a.key=b.key and b.ds='2009-07-07' and a.ds='2009-07-07')
-- union
select u.id, actions.date
from (
select av.uid as uid
from action_video av
where av.date = '2008-06-03'
union all
select ac.uid as uid
from action_comment ac
where ac.date = '2008-06-03'
) actions join users u on (u.id = actions.uid)
-- if, case
select if(field in (0, 1), 'ab', 'c') from tbl
select
case field
when 0 then 'a'
when 1 then 'b'
else 'c'
end
from tbl
-- subqueries
select col
from (
select a+b as col
from t1
) t2
select *
from a
where a.a in (select foo from b);
select a
from t1
where exists (select b from t2 where t1.x = t2.y)
-- common table expression
with q1 as (select key from src where key = '5')
select *
from q1;
with q1 as (select * from src where key= '5'),
q2 as (select * from src s2 where key = '4')
select * from q1 union all select * from q2;
-- create table as select example
create table s2 as
with q1 as ( select key from src where key = '4')
select * from q1;
-- create or replace temporary view is recommended instead of just 'create table'
create or replace temporary view foo as select * from t1 limit 1
-- view example
create view v1 as
with q1 as ( select key from src where key = '5')
select * from q1;
-- lateral view
select adid, count(1)
from pageads lateral view explode(adid_list) adtable as adid
group by adid
select k, v
from tbl lateral view explode(kvmap) kvs as k, v
group by k
select mycol1, mycol2 from basetable
lateral view explode(col1) mytable1 as mycol1
lateral view explode(col2) mytable2 as mycol2;
select * from src lateral view outer explode(array()) c as a limit 10;
-- time range (t is of timestamp type)
select t from table1
where t > to_utc_timestamp("2016-12-25", "UTC")
and t < to_utc_timestamp("2016-12-25 12:00", "UTC")
-- timestamp to string
select date_format(t, 'YYYY-MM-dd') from tbl
-- select field with special characters(use backtick)
select `@time` from t1
-- concat_ws to make an array as a string
-- map_values to make a map as an array
-- <array of structtype>.<field> goes into an <array of field>
select concat_ws(", ", map_values(items).price)
from Items
Recipes
Referencing query results as
DataFrame
in spark application
Example codes
https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/sql
Terminology
Topics
How schema merging works
There were no clear documentation about how the merging schema across files works. There are only some the general guidelines of growing schema: Append only, no modification.
But I was curious that if appending is OK, can I just reorder the StructFields
? Because StructType
takes them as a List
, it may. But as the data source can be json
, which doesn't care the order of fields, it may not. So I tested.
The conclusion is following:
- The order of
StructField
doesn't matter. Only field name matters. - If
mergeSchema
istrue
, all fields are merged. - If
mergeSchema
isfalse
, which is default, The schema of the first file in alphabetical order has priority - The
nullable
inStructType
doesn't matter. It seem to only matter when processing raw data, but merging files. - If there are fields with the same name and different types, schema merging will cause runtime errors.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
def createSchema(schema: String): StructType = {
val fieldNames = schema.split(" ")
val fields = fieldNames.map { name =>
StructField(name, StringType)
}
StructType(fields)
}
// A schema of 'name' and 'addr'
val schema1 = createSchema("name addr")
val data1 = List(Row("yeonghoey", "jamsil"))
val rdd1 = spark.sparkContext.parallelize(data1)
val df1 = spark.createDataFrame(rdd1, schema1)
df1.write.mode("overwrite").parquet("data1")
// Add 'sex' field in between the fields of schema1.
val schema2 = createSchema("name sex addr")
val data2 = List(Row("cwkim", "male", "unjung"))
val rdd2 = spark.sparkContext.parallelize(data2)
val df2 = spark.createDataFrame(rdd2, schema2)
df2.write.mode("overwrite").parquet("data2")
// Append 'sex' field to the schema1.
val schema3 = createSchema("name addr sex")
val data3 = List(Row("sub", "yangjae", "male"))
val rdd3 = spark.sparkContext.parallelize(data3)
val df3 = spark.createDataFrame(rdd3, schema3)
df3.write.mode("overwrite").parquet("data3")
// A schema of 'name' and 'sex'
val schema4 = createSchema("name sex")
val data4 = List(Row("suminb", "male"))
val rdd4 = spark.sparkContext.parallelize(data4)
val df4 = spark.createDataFrame(rdd4, schema4)
df4.write.mode("overwrite").parquet("data4")
// ----------------------------------------------------------------------
// name addr
// + name sex addr
spark.read.parquet("data{1,2}").show()
// +---------+------+
// | name| addr|
// +---------+------+
// | cwkim|unjung|
// |yeonghoey|jamsil|
// +---------+------+
// name addr
// * name sex addr
spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// +---------+------+----+
// | name| addr| sex|
// +---------+------+----+
// | cwkim|unjung|male|
// |yeonghoey|jamsil|null|
// +---------+------+----+
// name addr
// + name addr sex
spark.read.parquet("data{1,3}").show()
// +---------+-------+
// | name| addr|
// +---------+-------+
// | sub|yangjae|
// |yeonghoey| jamsil|
// +---------+-------+
// name addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{1,3}").show()
// +---------+-------+----+
// | name| addr| sex|
// +---------+-------+----+
// | sub|yangjae|male|
// |yeonghoey| jamsil|null|
// +---------+-------+----+
// name sex addr
// + name addr sex
spark.read.parquet("data{2,3}").show()
// +-----+----+-------+
// | name| sex| addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// | sub|male|yangjae|
// +-----+----+-------+
// name sex addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{2,3}").show()
// +-----+----+-------+
// | name| sex| addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// | sub|male|yangjae|
// +-----+----+-------+
// name addr sex
// + name sex
spark.read.parquet("data{3,4}").show()
// +------+-------+----+
// | name| addr| sex|
// +------+-------+----+
// | sub|yangjae|male|
// |suminb| null|male|
// +------+-------+----+
// name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{3,4}").show()
// +------+-------+----+
// | name| addr| sex|
// +------+-------+----+
// | sub|yangjae|male|
// |suminb| null|male|
// +------+-------+----+
// name addr
// + name sex
spark.read.parquet("data{1,4}").show()
// +---------+------+
// | name| addr|
// +---------+------+
// |yeonghoey|jamsil|
// | suminb| null|
// +---------+------+
// name addr
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,4}").show()
// +---------+------+----+
// | name| addr| sex|
// +---------+------+----+
// |yeonghoey|jamsil|null|
// | suminb| null|male|
// +---------+------+----+
// name addr
// name sex addr
// name addr sex
// + name sex
spark.read.parquet("data{1,2,3,4}").show()
// +---------+-------+
// | name| addr|
// +---------+-------+
// | cwkim| unjung|
// | sub|yangjae|
// |yeonghoey| jamsil|
// | suminb| null|
// +---------+-------+
// name addr
// name sex addr
// name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,2,3,4}").show()
// +---------+-------+----+
// | name| addr| sex|
// +---------+-------+----+
// | cwkim| unjung|male|
// | sub|yangjae|male|
// |yeonghoey| jamsil|null|
// | suminb| null|male|
// +---------+-------+----+
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
val schema1 = StructType(List(StructField("x'", StringType)))
val data1 = List(Row("yeonghoey"))
val rdd1 = spark.sparkContext.parallelize(data1)
val df1 = spark.createDataFrame(rdd1, schema1)
df1.write.mode("overwrite").parquet("data1")
val schema2 = StructType(List(StructField("x'", IntegerType)))
val data2 = List(Row(31))
val rdd2 = spark.sparkContext.parallelize(data2)
val df2 = spark.createDataFrame(rdd2, schema2)
df2.write.mode("overwrite").parquet("data2")
// ----------------------------------------------------------------------
spark.read.parquet("data{1,2}").show()
// Caused by: java.lang.UnsupportedOperationException: Unimplemented type: StringType
spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and IntegerType