Spark调优多线程并行处理任务实现方式

方式1:

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job

所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行

而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:

有延时发生了,batch无法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源

Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。

方式2:

场景1:

程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。

一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。

场景2:

  • 程序需要处理的相似job数随着业务的增长越来越多
  • 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
  • spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务

DStream.foreachRDD{
   rdd =>
    //创建线程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //将规则放入线程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //执行规则
       runRule(ru,spark)
      }
     })
    }
    //每次创建的线程池执行完所有规则后shutdown
    executors.shutdown()
  }

注意点

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
  • 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
  • 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。

2.可不可以将创建线程池放到foreachRDD外面?

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。

3.线程池executor崩溃了就会导致数据丢失

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • PyCharm搭建Spark开发环境的实现步骤

    1.安装好JDK 下载并安装好jdk-12.0.1_windows-x64_bin.exe,配置环境变量: 新建系统变量JAVA_HOME,值为Java安装路径 新建系统变量CLASSPATH,值为 .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;(注意最前面的圆点) 配置系统变量PATH,添加 %JAVA_HOME%bin;%JAVA_HOME%jrebin 在CMD中输入:java或者java -version,不显示不是内部命令等,说明

  • 从0开始学习大数据之java spark编程入门与项目实践

    本文实例讲述了大数据java spark编程.分享给大家供大家参考,具体如下: 上节搭建好了eclipse spark编程环境 在测试运行scala 或java 编写spark程序 ,在eclipse平台都可以运行,但打包导出jar,提交 spark-submit运行,都不能执行,最后确定是版本问题,就是你在eclipse调试的spark版本需和spark-submit 提交spark的运行版本一致,还有就是scala版本一致,才能正常运行. 以下是java spark程序运行 1.新建mave

  • 如何将PySpark导入Python的放实现(2种)

    方法一 使用findspark 使用pip安装findspark: pip install findspark 在py文件中引入findspark: >>> import findspark >>> findspark.init() 导入你要使用的pyspark库 >>> from pyspark import * 优点:简单快捷 缺点:治标不治本,每次写一个新的Application都要加载一遍findspark 方法二 把预编译包中的Python库

  • Python如何把Spark数据写入ElasticSearch

    这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中. 实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下. 如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持.所以首先你需要去这里下载依赖的ES官方开发的依赖包包. 下载完成后,放在本地目录,以下面命令方式启动pyspark: pyspark --jars elasticsearch-ha

  • spark通过kafka-appender指定日志输出到kafka引发的死锁问题

    在采用log4j的kafka-appender收集spark任务运行日志时,发现提交到yarn上的任务始终ACCEPTED状态,无法进入RUNNING状态,并且会重试两次后超时.期初认为是yarn资源不足导致,但在确认yarn资源充裕的时候问题依旧,而且基本上能稳定复现. 起初是这么配置spark日志输出到kafka的: log4j.rootCategory=INFO, console, kafka log4j.appender.console=org.apache.log4j.ConsoleA

  • pyspark 随机森林的实现

    随机森林是由许多决策树构成,是一种有监督机器学习方法,可以用于分类和回归,通过合并汇总来自个体决策树的结果来进行预测,采用多数选票作为分类结果,采用预测结果平均值作为回归结果. "森林"的概念很好理解,"随机"是针对森林中的每一颗决策树,有两种含义:第一种随机是数据采样随机,构建决策树的训练数据集通过有放回的随机采样,并且只会选择一定百分比的样本,这样可以在数据集合存在噪声点.异常点的情况下,有些决策树的构造过程中不会选择到这些噪声点.异常点从而达到一定的泛化作用在

  • 在IntelliJ IDEA中创建和运行java/scala/spark程序的方法

    本文将分两部分来介绍如何在IntelliJ IDEA中运行Java/Scala/Spark程序: 基本概念介绍 在IntelliJ IDEA中创建和运行java/scala/spark程序 基本概念介绍 IntelliJ IDEA 本文使用版本为: ideaIC-2020.1 IDEA 全称 IntelliJ IDEA,是java编程语言开发的集成环境.IntelliJ在业界被公认为最好的java开发工具,它的旗舰版本还支持HTML,CSS,PHP,MySQL,Python等,免费版只支持Jav

  • pyspark给dataframe增加新的一列的实现示例

    熟悉pandas的pythoner 应该知道给dataframe增加一列很容易,直接以字典形式指定就好了,pyspark中就不同了,摸索了一下,可以使用如下方式增加 from pyspark import SparkContext from pyspark import SparkConf from pypsark.sql import SparkSession from pyspark.sql import functions spark = SparkSession.builder.conf

  • Spark调优多线程并行处理任务实现方式

    方式1: 1. 明确 Spark中Job 与 Streaming中 Job 的区别 1.1 Spark Core 一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作) 一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算 Job在spark里应用里是一个被调度的单位 1.2 Streaming 一个 batch 的数据对应一个 DStreamGraph 而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作 每一个输出对应

  • Logback与Log4j2日志框架性能对比与调优方式

    目录 前言 性能测试 logback 同步日志 异步日志(队列扩容) 异步日志(半队列扩容) log4j2 同步日志 异步日志(队列扩容) 异步日志(日志淘汰策略) 异步日志(半队列扩容) 异步日志(等待策略) 性能调优 异步日志 日志可靠性 Logback Log4j2 日志抛弃策略 Log4j2 Logback 日志等待策略 TimeoutWaitStrategy YieldWaitStrategy 队列容量 Logback Log4j2 长度计算公式 消费瓶颈 消费TPS 请求TPS 消费

  • 浅谈C#多线程下的调优

    目录 一.原子操作 1.基于Lock实现 2.基于CAS实现 3.自旋锁SpinLock 4.读写锁ReaderWriterLockSlim 二.线程安全 1.线程安全集合 2.线程安全字典 三.线程池 1.通过QueueUserWorkItem启动工作者线程 2.线程池等待(信号量) 3.Task 4.线程池调度原理 四.并行 五.异步IO 1.异步IO于同步IO比较 2.异步读写文件 一.原子操作 先看一段问题代码 /// <summary> /// 获取自增 /// </summa

  • 数据库SQL调优的几种方式汇总

    目录 char  vs varchar 开启慢查询日志来定位查询慢的语句 合理使用关键字 优化查询缓存 适当使用索引 分割数据表 非规范化的方式 总结 最近在复习SQL调优,总结了下主要有以下几种方式: char  vs varchar 1.如果文本字段始终是固定长度的(例如,US 邮编,其始终具有“XXXXX-XXXX”形式的规范表示),那么推荐使用char.varchar 类型的长度是可变的,而 char 类型是一个定长的字段,以 char(10) 为例,不管真实的存储内容多大或者是占了多少

  • SpringBoot JVM参数调优方式

    目录 SpringBoot JVM参数调优 各种参数 SpringBoot jar包启动设置JVM参数 配置初始化堆和最大堆的大小 SpringBoot JVM参数调优 各种参数 参数名称 含义 默认值 说明 -Xms 初始堆大小 物理内存的1/64(<1GB) 默认(MinHeapFreeRatio参数可以调整)空余堆内存小于40%时,JVM就会增大堆直到-Xmx的最大限制. -Xmx 最大堆大小 物理内存的1/4(<1GB) 默认(MaxHeapFreeRatio参数可以调整)空余堆内存大

  • Apache Hive 通用调优featch抓取机制 mr本地模式

    目录 Apache Hive-通用优化-featch抓取机制 mr本地模式 Fetch抓取机制 mapreduce本地模式 切换Hive的执行引擎 Apache Hive-通用优化-join优化 - reduce端join -map端join reduce 端 join 优化 map 端 join 优化 Apache Hive--通用调优--数据倾斜优化 group by数据倾斜 join数据倾斜 Apache Hive--通用调优--MR程序task个数调整 maptask个数 reducet

  • Java JVM原理与调优_动力节点Java学院整理

    JVM是一种用于计算设备的规范,它是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟各种计算机功能来实现的.Java虚拟机包括一套字节码指令集.一组寄存器.一个栈.一个垃圾回收堆和一个存储方法域. JVM屏蔽了与具体操作系统平台相关的信息,使Java程序只需生成在Java虚拟机上运行的目标代码(字节码),就可以在多种平台上不加修改地运行.是运行Java应用最底层部分. JDK(Java Development kit) 整个Java的核心,包括了Java运行环境(Java Runtime E

  • sqlserver性能调优经验总结

    相信不少的朋友,无论是做开发.架构的,还是DBA等,都经常听说"调优"这个词.说起"调优",可能会让很多技术人员心头激情澎湃,也可能会让很多人感觉苦恼.当然,也有很多人对此不屑一顾,因为并不是每个人接触到的项目都很大,也不是每个人做的项目都对性能要求很高. 在主流的企业级开发和互联网应用中,数据库的重要性是不言而喻的,而数据库的性能对于整个系统的性能而言也是至关重要的,这里无庸赘述. sqlserver的性能调优,其实是个很宽广的话题.坦白讲,想从概念到实践的完全讲

  • 优化Java虚拟机总结(jvm调优)

    堆设置 -Xmx3550m:设置JVM最大堆内存为3550M. -Xms3550m:设置JVM初始堆内存为3550M.此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存. -Xss128k:设置每个线程的栈大小.JDK5.0以后每个线程栈大小为1M,之前每个线程栈大小为256K.应当根据应用的线程所需内存大小进行调整.在相同物理内存下,减小这个值能生成更多的线程.但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右. -Xmn2g:设置堆

  • 分析MySQL复制以及调优原理和方法

    一. 简介 MySQL自带复制方案,带来好处有: 数据备份. 负载均衡. 分布式数据. 概念介绍: 主机(master):被复制的数据库. 从机(slave):复制主机数据的数据库. 复制步骤: (1). master记录更改的明细,存入到二进制日志(binary log). (2). master发送同步消息给slave. (3). slave收到消息后,将master的二进制日志复制到本地的中继日志(relay log). (4). slave重现中继日志中的消息,从而改变数据库的数据. 下

随机推荐