博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发编程实战学习(3)--基础构建模块
阅读量:5997 次
发布时间:2019-06-20

本文共 7406 字,大约阅读时间需要 24 分钟。

转自:java并发编程实战

5.3阻塞队列和生产者-消费者模式

BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到空间可用;如果队列为空,那么take方法将阻塞直到有元素可用。队列可以是有界的也可以是无界的。

如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存。同样,put方法的阻塞特性也极大地简化了生产者的编码。如果使用有界队列,当队列充满时,生产者将阻塞并不能继续生产工作,而消费者就有时间来赶上工作的进度。阻塞队列同样提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败的状态。这样你就能创建更多灵活的策略来处理负荷过载的情况。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:他们能一直并防止产生过多的工作项,使应用程序在负荷过载的情况下边的更加健壮。

/** * java并发编程实战 * 5.3.1桌面搜索 * 爬虫查找所有文件并放入队列 * Created by mrf on 2016/3/7. */public class FileCrawler implements Runnable {    private final BlockingQueue
fileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue
fileQueue, FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } @Override public void run() { try { crawl(root); } catch (InterruptedException e) { //恢复中断 Thread.currentThread().interrupt(); e.printStackTrace(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries!=null){ for (File entry : entries) { if (entry.isDirectory()){ crawl(entry); }else if (!alreadyIndexed(entry)){ fileQueue.put(entry); } } } } private boolean alreadyIndexed(File entry){ //检查是否加入索引 return false; }}/** * 消费者 * 将爬虫结果队列取出并加入索引 */class Indexer implements Runnable{ private static final int BOUND = 100; private static final int N_CONSUMERS = 2; private final BlockingQueue
queue; Indexer(BlockingQueue
queue) { this.queue = queue; } @Override public void run() { try { while (true){ indexFile(queue.take()); } }catch (InterruptedException e){ Thread.currentThread().interrupt(); } } private void indexFile(File take) { //将文件加入索引 } public static void startIndexing(File[] roots){ BlockingQueue
queue = new LinkedBlockingDeque<>(BOUND); FileFilter fileFilter = new FileFilter() { @Override public boolean accept(File pathname) { return true; } }; for (File root:roots) { new Thread(new FileCrawler(queue,fileFilter,root)).start(); } for (int i = 0; i < N_CONSUMERS; i++) { new Thread(new Indexer(queue)).start(); } }}

  5.5信号量

Semaphore中管理着一组虚拟的许可(permit)。许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

/** * java 并发编程实战 * 5-14使用Semaphore做容器设置边界 * 信号量 * Created by mrf on 2016/3/8. */public class BoundedHashSet
{ private final Set
set; private final Semaphore sem;// public BoundedHashSet(Set
set, Semaphore sem) {// this.set = set;// this.sem = sem;// } public BoundedHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet
()); sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; }finally { if (!wasAdded){ sem.release(); } } } public boolean remove(Object o){ boolean wasRemoved = set.remove(o); if (wasRemoved){ sem.release(); } return wasRemoved; }}

  5.6构建高效且可伸缩的结果缓存

/** * java并发编程实战 * 5-16使用HashMap和不同机制来初始化缓存 * 实现将曾经计算过的命令缓存起来,方便相同的计算直接出结果而不用重复计算 * Created by mrf on 2016/3/8. */public interface Computable
{ V compute(A arg) throws InterruptedException;}class ExpensiveFunction implements Computable
{ @Override public BigInteger compute(String arg) throws InterruptedException { //在经过长时间的计算后 return new BigInteger(arg); }}/** * 保守上锁办法 * 每次只有一个线程能执行compute,性能差 * @param
* @param
*/class Memoizer1
implements Computable
{ @GuardedBy("this") private final Map
cache = new HashMap<>(); private final Computable
c; public Memoizer1(Computable
c) { this.c = c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result==null){ result = c.compute(arg); cache.put(arg,result); } return result; }}/** * 5-17 * 改用ConcurrentHashMap增强并发性 * 但还有个问题,就是只有计算完的结果才能缓存,正在计算的没有缓存, * 这将导致一个长时间的计算没有放入缓存,另一个又开始重复计算。 * @param
* @param
*/class Memoizer2
implements Computable
{ private final Map
cache = new ConcurrentHashMap<>(); private final Computable
c; Memoizer2(Computable
c) { this.c = c; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result ==null){ result = c.compute(arg); cache.put(arg,result); } return result; }}/** * 几乎完美:非常好的并发性,缓存正在计算中的结果 * 但compute模块中if代码块是非原子性的,这样可能导致两个相同的计算 * @param
* @param
*/class Memoizer3
implements Computable
{ private final Map
> cache = new ConcurrentHashMap<>(); private final Computable
c; Memoizer3(Computable
c) { this.c = c; } @Override public V compute(final A arg) throws InterruptedException { Future
f = cache.get(arg); if (f==null){ Callable
eval = new Callable
() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask
ft = new FutureTask
(eval); f = ft; cache.put(arg,ft); ft.run(); } try { return f.get(); } catch (ExecutionException e) { //抛出正在计算 e.printStackTrace(); } return null; }}/** * 使用ConcurrentHashMap的putIfAbsent解决原子问题 * 若计算取消则移除 * @param
* @param
*/class Memoizer
implements Computable
{ private final ConcurrentHashMap
> cache = new ConcurrentHashMap<>(); private final Computable
c; Memoizer(Computable
c) { this.c = c; } @Override public V compute(final A arg) throws InterruptedException { while (true){ Future
f = cache.get(arg); if (f==null){ Callable
eval = new Callable
() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask
ft = new FutureTask
(eval); f = cache.putIfAbsent(arg,ft); if (f==null){ f = ft;ft.run(); } } try { return f.get(); } catch (CancellationException e){ cache.remove(arg,f); } catch(ExecutionException e) { //抛出正在计算 e.printStackTrace(); } return null; } }}

  小结:

  • 可变状态是直观重要的(It's the mutable state,stupid)。所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程的安全性。
  • 尽量将域声明为final类型,除非需要他们是可变的。
  • 不可变对象一定是线程安全的。不可变对象能极大地降低并发编程的复杂性。他们更为简单而且可以任意共享而无须使用加锁或保护性复制等机制。
  • 封装有助于管理复杂性。在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁来保护每个可变变量。
  • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
  • 在执行复合操作期间,要持有锁。
  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。
  • 不要故作聪明地腿短出不需要使用同步。
  • 在设计过程中考虑线程安全,或者在文档中明确地指出他不是线程安全的。
  • 将同步策略文档化。

  

唯有不断学习方能改变! --
Ryan Miao

转载地址:http://ephlx.baihongyu.com/

你可能感兴趣的文章
initcall_debug简要说明【转】
查看>>
SQL Server查看Sql语句执行的耗时和IO消耗
查看>>
评审的艺术——谈谈现实中的代码评审 专题
查看>>
Vue框架引入JS库的正确姿势
查看>>
windows 7 64位出现Oracle中文乱码
查看>>
OnCheckedChangeListener和setChecked之间冲突问题解决
查看>>
DPDK 分析
查看>>
PhotoShop 图像处理 算法 汇总
查看>>
017-通过govendor管理依赖包
查看>>
通通玩blend美工(5)——旋转木马,交互性设计
查看>>
java并发线程池---了解ThreadPoolExecutor就够了
查看>>
Json转list,二层解析转换
查看>>
【mysql】mysql中varcher属性最大值能存多长
查看>>
Java程序监控指标
查看>>
【设计模式】桥接模式 Bridge Pattern
查看>>
Spring之Bean的配置方式
查看>>
slice全解析
查看>>
阿里云线上ROS静态路由转发,有大坑。
查看>>
基于SpringBoot的项目管理后台
查看>>
Logstash使用grok插件解析Nginx日志
查看>>