spark累加器使用

环境:

ubuntu16.04 64

伪分布式

使用的spark是2.3.1

scala 2.11.8

参考连接:

https://blog.csdn.net/android_xue/article/details/79780463#commentsedit

注意,这篇博客是对上述参考链接的总结和概括.

一句话讲明,累加器干嘛的?

统计slava机中的数据的总数量的.

上代码,

1.不使用累加器的代码AccumulatorTest.scala:

import scala.collection.Map

import scala.collection.mutable.ArrayBuffer

import scala.util.Random

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.ml.recommendation.{ALS, ALSModel}

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import org.apache.spark.sql.functions._

import org.apache.log4j.Logger

import org.apache.log4j.Level

import org.apache.spark.sql.execution.datasources.text

object AccumulatorTest

{

def main(args:Array[String]):Unit=

{

Logger.getLogger("org").setLevel(Level.OFF)

Logger.getLogger("akka").setLevel(Level.OFF)

Logger.getRootLogger().setLevel(Level.ERROR)

//这里是用来抑制一大堆log信息的.

val spark = SparkSession.builder

.appName("Intro").config("spark.master", "local")

.getOrCreate();

spark.sparkContext.

setLogLevel("ERROR")

val sc = spark.sparkContext

val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")

var i=0;

val result=linesRDD.map(s=>{i+=1

})

result.collect();

println("word lines is:"+i)

sc.stop()

}

}

1.启动Hadoop的HDFS系统

2.scalac AccumulatorTest1.scala

3.scala AccumulatorTest

2.使用累加器的代码AccumulatorTest2.scala:

import scala.collection.Map

import scala.collection.mutable.ArrayBuffer

import scala.util.Random

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.ml.recommendation.{ALS, ALSModel}

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import org.apache.spark.sql.functions._

import org.apache.log4j.Logger

import org.apache.log4j.Level

import org.apache.spark.sql.execution.datasources.text

object AccumulatorTest {

def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.OFF)

Logger.getLogger("akka").setLevel(Level.OFF)

Logger.getRootLogger().setLevel(Level.ERROR)

//这里是用来抑制一大堆log信息的.

val spark = SparkSession.builder

.appName("Intro").config("spark.master", "local")

.getOrCreate();

val sc = spark.sparkContext

sc.setLogLevel("ERROR")

val linesRDD=sc.textFile("hdfs://master:9000/test/word.txt")

val accumulator=sc.accumulator(0); //创建accumulator并初始化为0

val result=linesRDD.map(s=> {

accumulator.add(1)//有一条数据就增加1

s//返回s给result,意思也就是,把word.txt中的内容赋值(可能会打乱顺序)给result

})

result.collect().foreach(println);

println("words lines is :"+accumulator.value)

sc.stop()

}

}

1.启动Hadoop的HDFS系统

2.scalac AccumulatorTest2.scala

3.scala AccumulatorTest

以上是 spark累加器使用 的全部内容, 来源链接: www.h5w3.com/116363.html

回到顶部