1、从Seq中构造DataFrame
1 | val df = Seq( |
结果为:1
2
3
4
5
6
7
8+---+---------+----------------+
| id|timestamp| data|
+---+---------+----------------+
| 1| 12345678| this is a test|
| 1| 23456789| another test|
| 2| 2345678| 2nd test|
| 2| 1234567|2nd another test|
+---+---------+----------------+
2、DataFrame的join
spark datafrme提供了强大的JOIN操作。但是在操作的时候,经常发现会碰到重复列的问题。如下:如分别创建两个DF,其结果如下:1
2
3
4val df = sc.parallelize(Array(
("one", "A", 1), ("one", "B", 2), ("two", "A", 3), ("two", "B", 4)
)).toDF("key1", "key2", "value")
df.show()
1 | +----+----+-----+ |
1 | val df2 = sc.parallelize(Array( |
1 | +----+----+------+ |
对其进行JOIN操作之后,发现多产生了KEY1和KEY2这样的两个字段。1
2val joined = df.join(df2, df("key1") === df2("key1") && df("key2") === df2("key2"), "left_outer")
joined.show()
1 | +----+----+-----+----+----+------+ |
假如这两个字段同时存在,那么就会报错,如下:org.apache.spark.sql.AnalysisException: Reference ‘key2’ is ambiguous
因此,网上有很多关于如何在JOIN之后删除列的,后来经过仔细查找,才发现通过修改JOIN的表达式,完全可以避免这个问题。而且非常简单。主要是通过Seq这个对象来实现。1
df.join(df2, Seq[String]("key1", "key2"), "left_outer").show()
1 | +----+----+-----+------+ |
3、DataFrame选中N列
有时候我们需要从一个DataFrame中选出N列,这个选中的列名在一个list中,如下的伪代码表示:1
2var columnNames = getColumns(x) // Returns a List[Column]
df.select(columns) //trying to get
一种可行的方式如下:1
val result = df.select(columnNames.head, columnNames.tail: _*)
一种简洁的方式:1
2import org.apache.spark.sql.functions.col
df.select(columnNames.map(col): _*)
4、spark的DataFrame一列分割成多列
背景:在工作需求中我目前遇到一个将一列展开成1900多列的一个任务,每个列的列名和其在表中那一列的数据一一对应。
逻辑:将该列先split,然后在通过位置索引取值然后加上对应的列名进行展开。
首先有如下数据feature_df
:
bill_feature中使用,
分割之后为一个列表,大致有1900维度的数据,需要将其展开。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24--------+--------+--------------------+
|order_id|math_num| bill_feature|
+--------+--------+--------------------+
|282101x3| 847|0.105263157894736...|
|383107b1| 525|0.0,0.0,0.0,0.0,0...|
|493408c3| 630|0.208333333333333...|
|413001d9| 490|0.166666666666666...|
|563711e2| 602|0.148148148148148...|
|539906f2| 259|0.030303030303030...|
|466924g2| 1071|0.238095238095238...|
|258430f9| 1071|0.229729729729729...|
|370711a6| 959|0.246753246753246...|
|645012c2| 931|0.395833333333333...|
|574415t9| 588|0.153846153846153...|
|402916y9| 1022|0.220930232558139...|
|430205z9| 469|0.111111111111111...|
|266225g7| 693|0.090909090909090...|
|614527o5| 532|0.0625,1.57697780...|
|180837m9| 350|0.090909090909090...|
|381138k5| 854|0.285714285714285...|
|549847a4| 602|0.0,0.0,0.0,0.0,0...|
|491850p7| 637|0.44,0.3512465171...|
|380000i0| 833|0.120689655172413...|
+--------+--------+--------------------+
有如下两种方式:
使用withColumn(在1900多维度的情况下速度及其慢,不建议推荐,在进行其他复杂逻辑列的添加情况下使用且添加列少)
1
2
3
4
5
6
7
8
9import org.apache.spark.sql.{DataFrame, SparkSession, functions}
result_df = result_df.withColumn("feature_list", functions.split(result_df.col("bill_feature"), ","))
for(feature_name<-feature_name_id_map.keySet){
val add_col_udf=functions.udf((feature_list:Seq[String])=>{
val index = feature_name_id_map(feature_name)
feature_list(index).toString.toDouble
})
result_df = result_df.withColumn(feature_name,add_col_udf(result_df.col("feature_list")))
}使用select的方式(面对1900多维度情况下,很快,首选推荐)
1
2
3
4
5
6
7
8
9import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.sql.types._
val feature_id_name_map = feature_name_id_map.map(x => (x._2, x._1))
result_df = result_df.withColumn("feature_list", functions.split(result_df.col("bill_feature"), ","))
result_df = result_df.select(
functions.col("order_id")
+: functions.col("math_num")
+: (0 until feature_name_id_map.size).map(i => functions.col("feature_list")(i).alias(feature_id_name_map(i)).cast(DoubleType)): _*
)