线程同步

多个线程同时工作,经常会需要步调一致。

通过 join() 方法等待线程结束

例,两个线程一个执行不同的查询,最终比较结果后写入差异:

while(存在未对账订单){

    // 查询未对账订单
    Thread T1 = new Thread(()->{
        pos = getPOrders();
    });
    T1.start();

    // 查询派送单
    Thread T2 = new Thread(()->{
        dos = getDOrders();
    });
    T2.start();

    // 等待T1、T2结束
    T1.join();
    T2.join();

    // 执行对账操作
    diff = check(pos, dos);

    // 差异写入差异库
    save(diff);
}

CountDownLatch 可以等待计数器归零

由于每次循环都会创建新的线程,效率太低,建议使用线程池。但线程池里的线程不会退出,无法使用 join() 方法,可以使用 CountDownLatch 在每个线程结束工作的时候执行
countDown() 方法让计数器减 11,并在调用 await() 方法处等待计数器归 00 时,继续执行。


// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
    // 计数器初始化为2
    CountDownLatch latch = new CountDownLatch(2);
    // 查询未对账订单
    executor.execute(()-> {
        pos = getPOrders();
        latch.countDown();
    });
    // 查询派送单
    executor.execute(()-> {
        dos = getDOrders();
        latch.countDown();
    });
    
    // 等待两个查询操作结束
    latch.await();
    
    // 执行对账操作
    diff = check(pos, dos);
    // 差异写入差异库
    save(diff);
}

CyclicBarrier使线程之间互相等待

CountDownLatch 类似,CyclicBarrierawait() 方法使计数器减 11 。不同的是,当计数器归 00 ,会调用回调函数,并将计数器重新设为初始值。


// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = 
  Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
    // 循环查询订单库
    Thread T1 = new Thread(()->{
        while(存在未对账订单){
            // 查询订单库
            pos.add(getPOrders());
            // 等待
            barrier.await();
        }
    });
    T1.start();  

    // 循环查询运单库
    Thread T2 = new Thread(()->{
        while(存在未对账订单){
            // 查询运单库
            dos.add(getDOrders());
            // 等待
            barrier.await();
        }
    });
    T2.start();
}