文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Spark刷爆磁盘与Java弱引用的关系

2024-12-03 17:01

关注

 

一 引用基本概念

 

如下面,定义两个变量num,str,存储模型大致如下图:

  1. int num = 6; 
  2. String str = “浪尖聊大数据”; 

 

变量num值直接从6修改为了8;变量str只是修改了其保存的地址,从0x88修改为0x86,对象 “浪尖聊大数据 ”本身还在内存中,并没有被修改。只是内存中新增了对象 “浪尖是帅哥”。

二 值传递&引用传递

 

举例说明引用传递和值传递:

  1. 第一个栗子:基本类型 
  2. void foo(int value) { 
  3.     value = 88; 
  4. foo(num); // num 没有被改变 
  5.  
  6. 第二个栗子:没有提供改变自身方法的引用类型 
  7. void foo(String text) { 
  8.     text = "mac"
  9. foo(str); // str 也没有被改变 
  10.  
  11. 第三个栗子:提供了改变自身方法的引用类型 
  12. StringBuilder sb = new StringBuilder("vivo"); 
  13. void foo(StringBuilder builder) { 
  14.     builder.append("5"); 
  15. foo(sb); // sb 被改变了,变成了"vivo5"。 
  16.  
  17. 第四个栗子:提供了改变自身方法的引用类型,但是不使用,而是使用赋值运算符。 
  18. StringBuilder sb = new StringBuilder("oppo"); 
  19. void foo(StringBuilder builder) { 
  20.     builder = new StringBuilder("vivo"); 
  21. foo(sb); // sb 没有被改变,还是 "oppo"。 

三 引用的类型

  1. 单纯的申明一个软引用,指向一个person对象 
  2. 1 SoftReference pSoftReference=new SoftReference(new Person(“张三”,12)); 
  3.  
  4. 声明一个引用队列 
  5. ReferenceQueue queue = new ReferenceQueue<>(); 
  6.  
  7. 声明一个person对象,李四,obj是其强引用 
  8. Person obj = new Person(“李四”,13); 
  9.  
  10. 使软引用softRef指向李四对应的对象,并且将该软引用关联到引用队列 
  11. 2 SoftReference softRef = new SoftReference(obj,queue); 
  12.  
  13. 声明一个person对象,名叫王酒,并保证其仅含软引用,且将软引用关联到引用队列queue 
  14. 3 SoftReference softRef = new SoftReference(new Person(“王酒”,15),queue); 
  15.  
  16. 使用很简单softRef.get即可获取对应的value。 
    1. WeakReference weakReference = new WeakReference<>(new Person(“浪尖”,18)); 
    2.  
    3. 声明一个引用队列 
    4. ReferenceQueue queue = new ReferenceQueue<>(); 
    5.  
    6. 声明一个person对象,李四,obj是其强引用 
    7. Person obj = new Person(“李四”,13); 
    8.  
    9. 声明一个弱引用,指向强引用obj所指向的对象,同时该引用绑定到引用队列queue。 
    10. WeakReference weakRef = new WeakReference(obj,queue); 
    11.  
    12. 使用弱引用也很简单,weakRef.get 
      1. 声明引用队列 
      2. ReferenceQueue queue = new ReferenceQueue(); 
      3.  
      4. 声明一个虚引用 
      5. PhantomReference reference = new PhantomReference(new Person(“浪尖”,18), queue); 
      6.  
      7. 获取虚引用的值,直接为null,因为无法通过虚引用获取引用对象。 
      8. System.out.println(reference.get()); 

       

       

       


       

       

      四 Threadlocal如何使用弱引用


       

       

      五 spark如何使用弱引用进行数据清理

       

       


       

       

      shuffle相关的引用,实际上是在ShuffleDependency内部实现了,shuffle状态注册到ContextCleaner过程:

      1. _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) 

      然后,我们翻开registerShuffleForCleanup函数源码可以看到,注释的大致意思是注册ShuffleDependency目的是在垃圾回收的时候清除掉它对应的数据:

      1.  
      2.   def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = { 
      3.     registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId)) 
      4.   } 

      其中,registerForCleanup函数如下:

      1.  
      2.   private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { 
      3.     referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) 
      4.   } 

      referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,确保在引用队列没处理前,弱引用不会被垃圾回收。

      1.  
      2.   private val referenceBuffer = 
      3.     Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) 

      ContextCleaner内部有一个线程,循环从引用队列里取被垃圾回收的RDD等相关弱引用,然后完成对应的数据清除工作。

      1. private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() } 

      其中,keepCleaning函数,如下:

      1.  
      2.   private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { 
      3.     while (!stopped) { 
      4.       try { 
      5.         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) 
      6.           .map(_.asInstanceOf[CleanupTaskWeakReference]) 
      7.         // Synchronize here to avoid being interrupted on stop() 
      8.         synchronized { 
      9.           reference.foreach { ref => 
      10.             logDebug("Got cleaning task " + ref.task) 
      11.             referenceBuffer.remove(ref) 
      12.             ref.task match { 
      13.               case CleanRDD(rddId) => 
      14.                 doCleanupRDD(rddId, blocking = blockOnCleanupTasks) 
      15.               case CleanShuffle(shuffleId) => 
      16.                 doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) 
      17.               case CleanBroadcast(broadcastId) => 
      18.                 doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) 
      19.               case CleanAccum(accId) => 
      20.                 doCleanupAccum(accId, blocking = blockOnCleanupTasks) 
      21.               case CleanCheckpoint(rddId) => 
      22.                 doCleanCheckpoint(rddId) 
      23.             } 
      24.           } 
      25.         } 
      26.       } catch { 
      27.         case ie: InterruptedException if stopped => // ignore 
      28.         case e: Exception => logError("Error in cleaning thread", e) 
      29.       } 
      30.     } 
      31.   } 

      shuffle数据清除的函数是doCleanupShuffle,具体内容如下:

      1.  
      2.   def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { 
      3.     try { 
      4.       logDebug("Cleaning shuffle " + shuffleId) 
      5.       mapOutputTrackerMaster.unregisterShuffle(shuffleId) 
      6.       shuffleDriverComponents.removeShuffle(shuffleId, blocking) 
      7.       listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) 
      8.       logDebug("Cleaned shuffle " + shuffleId) 
      9.     } catch { 
      10.       case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) 
      11.     } 
      12.   } 

      细节就不细展开了。

       

      ContextCleaner的start函数被调用后,实际上启动了一个调度线程,每隔30min主动调用了一次System.gc(),来触发垃圾回收。

      1.  
      2.   def start(): Unit = { 
      3.     cleaningThread.setDaemon(true
      4.     cleaningThread.setName("Spark Context Cleaner"
      5.     cleaningThread.start() 
      6.     periodicGCService.scheduleAtFixedRate(() => System.gc(), 
      7.       periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) 
      8.   } 

      具体参数是:

      1. spark.cleaner.periodicGC.interval 

      本文转载自微信公众号「浪尖聊大数据」,可以通过以下二维码关注。转载本文请联系浪尖聊大数据公众号。

       

       

      免责声明:

      ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

      ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

      软考中级精品资料免费领

      • 历年真题答案解析
      • 备考技巧名师总结
      • 高频考点精准押题
      • 资料下载
      • 历年真题
      • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

        难度     813人已做
        查看
      • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

        难度     354人已做
        查看
      • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

        难度     318人已做
        查看
      • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

        难度     435人已做
        查看
      • 2024年上半年系统架构设计师考试综合知识真题

        难度     224人已做
        查看

      相关文章

      发现更多好内容
      咦!没有更多了?去看看其它编程学习网 内容吧