java线程池实例(java线程池的使用例子)
华为云服务器特价优惠火热进行中! 2核2G2兆仅需 38 元;4核4G3兆仅需 79 元。购买时间越长越优惠!更多配置及优惠价格请咨询客服。
合作流程: |
今天给各位分享java线程池实例的知识,其中也会对java线程池的使用例子进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
微信号:cloud7591如需了解更多,欢迎添加客服微信咨询。
复制微信号
本文目录一览:
- 1、Java 中几种常用的线程池
- 2、Java实现通用线程池
- 3、典型Java线程池的代码及其各部分功能介绍
- 4、java常用的几种线程池实例讲解
- 5、合理使用线程池以及线程变量
- 6、java线程池(一) 简述线程池的几种使用方式
Java 中几种常用的线程池
一:newCachedThreadPool
(1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;
(2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;
(3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
(4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止
二:newFixedThreadPool
(1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
(2)其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
(3)和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
(4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
三:ScheduledThreadPool
(1)调度型线程池
(2)这个池子里的线程可以按schedule依次delay执行,或周期执行
四:SingleThreadExecutor
(1)单例线程,任意时间池中只能有一个线程
(2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

Java实现通用线程池
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThread java
package polarman threadpool;
import java util Collection; import java util Vector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool) { this pool = pool;
}
public void putTask(ThreadTask task) { tasks add(task);
}
public void putTasks(ThreadTask[] tasks) { for(int i= ; itasks length; i++) this tasks add(tasks[i]);
}
public void putTasks(Collection tasks) { this tasks addAll(tasks);
}
protected ThreadTask popTask() { if(tasks size() ) return (ThreadTask)tasks remove( );
else
return null;
}
public boolean isRunning() {
return running;
}
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while(isAlive()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true;
this notify();
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空闲 ); this wait(); }else {
ThreadTask task;
while((task = popTask()) != null) { task run(); if(stopped) {
stopped = false;
if(tasks size() ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );
break;
}
}
if(paused) {
paused = false;
if(tasks size() ) { System out println(Thread currentThread() getId() + : Tasks are paused );
break;
}
}
}
running = false;
}
if(killed) {
killed = false;
break;
}
}
}catch(InterruptedException e) {
return;
}
//System out println(Thread currentThread() getId() + : Killed );
}
}
//ThreadPool java
package polarman threadpool;
import java util Collection; import java util Iterator; import java util Vector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;
}
public void init() {
initialized = true;
for(int i= ; iinitPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize); this maxPoolSize = maxPoolSize;
if(maxPoolSize getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size;
return;
}else if(size getPoolSize()) { for(int i=getPoolSize(); isize imaxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
}else if(size getPoolSize()) { while(getPoolSize() size) { PooledThread th = (PooledThread)threads remove( ); th kill();
}
}
//System out println( 重设线程数 线程数= + threads size());
}
public int getPoolSize() { return threads size();
}
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while(!hasIdleThread getPoolSize() = maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())
return th;
}
if(getPoolSize() maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
return thread;
}
//System out println( 线程池已满 等待 );
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread();
if(th != null) { th putTask(task); th startTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReader; import java io IOException; import java io InputStreamReader;
import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 启动任务A 时长为 秒 ); System out println( size 设置当前线程池大小为 ); System out println( max 设置线程池最大线程数为 ); System out println();
final ThreadPool pool = new ThreadPool( ); pool init();
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in));
while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) words length = ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) words length = ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) words length = ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace();
}
}
}
};
cmdThread start();
/**//*
for(int i= ; i ; i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + );
try { Thread sleep(timeLen); } catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); pool init();
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixinzhi/Article/program/Java/hx/201311/27203
典型Java线程池的代码及其各部分功能介绍
( )根据xml文件来管理线程池的最大最小线程数( )对线程池通过Timer定期扫描以防止线程未激活 ( )通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目 一 配置xml(listen xml)是 ?xml version= encoding= UTF ?configConsumeThreadPoolminPools /minPools ! 线程池最小线程 maxPools /maxPools! 线程池最大线程 checkThreadPeriod /checkThreadPeriod ! 检查线程池中线程的周期 分钟 /ConsumeThreadPool/config 二 对于ConsumeThreadPoolPara的javabean: import java io *;public class ConsumeThreadPoolPara implements Serializable{private int minPools;private int maxPools;private int checkThreadPeriod;public int getMinPools(){return minPools;}public int getMaxPools(){return maxPools;}public int getCheckThreadPeriod(){return checkThreadPeriod;}public void setMinPools(int minPools){this minPools = minPools;}public void setMaxPools(int maxPools){this maxPools = maxPools;}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod;}public String toString(){return minPools+ + maxPools+ +checkThreadPeriod;}public ConsumeThreadPoolPara() {}public static void main(String[] args) {ConsumeThreadPoolPara consumeThreadPool = new ConsumeThreadPoolPara();}} 三 解析xml程序代码(生成ConsumeThreadPoolPara) 使用jdom解析 import jdom *;import jdom input SAXBuilder;import java io *;import java util *;public class ParseConfig {static Hashtable Listens = null;static ConnPara connpara = null;static ConsumeThreadPoolPara consumeThreadPoolPara = null;private static String configxml = listen xml ;static{getConsumeThreadPoolPara(); //得到消费的线程池的参数}/*** 装载文档* @return 返回根结点* @throws JDOMException*/public static Element loadDocument() throws JDOMException{SAXBuilder parser = new SAXBuilder(); // 新建立构造器try {Document document = parser build(configxml);Element root = document getRootElement();return root;}catch(JDOMException e){logger error( listen xml文件格式非法! );throw new JDOMException();}}public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){if(consumeThreadPoolPara ==null){try {Element root = loadDocument();Element consumeThreadPool = root getChild( ConsumeThreadPool );if (consumeThreadPool != null) { //代表有数据库配置consumeThreadPoolPara = new ConsumeThreadPoolPara();Element minPools = consumeThreadPool getChild( minPools );consumeThreadPoolPara setMinPools(Integer parseInt(minPools getTextTrim()));Element maxPools = consumeThreadPool getChild( maxPools );consumeThreadPoolPara setMaxPools(Integer parseInt(maxPools getTextTrim()));Element checkThreadPeriod = consumeThreadPool getChild( checkThreadPeriod );consumeThreadPoolPara setCheckThreadPeriod(Integer parseInt(checkThreadPeriod getTextTrim()));}}catch (JDOMException e) {}}return consumeThreadPoolPara;}} 四 线程池源代码 import java util *;/*** pTitle: 线程池/p* pDescription: 采集消费模块/p* pCopyright: Copyright (c) /p* pCompany: /p* @author 张荣斌* @version */public class ThreadPool {private static int minPools = ; //最小连接池数目private static int maxPools = ; //最大连接池数目private static int checkThreadPeriod = ; //检查连接池的周期ArrayList m_ThreadList; //工作线程列表LinkedList m_RunList = null; //工作任务列表int totalThread = ; //总线程数static int freeThreadCount = ; //未被使用的线程数目private java util Timer timer = null; //定时器static Object o = new Object();static{ //先初始化线程池的参数ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig getConsumeThreadPoolPara();if(consumeThreadPoolPara!=null){minPools = consumeThreadPoolPara getMinPools();maxPools = consumeThreadPoolPara getMaxPools();checkThreadPeriod = consumeThreadPoolPara getCheckThreadPeriod()* * ;}}public void setMinPools(int minPools){this minPools = minPools;}public void setMaxPools(int maxPools){this maxPools = maxPools;}public void setCheckThreadPeriod(int checkThreadPeriod){this checkThreadPeriod = checkThreadPeriod;}public ThreadPool() {m_ThreadList=new ArrayList();m_RunList=new LinkedList();for(int i= ;iminPools;i++){WorkerThread temp=new WorkerThread();totalThread = totalThread + ;m_ThreadList add(temp);temp start();try{Thread sleep( );}catch(Exception e){}}timer = new Timer(true); //启动定时器timer schedule(new CheckThreadTask(this) checkThreadPeriod);}/*** 当有一个工作来的时候启动线程池的线程* 当空闲线程数为 的时候 看总线程是否小于最大线程池的数目 就new一个新的线程 否则sleep 直到有空闲线程为止;* 当空闲线程不为 则将任务丢给空闲线程去完成* @param work*/public synchronized void run(String work){if (freeThreadCount == ) {if(totalThreadmaxPools){WorkerThread temp = new WorkerThread();totalThread = totalThread + ;m_ThreadList add(temp);temp start();synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}else{while (freeThreadCount == ) {try {Thread sleep( );}catch (InterruptedException e) {}}synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}} else {synchronized(m_RunList){m_RunList add(work);m_RunList notify();}}}/*** 检查所有的线程的有效性*/public synchronized void checkAllThreads() {Iterator lThreadIterator = erator();while (lThreadIterator hasNext()) { //逐个遍厉WorkerThread lTestThread = (WorkerThread) lThreadIterator next();if (! (lTestThread isAlive())) { //如果处在非活动状态时lTestThread = new WorkerThread(); //重新生成个线程lTestThread start(); //启动}}}/*** 打印调试信息*/public void printDebugInfo(){System out println( totalThread= +totalThread);System out println( m_ThreadList size()= +m_ThreadList size());}/**** pTitle: 工作线程类/p* @author 张荣斌* @version */class WorkerThread extends Thread{boolean running = true;String work;public void run(){while(running){synchronized(o){freeThreadCount++;}synchronized(m_RunList){while(m_RunList size() == ){try{m_RunList wait();if(!running) return;}catch(InterruptedException e){} lishixinzhi/Article/program/Java/gj/201311/27379
java常用的几种线程池实例讲解
下面给你介绍4种线程池:
1、newCachedThreadPool:
底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)
通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
适用:执行很多短期异步的小程序或者负载较轻的服务器
2、newFixedThreadPool:
底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueueRunnable() 无解阻塞队列
通俗:创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
适用:执行长期的任务,性能好很多
3、newSingleThreadExecutor
底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例,corePoolSize为1;maximumPoolSize为1;keepAliveTime为0L;unit为:TimeUnit.MILLISECONDS;workQueue为:new LinkedBlockingQueueRunnable() 无解阻塞队列
通俗:创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)
适用:一个任务一个任务执行的场景
4、NewScheduledThreadPool:
底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
适用:周期性执行任务的场景
最后给你说一下线程池任务执行流程:
当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
当workQueue已满,且maximumPoolSizecorePoolSize时,新提交任务会创建新线程执行任务
当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
合理使用线程池以及线程变量
背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
线程池概述
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
总体来说,线程池有如下的优势:
线程池的使用
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
ThreadPoolExecutor线程池有如下几种状态:
线程池提交一个任务时任务调度的主要步骤如下:
核心代码如下:
Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200, 为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
Executors常用方法有以下几个:
Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端 :
使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。
使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。
在工程实践中,通常使用下述公式来计算核心线程数:
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。
上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。
为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:
为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。
如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。
ThreadLocal线程变量概述
ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。
ThreadLocal的原理与实践
对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。
众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:
Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。
从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。
EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:
在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:
EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:
在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。
在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/dubbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:
【问题1:权益领取失败分析】
在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxModule形式,Module间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的module 的init方法先执行)。
在应用中,权益领取接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionModule的init方法会优先执行;而开发同学在CommonXXApplyModule类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权益领取失败。
【问题2:脏数据分析】
权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。
【解决方案】
在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。
若使用强引用类型,则threadlocal的引用链为:Thread - ThreadLocal.ThreadLocalMap - Entry[] - Entry - key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。
若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:
在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。
如果不执行清理操作,则可能会出现:
建议使用try...finally 进行清理。
我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏()。
在应用中,谨慎使用ThreadLocal.withInitial(Supplier? extends S supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:
总结
在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。
java线程池(一) 简述线程池的几种使用方式
首先说明下java线程是如何实现线程重用的
1. 线程执行完一个Runnable的run()方法后,不会被杀死
2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12
java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明
newFixedThreadPool
创建一个固定大小的线程池
添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。
@Test
public void newFixedThreadPool() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build());
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}12345678
newSingleThreadExecutor
仅支持单线程顺序处理任务
@Test
public void newSingleThreadExecutor() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newCachedThreadPool
这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s
@Test
public void newCachedThreadPool() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newWorkStealingPool
支持给定的并行级别,并且可以使用多个队列来减少争用。
@Test
public void newWorkStealingPool() throws Exception {
ExecutorService executorService = Executors.newWorkStealingPool();
executorService = Executors.newWorkStealingPool(1);
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}123456789
newScheduledThreadPool
看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用
支持定时周期执行任务(还没有看完)
@Test
public void newScheduledThreadPool() throws Exception {
ExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}
关于java线程池实例和java线程池的使用例子的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
