HiveSql优化

发布于:2021-05-07 18:15:50

Hive SQL的各种优化方法基本 都和数据倾斜密切相关。
  Hive的优化分为join相关的优化和join无关的优化,从项目的实际来说,join相关的优化占了Hive优化的大部分内容,而join相关的优化又分为mapjoin可以解决的join优化和mapjoin无法解决的join优化。



1、Hive优化(这是重点)


在实际的Hive SQL开发的过程中,Hive SQL 性能的问题上实际上只有一小部分和数据倾斜有关,很多时候,Hive SQL运行慢是由于开发人员对于使用的数据了解不够以及一些不良的*惯引起的。


开发人员需要确定以下几点:


1、指标是否一定要通过明细表得到,可否通过其他量级比较小的表得到,重复代码是否可以使用中间表来替代


2、开发过程中是否真的需要扫描那么多分区,举个例子你要做一张月维度表的报表,月维度数据很大,如果在测试demo中使用where月作为限制条件是不是要花很多时间,仅仅从调通代码测试样本数据的角度出发,使用where日条件量级比较小的限制条件会不会更好呢?(这个是重点,where条件的使用)


3、尽量不要使用select * from your_table这样的方式,用到哪些列就指定哪些列,另外WHERE条件中尽量添加过滤条件,以去掉无关的行,从而减少整个MapReduce任务需要处理、分发的数据量。


4、输入文件不要是大量的小文件,Hive默认的Input Split是128MB(可配置),小文件可先合并成大文件。


我们知道文件数目小,容易在文件存储端造成瓶颈,给 HDFS 带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。

用于设置合并属性的参数有:

调优参数:

是否合并Map输出文件:hive.merge.mapfiles=true(默认值为真)
是否合并Reduce 端输出文件:hive.merge.mapredfiles=false(默认值为假)
合并文件的大小:hive.merge.size.per.task=25610001000(默认值为 256000000)


1.1、join无关的优化

Hive SQL性能问题基本上大部分都是和JOIN相关,对于和join无关的问题主要有group by相关的倾斜和count distinct相关的优化


1.11、group by引起的倾斜优化(若无法避免则设置调优参数进行hive map端的负载均衡)
  group by引起的倾斜主要是输入数据行按照group by列分别布均匀引起的,比如,假设按照供应商对销售明细事实表来统计订单数,那么部分大供应商的订单量显然非常大,而多数供应商的订单量就一般,


由于group by 的时候是按照供应商的ID分发到每个Reduce Task,那么此时分配到大供应商的Reduce task就分配了更多的订单,从而导致数据倾斜。


对应group by引起的数据倾斜,优化措施非常简单,只需要设置下面参数即可:


set hive.map.aggr = true


set hive.groupby.skewindata = true


此时,Hive在数据倾斜的时候回进行负载均衡。


注:


hive和其它关系数据库一样,支持count(distinct)操作,但是对于大数据量中,如果出现数据倾斜时,会使得性能非常差,解决办法为设置数据负载均衡,其设置方法为设置hive.groupby.skewindata参数
hive (default)> set hive.groupby.skewindata;
hive.groupby.skewindata=false
默认该参数的值为false,表示不启用,要启用时,可以set hive.groupby.skewindata=ture;进行启用。
当启用时,能够解决数据倾斜的问题,但如果要在查询语句中对多个字段进行去重统计时会报错。
hive> set hive.groupby.skewindata=true;
hive> select count(distinct id),count(distinct x) from test;
FAILED: SemanticException [Error 10022]: DISTINCT on different columns not supported with skew in data
下面这种方式是可以正常查询
hive>select count(distinct id, x) from test;


1.12、count distinct优化
  在Hive开发过程中,应该小心使用count distinct,因为很容易引起性能问题,比如下面的SQL:


select count(distinct user) from some_table;


由于必须去重,因此Hive将会把Map阶段的输出全部分布到一个Reduce Task上,此时很容易引起性能问题,对于这种情况,可以通过先group by再count的方式优化,优化后的SQL如下:


select count(*)


from (select user from some_table group by user) temp;


其原理为:利用group by去重,再统计group by 的行数目。


1.2大小表join优化



1.21默认hivesql大小表谁在前合适呢:答案遵循小表在前原则

原因:底层mr中 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,载入条目较少的表 可以有效减少 OOM(out of memory)即内存溢出。所以对于同一个 key 来说,对应的 value 值小的放前,大的放后,这便是“小表放前”原则。 若一条语句中有多个 Join,依据 Join 的条件相同与否,有不同的处理方法。具体如下:

key相同:

如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce
一个 Map-Reduce 任务,而不是 ‘n’ 个
在做 OUTER JOIN 的时候也是一样


key不同:


如果 Join 的条件不相同,比如:

INSERT OVERWRITE TABLE pv_users

SELECT pv.pageid, u.age FROM page_view p

JOIN user u ON (pv.userid = u.userid)

JOIN newuser x on (u.age = x.age);

Map-Reduce 的任务数目和 Join 操作的数目是对应的,上述查询和以下查询是等价的:

INSERT OVERWRITE TABLE tmptable

SELECT * FROM page_view p JOIN user u

ON (pv.userid = u.userid);

INSERT OVERWRITE TABLE pv_users

SELECT x.pageid, x.age FROM tmptable x

JOIN newuser y ON (x.age = y.age);

1.22大表join大表()


使用map join 操作 hive默认是开启set hive.auto.convent.join=true;
格式为/*+ MAPJOIN(pv) / 多表可表现在为/+ MAPJOIN(b,c,d) */
Join 操作在 Map 阶段完成,不再需要Reduce,前提条件是需要的数据在 Map 的过程中可以访问到。比如查询:


INSERT OVERWRITE TABLE pv_users


SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age


FROM page_view pv


JOIN user u ON (pv.userid = u.userid);


可以在 Map 阶段完成 Join.


进阶相关的参数为:


hive.join.emit.interval = 1000
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000
注:这个方法的前提是小表不能太大,一般不能超过1g,要不hive会报错,此外还需注意的一点是hdfs上的文件一般是压缩过的,实际可能会膨胀10倍


1.23大表join大表(假如小表大小超过1g就变成大表)


一、问题场景
      问题场景如下:
      A表为一个汇总表,汇总的是卖家买家最*N天交易汇总信息,即对于每个卖家最*N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。
      A表的字段有:buyer_id、seller_id、pay_cnt_90day。
      B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。
      要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:
      某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。
      正如mapjoin中的例子一样,第一反应是直接join两表并统计:
      select
         m.buyer_id,
        sum(pay_cnt_90day) as pay_cnt_90day,
        sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
      from (
        select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a
        join
         (select seller_id, sale_level from table_B) b
        on a.seller_id = b.seller_id
        ) m
      group by m.buyer_id
      但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,
    ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。
      但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。
   二、优化方案
优化方案1
      一个很正常的想法是,尽管B表无法直接mapjoin, 但是是否可以间接mapjoin它呢?
      实际上此思路有两种途径:限制行和限制列。
      限制行的思路是不需要join B全表,而只需要join其在A表中存在的,对于本问题场景,就是过滤掉90天内没有成交的卖家。
      限制列的思路是只取需要的字段。
      加上如上的限制后,检查过滤后的B表是否满足了Hive mapjoin的条件,如果能满足,那么添加过滤条件生成一个临时B表,然后mapjoin该表即可。采用此思路的语句如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day) as pay_cnt_90day,
        sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
      from (
        select /+mapjoin(b)/
          a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a
        join
         (
           select seller_id, sale_level from table_B b0
           join
           (select seller_id from table_A group by seller_id) a0
           on b0.seller_id = a0.selller_id
          ) b
        on a.seller_id = b.seller_id
        ) m
      group by m.buyer_id
      此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。


优化方案2
      此种解决方案应用场景是:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值concat随机数,
    从而达到随机分发的目的。此方案的核心逻辑如下:
       select a.user_id, a.order_id, b.user_id
      from table_a a join table_b b
      on (case when a.user_is is null then concat(‘hive’, rand()) else a.user_id end) = b.user_id
      Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:
      set hive.optimize.skewinfo=table_B:(selleer_id) [ ( “0”) (“1”) ) ]
      set hive.optimize.skewjoin = true;
      但是方案2因为无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。
  
   优化方案3:倍数B表,在取模join     1、通用方案
      此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day) as pay_cnt_90day,
        sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
      from (
        select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a
        join
         (
          select /+mapjoin(members)/
            seller_id, sale_level ,member
          from table_B
         join members
          ) b
        on a.seller_id = b.seller_id
          and mod(a.pay_cnt_90day,10)+1 = b.number
        ) m
      group by m.buyer_id
        此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,
      但这样做的一个弊端是B表也会膨胀N倍。
     专用方案
        通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(
      比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。
        在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day) as pay_cnt_90day,
        sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
      from (
        select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from (
          select /+mapjoin(big)/
             buyer_id, seller_id, pay_cnt_90day,
             if(big.seller_id is not null, concat( table_A.seller_id, ‘rnd’, cast( rand() * 1000 as bigint ), table_A.seller_id) as seller_id_joinkey
             from table_A
             left outer join
             --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变
             (select seller_id from dim_big_seller group by seller_id)big
             on table_A.seller_id = big.seller_id
        ) a
        join
         (
          select /+mapjoin(big)/
            seller_id, sale_level ,
            --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样
            coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey
          from table_B
         left out join
          --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变
          (select seller_id, seller_id_joinkey from dim_big_seller) big
          on table_B.seller_id= big.seller_id
          ) b
        on a.seller_id_joinkey= b.seller_id_joinkey
          and mod(a.pay_cnt_90day,10)+1 = b.number
        ) m
      group by m.buyer_id
      相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。
   动态一分为二(终极方案)
      实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,
    倾斜的把他们找出来做mapjoin,最后union all其结果即可。
      但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:
      --由于数据倾斜,先找出90天买家超过10000的卖家
      insert overwrite table temp_table_B
      select
        m.seller_id, n.sale_level
      from (
        select seller_id
        from (
          select seller_id,count(buyer_id) as byr_cnt
          from table_A
          group by seller_id
          ) a
        where a.byr_cnt >10000
        ) m
      left join
      (
       select seller_id, sale_level from table_B
      ) n
      on m.seller_id = n.seller_id;
      
      --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。
      
      select
         m.buyer_id,
        sum(pay_cnt_90day) as pay_cnt_90day,
        sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5
      from (
        select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a
        join
         (
          select seller_id, a.sale_level
           from table_A a
           left join temp_table_B b
          on a.seller_id = b.seller_id
          where b.seller_id is not null
          ) b
        on a.seller_id = b.seller_id
       union all
       
       select /+mapjoin(b)/
          a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day
        from (
           select buyer_id, seller_id, pay_cnt_90day
          from table_A
          ) a
        join
         (
           select seller_id, sale_level from table_B
          ) b
        on a.seller_id = b.seller_id
     ) m group by m.buyer_id
     ) m
     group by m.buyer_id


总结:方案1、2以及方案3中的同用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定使用场景。而方案3的专用方案和方案4是推荐的优化方案,但是它们都需要新建一个临时表
       来存储每日动态变化的大卖家。相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是是维度表,不然统*峁崾谴砦蟮摹7桨4最通用,自由度最高,
       但是对代码的更改也最大,甚至修改更难代码框架,可以作为终极方案使用。


优化总结


1.提高数据敏感度和培养一个好的*惯是至关重要的,善用where,创建必要的中间表会大大提高的你工作效率


2.优化时 hive sql底层mr是hive优化的根本


长期观察hadoop处理数据的过程,有几个显著的特征:
不怕数据多,就怕数据倾斜。
对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,没半小时是跑不完的。map reduce作业初始化的时间是比较长的。
对sum,count来说,不存在数据倾斜问题。
对count(distinct ),效率较低,数据量一多,准出问题,如果是多count(distinct )效率更低。
优化可以从几个方面着手:
好的模型设计事半功倍。
解决数据倾斜问题。
减少job数。
设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
自己动手写sql解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化总是漠视业务,*惯性提供通用的解决方法。 Etl开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效。
对count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
优化时把握整体,单个作业最优不如整体最优。


优化的常用手段


主要由三个属性来决定:
hive.exec.reducers.bytes.per.reducer #这个参数控制一个job会有多少个reducer来处理,依据的是输入文件的总大小。默认1GB。
hive.exec.reducers.max #这个参数控制最大的reducer的数量, 如果 input / bytes per reduce > max 则会启动这个参数所指定的reduce个数。 这个并不会影响mapre.reduce.tasks参数的设置。默认的max是999。
mapred.reduce.tasks #这个参数如果指定了,hive就不会用它的estimation函数来自动计算reduce的个数,而是用这个参数来启动reducer。默认是-1。


参数设置的影响
如果reduce太少:如果数据量很大,会导致这个reduce异常的慢,从而导致这个任务不能结束,也有可能会OOM 2、如果reduce太多: 产生的小文件太多,合并起来代价太高,namenode的内存占用也会增大。如果我们不指定mapred.reduce.tasks, hive会自动计算需要多少个reducer。


参考资料
①《离线和实时大数据开发实战》 | 朱松岭
② :https://mp.weixin.qq.com/s? __biz=MzA3MDY0NTMxOQ==&mid=2247485140&idx=1&sn=dd8d05309b8e2e86b3bde6728c6932ec&chksm=9f38e5fca84f6ceae8eb4791337ccfe81fc6764890100bb2cb7f7aec2ad23b1a78e1e25f56c4&mpshare=1&scene=23&srcid=07317XgGFtD10j3e3t28uP6U#rd


③https://blog.csdn.net/tp15868352616/article/details/81295450
④https://www.iteye.com/blog/daizj-2283332

相关推荐

猜你喜欢