首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  Java

消费者怎么确保能取到数据,又能正确退出线程

  •  2
     
  •   shayang888 · 44 天前 · 1533 次点击
    这是一个创建于 44 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在是这样的

    1.首先我最外面有个 quartz 定时器,每隔 N 秒执行一次

    2.定时器里执行的内容是这样,里面有个线程池,线程池大小是 2 个线程,coresize,maxsize 都是 2

    3.线程池里的 2 个线程,分别一个去执行生产者方法,一个去执行消费者方法

    4.生产者和消费者中间用消息队列来临时存数据

    现在有个问题,就是消费者这边,怎么能保证取到数据,又能正确的退出线程,进行到下一次定时器的执行

    在这之前我做的蠢办法是消费者那边加了个 while(true),结果定时器执行了 2 次后,线程池就满了,然后拒绝

    所以不知道有啥好办法

    代码:

    //线程池的
    public class TestTask extends QuartzJobBean {
    
        private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                2,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(4));
    
        @Override
        protected void executeInternal(JobExecutionContext jobExecutionContext) {
            threadPoolExecutor.execute(() -> {
                //生产者代码
                
                //数据放入消息队列
            });
            threadPoolExecutor.execute(() -> {
                while (true) {
                    //取队列里的数据
                    //消费者代码
                }
            });
        }
    }
    
    第 1 条附言  ·  44 天前
    可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
        1
    jinksw   44 天前
    看样子你定时器的意思是 每两秒生产一次 然后消费一次

    为啥要 2 个线程 你每次运行一个线程 先生产再消费不行吗
        2
    sarlanori   44 天前
    在 C#里,我一般是用信号量来等待和通知。
        3
    shayang888   44 天前
    @jinksw 那样不就是同步了吗?我是想生产和消费分隔开来
        4
    shayang888   44 天前
    @jinksw 另外定时器并不是 2 秒执行一次啊 定时器的执行时间随便设置的
        5
    passerbytiny   44 天前
    消费者在线程之上,而不是之内,拿到数据后再从线程池里开线程去执行后续处理。消费者不能用多线程+死循环来取数据,而应该是单线程异步监听+同步取值,再具体我也不知道了,因为基本都是直接调用 API。
        6
    Counter   44 天前
    机制是不是不太对,改成这样如何呢?
    生产后的数据加锁,生产者方法和消费者方法排队存取
        7
    shayang888   44 天前
    @passerbytiny 拿到数据后再从线程池里开线程去执行后续处理吗 好像有点思路
        8
    shayang888   44 天前
    @passerbytiny 可是消费者怎么知道它啥时候能拿到数据
        9
    shayang888   44 天前
    @Counter 没有懂你的意思
        10
    limhiaoing   44 天前 via iPhone
    生产者、消费者线程一般用条件变量 Condition Variable 来通信。
        11
    shayang888   44 天前
    @passerbytiny 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
        12
    shayang888   44 天前
    @Counter 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
        13
    passerbytiny   44 天前   ♥ 1
    @shayang888 #12 消费者也不要放在定时器里,它应该是一个常驻的、独立的单线程。我不知道你的消息队列是什么队列,但一般的消息队列都是提供消费者 API 的,可以直接使用,自己做消费者太难了。

    如果用线程去看消费者 /监听,那么是类似 while(true) {if(收到数据) {……} else { Thread.sleep(0.0001);} },这要是用线程去做,要么系统受不了,要么延迟时间受不了。消费者 /监听的轮询,回采用操作系统层次的东西,高级程序员都没必要知道的太深入,自己设计是肯定设计不来的。

    你可以参考下 java.net.ServerSocket#accept()。
        14
    autogen   44 天前
    生产者发送的 msg 封装一下,加个 ctrl 字段,消费者接收到 msg.ctrl=exit 就退出
        15
    NieKing   44 天前
    我想起了 Android 里的 RxJava
        16
    linjiayu   44 天前
    实现 callable
        17
    linjiayu   44 天前   ♥ 1
    public void offer(Event event)
    {
    synchronized (eventQueue)
    {
    while (eventQueue.size() >= max)
    {
    try
    {
    console(" the queue is full.");
    eventQueue.wait();
    } catch (InterruptedException e)
    {
    e.printStackTrace();
    }
    }

    console(" the new event is submitted");
    eventQueue.addLast(event);
    eventQueue.notifyAll();
    }
    }

    public Event take()
    {
    synchronized (eventQueue)
    {
    while (eventQueue.isEmpty())
    {
    try
    {
    console(" the queue is empty.");
    eventQueue.wait();
    } catch (InterruptedException e)
    {
    e.printStackTrace();
    }
    }


    Event event = eventQueue.removeFirst();
    this.eventQueue.notifyAll();
    console(" the event " + event + " is handled.");
    return event;
    }
    }
        18
    shayang888   44 天前
    @autogen 我现在是多个生产者同时生产数据然后往队列里 push,然后只有一个消费者在从队列里消费 加字段的话 我给哪个生产者加这个字段呢
        19
    jingxyy   44 天前
    先不管怎么实现合理的问题
    你是不是消费线程消费完了没退出啊?这样每一个 interval 之后你就有一个 while(true)的消费线程在跑,于是第 2 个周期后无法创建新的消费线程
        20
    ratel   44 天前
    使用消息中间件啊,消费者单独订阅消息消费,生产者用定时器就行了。
        21
    micean   44 天前
    quartz 为什么要玩 while(true)
        22
    shayang888   44 天前
    @jingxyy 对呀 我就是不知道怎么合适的退出
        23
    shayang888   44 天前
    @ratel 我现在就是没有用到中间件 想问问如果自己来弄的话咋做的好
        24
    woscaizi   44 天前 via iPhone
    简单的数据结构入栈出栈吧。
        25
    ratel   44 天前
    @shayang888 不用中间件,那也是用一样的设计模式,消费者和生产者是分开的,只依赖消息。
        26
    shayang888   44 天前
    @ratel 我不明白 为什么消费者要独立开来 我的消费只需要在这里进行消费呀 其它的地方都用不到它
        27
    pusidun   44 天前
    可以生产者定时生产消息,放入消息队列;消费者可以用线程池常驻,每个消费者线程轮询消息队列是否空,不为空处理,为空阻塞一段时间,就不用退出了。不过消息队列要保证是线程安全的
        28
    shayang888   44 天前
    @pusidun 你的意思和楼上他们说的是一个样吧 就是应该把消费者独立出来是吗?不应该放在定时器里
        29
    codingKingKong   44 天前
    大概是这样么?
    ```java
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class TT {

    public static void main(String[] args) throws Exception{
    threadPoolExecutor.execute(() -> {
    try {
    String a = "";
    while (a != null){
    a = blockingDeque.poll(10, TimeUnit.SECONDS);
    System.out.println(a);
    }
    System.out.println("consumer exit.");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    Thread.sleep(2000);
    executeInternal();
    Thread.sleep(2000);
    executeInternal();
    }

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    2,
    2,
    60,
    TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(4));


    private static BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>();

    private static void executeInternal() {
    threadPoolExecutor.execute(() -> {
    try {
    blockingDeque.put("123");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }

    }
    ```
        30
    Jrue0011   44 天前
    额,想问下定时器里为什么还要再通过线程池调度线程来执行任务...?
        31
    ratel   43 天前
    @shayang888 你去看下生产者消费者设计模式
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2075 人在线   最高记录 5043   ·  
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 23ms · UTC 16:10 · PVG 00:10 · LAX 09:10 · JFK 12:10
    ♥ Do have faith in what you're doing.
    沪ICP备16043287号-1