`

Java之 java.util.concurrent 包之概述

阅读更多
一、概述:java.util.concurrent (Java 并发编程工具包 ) Ref- jenkov

从 JDK1.5 开始新增了一个包:java.util.concurrent,这个包包含了一组Java类,它们使多线程开发变得容易。在新增这个包之前,你需要自己写这些工具类。

在本文中,我(用JDK1.6)将与你介绍一下这个包(java.util.concurrent)里面的类,挨个介绍。
本文不会涉及并讨论【Java并发编程】的原理。如果你对此感兴趣,请看这里:Ref- jenkov

1、 java.util.concurrent.atomic.AtomicInteger

在Java语言中,++i 和 i++ 操作并不是线程安全的,在多线程并发情况下,不可避免的会用到synchronized关键字。而AtomicInteger则提供了一种线程安全的加减操作。

/**
来看看AtomicInteger提供的接口。

- public final int get()                    //获取当前的值
- public final int getAndIncrement()        //获取当前的值,并自增
- public final int getAndDecrement()        //获取当前的值,并自减
- public final int getAndAdd(int delta)     //获取当前的值,并加上一个(正负)值
- public final int getAndSet(int newValue)  //取当前的值,并设置新的值

*/


2、java.util.concurrent.BlockingQueue 接口 Ref- jenkov



BlockingQueue 的典型应用场景是:一个线程生产对象,另一个线程则消费这个对象。

生产线程保持生产对象到队列中,直到队列满时,生产线程被阻塞。
消费线程保持消费队列中的对象,直到队列空时,消费线程被阻塞。

方法介绍:
Throws ExceptionSpecial ValueBlocksTimes Out
Insertadd(o)offer(o)put(o)offer(o, timeout, timeunit)
Removeremove(o)poll()take()poll(timeout, timeunit)
Examineelement()peek()


实现类:
        - ArrayBlockingQueue
        - DelayQueue
        - LinkedBlockingQueue
        - PriorityBlockingQueue
        - SynchronousQueue

例子:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockedQueueTest {  
	  
    public static void main(String[] args) throws Exception { 
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);  
  
        Producer producer = new Producer(queue);  
        Consumer consumer = new Consumer(queue);  
  
        new Thread(producer).start();  
        new Thread(consumer).start();  
  
        System.out.println("done!");
         
    }  
}  
  
class Producer implements Runnable{
    protected BlockingQueue<String> queue ;
    public Producer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }
    public void run() {  
        try {  
            queue.put("1");  
            Thread.sleep(1000);  
            queue.put("2");  
            Thread.sleep(1000);  
            queue.put("3");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
  
  
class Consumer implements Runnable{
    protected BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }
    public void run() {  
        try {  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  





3、java.util.concurrent.ConcurrentMap 接口 Ref- jenkov

java.util.concurrent.ConcurrentMap 接口 extends java.util.Map 接口 ,具有处理并发 put 和 get 的能力。

实现类:java.util.concurrent.ConcurrentHashMap
ConcurrentHashMap 与 java.util.HashTable 比较相似,但性能更高:读取数据时不锁Map;写数据时不会锁整个Map,只锁写的那部分。

另一个不同是:ConcurrentHashMap 不会抛 ConcurrentModificationException(当使用iterator 且 Map 数据发生改变时)。因为 Iterator 被设计为只允许一个线程使用。

例子:
      ConcurrentMap concurrentMap = new ConcurrentHashMap();
      concurrentMap.put("key", "value");
      Object value = concurrentMap.get("key");



4、Executors 框架 Ref- journaldev

JDK1.5以前,创建过多的线程可能会引起内存耗尽。所以使用线程池(ThreadPool)是一个很好的解决方案。

Executors 框架,可以创建线程池线程池(ThreadPool),并提供了对异步线程的 调用(invocation),调度(scheduling),执行(execution)的功能。

问题:如何重复使用一个线程?
解决:线程对象一旦运行结束,不可以再被从新执行。但使用阻塞消息队列,可以不断接收和处理 Runnable 或 Callable 对象。


java.util.concurrent.ExecutorService

java.util.concurrent.ExecutorService 接口是一个线程池(Thread Pool),可以异步执行线程。


import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Test;


public class TestExecutors {
	
	static class MyRunnable implements Runnable{
		@Override
		public void run() {
			System.out.println("Run, run, run!");
		}
		
	}
	
	static class MyCallable implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("Call, call, call!");
			return null;
		}
		
	}
	
	static class MyBlockedRunnale implements Runnable{
		@Override
		public void run() {
			try {
				for(int i = 1; i <= 5; i++){
					System.out.println("Sleeping " + i + "s.");
					Thread.sleep(1000);
					
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	
	
	
    /**
     *  test01_createServices
     *============================================================
     *  This ExecutorService contains the follow 5 methods 
     *  for passing it task for execution:
     *  
     * 
     *      - execute(Runnable)
     *      
     *      - submit(Runnable)
     *      - submit(Callable)
     *      
     *      - invokeAny(...)
     *      - invokeAll(...)
     *      
     *      
     */
	public void test01_createServices() throws Exception{
		
	    ExecutorService singlService    = Executors.newSingleThreadExecutor();
	    ExecutorService fixedService    = Executors.newFixedThreadPool(10);
	    ExecutorService schduService    = Executors.newScheduledThreadPool(10);
	    ExecutorService cacheService    = Executors.newCachedThreadPool();
	    
	    singlService.shutdown();
	    fixedService.shutdownNow();
	    schduService.awaitTermination(10, TimeUnit.DAYS);
	    cacheService.awaitTermination(10, TimeUnit.SECONDS);
	}
    
	
    /**
     *  test02_Runnable_ExeuteSubmit
     *  ===========================================================
     */
    public void test02_Runnable_ExeuteSubmit() throws Exception{
        
    	ExecutorService singleService    = Executors.newSingleThreadExecutor();
    
    	/*
	     * service.execute() - void
	     */
	    singleService.execute(new MyRunnable());
	    
	    /*
	     * service.submit(Runnable) - Future
	     *  - future.get(): this method is blocked, until result is returned.
	     *  - returns null if the task has finished correctly - for Runnable.
	     */
	    Future<?> future =  singleService.submit(new MyRunnable());
	    future.get(); 
	    
    }
	    
    
    /**
     *  test03_Callable_Submit
     *  ===========================================================
     */
	public void test03_Callable_Submit() throws Exception{  
	    
		ExecutorService singleService    = Executors.newSingleThreadExecutor();
		
		/*
		 * service.submit(Callable) - Future
		 * - future.get(): this method is blocked, until result is returned.
		 * - returns result if successful completion.
		 */
		Future<String> future = singleService.submit(new MyCallable());
		System.out.println("future.get() = " + future.get());
    
	}
	
	

    /**
     * test04_invokeAny()
     * ================================================================
        The invokeAny() method takes a collection of Callable objects
        
        Invoking this method does not return a Future, 
        but returns the result of one of the Callable objects. 
        
        You have no guarantee about which of the Callable's results you get. 
        Just one of the ones that finish.

        If one of the tasks complete (or throws an exception), 
        the rest of the Callable's are cancelled.
     */
	public void test04_invokeAny() throws Exception{  
    
		ExecutorService cacheService    = Executors.newCachedThreadPool();
    
		Set<Callable<String>> callables = new HashSet<Callable<String>>();
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 1";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 2";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 3";
	        }
	    });

	    String result = cacheService.invokeAny(callables);
	    System.out.println("result = " + result);
    
	}
	    

    /**
     * test05_invokeAll
     ===========================================================================
     The invokeAll() method invokes all of the Callable objects you pass to it 
     in the collection passed as parameter. 
     
     The invokeAll() returns a list of Future objects via which you can obtain 
     the results of the executions of each Callable.


     Keep in mind that a task might finish due to an exception, 
     so it may not have "succeeded". 
     
     There is no way on a Future to tell the difference.
     
     */
	public void test05_invokeAll() throws Exception{
		ExecutorService cacheService    = Executors.newCachedThreadPool();
    
		Set<Callable<String>> callables = new HashSet<Callable<String>>();
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 1";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 2";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 3";
	        }
	    });

	    List<Future<String>> futures = cacheService.invokeAll(callables);
	    for(Future<String> future : futures){
	        System.out.println("future.get = " + future.get());
	    }
    
	}


	
	public void test06_Shutdown_AwaitTermination() throws Exception{
	
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		
		/*
	     * This method does not wait for previously 
	     * submitted tasks to complete execution,
	     * and it will no longer accept new tasks.
	     * 
	     * The ExecutorService will shut down immediately.
		 */
		cacheService.shutdown();
		
		
		/*
		 * Blocks until: 
		 * 
		 * - all tasks have completed execution after a shutdown request, or 
		 * - the timeout occurs, or 
		 * - the current thread is interrupted, 
		 * 
		 * whichever happens first.
		 */
		cacheService.awaitTermination(50, TimeUnit.MINUTES);
    
    }
	
	
	

	
	@Test
	public void testShutdown(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		cacheService.shutdown();
		System.out.println("shut down...");
	}
	
	@Test
	public void testAwaitTermination(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		try {
			cacheService.awaitTermination(50, TimeUnit.MINUTES);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("shut down...");
		
	}
	
	/**
	 * Best Practice
	 */
	@Test
	public void testShutdownAndAwaitTermination(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		try {
			cacheService.shutdown();
			cacheService.awaitTermination(50, TimeUnit.MINUTES);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("shut down...");
		
	}

}







5、java.util.concurrent.locks 包

java.util.concurrent.locks 包提供了线性同步机制,跟 synchronized 关键字一样,但是功能更多。


5.1 java.util.concurrent.locks.Lock 接口 Ref- jenkov
方法概述:

      - lock()
      - lockInterruptibly()
      - tryLock()
      - tryLock(long timeout, TimeUnit timeUnit)
      - unlock()

实现类:
java.util.concurrent.locks.ReentrantLock




5.2 java.util.concurrent.locks.ReadWriteLock 接口 Ref- jenkov

提供了更高级的锁机制:可以有多个线程同时读,但只能有一个线程写。

Read Lock
取得读锁的条件:
- 没有线程在(或已经)获取写锁。

Write Lock 
取得写锁的条件:
- 没有线程在读
- 没有线程在写

实现类:
java.util.concurrent.locks.ReentrantReadWriteLock

例子:


import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestConcurrent_Lock {
    
    public void test(){
        
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();


        readWriteLock.readLock().lock();

            // multiple readers can enter this section,
            // if not locked for writing, and no writers waiting
            // to lock for writing.

        readWriteLock.readLock().unlock();


        readWriteLock.writeLock().lock();

            // only one writer can enter this section,
            // and only if no threads are currently reading.

        readWriteLock.writeLock().unlock();
    }

}

/*


readLock.lock();

This means that if any other thread is writing (i.e. holds a write lock) 
then stop here until no other thread is writing.




writeLock.lock();

This means that if any other thread is reading or writing, 
then stop here and wait until no other thread is reading or writing.



*/







java.util.concurrent包之Execuotor系列文章

00_Java之 java.util.concurrent 包之概述

01_Java之java.util.concurrent包之Executor与ExecutorService

02_Java之 java.util.concurrent 包之ExecutorService之submit () 之 Future

03_Java之多线程之Callable与Future

04_Java之多线程之Lock




转载请注明,
原文出处:http://lixh1986.iteye.com/blog/2341898












-


  • 大小: 9 KB
分享到:
评论

相关推荐

    JAVA_API1.6文档(中文)

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    [Java参考文档].JDK_API 1.6

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    Java基础知识点总结.docx

    无论是工作学习,不断的总结是必不可少的。只有不断的总结,发现问题,弥补不足,才能长久的...java.util.concurrent.locks包下常用的类 326 NIO(New IO) 327 volatile详解 337 Java 8新特性 347 Java 性能优化 362

    Java 1.6 API 中文 New

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    JavaAPI中文chm文档 part2

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    JavaAPI1.6中文chm文档 part1

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    java api最新7.0

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    java jdk-api-1.6 中文 chmd

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    [Java参考文档]

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    JDK_1_6 API

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR...

    java并发编程综合讲解

    这份资源为您提供了关于 Java 并发编程的全面讲解,着重介绍了 JUC(java.util.concurrent)库中的核心概念、工具和最佳实践。通过深入学习,您将能够更好地理解并发编程的挑战,掌握构建高性能、高可伸缩性的并发...

    bouncer:分布式专属调度库

    集成基于 java.util.concurrent.ScheduledThreadPoolExecutor 的自定义实现,因此应该可用于各种应用程序。 用法 服务器组件应该作为一个独立的 Java 应用程序使用如下命令运行: java -jarbouncer-1.0.0.jar --...

    java面试宝典

    77、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 18 78、abstract class Name { private String name; public abstract boolean isStupidName(String name) {}}这有何错误? 18 79、public class ...

    千方百计笔试题大全

    77、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 18 78、abstract class Name { private String name; public abstract boolean isStupidName(String name) {}}这有何错误? 18 79、public class ...

    OPhone应用开发权威指南(黄晓庆)

    9.2.2 java.util.concurrent框架 359 9.2.3 AsyncTask 369 9.3 网络编程接口 373 9.3.1 HttpClient API介绍 373 9.3.2 GET方法的使用和限制 378 9.3.3 使用POST方法上传附件 382 9.3.4 从服务器端下载图片 390 9.4 ...

Global site tag (gtag.js) - Google Analytics