消费生产模式扫表

背景

有一批数据需要导入到ElasticSearch中,但是写ElasticSearch的速度比较慢,需要采用多线程的方式,但是在每个线程中都扫表,会产生重复的数据段,所以采用生产消费的模型来解决该问题 (为什么不直接选择线程池?线程池提交是异步的,一般table中的数据量都比较大,很容易塞爆内存)

流程图

produce-consume.png

  1. 由生产者进行扫表,每次取出一批的数据(如:500条)

  2. 将500条数据放入java的Queue中

  3. 多个生产者来消费这个Queue

  4. 当生产者结束扫表,或者外部中断扫表的时候,中断消费者

中断消费者的方式,往Queue中扔入一个毒药对象,当消费者获取到毒药对象时,停止消费,并将毒药对象塞回Queue,用于停止其他消费者

功能点

  1. 开始扫表

  2. 暂停扫表

  3. 结束扫表

  4. 数据扫表状态

  5. 恢复扫表(支持指定offset)

实现

Producer

public class Producer implements Runnable {

    private final SynchronousQueue<List<Long>> queue;

    private volatile boolean running = true;

    public Producer(SynchronousQueue<List<Long>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        long lastId = 0L;
        int batchSize = 500;
        while (running) {
            // select * from my_table where id > ${lastId} order by id asc limit ${batchSize};
            List<Long> ids = new ArrayList<>(); // 自行实现上面的查询
            if (CollectionUtils.isEmpty(ids)) {
                putQueueQuite(Context.poison);
                break;
            }
            putQueueQuite(ids);
            lastId = Collections.max(ids);
            if (ids.size() < batchSize) {
                putQueueQuite(Context.poison);
                break;
            }
        }
        // throw poison
    }

    private void putQueueQuite(List<Long> pill) {
        try {
            queue.put(pill);
        } catch (InterruptedException e) {
            // ignore
        }
    }

}

Consumer

public class Consumer implements Runnable {

    private final SynchronousQueue<List<Long>> queue;

    public Consumer(SynchronousQueue<List<Long>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                List<Long> ids = queue.take();
                if (ids == Context.poison) {
                    queue.put(Context.poison);
                    break;
                }
                // do something
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }
}

Context

public class Context {

    public static final List<Long> poison = new ArrayList<>();

}

Runner

public class Runner {
    public static void main(String[] args) {
        int maxThreads = 10;
        int consumerThreads = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(maxThreads);
        SynchronousQueue<List<Long>> queue = new SynchronousQueue<>();
        executorService.submit(new Producer(queue));
        for (int i = 0; i < consumerThreads; i++) {
            executorService.submit(new Consumer(queue));
        }
    }
}

功能点的控制,自己实现就好了

标签: none

添加新评论