5个并发处理技巧代码示例

【译者注】在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。 以下为译文:

1.捕获InterruptedException错误

请检查下面的代码片段:

public class Task implements Runnable {
	private final BlockingQueue queue = ...;
	@Override
	 public void run() {
		while (!Thread.currentThread().isInterrupted()) {
			String result = getOrDefault(() -> queue.poll(1L, TimeUnit.MINUTES), "default");
			//do smth with the result
		}
	}
	T getOrDefault(Callable supplier, T defaultValue) {
		try {
			return supplier.call();
		}
		catch (Exception e) {
			logger.error("Got exception while retrieving value.", e);
			return defaultValue;
		}
	}
}

代码的问题是,在等待队列中的新元素时,是不可能终止线程的,因为中断的标志永远不会被恢复:

1.运行代码的线程被中断。
2.BlockingQueue # poll()方法抛出InterruptedException异常,并清除了中断的标志。
3.while中的循环条件 (!Thread.currentThread().isInterrupted())的判断是true,因为标记已被清除。

为了防止这种行为,当一个方法被显式抛出(通过声明抛出InterruptedException)或隐式抛出(通过声明/抛出一个原始异常)时,总是捕获InterruptedException异常,并恢复中断的标志。

T getOrDefault(Callable supplier, T defaultValue) {
	try {
		return supplier.call();
	}
	catch (InterruptedException e) {
		logger.error("Got interrupted while retrieving value.", e);
		Thread.currentThread().interrupt();
		return defaultValue;
	}
	catch (Exception e) {
		logger.error("Got exception while retrieving value.", e);
		return defaultValue;
	}
}

2.使用特定的执行程序来阻止操作

因为一个缓慢的操作而使整个服务器变得无响应,这通常不是开发人员想要的。不幸的是,对于RPC,响应时间通常是不可预测的。

假设服务器有100个工作线程,有一个端点,称为100 RPS。在内部,它发出一个RPC调用,通常需要10毫秒。在某个时间点,此RPC的响应时间变为2秒,在峰值期间服务器能够做的惟一的一件事就是等待这些调用,而其他端点则无法访问。

@GET
@Path("/genre/{name}")
@Produces(MediaType.APPLICATION_JSON)
public Response getGenre(@PathParam("name") String genreName) {
	Genre genre = potentiallyVerySlowSynchronousCall(genreName);
	return Response.ok(genre).build();
}

解决这个问题最简单的方法是提交代码,它将阻塞调用变成一个线程池:

@GET
@Path("/genre/{name}")
@Produces(MediaType.APPLICATION_JSON)
public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {
	response.setTimeout(1L, TimeUnit.SECONDS);
	executorService.submit(() -> {
		Genre genre = potentiallyVerySlowSynchronousCall(genreName);
		return response.resume(Response.ok(genre).build());
	}
	);
}

3.传MDC的值

MDC(Mapped Diagnostic Context)通常用于存储单个任务的特定值。例如,在web应用程序中,它可能为每个请求存储一个请求id和一个用户id,因此MDC查找与单个请求或整个用户活动相关的日志记录变得更加容易。

2017-08-27 14:38:30,893 INFO [server-thread-0] [requestId=060d8c7f, userId=2928ea66] c.g.s.web.Controller - Message.

可是如果代码的某些部分是在专用线程池中执行的,则线程(提交任务的线程)中MDC就不会被继续传值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志则没有:

@GET
@Path("/genre/{name}")
@Produces(MediaType.APPLICATION_JSON)
public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {
	try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) {
		String genreId = getGenreIdbyName(genreName);
		//Sync call
		logger.trace("Submitting task to find genre with id '{}'.", genreId);
		//'requestId' is logged
		executorService.submit(() -> {
			logger.trace("Starting task to find genre with id '{}'.", genreId);
			//'requestId' is not logged
			Response result = getGenre(genreId) //Async call
			.map(artist -> Response.ok(artist).build())
			   .orElseGet(() -> Response.status(Response.Status.NOT_FOUND).build());
			response.resume(result);
		}
		);
	}
}

这可以通过MDC#getCopyOfContextMap()方法来解决:

...
public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {
 try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) {
 ...
 logger.trace("Submitting task to find genre with id '{}'.", genreId); //'requestId' is logged
 withCopyingMdc(executorService, () -> {
  logger.trace("Starting task to find genre with id '{}'.", genreId); //'requestId' is logged
  ...
 });
 }
}
private void withCopyingMdc(ExecutorService executorService, Runnable function) {
 Map

4.更改线程名称

为了简化日志读取和线程转储,可以自定义线程的名称。这可以通过创建ExecutorService时用一个ThreadFactory来完成。在流行的实用程序库中有许多ThreadFactory接口的实现:

com.google.common.util.concurrent.ThreadFactoryBuilde+r in Guava.
org.springframework.scheduling.concurrent.CustomizableThreadFactory in Spring.
org.apache.commons.lang3.concurrent.BasicThreadFactory in Apache Commons Lang 3.
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
 .namingPattern("computation-thread-%d")
 .build();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, threadFactory);

尽管ForkJoinPool不使用ThreadFactory接口,但也支持对线程的重命名:

ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinThreadFactory = pool -> {
 ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
 thread.setName("computation-thread-" + thread.getPoolIndex());
 return thread;
};
ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads, forkJoinThreadFactory, null, false);

将线程转储与默认命名进行比较:

"pool-1-thread-3" #14 prio=5 os_prio=31 tid=0x00007fc06b19f000 nid=0x5703 runnable [0x0000700001ff9000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
...
"pool-2-thread-3" #15 prio=5 os_prio=31 tid=0x00007fc06aa10800 nid=0x5903 runnable [0x00007000020fc000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)
...
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fc06aa10000 nid=0x5303 runnable [0x0000700001df3000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
 ...

与自定义命名进行比较:

"task-handler-thread-1" #14 prio=5 os_prio=31 tid=0x00007fb49c9df000 nid=0x5703 runnable [0x000070000334a000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
...
"authentication-service-ping-thread-0" #15 prio=5 os_prio=31 tid=0x00007fb49c9de000 nid=0x5903 runnable [0x0000700003247000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)
...
"task-handler-thread-0" #12 prio=5 os_prio=31 tid=0x00007fb49b9b5000 nid=0x5303 runnable [0x0000700003144000]
 java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
 ...

想象一下,可能会不止3个线程。

5.使用LongAdder计数器

在高竞争的情况下,会采用java.util.concurrent.atomic.LongAdder进行计数,而不会采用AtomicLong/AtomicInteger。

LongAdder可以跨越多个单元间仍保持值不变,但是如果需要的话,也可以增加它们的值,但与父类AtomicXX比较,这会导致更高的吞吐量,也会增加内存消耗。

LongAdder counter = new LongAdder();
counter.increment();
...
long currentValue = counter.sum();

总结

以上就是本文关于5个并发处理技巧代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程Semaphore计数信号量详解、优化Tomcat配置(内存、并发、缓存等方面)方法详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

时间: 2017-10-29

python高并发异步服务器核心库forkcore使用方法

1 拷贝下面的代码到一个文件,并命名为forkcore.py 复制代码 代码如下: import osimport threadingimport selectimport socket class ds_forkcore(object): #async IO(epoll)    def ds_epoll(self):        epoll=select.epoll()        epoll.register(self.s.fileno(),select.EPOLLIN|select.E

Java Socket编程(四) 重复和并发服务器

文章来源:aspcn 作者:孙雯 重复和并发服务器 这个应用程序被当作一个重复的服务器.因为它只有在处理完一个进程以后才会接受另一个连接.更多的复杂服务器是并发的.它为每一个请求分配一个线程,而不是来一个处理一个.所以看起来它在同时处理多人请求.所有的商业的服务器都是并发的服务器. Java数据报类 不像面向连接的类,数据报的客户端和服务器端的类在表面上是一样的.下面的程序建立了一个客户和服务器商的数据报sockets: DatagramSocket serverSocket = new Dat

c#实现服务器性能监控并发送邮件保存日志

客户端代码 复制代码 代码如下: using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Diagnostics;using System.ServiceProcess;using System.Text;using System.Threading;using System.Management;using System.Configurat

Windows服务器应对高并发和DDOS攻击的配置方法

windows系统本身就有很多机制可以用来提高性能和安全,其中有不少可以用来应对高并发请求和DDOS攻击的情况. 通过以下配置可以改善windows服务器性能: 一.应对高并发请求: 1.TCP连接延迟等待时间 TcpTimedWaitDelay: 这是设定TCP/IP 可释放已关闭连接并重用其资源前,必须经过的时间.关闭和释放之间的此时间间隔通称 TIME_WAIT状态或两倍最大段生命周期(2MSL)状态.在此时间内,重新打开到客户机和服务器的连接的成本少于建立新连接.减少此条目的值允许 TC

Nginx+PHP(FastCGI)搭建高并发WEB服务器(自动安装脚本)第二版

本文是依照张宴的 Nginx 0.7.x + PHP 5.2.10(FastCGI)搭建胜过Apache十倍的Web服务器(第5版) 编写 原文地址 http://blog.s135.com/nginx_php_v5/ 因为编译过程和等待时间繁琐,于是就自己写了个全自动安装的shell脚本,此脚本可以随意修改,转载请注明出处. 这篇文章为这个系列的第二版,在第一版的基础上加入 1.日志切割 2.智能选择yum或者rpm安装 下载地址 注意:如果不能使用yum源,请放入系统光盘,单张dvd的,如果

IIS Web服务器支持高并发设置方法详解

适用的IIS版本:IIS 7.0, IIS 7.5, IIS 8.0 适用的Windows版本:Windows Server 2008, Windows Server 2008 R2, Windows Server 2012 1.应用程序池(Application Pool)的设置: General->Queue Length设置为65535(队列长度所支持的最大值)Process Model->Idle Time-out设置为0(不让应用程序池因为没有请求而回收)Recycling->

Java系统的高并发解决方法详解

一个小型的网站,比如个人网站,可以使用最简单的html静态页面就实现了,配合一些图片达到美化效果,所有的页面均存放在一个目录下,这样的网站对系统架构.性能的要求都很简单,随着互联网业务的不断丰富,网站相关的技术经过这些年的发展,已经细分到很细的方方面面,尤其对于大型网站来说,所采用的技术更是涉及面非常广,从硬件到软件.编程语言.mysql" target="_blank" title="MySQL知识库">数据库.WebServer.防火墙等各个领域

Linux环境下Oracle安装参数设置方法详解

前面讲了虚拟机的设置和OracleLinux的安装,接下来我们来说下Oracle安装前的准备工作. 1.系统信息查看 系统信息查看 首先服务器ip:192.168.8.120 服务器系统:Oracle Linux Server release 6.5 服务器主机名:oracle-learn 查看磁盘空间情况: [root@oracle-learn ~]# df -h Filesystem Size Used Avail Use% Mounted on /dev/sda1 32G 4.8G 26G

浅谈Nginx10m+高并发内核优化详解

何为高并发 默认的Linux内核参数考虑的是最通用场景,不符合用于支持高并发访问的Web服务器,所以需要修改Linux内核参数,这样可以让Nginx拥有更高的性能: 在优化内核时,可以做的事情很多,不过,我们通常会根据业务特点来进行调整,当Nginx作为静态web内容服务器.反向代理或者提供压缩服务器的服务器时,期内核参数的调整都是不同的,这里针对最通用的.使Nginx支持更多并发请求的TCP网络参数做简单的配置: 这些需要修改/etc/sysctl.conf来更改内核参数. 配置方法 配置详析

MySQL服务器线程数的查看方法详解

本文实例讲述了MySQL服务器线程数的查看方法.分享给大家供大家参考,具体如下: mysql重启命令: /etc/init.d/mysql restart MySQL服务器的线程数需要在一个合理的范围之内,这样才能保证MySQL服务器健康平稳地运行.Threads_created表示创建过的线程数,通过查看Threads_created就可以查看MySQL服务器的进程状态. mysql> show global status like 'Thread%'; +------------------

destoon之URL Rewrite(伪静态)设置方法详解

1.如果您的服务器支持.htaccess,则无需设置,网站根目录下的.htaccess已经设置好规则. 规则(参考http://download.destoon.com/rewrite/htaccess.txt)为: # Destoon B2B Rewrite Rules ErrorDocument 404 /404.php RewriteEngine On RewriteBase / RewriteRule ^(.*)\.(asp|aspx|asa|asax|dll|jsp|cgi|fcgi|

thinkPHP多语言切换设置方法详解

本文实例讲述了thinkPHP多语言切换设置方法.分享给大家供大家参考,具体如下: thinkphp多语言设置有点'高大上',为什么说它有点'高大上'呢?因为本人设置了好久才弄好,而本人之所以弄了好久的原因,竟然是因为'开启语言设置必须得先开启初始化系统的行为类',所以,在这里,因为本人的经验有限,姑且认为tp的多语言设置必须的先初始化tp的CheckLangBehavior.class.php 怎么初始化CheckLangBehavior.class.php呢?下面进行讲解. tp框架下面,所

在MySQL中修改密码及访问限制的设置方法详解

由于其源码的开放性及稳定性,且与网站流行编 挥镅 PHP的完美结合,现在很多站点都利用其当作后端数据库,使其获得了广泛应用.处于安全方面的考虑,需要为每一用户赋于对不同数据库的访问限制,以满足不同用户的要求.下面就分别讨论,供大家参考.    一.MySQL修改密码方法总结  首先要说明一点的是:一般情况下,修改MySQL密码是需要有mysql里的root权限的,这样一般用户是无法更改密码的,除非请求管理员帮助修改.    方法一    使用phpMyAdmin  (图形化管理MySql数据库的

Centos下IP与DNS设置方法详解

本文较为详细的讲述了Centos下IP与DNS设置方法.分享给大家供大家参考,具体如下: 1.CentOS 修改DNS 修改对应网卡的DNS的配置文件 # vi /etc/resolv.conf 修改以下内容 nameserver 8.8.8.8 #google域名服务器 nameserver 8.8.4.4 #google域名服务器 2.CentOS 修改网关 修改对应网卡的网关的配置文件 [root@centos]# vi /etc/sysconfig/network 修改以下内容 NETW

Java Web使用POI导出Excel的方法详解

本文实例讲述了Java Web使用POI导出Excel的方法.分享给大家供大家参考,具体如下: 采用Spring mvc架构: Controller层代码如下 @Controller public class StudentExportController{ @Autowired private StudentExportService studentExportService; @RequestMapping(value = "/excel/export") public void