Apache Spark

Spark API

File Input pattern


You can use commas as delimiters of multiple patterns:


Which is same as:


Spark SQL

There seems to be a newly documented page: https://docs.databricks.com/spark/latest/spark-sql/index.html

Spark SQL's query languages is based on HiveQL.


-- basics
select col1 from t1 where col1 > 10
select * from t1 limit 5

-- plan
explain select * from t1

-- group by
select col1, count(*) from t1 group by col1
select col1, sum(col2) from t1 group by col1

-- 'group by' can be specified with position numbers(1-indexed from selected columns)
select col1, sum(col2) from t1 group by 1

-- distinct
select col1, col2 from t1
1 3
1 3
1 4
2 5

select distinct col1, col2 from t1
1 3
1 4
2 5

select distinct col1 from t1

-- distinct can be used within 'count'
select col1, count(distinct col2) from t1 group by col1

-- having
select col1 from t1 group by col1 having sum(col2) > 10
-- same as above
select col1 from (select col1, sum(col2) as col2sum from t1 group by col1) t2 where t2.col2sum > 10

-- order by
select col1 from t1 order by col1 desc

-- join
select a.* from a join b on (a.id = b.id and a.department = b.department)
select a.* from a left outer join b on (a.id <> b.id)
select a.val, b.val, c.val from a join b on (a.key = b.key1) join c on (c.key = b.key1)

-- joins occur before where clauses, this is a bad way:
select a.val, b.val from a left outer join b on (a.key=b.key)
where a.ds='2009-07-07' and b.ds='2009-07-07'

-- following is better:
select a.val, b.val from a left outer join b
on (a.key=b.key and b.ds='2009-07-07' and a.ds='2009-07-07')

-- union
select u.id, actions.date
from (
  select av.uid as uid
  from action_video av
  where av.date = '2008-06-03'
  union all
  select ac.uid as uid
  from action_comment ac
  where ac.date = '2008-06-03'
) actions join users u on (u.id = actions.uid)

-- if, case
select if(field in (0, 1), 'ab', 'c') from tbl

  case field
  when 0 then 'a'
  when 1 then 'b'
  else 'c'
from tbl

-- subqueries
select col
from (
  select a+b as col
  from t1
) t2

select *
from a
where a.a in (select foo from b);

select a
from t1
where exists (select b from t2 where t1.x = t2.y)

-- common table expression
with q1 as (select key from src where key = '5')
select *
from q1;

with q1 as (select * from src where key= '5'),
     q2 as (select * from src s2 where key = '4')
select * from q1 union all select * from q2;

-- create table as select example
create table s2 as
with q1 as ( select key from src where key = '4')
select * from q1;

-- create or replace temporary view is recommended instead of just 'create table'
create or replace temporary view foo as select * from t1 limit 1

-- view example
create view v1 as
with q1 as ( select key from src where key = '5')
select * from q1;

-- lateral view
select adid, count(1)
from pageads lateral view explode(adid_list) adtable as adid
group by adid

select k, v
from tbl lateral view explode(kvmap) kvs as k, v
group by k

select mycol1, mycol2 from basetable
lateral view explode(col1) mytable1 as mycol1
lateral view explode(col2) mytable2 as mycol2;

select * from src lateral view outer explode(array()) c as a limit 10;

-- time range (t is of timestamp type)
select t from table1
where t > to_utc_timestamp("2016-12-25", "UTC")
and t < to_utc_timestamp("2016-12-25 12:00", "UTC")

-- timestamp to string
select date_format(t, 'YYYY-MM-dd') from tbl

-- select field with special characters(use backtick)
select `@time` from t1

-- concat_ws to make an array as a string
-- map_values to make a map as an array
-- <array of structtype>.<field> goes into an <array of field>
select concat_ws(", ", map_values(items).price)
from Items


  1. Referencing query results as DataFrame in spark application

How schema merging works

There were no clear documentation about how the merging schema across files works. There are only some the general guidelines of growing schema: Append only, no modification.

But I was curious that if appending is OK, can I just reorder the StructFields? Because StructType takes them as a List, it may. But as the data source can be json, which doesn't care the order of fields, it may not. So I tested.

The conclusion is following:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

def createSchema(schema: String): StructType = {
    val fieldNames = schema.split(" ")
    val fields = fieldNames.map { name =>
        StructField(name, StringType)

// A schema of 'name' and 'addr'
val schema1 = createSchema("name addr")
val data1   = List(Row("yeonghoey", "jamsil"))
val rdd1    = spark.sparkContext.parallelize(data1)
val df1     = spark.createDataFrame(rdd1, schema1)

// Add 'sex' field in between the fields of schema1.
val schema2 = createSchema("name sex addr")
val data2   = List(Row("cwkim", "male", "unjung"))
val rdd2    = spark.sparkContext.parallelize(data2)
val df2     = spark.createDataFrame(rdd2, schema2)

// Append 'sex' field to the schema1.
val schema3 = createSchema("name addr sex")
val data3   = List(Row("sub", "yangjae", "male"))
val rdd3    = spark.sparkContext.parallelize(data3)
val df3     = spark.createDataFrame(rdd3, schema3)

// A schema of 'name' and 'sex'
val schema4 = createSchema("name sex")
val data4   = List(Row("suminb", "male"))
val rdd4    = spark.sparkContext.parallelize(data4)
val df4     = spark.createDataFrame(rdd4, schema4)

// ----------------------------------------------------------------------

//   name addr
// + name sex addr
// +---------+------+
// |     name|  addr|
// +---------+------+
// |    cwkim|unjung|
// |yeonghoey|jamsil|
// +---------+------+

//   name addr
// * name sex addr
spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// +---------+------+----+
// |     name|  addr| sex|
// +---------+------+----+
// |    cwkim|unjung|male|
// |yeonghoey|jamsil|null|
// +---------+------+----+

//   name addr
// + name addr sex
// +---------+-------+
// |     name|   addr|
// +---------+-------+
// |      sub|yangjae|
// |yeonghoey| jamsil|
// +---------+-------+

//   name addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{1,3}").show()
// +---------+-------+----+
// |     name|   addr| sex|
// +---------+-------+----+
// |      sub|yangjae|male|
// |yeonghoey| jamsil|null|
// +---------+-------+----+

//   name sex addr
// + name addr sex
// +-----+----+-------+
// | name| sex|   addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// |  sub|male|yangjae|
// +-----+----+-------+

//   name sex addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{2,3}").show()
// +-----+----+-------+
// | name| sex|   addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// |  sub|male|yangjae|
// +-----+----+-------+

//   name addr sex
// + name sex
// +------+-------+----+
// |  name|   addr| sex|
// +------+-------+----+
// |   sub|yangjae|male|
// |suminb|   null|male|
// +------+-------+----+

//   name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{3,4}").show()
// +------+-------+----+
// |  name|   addr| sex|
// +------+-------+----+
// |   sub|yangjae|male|
// |suminb|   null|male|
// +------+-------+----+

//   name addr
// + name sex
// +---------+------+
// |     name|  addr|
// +---------+------+
// |yeonghoey|jamsil|
// |   suminb|  null|
// +---------+------+

//   name addr
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,4}").show()
// +---------+------+----+
// |     name|  addr| sex|
// +---------+------+----+
// |yeonghoey|jamsil|null|
// |   suminb|  null|male|
// +---------+------+----+

//   name addr
//   name sex addr
//   name addr sex
// + name sex
// +---------+-------+
// |     name|   addr|
// +---------+-------+
// |    cwkim| unjung|
// |      sub|yangjae|
// |yeonghoey| jamsil|
// |   suminb|   null|
// +---------+-------+

//   name addr
//   name sex addr
//   name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,2,3,4}").show()
// +---------+-------+----+
// |     name|   addr| sex|
// +---------+-------+----+
// |    cwkim| unjung|male|
// |      sub|yangjae|male|
// |yeonghoey| jamsil|null|
// |   suminb|   null|male|
// +---------+-------+----+
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

val schema1 = StructType(List(StructField("x'", StringType)))
val data1   = List(Row("yeonghoey"))
val rdd1    = spark.sparkContext.parallelize(data1)
val df1     = spark.createDataFrame(rdd1, schema1)

val schema2 = StructType(List(StructField("x'", IntegerType)))
val data2   = List(Row(31))
val rdd2    = spark.sparkContext.parallelize(data2)
val df2     = spark.createDataFrame(rdd2, schema2)

// ----------------------------------------------------------------------

// Caused by: java.lang.UnsupportedOperationException: Unimplemented type: StringType

spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and IntegerType

