创建线程
Thread thread = new Thread("thread-1"){
@Override
public void run() {
logger.info("hello world");
}
};
thread.start();
Runnable runnable = new Runnable() {
public void run() {
logger.info("hello world");
}
};
Thread thread = new Thread(runnable,"thread");
thread.start();
Runnable runnable = ()->{
logger.info("hello world");
};
Thread thread = new Thread(runnable,"thread");
thread.start();
FutureTask<String> task = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
logger.info("running start...");
Thread.sleep(1000);
logger.info("running end...");
return "ok";
}
});
Thread thread = new Thread(task,"thread");
thread.start();
logger.info(task.get());//等待线程返回的结果
Thread常见方法
- start():启动新的线程,线程处于就绪状态
- run():线程启动调用的方法
直接调用run方法时则由主线程进行执行
- join():等待线程运行结束(调用线程加入当前线程执行)
- setPriority():设置线程优先级
- getState():获取线程状态
- sleep(n):线程休眠n毫秒,结束后处于就绪状态
- yield():让出CPU控制权
- interrupt():线程中断状态设为true
sleep/wait/join状态下,线程抛出InterruptedException,sleep清除打断标记
- interrupted():返回当前线程的中断状态;将当前线程的中断状态设为false,即清除打断标记
- isInterrupted():返回当前线程的中断状态,不清除打断标记
Thread thread = new Thread( new Runnable(){
public void run(){
// 若未发生中断,就正常执行任务
while(!Thread.currentThread.isInterrupted()){
// 正常任务代码……
}
// 中断的处理代码……
doSomething();
}
} ).start();
doSomething();
thread.interrupt();
- setDaemon():设置为守护线程(非守护线程执行完毕,守护线程即停止)
synchronized
synchronized (对象){
//临界区
}
线程进入临界区需要获得锁,临界区的代码执行完才会释放锁,并唤醒阻塞线程
static int count = 0;
static Object lock = new Object();
public static void main(String[] args){
Thread increment = new Thread(()->{
synchronized (lock){
count++;
}
});
Thread decrement = new Thread(()->{
synchronized (lock){
count--;
}
});
increment.start();
decrement.start();
increment.join();
decrement.join();
logger.info("count:"+count);//count始终为0
}
class Utils{
private int count = 0;
public void increment(){
synchronized (this){
count++;
}
}
public void decrement(){
synchronized (this){
count--;
}
}
public int getCount(){
return this.count;
}
}
public static void main(String[] args) {
Utils utils = new Utils();
Thread inc = new Thread(()->{
utils.increment();
});
Thread dec = new Thread(()->{
utils.decrement();
});
inc.start();
dec.start();
inc.join();
dec.join();
logger.info("count:"+utils.getCount());
}
public synchronized void increment(){
//doSomething
}
public void increment(){
synchronized (this){
//doSomething
}
}
//以上等价
public static synchronized void increment(){
//doSomething
}
public void increment(){
synchronized (Utils.class){
//doSomething
}
}
//以上等价
name.Class和this不是同一个对象,类对象只有一个,this为实例,可以有多个
Monitor(重量级锁)
加synchronized对象会和一个对应的Monitor对象关联(对象的MarkWord置为Monitor对象的指针),当线程成为Monitor的owner时才能执行临界区的代码,即获得锁,否则阻塞,添加到EntryList中,当owner为空时竞争锁。
WaitSet中的线程为已经获得过锁,但条件不满足放弃锁,进入waiting状态。owner线程调用notify()时唤醒WaitSet中的线程。
轻量级锁
线程访问临界区的时间错开,置锁对象的MarkWord为栈帧中琐记录值来上锁
锁重入:同一线程对同一对象再加锁
偏向锁:用threadId替换锁对象的markWord,若有锁重入,即再次加锁时检测锁对象的MarkWord是否等于threadId,一致则仅在栈帧中添加一条琐记录
public void increment(){
synchronized (this){
count++;
decrement();
}
}
public void decrement(){
synchronized (this){
count--;
}
}
锁膨胀
Thread-1尝试进行轻量级加锁,但Thread-0已加锁,则Thread-1为Object对象申请Monitor锁对象,进行重量级加锁,Thread-1进入Monitor的EntryList
Thread-0进入轻量级锁流程,由于此时为重量级加锁,失败
执行重量级解锁流程,即置Monitor对象的owner为空,并唤醒EntryList中的阻塞线程
wait/notify
只有获得锁的线程才能调用wait/notify方法,即需在synchronized代码块中调用(此时为重量级锁)
wait()会释放锁,sleep()不会释放锁
static Logger logger = Logger.getLogger("log");
static Object lock = new Object();
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread thread = new Thread(()->{
synchronized (lock){
logger.info("执行...");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("被唤醒,执行...");
}
});
thread.start();
Thread.sleep(1000);
synchronized (lock){
logger.info("主线程执行...");
logger.info("唤醒子线程...");
lock.notify();
}
}
十二月 04, 2020 3:51:24 下午 Main lambda$main$0
信息: 执行...
十二月 04, 2020 3:51:25 下午 Main main
信息: 主线程执行...
十二月 04, 2020 3:51:25 下午 Main main
信息: 唤醒子线程...
十二月 04, 2020 3:51:25 下午 Main lambda$main$0
信息: 被唤醒,执行...
同步-保护性暂停
class GuardObject {
private String response;
public String get() {
synchronized (this){
while (response == null)
try {
this.wait();
}catch (Exception e){
e.printStackTrace();
}
return response;
}
}
//等待超时方法
public String get(long timeout) {
synchronized (this){
long begin = System.currentTimeMillis();
long passedTime = 0;
while (response == null){
long waitTime = timeout - passedTime;
if(waitTime <= 0) break;
try {
this.wait(waitTime);
}catch (Exception e){
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - begin;
}
return response;
}
}
public void set(String response) {
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}
public static void main(String[] args){
GuardObject guardObject = new GuardObject();
new Thread(()->{
String res = guardObject.get();
//String res = guardObject.get(1000);
logger.info("response:"+res.toString());
}).start();
new Thread(()->{
//Thread.sleep(2000);
guardObject.set("I set something.");
}).start();
}
信息: response:I set something.
生产者-消费者
class MessageQueue {
Logger logger = Logger.getLogger("log");
private LinkedList<Message> queue = new LinkedList<>();
private int capcity;
public MessageQueue(int capcity){
this.capcity = capcity;
}
public Message take(){
synchronized (queue){
while (queue.isEmpty()){
try {
logger.info("queue is empty,waiting...");
queue.wait();
}catch (Exception e){
e.printStackTrace();
}
}
Message message = queue.removeFirst();
logger.info("consume message,"+message);
queue.notifyAll();
return message;
}
}
public void put(Message message){
synchronized (queue){
while (queue.size() == capcity){
try {
logger.info("queue is full,waiting...");
queue.wait();
}catch (Exception e){
e.printStackTrace();
}
}
queue.addLast(message);
logger.info("add message,"+message);
queue.notifyAll();
}
}
@Override
public String toString() {
return "MessageQueue{" +
"queue=" + queue +
", capcity=" + capcity +
'}';
}
}
final class Message{
private int id;
private String value;
public Message(int id,String value){
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public String getValue() {
return value;
}
}
static Logger logger = Logger.getLogger("log");
public static void main(String[] args){
final MessageQueue messageQueue = new MessageQueue(5);
//三个生产者线程
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(()->{
messageQueue.put(new Message(id,"value:"+id));
},"生产者"+i).start();
}
//消费者线程
new Thread(()->{
while (true){
Message message = messageQueue.take();
logger.info(message.toString());
}
}).start();
}
十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@483b79f5
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@483b79f5
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@483b79f5
十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@6a6daa45
十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@3555b188
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@6a6daa45
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@6a6daa45
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@3555b188
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@3555b188
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: queue is empty,waiting...
Park/Unpark
LockSupport.park();//暂停当前线程
LockSupport.unpark(Thread thread);//恢复某个线程的运行
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
logger.info("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("park...");
LockSupport.park();
logger.info("resume...");
});
thread.start();
Thread.sleep(2000);
logger.info("unpark...");
LockSupport.unpark(thread);
}
十二月 10, 2020 4:55:48 下午 Main lambda$main$0
信息: start...
十二月 10, 2020 4:55:49 下午 Main lambda$main$0
信息: park...
十二月 10, 2020 4:55:50 下午 Main main
信息: unpark...
十二月 10, 2020 4:55:50 下午 Main lambda$main$0
信息: resume...
死锁
产生死锁的必要条件:互斥条件、请求和保持条件、不剥夺条件、环路等待条件
#哲学家进餐问题
semaphore chopstick[5]={1,1,1,1,1};//五根筷子
semaphore mutex = 1;//设置取筷子的信号量
//i号哲学家进程
Pi(){
do{
wait(mutex);
wait(chopstick[i]);//取左边筷子
wait(chopstick[(i+1]%5);//取右边筷子
signal(mutex);
eat;
signal(chopstick[i]);
signal(chopstick[(i+1]%5);
think;
}
}
ReentrantLock
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();//获得锁
try {
//临界区
}finally {
reentrantLock.unlock();//释放锁
}
ReentrantLock 可重入
static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
reentrantLock.lock();
try {
logger.info("start");
enter();
}finally {
reentrantLock.unlock();
}
}
public static void enter(){
reentrantLock.lock();
try {
logger.info("enter");
exit();
}finally {
reentrantLock.unlock();
}
}
public static void exit(){
reentrantLock.lock();
try {
logger.info("exit");
}finally {
reentrantLock.unlock();
}
}
十二月 11, 2020 3:29:48 下午 Main main
信息: start
十二月 11, 2020 3:29:48 下午 Main enter
信息: enter
十二月 11, 2020 3:29:48 下午 Main exit
信息: exit
ReentrantLock 可中断
static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
try {
logger.info("尝试获得锁...");
reentrantLock.lockInterruptibly();//可打断
} catch (InterruptedException e) {
logger.warning("被打断.。.");
e.printStackTrace();
return;
}
try {
logger.info("获得锁...");
}finally {
reentrantLock.unlock();
}
},"thread");
thread.start();
thread.interrupt();
}
十二月 11, 2020 3:28:52 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:28:52 下午 Main lambda$main$0
警告: 被打断.。.
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at Main.lambda$main$0(Main.java:21)
at java.lang.Thread.run(Thread.java:748)
ReentrantLock 锁超时
static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
logger.info("尝试获得锁...");
if(!reentrantLock.tryLock()){
logger.info("获得锁失败");
return;
}
try {
logger.info("获得锁...");
}finally {
reentrantLock.unlock();
}
},"thread");
reentrantLock.lock();//主线程先获得锁
thread.start();
}
十二月 11, 2020 3:36:11 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:36:11 下午 Main lambda$main$0
信息: 获得锁失败
static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
logger.info("尝试获得锁...");
try {
if(!reentrantLock.tryLock(1, TimeUnit.SECONDS)){
logger.info("获得锁失败");
return;
}
} catch (InterruptedException e) {
logger.info("获得锁失败");
e.printStackTrace();
return;
}
try {
logger.info("获得锁...");
}finally {
reentrantLock.unlock();
}
},"thread");
reentrantLock.lock();//主线程先获得锁
thread.start();
}
十二月 11, 2020 3:38:11 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:38:12 下午 Main lambda$main$0
信息: 获得锁失败
ReentrantLock 条件变量
static Logger logger = Logger.getLogger("log");
static ReentrantLock reentrantLock = new ReentrantLock();
static Condition condition_s = reentrantLock.newCondition();
static Condition condition_g = reentrantLock.newCondition();
static boolean has_s = false;
static boolean has_g = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
reentrantLock.lock();
try {
logger.info("is there a s ?");
while (!has_s){
condition_s.await();//等待s条件
}
if(has_s){
logger.info("get s");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
new Thread(()->{
reentrantLock.lock();
try {
logger.info("is there a g ?");
while (!has_s){
condition_g.await();//等待g条件
}
if(has_s){
logger.info("get g");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
Thread.sleep(2000);
new Thread(()->{
reentrantLock.lock();
try {
logger.info("add s");
has_s = true;
condition_s.signal();
}finally {
reentrantLock.unlock();
}
}).start();
Thread.sleep(1000);
new Thread(()->{
reentrantLock.lock();
try {
logger.info("add g");
has_g = true;
condition_g.signal();
}finally {
reentrantLock.unlock();
}
}).start();
}
十二月 11, 2020 4:30:39 下午 Main lambda$main$0
信息: is there a s ?
十二月 11, 2020 4:30:39 下午 Main lambda$main$1
信息: is there a g ?
十二月 11, 2020 4:30:41 下午 Main lambda$main$2
信息: add s
十二月 11, 2020 4:30:41 下午 Main lambda$main$0
信息: get s
十二月 11, 2020 4:30:42 下午 Main lambda$main$3
信息: add g
十二月 11, 2020 4:30:42 下午 Main lambda$main$1
信息: get g
volatile 可见性
可见性:某个线程修改了共享变量,其他线程能够实时的知晓其改变
1.适用于一个写线程、多个读线程;禁止volatile共享变量前指令的重排序
2.共享变量易变,多个线程每次从主存中而非缓存中读取(读屏障),并同步对共享变量的改动至主存(写屏障)
3.保证多个线程对其的可见性,防止某线程改变了其值,其他线程仍从缓存中读取旧值
4.synchronized只能保证完全交由synchronized保护的共享变量的原子性、有序性(单例模式-双重锁机制:instance共享变量需设为volatile,阻止指令重排序)
volatile static boolean flag = false;
happens-before原则
加锁
volatile变量
线程start()方法调用前的所有操作对新创建的线程可见
调用join()方法线程的操作对因其阻塞的线程可见
线程操作对其被调用interrupt的线程可见
无锁模式
适合线程数小且多核cpu场景
class AccountCas{
private AtomicInteger balance;
public AccountCas(int balance){
this.balance = new AtomicInteger(balance);
}
public int getBalance() {
return balance.get();
}
public void withdraw(int amount){
while (true){
int prev = getBalance();
int next = prev - amount;
//prev和balance比较,查看是否有其他线程修改其值,被修改则进入下一次循环
if(balance.compareAndSet(prev,next)){
break;
}
//balance.getAndAdd(-1*amount);等价以上
}
}
}
原子整数
//AtomicInteger
AtomicInteger i = new AtomicInteger(0);
System.out.println("i:"+i.get());
System.out.println("++i:"+i.incrementAndGet());
System.out.println("--i:"+i.decrementAndGet());
System.out.println("i++:"+i.getAndIncrement());
System.out.println("i--:"+i.getAndDecrement());
System.out.println("i:"+i.get());
System.out.println("i+(2):"+i.addAndGet(2));
System.out.println("i * 10:"+i.updateAndGet(value->value * 10));
i:0
++i:1
--i:0
i++:0
i--:1
i:0
i+(2):2
i * 10:20
原子引用
class AccountCas{
private AtomicReference<BigDecimal> balance;
public AccountCas(BigDecimal balance){
this.balance = new AtomicReference<BigDecimal>(balance);
}
public BigDecimal getBalance() {
return balance.get();
}
public void withdraw(BigDecimal amount){
while (true){
BigDecimal prev = getBalance();
BigDecimal next = prev.subtract(amount);
if(balance.compareAndSet(prev,next)){
break;
}
}
}
}
//AtomicStampedReference
static Logger logger = Logger.getLogger("log");
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
logger.info("main start");
String prev = ref.getReference();
new Thread(()->{
int stamp = ref.getStamp();
logger.info("change A -> B");
ref.compareAndSet(ref.getReference(),"B",stamp,stamp++);
logger.info("ref.getReference():"+ref.getReference());
}).start();
Thread.sleep(1000);
new Thread(()->{
int stamp = ref.getStamp();
logger.info("change B -> A");
ref.compareAndSet(ref.getReference(),"A",stamp,stamp++);
logger.info("ref.getReference():"+ref.getReference());
}).start();
Thread.sleep(1000);
int stamp = ref.getStamp();
ref.compareAndSet(prev,"C",stamp,stamp++);
logger.info("ref.getReference():"+ref.getReference());
}
十二月 17, 2020 5:08:24 下午 Main main
信息: main start
十二月 17, 2020 5:08:24 下午 Main lambda$main$0
信息: change A -> B
十二月 17, 2020 5:08:24 下午 Main lambda$main$0
信息: ref.getReference():B
十二月 17, 2020 5:08:25 下午 Main lambda$main$1
信息: change B -> A
十二月 17, 2020 5:08:25 下午 Main lambda$main$1
信息: ref.getReference():A
十二月 17, 2020 5:08:26 下午 Main main
信息: ref.getReference():C
自定义线程池
//拒绝策略
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
class ThreadPool{
Logger logger = Logger.getLogger("log");
private BlockingQueue<Runnable> taskQueue;//任务队列
private HashSet<Worker> workers = new HashSet<>();
private int coreSize;//线程数量
private long timeout;//获取任务超时时间
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize,long timeout,TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//执行任务
public void execute(Runnable task){
synchronized (workers){ //保证works集合的线程安全
if(workers.size() < coreSize){
Worker worker = new Worker(task);
logger.info("创建worker:"+worker);
workers.add(worker);
worker.start();
}
else {
logger.info("任务加入队列:"+task);
//taskQueue.put(task);替换为自定义处理方式
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
//while (task != null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.take(timeout,timeUnit)) != null){
try {
logger.info("执行任务:"+task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
logger.info("worker被移除:"+this);
workers.remove(this);
}
}
}
}
//任务阻塞队列
class BlockingQueue<T>{
private Deque<T> queue = new ArrayDeque<>(); //任务队列
private ReentrantLock lock = new ReentrantLock();//队列任务锁
private Condition fullWaitSet = lock.newCondition();//队列满
private Condition emptyWaitSet = lock.newCondition();//队列空
private int capacity;//队列容量
public BlockingQueue(int queueCapacity) {
this.capacity = queueCapacity;
}
public T take(){
lock.lock();
try{
while (queue.isEmpty()){
try {
emptyWaitSet.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
};
public T take(long timeout, TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while (queue.isEmpty()){
try {
if(nanos <= 0) return null;
nanos = emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
};
public void put(T element){
lock.lock();
try {
while (queue.size() == capacity){
try{
fullWaitSet.await();
}catch (InterruptedException e){
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
//超时
public boolean offer(T element,long timeout,TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity){
try{
if(nanos <= 0) return false;
nanos = emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity){
rejectPolicy.reject(this,task);
}
else {
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try {
return capacity;
}finally {
lock.unlock();
}
}
}
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10,
((queue, task)->{
//queue.put(task);//死等
//queue.offer(task,5,TimeUnit.SECONDS);//超时等待
//throw new RuntimeException("任务执行失败.");//抛出异常
task.run(); //主线程执行任务
}));
for (int i = 0; i < 5; i++){
int index = i;
threadPool.execute(()->{
logger.info(""+index);
});
}
}
java.util.concurrent
自定义锁
//自定义锁(不可重入锁)
class MyLock implements Lock{
//同步器类 独占锁 AQS
class MySync extends AbstractQueuedSynchronizer{
@Override //尝试获得锁
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());//设置锁的持有者线程
return true;
}
return false;
}
@Override //尝试释放锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition(){
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override //加锁 不成功进入等待队列等待
public void lock() {
sync.acquire(1);
}
@Override //可打断的加锁
public void lockInterruptibly() throws InterruptedException {
sync.tryAcquire(1);
}
@Override //尝试加锁 仅尝试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
try{
logger.info("locking");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
logger.info("unlocking");
lock.unlock();
}
}).start();
new Thread(()->{
lock.lock();
try{
logger.info("locking");
}
finally {
logger.info("unlocking");
lock.unlock();
}
}).start();
}
一月 05, 2021 4:05:27 下午 Main lambda$main$0
信息: locking
一月 05, 2021 4:05:28 下午 Main lambda$main$0
信息: unlocking
一月 05, 2021 4:05:28 下午 Main lambda$main$1
信息: locking
一月 05, 2021 4:05:28 下午 Main lambda$main$1
信息: unlocking
读写锁
并发的读操作不互斥,并发读操作和写操作互斥。
class DataContainer{
static Logger logger = Logger.getLogger("log");
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock= rw.readLock();
private ReentrantReadWriteLock.WriteLock writeLock= rw.writeLock();
public Object read(){
logger.info("get readlock...");
readLock.lock();
try {
logger.info("reading...");
return data;
}finally {
logger.info("release readlock...");
readLock.unlock();
}
}
public void write(){
logger.info("get writelock...");
writeLock.lock();
try {
logger.info("writing...");
}finally {
logger.info("release writelock...");
writeLock.unlock();
}
}
}
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer();
/*
new Thread(()->{
dataContainer.read();
}).start();
new Thread(()->{
dataContainer.read();
}).start();
*/
new Thread(()->{
dataContainer.write();
}).start();
new Thread(()->{
dataContainer.read();
}).start();
}
一月 06, 2021 11:29:39 上午 DataContainer write
信息: get writelock...
一月 06, 2021 11:29:39 上午 DataContainer read
信息: get readlock...
一月 06, 2021 11:29:40 上午 DataContainer write
信息: writing...
一月 06, 2021 11:29:40 上午 DataContainer write
信息: release writelock...
一月 06, 2021 11:29:40 上午 DataContainer read
信息: reading...
一月 06, 2021 11:29:40 上午 DataContainer read
信息: release readlock...
StampedLock
class DataContainer{
static Logger logger = Logger.getLogger("log");
private int data;
private final StampedLock lock = new StampedLock();
DataContainer(int data){
this.data = data;
}
public int read(int readTime) throws InterruptedException {
logger.info("tryOptimisticRead...");
long stamp = lock.tryOptimisticRead();
logger.info("stamp:"+stamp);
Thread.sleep(readTime);
if(lock.validate(stamp)){ //戳是否未被更新
logger.info("read finish(OptimisticRead)...");
return data;
}
logger.info("get readlock...");
try {
stamp = lock.readLock();
logger.info("stamp:"+stamp);
logger.info("reading...");
Thread.sleep(readTime);
logger.info("read finish...");
return data;
}finally {
logger.info("release readlock...");
lock.unlockRead(stamp);
}
}
public void write(int data){
logger.info("get writelock...");
long stamp = lock.writeLock();
logger.info("stamp:"+stamp);
try {
logger.info("writing...");
this.data = data;
}finally {
logger.info("release writelock...");
lock.unlockWrite(stamp);
}
}
}
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer(0);
new Thread(()->{
try {
dataContainer.read(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
new Thread(()->{
try {
dataContainer.read(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
一月 08, 2021 4:08:01 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 4:08:01 下午 DataContainer read
信息: stamp:256
一月 08, 2021 4:08:02 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 4:08:02 下午 DataContainer read
信息: stamp:256
一月 08, 2021 4:08:02 下午 DataContainer read
信息: read finish(OptimisticRead)...
一月 08, 2021 4:08:04 下午 DataContainer read
信息: read finish(OptimisticRead)...
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
DataContainer dataContainer = new DataContainer(0);
new Thread(()->{
try {
dataContainer.read(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
dataContainer.write(10);
}).start();
}
一月 08, 2021 5:22:44 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: get writelock...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: stamp:384
一月 08, 2021 5:22:44 下午 DataContainer read
信息: stamp:256
一月 08, 2021 5:22:44 下午 DataContainer write
信息: writing...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: release writelock...
一月 08, 2021 5:22:46 下午 DataContainer read
信息: get readlock...
一月 08, 2021 5:22:46 下午 DataContainer read
信息: stamp:513
一月 08, 2021 5:22:46 下午 DataContainer read
信息: reading...
一月 08, 2021 5:22:48 下午 DataContainer read
信息: read finish...
一月 08, 2021 5:22:48 下午 DataContainer read
信息: release readlock...
Semaphore 信号量
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
semaphore.acquire();
logger.info("running..");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
logger.info("end...");
semaphore.release();
}
}).start();
}
}
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
CountDownLatch
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(()->{
logger.info("running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
logger.info("end");
}).start();
new Thread(()->{
logger.info("running");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
logger.info("end");
}).start();
logger.info("main thread waiting");
countDownLatch.await();//主线程等待其他所有线程执行完毕再恢复运行
logger.info("main thread restart");
}
一月 11, 2021 4:29:53 下午 Main lambda$main$0
信息: running
一月 11, 2021 4:29:53 下午 Main lambda$main$1
信息: running
一月 11, 2021 4:29:53 下午 Main main
信息: main thread waiting
一月 11, 2021 4:29:54 下午 Main lambda$main$0
信息: end
一月 11, 2021 4:29:55 下午 Main lambda$main$1
信息: end
一月 11, 2021 4:29:55 下午 Main main
信息: main thread restart
CyclicBarrier
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
logger.info("all tasks finished");
});
service.submit(()->{
logger.info("task1 running");
try {
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(()->{
logger.info("task2 running");
try {
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
service.shutdown();
}
一月 11, 2021 4:44:32 下午 Main lambda$main$1
信息: task1 running
一月 11, 2021 4:44:32 下午 Main lambda$main$2
信息: task2 running
一月 11, 2021 4:44:33 下午 Main lambda$main$0
信息: all tasks finished
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!