Apache Spark

Table of Contents

Overview

Reference

Spark API

File Input pattern

http://stackoverflow.com/questions/31782763/how-to-use-regex-to-include-exclude-some-input-files-in-sc-textfile

You can use commas as delimiters of multiple patterns:

sc.textFile("/user/Orders/2015072[7-9]*,/user/Orders/2015073[0-1]*")

Which is same as:

sc.textFile("/user/Orders/201507{2[7-9],3[0-1]}*")

Spark SQL

There seems to be a newly documented page: https://docs.databricks.com/spark/latest/spark-sql/index.html

Spark SQL's query languages is based on HiveQL.

Examples

-- basics
select col1 from t1 where col1 > 10
select * from t1 limit 5

-- plan
explain select * from t1

-- group by
select col1, count(*) from t1 group by col1
select col1, sum(col2) from t1 group by col1

-- 'group by' can be specified with position numbers(1-indexed from selected columns)
select col1, sum(col2) from t1 group by 1

-- distinct
select col1, col2 from t1
1 3
1 3
1 4
2 5

select distinct col1, col2 from t1
1 3
1 4
2 5

select distinct col1 from t1
1
2

-- distinct can be used within 'count'
select col1, count(distinct col2) from t1 group by col1

-- having
select col1 from t1 group by col1 having sum(col2) > 10
-- same as above
select col1 from (select col1, sum(col2) as col2sum from t1 group by col1) t2 where t2.col2sum > 10

-- order by
select col1 from t1 order by col1 desc

-- join
select a.* from a join b on (a.id = b.id and a.department = b.department)
select a.* from a left outer join b on (a.id <> b.id)
select a.val, b.val, c.val from a join b on (a.key = b.key1) join c on (c.key = b.key1)

-- joins occur before where clauses, this is a bad way:
select a.val, b.val from a left outer join b on (a.key=b.key)
where a.ds='2009-07-07' and b.ds='2009-07-07'

-- following is better:
select a.val, b.val from a left outer join b
on (a.key=b.key and b.ds='2009-07-07' and a.ds='2009-07-07')

-- union
select u.id, actions.date
from (
  select av.uid as uid
  from action_video av
  where av.date = '2008-06-03'
  union all
  select ac.uid as uid
  from action_comment ac
  where ac.date = '2008-06-03'
) actions join users u on (u.id = actions.uid)

-- if, case
select if(field in (0, 1), 'ab', 'c') from tbl

select
  case field
  when 0 then 'a'
  when 1 then 'b'
  else 'c'
  end
from tbl

-- subqueries
select col
from (
  select a+b as col
  from t1
) t2

select *
from a
where a.a in (select foo from b);

select a
from t1
where exists (select b from t2 where t1.x = t2.y)

-- common table expression
with q1 as (select key from src where key = '5')
select *
from q1;

with q1 as (select * from src where key= '5'),
     q2 as (select * from src s2 where key = '4')
select * from q1 union all select * from q2;

-- create table as select example
create table s2 as
with q1 as ( select key from src where key = '4')
select * from q1;

-- create or replace temporary view is recommended instead of just 'create table'
create or replace temporary view foo as select * from t1 limit 1

-- view example
create view v1 as
with q1 as ( select key from src where key = '5')
select * from q1;

-- lateral view
select adid, count(1)
from pageads lateral view explode(adid_list) adtable as adid
group by adid

select k, v
from tbl lateral view explode(kvmap) kvs as k, v
group by k

select mycol1, mycol2 from basetable
lateral view explode(col1) mytable1 as mycol1
lateral view explode(col2) mytable2 as mycol2;

select * from src lateral view outer explode(array()) c as a limit 10;

-- time range (t is of timestamp type)
select t from table1
where t > to_utc_timestamp("2016-12-25", "UTC")
and t < to_utc_timestamp("2016-12-25 12:00", "UTC")

-- timestamp to string
select date_format(t, 'YYYY-MM-dd') from tbl

-- select field with special characters(use backtick)
select `@time` from t1

-- concat_ws to make an array as a string
-- map_values to make a map as an array
-- <array of structtype>.<field> goes into an <array of field>
select concat_ws(", ", map_values(items).price)
from Items

Recipes

  1. Referencing query results as DataFrame in spark application

Example codes

https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/sql

Terminology

Topics

How schema merging works

There were no clear documentation about how the merging schema across files works. There are only some the general guidelines of growing schema: Append only, no modification.

But I was curious that if appending is OK, can I just reorder the StructFields? Because StructType takes them as a List, it may. But as the data source can be json, which doesn't care the order of fields, it may not. So I tested.

The conclusion is following:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

def createSchema(schema: String): StructType = {
    val fieldNames = schema.split(" ")
    val fields = fieldNames.map { name =>
        StructField(name, StringType)
    }
    StructType(fields)
}

// A schema of 'name' and 'addr'
val schema1 = createSchema("name addr")
val data1   = List(Row("yeonghoey", "jamsil"))
val rdd1    = spark.sparkContext.parallelize(data1)
val df1     = spark.createDataFrame(rdd1, schema1)
df1.write.mode("overwrite").parquet("data1")

// Add 'sex' field in between the fields of schema1.
val schema2 = createSchema("name sex addr")
val data2   = List(Row("cwkim", "male", "unjung"))
val rdd2    = spark.sparkContext.parallelize(data2)
val df2     = spark.createDataFrame(rdd2, schema2)
df2.write.mode("overwrite").parquet("data2")

// Append 'sex' field to the schema1.
val schema3 = createSchema("name addr sex")
val data3   = List(Row("sub", "yangjae", "male"))
val rdd3    = spark.sparkContext.parallelize(data3)
val df3     = spark.createDataFrame(rdd3, schema3)
df3.write.mode("overwrite").parquet("data3")

// A schema of 'name' and 'sex'
val schema4 = createSchema("name sex")
val data4   = List(Row("suminb", "male"))
val rdd4    = spark.sparkContext.parallelize(data4)
val df4     = spark.createDataFrame(rdd4, schema4)
df4.write.mode("overwrite").parquet("data4")

// ----------------------------------------------------------------------

//   name addr
// + name sex addr
spark.read.parquet("data{1,2}").show()
// +---------+------+
// |     name|  addr|
// +---------+------+
// |    cwkim|unjung|
// |yeonghoey|jamsil|
// +---------+------+

//   name addr
// * name sex addr
spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// +---------+------+----+
// |     name|  addr| sex|
// +---------+------+----+
// |    cwkim|unjung|male|
// |yeonghoey|jamsil|null|
// +---------+------+----+

//   name addr
// + name addr sex
spark.read.parquet("data{1,3}").show()
// +---------+-------+
// |     name|   addr|
// +---------+-------+
// |      sub|yangjae|
// |yeonghoey| jamsil|
// +---------+-------+

//   name addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{1,3}").show()
// +---------+-------+----+
// |     name|   addr| sex|
// +---------+-------+----+
// |      sub|yangjae|male|
// |yeonghoey| jamsil|null|
// +---------+-------+----+

//   name sex addr
// + name addr sex
spark.read.parquet("data{2,3}").show()
// +-----+----+-------+
// | name| sex|   addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// |  sub|male|yangjae|
// +-----+----+-------+

//   name sex addr
// * name addr sex
spark.read.option("mergeSchema", true).parquet("data{2,3}").show()
// +-----+----+-------+
// | name| sex|   addr|
// +-----+----+-------+
// |cwkim|male| unjung|
// |  sub|male|yangjae|
// +-----+----+-------+

//   name addr sex
// + name sex
spark.read.parquet("data{3,4}").show()
// +------+-------+----+
// |  name|   addr| sex|
// +------+-------+----+
// |   sub|yangjae|male|
// |suminb|   null|male|
// +------+-------+----+

//   name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{3,4}").show()
// +------+-------+----+
// |  name|   addr| sex|
// +------+-------+----+
// |   sub|yangjae|male|
// |suminb|   null|male|
// +------+-------+----+

//   name addr
// + name sex
spark.read.parquet("data{1,4}").show()
// +---------+------+
// |     name|  addr|
// +---------+------+
// |yeonghoey|jamsil|
// |   suminb|  null|
// +---------+------+

//   name addr
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,4}").show()
// +---------+------+----+
// |     name|  addr| sex|
// +---------+------+----+
// |yeonghoey|jamsil|null|
// |   suminb|  null|male|
// +---------+------+----+

//   name addr
//   name sex addr
//   name addr sex
// + name sex
spark.read.parquet("data{1,2,3,4}").show()
// +---------+-------+
// |     name|   addr|
// +---------+-------+
// |    cwkim| unjung|
// |      sub|yangjae|
// |yeonghoey| jamsil|
// |   suminb|   null|
// +---------+-------+

//   name addr
//   name sex addr
//   name addr sex
// * name sex
spark.read.option("mergeSchema", true).parquet("data{1,2,3,4}").show()
// +---------+-------+----+
// |     name|   addr| sex|
// +---------+-------+----+
// |    cwkim| unjung|male|
// |      sub|yangjae|male|
// |yeonghoey| jamsil|null|
// |   suminb|   null|male|
// +---------+-------+----+
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._

val schema1 = StructType(List(StructField("x'", StringType)))
val data1   = List(Row("yeonghoey"))
val rdd1    = spark.sparkContext.parallelize(data1)
val df1     = spark.createDataFrame(rdd1, schema1)
df1.write.mode("overwrite").parquet("data1")

val schema2 = StructType(List(StructField("x'", IntegerType)))
val data2   = List(Row(31))
val rdd2    = spark.sparkContext.parallelize(data2)
val df2     = spark.createDataFrame(rdd2, schema2)
df2.write.mode("overwrite").parquet("data2")

// ----------------------------------------------------------------------

spark.read.parquet("data{1,2}").show()
// Caused by: java.lang.UnsupportedOperationException: Unimplemented type: StringType

spark.read.option("mergeSchema", true).parquet("data{1,2}").show()
// Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and IntegerType

How-to

Links