spark第二阶段

本章节主要是介绍了我在学习spark阶段的几个小知识点,分享给大家。

spark在window下运行

  • 在window下加载数据整理及打印
    • 注意window下运行spark需要下载一个hadoop的winutils.exe文件,并代码引入否则会报io异常,见附件
    • 注意切割的时候最好用单引号,如果用双引号则有的字符需要转义例如竖线
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def main(args: Array[String]): Unit = {
//ID,url,host,权重,tag,App
System.setProperty("hadoop.home.dir", "F:/hadoop")
//初始化并配置spark
val conf = new SparkConf()
conf.setAppName("DPI")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("F:/infinibase/DPIDATA/input01.txt")
//对每条数据切分,在rdd中以数组形式保存
val out = lines.map(line => line.split(','))
out .foreach(o=>{println(o.(0)+"---"+o.(1)+"---"+o.(2)+"---"+o.(3)+"---"+o.(4)+"---"+o.(5)+"---"+o.(6))})
}

aggregateByKey

  • spark RDD算子:aggregateByKey
    • 不同于reduceByKey可以返回不同数据结构的RDD
    • 例如可以把RDD(String,(Double,String))转化成RDD(String,Map(String:Double))如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//union的意思是纵向集合在一块
//把union在一起的数据集转换成(String,Map[String:double]),并把key相同的放在一起
def getQuantity(quantityUnion:RDD[(String,(Double,String))]): RDD[(String,Map[String,Double])] ={
//aggregateByKey函数的初始值
val initialCount :Map[String,Double] = Map()
//聚合在一起以(String,Map(String:Double)) 的方式返回
val quantity = quantityUnion.aggregateByKey(initialCount)(tupleToMap, distinctMap)
quantity
}
//不同分区传入的map合并在一起
val distinctMap = (line1:Map[String,Double],line2:Map[String,Double]) =>{
line1 ++ line2
}
//把元组转换成map
val tupleToMap = (line2:Map[String,Double],line1:(Double,String)) => {
line2 + (line1._2 -> line1._1)
}
  • aggregateByKey(_),(_,_)
    • 以这种方式传入参数,其中第一个是相对空值:int => 0 , map 就是一个空的map。。。
    • 第二个传入一个函数里面有两个参数,一个是转换后的类型,一个是转换前的类型,在里面实现类型的转化及相加
    • 第三个表示传入两个转换后的参数,使在分布式时不同分区的结果加在一起。

scala编写的spark程序打成jar到linux运行脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/bin/sh
baseDirForScriptSelf=$(cd "$(dirname "$0")"; pwd)
appPath="$baseDirForScriptSelf/../app"
config="$baseDirForScriptSelf/../config/config.json"
url_config="$baseDirForScriptSelf/../config/UrlConfig.json"
masterIP="node1"
# --conf spark.default.parallelism=200 \
spark-submit \
--master spark://$masterIP:7077 \
--class IFB.TelComDpiExtensionWorker \
--executor-memory 3G \
--total-executor-cores 20 \
--executor-cores 2 \
--driver-memory 2G \
## 此配置表示引入外部的log4j配置文件
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///home/soft/app/userApp/ifb_dpi_marketing/config/log4j.properties \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
$appPath/dpi-marketing-1.0.jar $config $url_config
  • 运行的时候,如果想在外部用textFile 函数读取数据,则需要在前面加上File:/// 否则默认在hdfs中读取。
张冲 wechat
欢迎扫一扫上面的微信关注我,一起交流!
坚持原创技术分享,您的支持将鼓励我继续创,点击打赏!