消费生产模式扫表
背景
有一批数据需要导入到ElasticSearch中,但是写ElasticSearch的速度比较慢,需要采用多线程的方式,但是在每个线程中都扫表,会产生重复的数据段,所以采用生产消费的模型来解决该问题 (为什么不直接选择线程池?线程池提交是异步的,一般table中的数据量都比较大,很容易塞爆内存)
流程图
- 由生产者进行扫表,每次取出一批的数据(如:500条)
- 将500条数据放入java的Queue中
- 多个生产者来消费这个Queue
- 当生产者结束扫表,或者外部中断扫表的时候,中断消费者
中断消费者的方式,往Queue中扔入一个毒药对象,当消费者获取到毒药对象时,停止消费,并将毒药对象塞回Queue,用于停止其他消费者
功能点
- 开始扫表
- 暂停扫表
- 结束扫表
- 数据扫表状态
- 恢复扫表(支持指定offset)
实现
Producer
public class Producer implements Runnable {
private final SynchronousQueue<List<Long>> queue;
private volatile boolean running = true;
public Producer(SynchronousQueue<List<Long>> queue) {
this.queue = queue;
}
@Override
public void run() {
long lastId = 0L;
int batchSize = 500;
while (running) {
// select * from my_table where id > ${lastId} order by id asc limit ${batchSize};
List<Long> ids = new ArrayList<>(); // 自行实现上面的查询
if (CollectionUtils.isEmpty(ids)) {
putQueueQuite(Context.poison);
break;
}
putQueueQuite(ids);
lastId = Collections.max(ids);
if (ids.size() < batchSize) {
putQueueQuite(Context.poison);
break;
}
}
// throw poison
}
private void putQueueQuite(List<Long> pill) {
try {
queue.put(pill);
} catch (InterruptedException e) {
// ignore
}
}
}
Consumer
public class Consumer implements Runnable {
private final SynchronousQueue<List<Long>> queue;
public Consumer(SynchronousQueue<List<Long>> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
List<Long> ids = queue.take();
if (ids == Context.poison) {
queue.put(Context.poison);
break;
}
// do something
} catch (InterruptedException e) {
// ignore
}
}
}
}
Context
public class Context {
public static final List<Long> poison = new ArrayList<>();
}
Runner
public class Runner {
public static void main(String[] args) {
int maxThreads = 10;
int consumerThreads = 3;
ExecutorService executorService = Executors.newFixedThreadPool(maxThreads);
SynchronousQueue<List<Long>> queue = new SynchronousQueue<>();
executorService.submit(new Producer(queue));
for (int i = 0; i < consumerThreads; i++) {
executorService.submit(new Consumer(queue));
}
}
}
大佬好
可以加友链吗?哈哈