JUC
# 概念
在高内聚低耦合的前提下,线程 操作(对外暴露的方法) 资源类
/**
* 资源类
*/
class Phone {
// 对外暴露的方法 sendEmail
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendEmail~~");
}
// 对外暴露的方法 sendSMS
public static synchronized void sendSMS() {
System.out.println("sendSMS~~");
}
}
public class Lock8 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
// 线程1
new Thread(phone::sendEmail).start();
TimeUnit.MILLISECONDS.sleep(1);
// 线程2
new Thread(Phone::sendSMS).start();
}
}
# 线程状态
/**
* 新建
*/
NEW,
/**
* 就绪、运行
*/
RUNNABLE,
/**
* 阻塞
*/
BLOCKED,
/**
* 等待
* 不会被分配CPU执行时间,如果不被唤醒则会无限期等待
*/
WAITING,
/**
* 超时等待
* 不会被分配CPU执行时间,如果不被唤醒则时间到了会自动唤醒
*/
TIMED_WAITING,
/**
* 终止、销毁
*/
TERMINATED;
# 线程间通信
判断 干活 通知
多线程交互必须防止多线程的虚假唤醒,即(判断只能用while不能用if)
class Plusminus {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
// 1判断
while (number != 0) {
condition.await();
}
// 3干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ThreadNotifyDemo {
public static void main(String[] args) {
Plusminus plusminus = new Plusminus();
new Thread(() -> {
for (int i = 0; i < 9; i++) plusminus.increment();
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 9; i++) plusminus.decrement();
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 9; i++) plusminus.increment();
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 9; i++) plusminus.decrement();
}, "D").start();
}
}
# 多线程锁的机制
一个资源类里面如果有多个synchronized方法,那么如果一个线程调用了当前锁对象this的其中一个synchronized方法了,那么就会锁住this对象的所有synchronized方法,其他线程就只能等待不能访问当前对象的其他synchronized方法。
锁的表现形式:
- 对于普通同步方法,锁是当前实例对象
- 对于静态同步方法,锁是当前类的class对象
- 对于同步方法块,锁是synchronized(...)里配置的对象
说明:
当一个线程试图访问同步代码块时,它必须先得到锁。退出或抛出异常时必须释放锁
当一个实例对象的非静态同步方法取得锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁。其他实例对象的锁与该实例对象的锁不一样所以无须等待释放锁
所有静态同步方法用的是同一把锁(类对象本身)。静态同步方法与非静态同步方法的锁是两个不同的对象所以不存在锁的竞争
# ArrayList线程不安全说明
不安全场景
在数据同时进行读写时会产生数据不一致
public class NotSafeDemo { public static void main(String[] args) { ArrayList<String> list = new ArrayList<>(); for (int i = 1; i <= 5; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
解决方法
- Vector
- Collections.synchronizedList()
- CopyOnWriteArrayList
说明
CopyOnWriteArrayList:
CopyOnWriteArrayList是一个在写时复制的容器。先将当前容器进行复制,再往新容器写入数据,写入完成后将原容器指向新容器。所以CopyOnWriteArrayList实际采用了一种读写分离的思想,读和写分别在不同的容器中进行
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
# ReentrantReadWriteLock锁
ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问
class MyCache {
private Map<String,Object> map = new HashMap<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, String value) throws InterruptedException {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t" + "开始写入数据");
TimeUnit.MILLISECONDS.sleep(300);
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "\t" + "写入数据完成");
} finally {
lock.writeLock().unlock();
}
}
public void get(String key) throws InterruptedException {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t" + "开始读取数据" + "\t" + key);
TimeUnit.MILLISECONDS.sleep(300);
System.out.println(Thread.currentThread().getName() + "\t" + "读取数据完成" + "\t" + map.get(key));
} finally {
lock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
int tempInt = i;
new Thread(() -> {
try {
myCache.put(tempInt+"",tempInt+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
},tempInt+"").start();
}
for (int i = 1; i <= 5; i++) {
int tempInt = i;
new Thread(() -> {
try {
myCache.get(tempInt+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
},tempInt+"").start();
}
}
}
# 阻塞队列
当队列为空时,从队列中获取元素将阻塞。 当队列为满时,从队列中添加元素将阻塞。 因为是队列,所以我们理应想到先进先出。
# 线程池
线程池优势:
线程池的任务只是控制运行的线程数量,处理过程中将任务放入队列,然后线程创建后启动这些任务,如果线程数量超过最大线程数,超出的线程将排队等候,等其他线程执行完任务,再从队列中取出任务执行
特点:
- 线程复用
- 降低资源消耗,通过重复利用已经创建的线程可以降低线程创建和销毁的资源消耗
- 提高响应速度,当任务到达时,可以不用等待线程的创建就能立即执行
- 控制最大并发数
- 管理线程
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以统一的分配、调优和监控
线程池类图:
Executors创建线程的三种方法:
// 固定容量
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 单例的、单个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 缓存的 即超出就自动创建线程的
ExecutorService threadPool = Executors.newCachedThreadPool();
ThreadPoolExecutor:
/**
*corePoolSize:指定了线程池中的核心线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
*maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
*keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
*unit:keepAliveTime的单位
*workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
*threadFactory:线程工厂,用于创建线程,一般用默认即可;
*handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
**/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
拒绝策略
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下:
1、AbortPolicy:该策略会直接抛出异常,阻止系统正常工作;
2、CallerRunsPolicy:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
3、DiscardOldestPolicy:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个 任务,并尝试再次提交;
4、DiscardPolicy:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
线程池工作流程: 执行任务、放入队列,扩容线程,拒绝策略
- 创建线程池后,开始等待请求任务
- 当调用execute()方法添加一个任务时,线程池开始判断:
- 如果当前线程运行数量 < corePoolSize,立即创建一个核心线程执行当前任务
- 如果当前线程运行数量 >= corePoolSize,将任务放入workQueue
- 如果workQueue已经满了但是当前线程运行数量 < maximumPoolSize,立即创建一个非核心线程执行当前任务
- 如果workQueue已经满了且当前线程运行数量 >= maximumPoolSize,那么线程池会执行handler拒绝任务
- 当一个线程执行完一个任务时,会从workQueue中取出一个任务执行
- 当一个线程空闲时间 > keepAliveTime 时,线程池开始判断:
- 如果当前线程数量 > corePoolSize,销毁当前空闲线程
- 如果线程池完成所以任务后,销毁所有非核心线程,保持到corePoolSize数量