Amino是无锁并行框架,线程安装,该框架封装了无锁算法,提供了可用于线程安全的一些数据结构,同时还内置了一些多线程调度模式。使用Amino进行软件开发有以下的优势:
1.对死锁的问题免疫
2.确保系统并发的整体进度
3.降低高并发下无锁竞争带来的性能开销
4.可以轻松使用一些成熟的无锁结构,而无需执行研发
有一种多线程同步的机制cas(compare and swap),他是基于操作系统的cas指令进行判断。
原理步骤:
1、先取出临界资源的值
2、接着将此值作为期望值,和临界值的最新值对比,如果相同,说明没有其他线程修改,直接更新;如果不相同,则说明被其他线程修改过了,回到步骤1继续
jdk内部已经实现了部分数据结构的cas无锁算法。比如AtomicInteger、AtomicIntegerArray、AtomicLong、AtomicLongArray、AtomicDouble、AtomicDoubleArray、AtomicBoolean。
但是没有对list、set、tree、Grap的实现。
Amino就是这样一款基于cas(compare and swap)无锁算法的框架,高并发,高性能。
提供了
list(LockFreeList、LockFreeVector)、
set(LockFreeSet)、
tree(LockFreeBSTree)、
Grap(UndirectedGrap)
2、Amino如何引入
不提供maven依赖,所以需要把源码下载,然后自己编译,最后把jar放到私服上即可引用。 源码下载地址:Concurrent Building Block - Browse /cbbs at SourceForge.net 编译放到maven私服,然后在pom.xml文件中引入即可融入执行项目进行整合开发,如下图所示:
3、如何使用
直接new 对应的类即可实现
list(LockFreeList、LockFreeVector)
set(LockFreeSet)
tree(LockFreeBSTree)
Grap(UndirectedGrap)
============================================================================
一.List
Amino提供了一组List的实现方式,其中最为重要的两种是LockFreeList和LockFreeVector,他们都实现了java.util.List接口;LockFreeList使用链表的作为底层的数据结构,实现了线程安全的无锁List,而LockFreeVector使用连续的数据作为底层数据结构,实现了线程安全的无锁Vector,LockFreeList和LockFreeVector的关系,就如同LinkedList和ArrayList一样;
下面我们是用LockFreeVector,LockFreeList,Vector,和实现线程安全的LinkedList进行了在高并发环境的性能进行对比。每一个测试线程AccessListThread对每一种List分别作1000次添加和删除操作:
===========================================================================
package org.jd.amino.concurrent.data.chat;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.amino.ds.lockfree.LockFreeSet;
import org.junit.Test;
public class TestLockFreeSet {
private static final int MAX_THREADS = 2000;
private static final int TASK_COUNT = 4000;
java.util.Random rand=new java.util.Random();
Set set;
public class AccessSetThread implements Runnable{
protected String name;
public AccessSetThread(){
}
public AccessSetThread(String name){
this.name=name;
}
@Override
public void run() {
try {
for(int i=0;i<500;i++)
handleSet(rand.nextInt(1000));
Thread.sleep(rand.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CounterPoolExecutor extends ThreadPoolExecutor{
private AtomicInteger count =new AtomicInteger(0);
public long startTime=0;
public String funcname="";
public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
protected void afterExecute(Runnable r, Throwable t) {
int l=count.addAndGet(1);
if(l==TASK_COUNT){
System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));
}
}
}
public Object handleSet(int index){
set.add(rand.nextInt(2000));
if(set.size()>10000)set.clear();
return null;
}
public void initSet(){
set=Collections.synchronizedSet(new HashSet());
}
public void initFreeLockSet(){
set=new LockFreeSet();
}
//@Test
public void testSet() throws InterruptedException {
initSet();
CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
long starttime=System.currentTimeMillis();
exe.startTime=starttime;
exe.funcname="testSet";
for(int i=0;i<TASK_COUNT;i++)
exe.submit(new AccessSetThread());
Thread.sleep(10000);
}
@Test
public void testLockFreeSet() throws InterruptedException {
initFreeLockSet();
CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
long starttime=System.currentTimeMillis();
exe.startTime=starttime;
exe.funcname="testLockFreeSet";
for(int i=0;i<TASK_COUNT;i++)
exe.submit(new AccessSetThread());
Thread.sleep(10000);
}
}
由测试结果来看,在高并发的情况下,Amino提供的List性能要远超出JDK内置的基于锁的List性能要高出六七倍;
在高并发环境下,使用无锁的集合可以有效的提升系统的吞吐量。通过Amino框架,可以让开发人员轻松使用这种技术;
======================================================================
Amino CBB (Concurrent Building Blocks) 类库将提供优化后的并发线程组件,适用于JDK6.0 及其以后的版本。
Amino Java 类库将涉及下面四个方面的内容:
1) 数据结构
该组件将提供一套免锁的集合类。因为这些数据结构采用免锁的运算法则来生成,所
以,它们将拥有基本的免锁组件的特性,如可以避免不同类型的死锁,不同类型的线程初始
化顺序等。
2) 并行模式
Amino 将为应用程序提供一个或几个大家熟知的并行计算模式。采用这些并行模式可
以使开发者起到事半功倍的效果,这些模式包括 Master-Worker、Map-reduce、Divide and
conquer, Pipeline 等,线程调度程序可以与这些模式类协同工作,提供了开发效率。
3) 并行计算中的一般功能
Amino 将为应用程序提供并行计算中常用的方法,例如:
a. String、Sequence 和Array 的处理方面。如Sort、Search、Merge、Rank、Compare、
Reverse、 Shuffle、Rotate 和Median 等
4)原子和STM(软件事务内存模型)
--------------------------------
在Amino 类库中,主要算法将使用锁无关的(Lock-Free)的数据结构。
原语Compare-and-swap(CAS) 是实现锁无关数据结构的通用原语。CAS 可以原子
地比较一个内存位置的内容及一个期望值,如果两者相同,则用一个指定值取替这个内存位
罝里的内容,并且提供结果指示这个操作是否成功。
CAS 操作过程是:当处理器要更新一个内存位置的值的时候,它首
先将目前内存位置的值与它所知道的修改前的值进行对比(要知道在多处理的时候,你要更
新的内存位置上的值有可能被其他处理更新过,而你全然不知),如果内存位置目前的值与
期望的原值相同(说明没有被其他处理更新过),那么就将新的值写入内存位置;而如果不
同(说明有其他处理在我不知情的情况下改过这的值咯),那么就什么也不做,不写入新的
值(现在最新的做法是定义内存值的版本号,根据版本号的改变来判断内存值是否被修改,
一般情况下,比较内存值的做法已经满足要求了)。CAS 的价值所在就在于它是在硬件级别
实现的,速度那是相当的快。
————————————————下面提供多份测试代码——————————————
import java.util.Collection;
import java.util.List;
import java.util.Vector;
import org.amino.pattern.internal.Doable;
import org.amino.pattern.internal.DynamicWorker;
import org.amino.pattern.internal.MasterWorker;
import org.amino.pattern.internal.MasterWorkerFactory;
import org.amino.pattern.internal.WorkQueue;
import org.junit.Test;
public class TestMasterWorker {
public class Pow3 implements Doable<Integer,Integer>{
@Override
public Integer run(Integer input) {
return input*input*input;
}
}
public class Pow3Dyn implements DynamicWorker<Integer,Integer>{
@Override
public Integer run(Integer w, WorkQueue<Integer> wq) {
return w*w*w;
}
}
@Test
public void testStatic() {
MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newStatic(new Pow3());
List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();
for(int i=0;i<100;i++){
keyList.add(mw.submit(i));
}
mw.execute();
int re=0;
while(keyList.size()>0){ //不等待全部执行完成,就开始求和
MasterWorker.ResultKey k=keyList.get(0);
Integer i=mw.result(k);
if(i!=null){
re+=i;
keyList.remove(0);
}
}
System.out.println(re);
}
@Test
public void testDynamic() {
MasterWorker<Integer,Integer> mw=MasterWorkerFactory.newDynamic(new Pow3Dyn());
List<MasterWorker.ResultKey> keyList=new Vector<MasterWorker.ResultKey>();
for(int i=0;i<50;i++)
keyList.add(mw.submit(i));
mw.execute(); //在已经开始执行的情况下,继续添加任务
for(int i=50;i<100;i++)
keyList.add(mw.submit(i));
int re=0;
while(keyList.size()>0){ //不等待全部执行完成,就开始求和
MasterWorker.ResultKey k=keyList.get(0);
Integer i=mw.result(k);
if(i!=null){
re+=i;
keyList.remove(0);
}
}
System.out.println(re);
}
}
===========================测试Dictionary===================================
import java.util.HashMap;
import java.util.Set;
import java.util.TreeMap;
import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;
public class TestLockFreeDictionaryDemo {
@Test
public void test(){
LockFreeDictionary<Integer ,Object> map=new LockFreeDictionary<Integer ,Object>();
for(int i=0;i<100;i++)
map.put(i, i);
Set<Integer> keys=map.keySet();
for(Integer i:keys)
System.out.println(i);
}
@Test
public void testTreeMap(){
TreeMap<Integer ,Object> map=new TreeMap<Integer ,Object>();
for(int i=0;i<100;i++)
map.put(i, i);
Set<Integer> keys=map.keySet();
for(Integer i:keys)
System.out.println(i);
}
//@Test
public void testHashMap(){
HashMap<Integer ,Object> map=new HashMap<Integer ,Object>();
for(int i=0;i<100;i++)
map.put(i, i);
Set<Integer> keys=map.keySet();
for(Integer i:keys)
System.out.println(i);
}
}
=================================测试Map====================================
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.amino.ds.lockfree.LockFreeDictionary;
import org.junit.Test;
public class TestLockFreeMap {
private static final int MAX_THREADS = 20;
private static final int TASK_COUNT = 40;
java.util.Random rand=new java.util.Random();
private static Object DUMMY=new Object();
Map map;
public class AccessMapThread implements Runnable{
protected String name;
public AccessMapThread(){
}
public AccessMapThread(String name){
this.name=name;
}
@Override
public void run() {
try {
for(int i=0;i<50000;i++)
handleMap(rand.nextInt(1000));
Thread.sleep(rand.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CounterPoolExecutor extends ThreadPoolExecutor{
private AtomicInteger count =new AtomicInteger(0);
public long startTime=0;
public String funcname="";
public CounterPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
protected void afterExecute(Runnable r, Throwable t) {
int l=count.addAndGet(1);
if(l==TASK_COUNT){
System.out.println(funcname+" spend time:"+(System.currentTimeMillis()-startTime));
}
}
}
public Object handleMap(int index){
map.put(rand.nextInt(2000), DUMMY);
return map.get(index);
}
public void initLockFreeMap(){
map=new LockFreeDictionary();
for(int i=0;i<1000;i++)
map.put(i, DUMMY);
}
public void initTreeMap(){
map=Collections.synchronizedMap(new TreeMap());
for(int i=0;i<1000;i++)
map.put(i, DUMMY);
}
@Test
public void testLockFreeMap() throws InterruptedException {
initLockFreeMap();
CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
long starttime=System.currentTimeMillis();
exe.startTime=starttime;
exe.funcname="testLockFreeMap";
Runnable r=new AccessMapThread();
for(int i=0;i<TASK_COUNT;i++)
exe.submit(r);
Thread.sleep(10000);
}
//@Test
public void testTreeMap() throws InterruptedException {
initTreeMap();
CounterPoolExecutor exe=new CounterPoolExecutor(MAX_THREADS, MAX_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
long starttime=System.currentTimeMillis();
exe.startTime=starttime;
exe.funcname="testTreeMap";
Runnable r=new AccessMapThread();
for(int i=0;i<TASK_COUNT;i++)
exe.submit(r);
Thread.sleep(10000);
}
}
======================================================================