实现Runnable中不能抛异常只能捕获异常原因

近日,小伙伴发现使用线程池,如果Runnable中存在异常且没有catch后会造成某一个线程的阻塞。

以下代码是根据具体业务改编。(不要喷 (•◡•) /)

public class RunnableTest implements Runnable{
    private int i;

    @Override
    public void run() {
        User xxx = new User(i);
        System.out.println(Thread.currentThread().getName() + " Task " + i + "t start");
        xxx = null;
        xxx.getAge();
        System.out.println(Thread.currentThread().getName() + " Task " + i + "t end");
    }

    public RunnableTest(int i) {
        this.i = i;
    }

    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue(1),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

        /*执行10个任务*/
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(new RunnableTest(1));
        }

    }
}

执行结果:

pool-1-thread-1 Task 1   start
Exception in thread "pool-1-thread-1" pool-1-thread-2 Task 1     start
java.lang.NullPointerException
    at com.mashibing.practice.runnable_t1.RunnableTest.run(RunnableTest.java:26)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" java.lang.NullPointerException
    at com.mashibing.practice.runnable_t1.RunnableTest.run(RunnableTest.java:26)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

看到这个结果,内心有这样几个问题

  • 为什么xxx.getAge();之后的语句没有执行呢?

    很明显是因为xxx = null;之后执行xxx.getAge();就有NullPointerException问题,所以下边代码没有执行。

  • 为什么就只有两个Task执行呢?

    这就要从ThreadPoolExecutor中的各个参数说起了,先看看这个参数的具体含义。

ThreadPoolExecutor构造函数
  • corePoolSize 线程池中最少的工作线程,不允许销毁,除非设置了 allowCoreThreadTimeOut 参数
  • maximumPoolSize 线程池中最多工作线程数(最大可以是2^29-1个)
  • workQueue 任务队列,用于保存任务和执行任务之间切换
  • keepAliveTime: 如果 当前线程数量 > corePoolSize,多出来的线程会在keepAliveTime之后就被释放掉
  • unit: keepAliveTime的时间单位,比如分钟,小时等
  • threadFactory: 创建线程的工厂
  • handler: 就是说当线程,队列都满了,之后采取的策略,比如抛出异常等策略

参数的具体含义明白后,再看看代码,发现队列慢了之后的策略是new ThreadPoolExecutor.DiscardPolicy(),那明白了,也就是说,我一下在创建了10个任务同时启动,队列满了,触发DiscardPolicy之后进来的任务都抛弃了。那我们再测试一下。

        /*执行10个任务*/
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(new RunnableTest(i));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

我向线程池扔一个任务后,睡1s,然后再扔,验证上边所说问题。

执行结果

image.png

结果显而易见,线程没有被阻塞,仍然正常的执行之后的任务,线程没有阻塞。

那接下来就有问题了,线上出现了这个问题,我们改怎么优化呢?
分析一下具体有哪些问题

  • 问题1: 线程提交任务后,由于队列满了,再向线程池中扔Runnable,我们应该采取什么策略呢?
    1. 使用submit提交子任务,一定要获取返回值Future,通过get方法获取可能出现的异常,并且可以进行捕获(推荐),然后进一步处理。
    2. 扩大线程池中等待队列(请求处理队列)。这样看起来似乎可以,估计也是目前最简单粗暴的方案了。但是当并发较高时,系统内存显然是稀缺资源,可能会耗费非常大的内存,甚至OOM。那在不增加机器,不增加内存的情况下,如何让系统有更高的并发呢?
    3. 扩大线程池中maximumPoolSize,显然也有一些缺点,比如并发非常高时,可能会创建数量非常多的线程,甚至OOM。
    4. 还有一种方案就是调整拒绝策略。默认的拒绝策略看来够呛了。原因是默认的拒绝策略中DiscardPolicy()DiscardOldestPolicy直接把任务扔掉,这是不是有点粗鲁,在特定场景下,直接扔掉会造成业务数据异常。那我们看看AbortPolicy()这个呢,还好点,最起码向主线程抛出异常,但这个姿势还是不好,根本问题没有解决,业务数据还是可能有异常的情况。那还有最后一个默认策略CallerRunsPolicy()大概意思就是哪个线程向线程池中扔任务,那这个线程执行该Runnable。这个似乎还可以。但是这样如果队列满了,发起执行任务的线程就要执行Runnable,这样会大大降低发起执行任务的线程的并发。既然这样,我们不如自定义拒绝策略吧,当线程池中队列满了,我们将多出的Runnable扔到中间件(例如:KafKa)中,起几个消费者帮忙执行一下任务。根据业务场景如果数据可以有一定的延迟时,可以先把Runnable先存下来,等到队列中空闲时再把保存的Runnable扔到等待队列中。
    static class MyHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//            System.out.println();("r rejected")
//            save r kafka mysql redis
//            try 3 times


            if (executor.getQueue().size() < 4) {
                //try put again();

            }
        }
    }
  1. 既然这样,我们为什么直接用中间件之间处理呢?这就得根据项目具体业务来确定了,来我们聊聊使用线程池和使用中间件的区别吧!

用线程池ExecutorService异步处理:我理解ExecutorService其实也是内部使用了队列(如LinkedBlockingQueue),所以从设计上,其实和使用中间价的消息队列是差不多一致的。只是这里应用服务器既充当生产者又充当消费者,也是消息队列中间价的实现者。这种应该适合非分布式的架构,比如简单的只有一台服务器

使用消息队列:消息队列(指activeMQ,rabbitMQ,kafaKa,Redis等)因为一般都是中间件,部署在其他机器,需要一定的网络消耗。
本着解耦的目的,使用后者更合理,因为应用服务器一般内存也不会太多,队列长度不易太长。让应用服务器只处理逻辑比较合理。适合分布式架构。
MQ也可以更加有扩展性, 支持的场景更多, 而且支持消息自动的持久化, 建议看看 RabbitMQ 和 AMQP 协议, JMS 可以学但是没 AMQP 更加通用, redis的MQ还是不要用了, 那只是一个附带的功能, kafka 是大数据领域的不适合做核心业务功能, 只适合数据统计类应用的发送数据, 因为他不确保消息100%不丢失, 如此大的数据量丢一条无所谓的, 不会对统计结果造成影响, 但速度和吞吐量高很多.

但是线程池就不一样了, 目前执行状态你无法知道, msg的消费率是多少都不知道, 消息转发啊, 消息拒绝啊, 都的自己实现, 而且是单机版的, 我目前用他来做一级转发, 就是用他来将 event 异步发送出去, 而不是让他异步做一些很繁重的工作, 举例:
注册用户service方法, 当事务结束后, 发送 RegisterUserEvent, 这个发送就是用java线程池(如spring的), 然后 RegisterUserListener 监听到了这个 event 就发送 msg 到 Rabbit MQ, 之后对注册用户这个Topic感兴趣的应用都可以订阅, 比如送积分的服务, 送优惠券的服务, 开辟云盘空间的服务等等。

– 问题2: 如何能捕获到快速捕获到线程池中线程的异常呢

1.异常统一捕获
  1. 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常
  2. 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理

3.重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常

4.为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常 (不推荐)

2.当业务量比较复杂时,项目中使用的线程池比较多,能够快速定位问题显得格外重要,那就给线程池定义一个业务名称吧。

通过实现ThreadFactory接口,可以实现

如果线上机器突然宕机,线程池中阻塞队列中的请求怎么办?

必然会导致线程池里的积压的任务实际上来说都是会丢失的

如果说你要提交一个任务到线程池里去,在提交之前,麻烦你先在数据库里插入这个任务的信息,更新他的状态:未提交、已提交、已完成。提交成功之后,更新他的状态是已提交状态

系统重启,后台线程去扫描数据库里的未提交和已提交状态的任务,可以把任务的信息读取出来,重新提交到线程池里去,继续进行执行

由于本人水平有限,文章中如果有不严谨的地方还请提出来,愿闻其详。

还应学习的资料

  • 源码 https://www.cnblogs.com/wang-meng/p/10588637.html
  • ThreadPoolExcutor 线程池 异常处理 (下篇)
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.dandroid.cn/12725,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?