前言

作为一位开发者,我们经常会接触到“线程”一词,线程意味着并发,但是并发编程是比较困难的。在并发编程中,我们比较关心的就是线程安全问题,解决线程安全问题常用的方法是加锁,可以是乐观锁或者悲观锁,但是我们知道锁技术是很慢的,而且加锁的过程中还很容易出现死锁的现象。Disruptor很好的解决了这些问题。

一、Disruptor究竟是什么

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单。

二、Disruptor为什么这么快

1.使用无锁算法来实现并发:

首先,Disruptor根本就不用锁。在需要确保操作是线程安全的地方,Disruptor使用CAS(Compare And Swap/Set)操作。这是一个CPU级别的指令,它的工作方式有点像乐观锁——CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。CAS操作比锁消耗资源少的多,因为它们不牵涉操作系统,它们直接在CPU上操作。

2.让消息可以通过被多个消费者并行处理

一般我们在使用队列的时候,队列中的消息只会被一个消费者使用,但是在 Disruptor 中,同一个消息可以被多个消费者同时处理,多个消费者之间是并行的。

3.减少垃圾回收:

Disruptor为了在低延迟的系统中减少进行内存分配,减少垃圾回收所带来的停顿时间,Disruptor使用 RingBuffer 来达成这个目标,在 RingBuffer 中提前创建好对象,后续通过反复利用这些对象来避免垃圾回收,这个实现是线程安全的。

4.缓存行填充

这块比较复杂,大家自行百度,这里就不再赘述。。。。

三、Disruptor实战

maven依赖

com.lmax

disruptor

3.3.2

import com.lmax.disruptor.BlockingWaitStrategy;

import com.lmax.disruptor.dsl.Disruptor;

import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executor;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

/**

*基于Disruptor的Executor,比Java自带的Executor速度快2-3倍.

* @author syp

*/

public class DisruptorExecutor implements Executor {

/**

* 线程名称

*/

private static final String DEFAULT_EXECUTOR_NAME = "disruptor-executor";

/**

* Disruptor的ringBuffer缓存大小,必须是2的幂

*/

private static final int BUFFER_SIZE = 65536;

/**

* 实际执行task的executor

*/

private final ExecutorService executor;

private final Disruptor disruptor;

public DisruptorExecutor() {

this(DEFAULT_EXECUTOR_NAME);

}

/**

* 构造函数.

*

* @param name 名称.

*/

public DisruptorExecutor(String name) {

this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));

this.disruptor = new Disruptor<>(TaskEvent::new, BUFFER_SIZE, executor, ProducerType.MULTI,

new BlockingWaitStrategy());

}

/**

* 启动DisruptorExecutor.

*/

public void startUp() {

disruptor.handleExceptionsWith(new LogExceptionHandler());

disruptor.handleEventsWith((event, sequence, endOfBatch) -> event.getTask().run());

disruptor.start();

}

/**

* 停止任务调度(阻塞直到所有提交任务完成).

*

* @return 结果.

*/

public boolean awaitAndShutdown() {

return awaitAndShutdown(Integer.MAX_VALUE, TimeUnit.SECONDS);

}

/**

* 停止任务调度(阻塞直到所有提交任务完成).

*

* @param timeout . 超时时间.

* @param timeUnit . 时间单位.

* @return 结果.

*/

public boolean awaitAndShutdown(long timeout, TimeUnit timeUnit) {

shutdown();

try {

return executor.awaitTermination(timeout, timeUnit);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

return false;

}

/**

* 停止任务调度.

*/

void shutdown() {

disruptor.shutdown();

executor.shutdown();

}

/**

* 强制停止任务调度(正在执行的任务将被停止,未执行的任务将被丢弃).

*/

void halt() {

executor.shutdownNow();

disruptor.halt();

}

/**

* 执行任务.

*

* @param task . 任务.

*/

@Override

public void execute(Runnable task) {

disruptor.getRingBuffer().publishEvent((event, sequence, buffer) -> event.setTask(task), task);

}

}

更多内容请关注微信公众号