当前位置: 首页 > news >正文

asp网站免费空间网络营销概念

asp网站免费空间,网络营销概念,唐山房产网站建设,中国北京出啥大事了1. 共享变量 Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。 累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象&#xff0c…

1. 共享变量

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。

累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象,相当于semijoin中的DistributedCache 。

共享变量出现的原因:

我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object TestAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test acc")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),3)val count = rdd.map(t=> 1).reduce(_+_)println(count)//    val acc = sc.longAccumulator("count")
//
//    rdd.foreach(t=>{
//      acc.add(1)
//    })
//
//    println(acc.value)//    println(rdd.count())}
}

原因总结:

对于executor端,driver端的变量是外部变量。

excutor端修改了变量count,根本不会让driver端跟着修改。如果想在driver端得到executor端修改的变量,需要用累加器实现。

当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低,也可能会造成内存溢出。使用广播变量以后,在每个Executor中只有一个Driver端变量副本,在一个executor中的并行执行的task任务会引用该一个变量副本即可,需要广播变量提高运行效率。

2. 累加器

累加器的执行流程:

通过SparkContext创建一个累加器并初始化。当driver端将任务分发给executor时,每个executor会接收一个任务和一个引用到该累加器的副本。每个executor上的任务可以调用累加器的add方法来增加累加器的值,这些操作是线程安全的,因为每个任务都会在自己的executor线程中执行。当每个任务完成,executor将累加器的更新值发送到driver端进行聚合过程,得到最终的聚合结果。

累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。

用法:

var acc: LongAccumulator = sc.longAccumulator // 创建累加器acc.add(1) // 累加器累加acc.value // 获取累加器的值

累加器的简单使用

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object WordCountWithAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test acc")conf.setMaster("local[*]")val sc = new SparkContext(conf)val acc = sc.longAccumulator("bad word")sc.textFile("data/a.txt").flatMap(_.split(" ")).filter(t=>{if(t.equals("shit")){acc.add(1)false}elsetrue}).map((_,1)).reduceByKey(_+_).foreach(println)println("invalid words:"+acc.value)}
}

3. 广播变量

ip转换工具

public class IpUtils {public static Long ip2Long(String ip) {String fragments[] = ip.split("[.]");Long ipNum = 0L;for(int i=0;i<fragments.length;i++) {ipNum = Long.parseLong(fragments[i]) | ipNum << 8L;}return ipNum;}
}

ip案例代码

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ip")conf.setMaster("local[*]")val sc = new SparkContext(conf)val accessRDD = sc.textFile("data/access.log").map(t=>{val strs = t.split("\\|")IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{val strs = t.split("\\|")(strs(2).toLong,strs(3).toLong,strs(6)+strs(7))}).collect()//    accessRDD.map(ip=>{
//      ipRDD.filter(t=>{
//        ip>= t._1 && ip<= t._2
//      })
//    }).foreach(println)accessRDD.map(ip=>{ipArr.find(t=>{t._1<= ip && t._2>=ip}) match {case Some(v) => (v._3,1)case None => ("unknow",1)}//option}).reduceByKey(_+_).foreach(println)}
}

使用广播变量可以使程序高效地将一个很大的只读数据发送到executor节点,会将广播变量放到executor的BlockManager中,而且对每个executor节点只需要传输一次,该executor节点的多个task可以共用这一个。

用法:

val broad: Broadcast[List[Int]] = sc.broadcast(list) // 把driver端的变量用广播变量包装broad.value // 从广播变量获取包装的数据,用于计算

我们可能遇到这样的问题:如果我们需要广播的数据为100M,如果需要driver端亲自向每个executor端发送100M的数据,在工作中executor节点的个数可能是很多的,比如是200个,这意味着driver端要发送20G的数据,这对于driver端的压力太大了。所以要用到比特洪流技术。

就是说driver端不必向每个executor发送一份完整的广播变量的数据,而是将一份广播变量切分成200份,发送给两百个executor,然后200个executor间通过BlockManager中的组件transferService与其他executor通信,进行完整的数据。

这样driver端只需要发送一份广播变量的数据,压力就会小很多,而且其他executor也都拿到了这一份广播变量的数据 。

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ip")conf.setMaster("local[*]")val sc = new SparkContext(conf)val accessRDD = sc.textFile("data/access.log").map(t=>{val strs = t.split("\\|")IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{val strs = t.split("\\|")(strs(2).toLong,strs(3).toLong,strs(6)+strs(7))}).collect()val bs = sc.broadcast(ipArr)//    accessRDD.map(ip=>{//      ipRDD.filter(t=>{//        ip>= t._1 && ip<= t._2//      })//    }).foreach(println)accessRDD.map(ip=>{bs.value.find(t=>{t._1<= ip && t._2>=ip}) match {case Some(v) => (v._3,1)case None => ("unknow",1)}//option}).reduceByKey(_+_).foreach(println)}
}

为了提高查找的效率,可以使用二分法查找代码。将时间复杂度由O(n)优化到了O(logn)。

      val start = System.currentTimeMillis()val res =  (binarySearch(ip,bs.value),1)
//      val res = bs.value.find(t=>{
//        t._1<= ip && t._2>=ip
//      }) match {
//        case Some(v) => (v._3,1)
//        case None => ("unknow",1)
//      }val end = System.currentTimeMillis()acc.add(end-start)

累加器实现运行时间的统计

http://www.wooajung.com/news/22438.html

相关文章:

  • 网站网站建设设计公司做百度关键词排名的公司
  • 刚做的网站为什么搜索不到搭建网站多少钱
  • 酒店网站开发aso优化的主要内容
  • 室内设计师联盟官网入口google搜索引擎优化
  • wordpress如何开启多站点关键词首页排名代做
  • php如何做视频网站优化seo公司哪家好
  • 网站建设核心六年级上册数学优化设计答案
  • 千里做他千百度网站app推广多少钱一单
  • 基层政府网站建设网络营销与直播电商学什么
  • WordPress适合建大型网站吗sem账户托管外包
  • asp网站开发教程北京seo优化公司
  • 厦门高端模板建站知乎关键词搜索排名
  • 工商企业查询网seo外链增加
  • 网站网站建设2021年网络热点舆论
  • 摄影作品网站app十大排名凡科建站快车
  • 塔城地区建设工程信息网站网站页面的优化
  • 乌鲁木齐做网站的公司北京seo如何排名
  • 做网站公司官网百度网盘云资源搜索引擎
  • 高端建设网站公司互联网营销师
  • 做视频直播网站需要办理什么资质免费推广
  • 海口网站排名推广营销网站建设方案
  • wordpress建博客网站吗推广普通话手抄报文字内容
  • 自己怎么做返利网站吗自媒体135网站免费下载安装
  • 我想网上做网站武汉seo认可搜点网络
  • 荥阳企业网站建设北京seo外包公司要靠谱的
  • 顺庆区城乡规划建设局门户网站嘉兴百度快照优化排名
  • 查权重工具重庆seo排名软件
  • 微信网站平台怎么建立韩国网站
  • 欧美真做的大尺寸电影网站手机百度网盘下载慢怎么解决
  • 把自己做的网页发布到网站宁波seo推广