spark基础笔记

1、从Seq中构造DataFrame

1
2
3
4
5
6
val df = Seq(
(1,12345678,"this is a test"),
(1,23456789, "another test"),
(2,2345678,"2nd test"),
(2,1234567, "2nd another test")
).toDF("id","timestamp","data")

结果为:

1
2
3
4
5
6
7
8
+---+---------+----------------+
| id|timestamp| data|
+---+---------+----------------+
| 1| 12345678| this is a test|
| 1| 23456789| another test|
| 2| 2345678| 2nd test|
| 2| 1234567|2nd another test|
+---+---------+----------------+

2、DataFrame的join

spark datafrme提供了强大的JOIN操作。但是在操作的时候,经常发现会碰到重复列的问题。如下:如分别创建两个DF,其结果如下:

1
2
3
4
val df = sc.parallelize(Array(
("one", "A", 1), ("one", "B", 2), ("two", "A", 3), ("two", "B", 4)
)).toDF("key1", "key2", "value")
df.show()

1
2
3
4
5
6
7
8
+----+----+-----+
|key1|key2|value|
+----+----+-----+
| one| A| 1|
| one| B| 2|
| two| A| 3|
| two| B| 4|
+----+----+-----+
1
2
3
4
val df2 = sc.parallelize(Array(
("one", "A", 5), ("two", "A", 6)
)).toDF("key1", "key2", "value2")
df2.show()
1
2
3
4
5
6
+----+----+------+
|key1|key2|value2|
+----+----+------+
| one| A| 5|
| two| A| 6|
+----+----+------+

对其进行JOIN操作之后,发现多产生了KEY1和KEY2这样的两个字段。

1
2
val joined = df.join(df2, df("key1") === df2("key1") && df("key2") === df2("key2"), "left_outer")
joined.show()

1
2
3
4
5
6
7
8
+----+----+-----+----+----+------+
|key1|key2|value|key1|key2|value2|
+----+----+-----+----+----+------+
| two| A| 3| two| A| 6|
| one| A| 1| one| A| 5|
| two| B| 4|null|null| null|
| one| B| 2|null|null| null|
+----+----+-----+----+----+------+

假如这两个字段同时存在,那么就会报错,如下:org.apache.spark.sql.AnalysisException: Reference ‘key2’ is ambiguous
因此,网上有很多关于如何在JOIN之后删除列的,后来经过仔细查找,才发现通过修改JOIN的表达式,完全可以避免这个问题。而且非常简单。主要是通过Seq这个对象来实现。

1
df.join(df2, Seq[String]("key1", "key2"), "left_outer").show()

1
2
3
4
5
6
7
8
+----+----+-----+------+
|key1|key2|value|value2|
+----+----+-----+------+
| two| A| 3| 6|
| one| A| 1| 5|
| two| B| 4| null|
| one| B| 2| null|
+----+----+-----+------+

3、DataFrame选中N列

有时候我们需要从一个DataFrame中选出N列,这个选中的列名在一个list中,如下的伪代码表示:

1
2
var columnNames = getColumns(x) // Returns a List[Column]
df.select(columns) //trying to get

一种可行的方式如下:

1
val result = df.select(columnNames.head, columnNames.tail: _*)

一种简洁的方式:

1
2
import org.apache.spark.sql.functions.col
df.select(columnNames.map(col): _*)

4、spark的DataFrame一列分割成多列

背景:在工作需求中我目前遇到一个将一列展开成1900多列的一个任务,每个列的列名和其在表中那一列的数据一一对应。
逻辑:将该列先split,然后在通过位置索引取值然后加上对应的列名进行展开。
首先有如下数据feature_df:
bill_feature中使用,分割之后为一个列表,大致有1900维度的数据,需要将其展开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
--------+--------+--------------------+
|order_id|math_num| bill_feature|
+--------+--------+--------------------+
|282101x3| 847|0.105263157894736...|
|383107b1| 525|0.0,0.0,0.0,0.0,0...|
|493408c3| 630|0.208333333333333...|
|413001d9| 490|0.166666666666666...|
|563711e2| 602|0.148148148148148...|
|539906f2| 259|0.030303030303030...|
|466924g2| 1071|0.238095238095238...|
|258430f9| 1071|0.229729729729729...|
|370711a6| 959|0.246753246753246...|
|645012c2| 931|0.395833333333333...|
|574415t9| 588|0.153846153846153...|
|402916y9| 1022|0.220930232558139...|
|430205z9| 469|0.111111111111111...|
|266225g7| 693|0.090909090909090...|
|614527o5| 532|0.0625,1.57697780...|
|180837m9| 350|0.090909090909090...|
|381138k5| 854|0.285714285714285...|
|549847a4| 602|0.0,0.0,0.0,0.0,0...|
|491850p7| 637|0.44,0.3512465171...|
|380000i0| 833|0.120689655172413...|
+--------+--------+--------------------+

有如下两种方式:

  • 使用withColumn(在1900多维度的情况下速度及其慢,不建议推荐,在进行其他复杂逻辑列的添加情况下使用且添加列少)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.{DataFrame, SparkSession, functions}
    result_df = result_df.withColumn("feature_list", functions.split(result_df.col("bill_feature"), ","))
    for(feature_name<-feature_name_id_map.keySet){
    val add_col_udf=functions.udf((feature_list:Seq[String])=>{
    val index = feature_name_id_map(feature_name)
    feature_list(index).toString.toDouble
    })
    result_df = result_df.withColumn(feature_name,add_col_udf(result_df.col("feature_list")))
    }
  • 使用select的方式(面对1900多维度情况下,很快,首选推荐)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.{DataFrame, SparkSession, functions}
    import org.apache.spark.sql.types._
    val feature_id_name_map = feature_name_id_map.map(x => (x._2, x._1))
    result_df = result_df.withColumn("feature_list", functions.split(result_df.col("bill_feature"), ","))
    result_df = result_df.select(
    functions.col("order_id")
    +: functions.col("math_num")
    +: (0 until feature_name_id_map.size).map(i => functions.col("feature_list")(i).alias(feature_id_name_map(i)).cast(DoubleType)): _*
    )