spark累加器使用
环境:
ubuntu16.04 64
伪分布式
使用的spark是2.3.1
scala 2.11.8
参考连接:
https://blog.csdn.net/android_xue/article/details/79780463#commentsedit
注意,这篇博客是对上述参考链接的总结和概括.
一句话讲明,累加器干嘛的?
统计slava机中的数据的总数量的.
上代码,
1.不使用累加器的代码AccumulatorTest.scala:
import scala.collection.Mapimport scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.execution.datasources.text
object AccumulatorTest
{
def main(args:Array[String]):Unit=
{
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getRootLogger().setLevel(Level.ERROR)
//这里是用来抑制一大堆log信息的.
val spark = SparkSession.builder
.appName("Intro").config("spark.master", "local")
.getOrCreate();
spark.sparkContext.
setLogLevel("ERROR")
val sc = spark.sparkContext
val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")
var i=0;
val result=linesRDD.map(s=>{i+=1
})
result.collect();
println("word lines is:"+i)
sc.stop()
}
}
1.启动Hadoop的HDFS系统
2.scalac AccumulatorTest1.scala
3.scala AccumulatorTest
2.使用累加器的代码AccumulatorTest2.scala:
import scala.collection.Mapimport scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.execution.datasources.text
object AccumulatorTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getRootLogger().setLevel(Level.ERROR)
//这里是用来抑制一大堆log信息的.
val spark = SparkSession.builder
.appName("Intro").config("spark.master", "local")
.getOrCreate();
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")
val accumulator=sc.accumulator(0); //创建accumulator并初始化为0
val result=linesRDD.map(s=> {
accumulator.add(1)//有一条数据就增加1
s//返回s给result,意思也就是,把word.txt中的内容赋值(可能会打乱顺序)给result
})
result.collect().foreach(println);
println("words lines is :"+accumulator.value)
sc.stop()
}
}
1.启动Hadoop的HDFS系统
2.scalac AccumulatorTest2.scala
3.scala AccumulatorTest
以上是 spark累加器使用 的全部内容, 来源链接: www.h5w3.com/116363.html