创建线程

Thread thread = new Thread("thread-1"){
    @Override
    public void run() {
        logger.info("hello world");
    }
};
thread.start();
Runnable runnable = new Runnable() {
    public void run() {
        logger.info("hello world");
    }
};
Thread thread = new Thread(runnable,"thread");
thread.start();
Runnable runnable = ()->{
    logger.info("hello world");
};
Thread thread = new Thread(runnable,"thread");
thread.start();
FutureTask<String> task = new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        logger.info("running start...");
        Thread.sleep(1000);
        logger.info("running end...");
        return "ok";
    }
});
Thread thread = new Thread(task,"thread");
thread.start();

logger.info(task.get());//等待线程返回的结果

Thread常见方法

  • start():启动新的线程,线程处于就绪状态
  • run():线程启动调用的方法

    直接调用run方法时则由主线程进行执行

  • join():等待线程运行结束(调用线程加入当前线程执行)
  • setPriority():设置线程优先级
  • getState():获取线程状态
  • sleep(n):线程休眠n毫秒,结束后处于就绪状态
  • yield():让出CPU控制权
  • interrupt():线程中断状态设为true

    sleep/wait/join状态下,线程抛出InterruptedException,sleep清除打断标记

  • interrupted():返回当前线程的中断状态;将当前线程的中断状态设为false,即清除打断标记
  • isInterrupted():返回当前线程的中断状态,不清除打断标记
Thread thread = new Thread( new Runnable(){
    public void run(){
        // 若未发生中断,就正常执行任务
        while(!Thread.currentThread.isInterrupted()){
            // 正常任务代码……
        }

        // 中断的处理代码……
        doSomething();
    }
} ).start();
doSomething();
thread.interrupt();
  • setDaemon():设置为守护线程(非守护线程执行完毕,守护线程即停止)

synchronized

synchronized (对象){
    //临界区            
}

线程进入临界区需要获得锁,临界区的代码执行完才会释放锁,并唤醒阻塞线程

static int count = 0;
static Object lock = new Object();
public static void main(String[] args){
    Thread increment = new Thread(()->{
        synchronized (lock){
            count++;
        }
    });

    Thread decrement = new Thread(()->{
        synchronized (lock){
            count--;
        }
    });

    increment.start();
    decrement.start();
    increment.join();
    decrement.join();

    logger.info("count:"+count);//count始终为0
}
class Utils{
    private int count = 0;
    public void increment(){
        synchronized (this){
            count++;
        }
    }
    public void decrement(){
        synchronized (this){
            count--;
        }
    }
    public int getCount(){
        return this.count;
    }
}

public static void main(String[] args) {
    Utils utils = new Utils();
    Thread inc = new Thread(()->{
        utils.increment();
    });
    Thread dec = new Thread(()->{
        utils.decrement();
    });
    inc.start();
    dec.start();
    inc.join();
    dec.join();

    logger.info("count:"+utils.getCount());
}
public synchronized void increment(){
    //doSomething
}
public void increment(){
    synchronized (this){
        //doSomething
    }
}
//以上等价

public static synchronized void increment(){
    //doSomething
}
public void increment(){
    synchronized (Utils.class){
        //doSomething
    }
}
//以上等价

name.Class和this不是同一个对象,类对象只有一个,this为实例,可以有多个

Monitor(重量级锁)


加synchronized对象会和一个对应的Monitor对象关联(对象的MarkWord置为Monitor对象的指针),当线程成为Monitor的owner时才能执行临界区的代码,即获得锁,否则阻塞,添加到EntryList中,当owner为空时竞争锁。
WaitSet中的线程为已经获得过锁,但条件不满足放弃锁,进入waiting状态。owner线程调用notify()时唤醒WaitSet中的线程。

轻量级锁

线程访问临界区的时间错开,置锁对象的MarkWord为栈帧中琐记录值来上锁

锁重入:同一线程对同一对象再加锁

偏向锁:用threadId替换锁对象的markWord,若有锁重入,即再次加锁时检测锁对象的MarkWord是否等于threadId,一致则仅在栈帧中添加一条琐记录

public void increment(){
    synchronized (this){
                count++;
        decrement(); 
    }
}
public void decrement(){
    synchronized (this){
        count--;
    }
}

锁膨胀

Thread-1尝试进行轻量级加锁,但Thread-0已加锁,则Thread-1为Object对象申请Monitor锁对象,进行重量级加锁,Thread-1进入Monitor的EntryList

Thread-0进入轻量级锁流程,由于此时为重量级加锁,失败

执行重量级解锁流程,即置Monitor对象的owner为空,并唤醒EntryList中的阻塞线程

wait/notify

只有获得锁的线程才能调用wait/notify方法,即需在synchronized代码块中调用(此时为重量级锁)

wait()会释放锁,sleep()不会释放锁

static Logger logger = Logger.getLogger("log");
static Object lock = new Object();
public static void main(String[] args) throws ExecutionException, InterruptedException {

    Thread thread = new Thread(()->{
        synchronized (lock){
            logger.info("执行...");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("被唤醒,执行...");
        }
    });
    thread.start();
    Thread.sleep(1000);
    synchronized (lock){
        logger.info("主线程执行...");
        logger.info("唤醒子线程...");
        lock.notify();
    }
}

十二月 04, 2020 3:51:24 下午 Main lambda$main$0
信息: 执行...
十二月 04, 2020 3:51:25 下午 Main main
信息: 主线程执行...
十二月 04, 2020 3:51:25 下午 Main main
信息: 唤醒子线程...
十二月 04, 2020 3:51:25 下午 Main lambda$main$0
信息: 被唤醒,执行...

同步-保护性暂停

class GuardObject {
    private String response;
    public String get() {
        synchronized (this){
            while (response == null)
                try {
                    this.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
            return response;
        }
    }
    //等待超时方法
    public String get(long timeout) {
        synchronized (this){
            long begin = System.currentTimeMillis();
            long passedTime = 0;

            while (response == null){
                long waitTime = timeout - passedTime;
                if(waitTime <= 0) break;
                try {
                    this.wait(waitTime);
                }catch (Exception e){
                    e.printStackTrace();
                }
                passedTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    public void set(String response) {
        synchronized (this){
           this.response = response;
           this.notifyAll();
        }
    }
}

public static void main(String[] args){
    GuardObject guardObject = new GuardObject();
    new Thread(()->{
        String res = guardObject.get();
        //String res = guardObject.get(1000);
        logger.info("response:"+res.toString());
    }).start();

    new Thread(()->{
            //Thread.sleep(2000);
        guardObject.set("I set something.");
    }).start();
}
信息: response:I set something.

生产者-消费者

class MessageQueue {
    Logger logger = Logger.getLogger("log");
    private LinkedList<Message> queue = new LinkedList<>();
    private int capcity;
    public  MessageQueue(int capcity){
        this.capcity = capcity;
    }
    public Message take(){
        synchronized (queue){
            while (queue.isEmpty()){
                try {
                    logger.info("queue is empty,waiting...");
                    queue.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }

            Message message = queue.removeFirst();
            logger.info("consume message,"+message);
            queue.notifyAll();
            return  message;
        }
    }

    public void put(Message message){
        synchronized (queue){
            while (queue.size() == capcity){
                try {
                    logger.info("queue is full,waiting...");
                    queue.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            logger.info("add message,"+message);
            queue.notifyAll();
        }
    }

    @Override
    public String toString() {
        return "MessageQueue{" +
                "queue=" + queue +
                ", capcity=" + capcity +
                '}';
    }
}

final class Message{
    private int id;
    private String value;
    public Message(int id,String value){
        this.id = id;
        this.value = value;
    }

    public int getId() {
        return id;
    }

    public String getValue() {
        return value;
    }
}

static Logger logger = Logger.getLogger("log");
public static void main(String[] args){
    final MessageQueue messageQueue = new MessageQueue(5);
    //三个生产者线程
    for (int i = 0; i < 3; i++) {
        int id = i;
        new Thread(()->{
            messageQueue.put(new Message(id,"value:"+id));
        },"生产者"+i).start();
    }

    //消费者线程
    new Thread(()->{
        while (true){
            Message message = messageQueue.take();
            logger.info(message.toString());
        }
    }).start();
}

十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@483b79f5
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@483b79f5
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@483b79f5
十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@6a6daa45
十二月 10, 2020 3:46:38 下午 MessageQueue put
信息: add message,Message@3555b188
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@6a6daa45
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@6a6daa45
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: consume message,Message@3555b188
十二月 10, 2020 3:46:38 下午 Main lambda$main$1
信息: Message@3555b188
十二月 10, 2020 3:46:38 下午 MessageQueue take
信息: queue is empty,waiting...

Park/Unpark

LockSupport.park();//暂停当前线程
LockSupport.unpark(Thread thread);//恢复某个线程的运行
static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(()->{
        logger.info("start...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("park...");
        LockSupport.park();
        logger.info("resume...");
    });

    thread.start();
    Thread.sleep(2000);
    logger.info("unpark...");
    LockSupport.unpark(thread);
}

十二月 10, 2020 4:55:48 下午 Main lambda$main$0
信息: start...
十二月 10, 2020 4:55:49 下午 Main lambda$main$0
信息: park...
十二月 10, 2020 4:55:50 下午 Main main
信息: unpark...
十二月 10, 2020 4:55:50 下午 Main lambda$main$0
信息: resume...

死锁

产生死锁的必要条件:互斥条件、请求和保持条件、不剥夺条件、环路等待条件

#哲学家进餐问题
semaphore chopstick[5]={1,1,1,1,1};//五根筷子
semaphore mutex =  1;//设置取筷子的信号量
//i号哲学家进程
Pi(){
    do{
        wait(mutex);
        wait(chopstick[i]);//取左边筷子
        wait(chopstick[(i+1]%5);//取右边筷子
        signal(mutex);
        eat;
        signal(chopstick[i]);
        signal(chopstick[(i+1]%5);
        think;
    }
}

ReentrantLock

ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();//获得锁
try {
    //临界区
}finally {
    reentrantLock.unlock();//释放锁
}

ReentrantLock 可重入

static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {

    reentrantLock.lock();
    try {
        logger.info("start");
        enter();
    }finally {
        reentrantLock.unlock();
    }
}

public static void enter(){
    reentrantLock.lock();
    try {
        logger.info("enter");
        exit();
    }finally {
        reentrantLock.unlock();
    }
}

public static void exit(){
    reentrantLock.lock();
    try {
        logger.info("exit");
    }finally {
        reentrantLock.unlock();
    }
}

十二月 11, 2020 3:29:48 下午 Main main
信息: start
十二月 11, 2020 3:29:48 下午 Main enter
信息: enter
十二月 11, 2020 3:29:48 下午 Main exit
信息: exit

ReentrantLock 可中断

 static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(()->{
        try {
            logger.info("尝试获得锁...");
            reentrantLock.lockInterruptibly();//可打断
        } catch (InterruptedException e) {
            logger.warning("被打断.。.");
            e.printStackTrace();
            return;
        }

        try {
            logger.info("获得锁...");
        }finally {
            reentrantLock.unlock();
        }
    },"thread");


    thread.start();
    thread.interrupt();
}

十二月 11, 2020 3:28:52 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:28:52 下午 Main lambda$main$0
警告: 被打断.。.
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at Main.lambda$main$0(Main.java:21)
    at java.lang.Thread.run(Thread.java:748)

ReentrantLock 锁超时

static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(()->{
        logger.info("尝试获得锁...");
        if(!reentrantLock.tryLock()){
            logger.info("获得锁失败");
            return;
        }

        try {
            logger.info("获得锁...");
        }finally {
            reentrantLock.unlock();
        }
    },"thread");

    reentrantLock.lock();//主线程先获得锁
    thread.start();
}
十二月 11, 2020 3:36:11 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:36:11 下午 Main lambda$main$0
信息: 获得锁失败

static Logger logger = Logger.getLogger("log");
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(()->{
        logger.info("尝试获得锁...");
        try {
            if(!reentrantLock.tryLock(1, TimeUnit.SECONDS)){
                logger.info("获得锁失败");
                return;
            }
        } catch (InterruptedException e) {
            logger.info("获得锁失败");
            e.printStackTrace();
            return;
        }

        try {
            logger.info("获得锁...");
        }finally {
            reentrantLock.unlock();
        }
    },"thread");

    reentrantLock.lock();//主线程先获得锁
    thread.start();
}

十二月 11, 2020 3:38:11 下午 Main lambda$main$0
信息: 尝试获得锁...
十二月 11, 2020 3:38:12 下午 Main lambda$main$0
信息: 获得锁失败

ReentrantLock 条件变量

static Logger logger = Logger.getLogger("log");
static ReentrantLock reentrantLock = new ReentrantLock();
static Condition condition_s = reentrantLock.newCondition();
static Condition condition_g = reentrantLock.newCondition();
static boolean has_s = false;
static boolean has_g = false;
public static void main(String[] args) throws InterruptedException {
    new Thread(()->{
        reentrantLock.lock();
        try {
            logger.info("is there a s ?");
            while (!has_s){
                condition_s.await();//等待s条件
            }
            if(has_s){
                logger.info("get s");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }).start();

    new Thread(()->{
        reentrantLock.lock();
        try {
            logger.info("is there a g ?");
            while (!has_s){
                condition_g.await();//等待g条件
            }
            if(has_s){
                logger.info("get g");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }).start();

    Thread.sleep(2000);

    new Thread(()->{
        reentrantLock.lock();
        try {
            logger.info("add s");
            has_s = true;
            condition_s.signal();
        }finally {
            reentrantLock.unlock();
        }
    }).start();

    Thread.sleep(1000);
    new Thread(()->{
        reentrantLock.lock();
        try {
            logger.info("add g");
            has_g = true;
            condition_g.signal();
        }finally {
            reentrantLock.unlock();
        }
    }).start();

}
十二月 11, 2020 4:30:39 下午 Main lambda$main$0
信息: is there a s ?
十二月 11, 2020 4:30:39 下午 Main lambda$main$1
信息: is there a g ?
十二月 11, 2020 4:30:41 下午 Main lambda$main$2
信息: add s
十二月 11, 2020 4:30:41 下午 Main lambda$main$0
信息: get s
十二月 11, 2020 4:30:42 下午 Main lambda$main$3
信息: add g
十二月 11, 2020 4:30:42 下午 Main lambda$main$1
信息: get g

volatile 可见性

可见性:某个线程修改了共享变量,其他线程能够实时的知晓其改变

1.适用于一个写线程、多个读线程;禁止volatile共享变量前指令的重排序

2.共享变量易变,多个线程每次从主存中而非缓存中读取(读屏障),并同步对共享变量的改动至主存(写屏障)

3.保证多个线程对其的可见性,防止某线程改变了其值,其他线程仍从缓存中读取旧值

4.synchronized只能保证完全交由synchronized保护的共享变量的原子性、有序性(单例模式-双重锁机制:instance共享变量需设为volatile,阻止指令重排序)

volatile static boolean flag = false;

happens-before原则

加锁
volatile变量
线程start()方法调用前的所有操作对新创建的线程可见
调用join()方法线程的操作对因其阻塞的线程可见
线程操作对其被调用interrupt的线程可见

无锁模式

适合线程数小且多核cpu场景

class AccountCas{
    private AtomicInteger balance;

    public AccountCas(int balance){
        this.balance = new AtomicInteger(balance);
    }
    public int getBalance() {
        return balance.get();
    }

    public void withdraw(int amount){
        while (true){
            int prev = getBalance();
            int next = prev - amount;
            //prev和balance比较,查看是否有其他线程修改其值,被修改则进入下一次循环
            if(balance.compareAndSet(prev,next)){
                break;
            }
            //balance.getAndAdd(-1*amount);等价以上
        }
    }
}

原子整数

//AtomicInteger
AtomicInteger i = new AtomicInteger(0);
System.out.println("i:"+i.get());
System.out.println("++i:"+i.incrementAndGet());
System.out.println("--i:"+i.decrementAndGet());
System.out.println("i++:"+i.getAndIncrement());
System.out.println("i--:"+i.getAndDecrement());
System.out.println("i:"+i.get());

System.out.println("i+(2):"+i.addAndGet(2));

System.out.println("i * 10:"+i.updateAndGet(value->value * 10));

i:0
++i:1
--i:0
i++:0
i--:1
i:0
i+(2):2
i * 10:20

原子引用

class AccountCas{
    private AtomicReference<BigDecimal> balance;
    public AccountCas(BigDecimal balance){
        this.balance = new AtomicReference<BigDecimal>(balance);
    }
    public BigDecimal getBalance() {
        return balance.get();
    }

    public void withdraw(BigDecimal amount){
        while (true){
            BigDecimal prev = getBalance();
            BigDecimal next = prev.subtract(amount);
            if(balance.compareAndSet(prev,next)){
                break;
            }
        }
    }
}

//AtomicStampedReference
static Logger logger = Logger.getLogger("log");
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
    logger.info("main start");
    String prev = ref.getReference();
    new Thread(()->{
        int stamp = ref.getStamp();
        logger.info("change A -> B");
        ref.compareAndSet(ref.getReference(),"B",stamp,stamp++);
        logger.info("ref.getReference():"+ref.getReference());
    }).start();
    Thread.sleep(1000);
    new Thread(()->{
        int stamp = ref.getStamp();
        logger.info("change B -> A");
        ref.compareAndSet(ref.getReference(),"A",stamp,stamp++);
        logger.info("ref.getReference():"+ref.getReference());
    }).start();

    Thread.sleep(1000);
    int stamp = ref.getStamp();
    ref.compareAndSet(prev,"C",stamp,stamp++);

    logger.info("ref.getReference():"+ref.getReference());
}

十二月 17, 2020 5:08:24 下午 Main main
信息: main start
十二月 17, 2020 5:08:24 下午 Main lambda$main$0
信息: change A -> B
十二月 17, 2020 5:08:24 下午 Main lambda$main$0
信息: ref.getReference():B
十二月 17, 2020 5:08:25 下午 Main lambda$main$1
信息: change B -> A
十二月 17, 2020 5:08:25 下午 Main lambda$main$1
信息: ref.getReference():A
十二月 17, 2020 5:08:26 下午 Main main
信息: ref.getReference():C

自定义线程池

//拒绝策略
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue,T task);
}

class ThreadPool{
    Logger logger = Logger.getLogger("log");
    private BlockingQueue<Runnable> taskQueue;//任务队列
    private HashSet<Worker> workers = new HashSet<>();
    private int coreSize;//线程数量
    private long timeout;//获取任务超时时间
    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize,long timeout,TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }
    //执行任务
    public void execute(Runnable task){
        synchronized (workers){ //保证works集合的线程安全
            if(workers.size() < coreSize){
                Worker worker = new Worker(task);
                logger.info("创建worker:"+worker);
                workers.add(worker);
                worker.start();
            }
            else {
                logger.info("任务加入队列:"+task);
                //taskQueue.put(task);替换为自定义处理方式
                taskQueue.tryPut(rejectPolicy,task);
            }
        }
    }
    class Worker extends Thread{
        private  Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            //while (task != null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.take(timeout,timeUnit)) != null){
                try {
                    logger.info("执行任务:"+task);
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers){
                logger.info("worker被移除:"+this);
                workers.remove(this);
            }
        }
    }
}

//任务阻塞队列
class BlockingQueue<T>{
    private Deque<T> queue = new ArrayDeque<>(); //任务队列
    private ReentrantLock lock = new ReentrantLock();//队列任务锁
    private Condition fullWaitSet = lock.newCondition();//队列满
    private Condition emptyWaitSet = lock.newCondition();//队列空
    private int capacity;//队列容量

    public BlockingQueue(int queueCapacity) {
        this.capacity = queueCapacity;
    }

    public T take(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    };
    public T take(long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
            long nanos = timeUnit.toNanos(timeout);
            while (queue.isEmpty()){
                try {
                    if(nanos <= 0) return null;
                    nanos = emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    };

    public void put(T element){
        lock.lock();
        try {
            while (queue.size() == capacity){
                try{
                    fullWaitSet.await();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }
    //超时
    public boolean offer(T element,long timeout,TimeUnit timeUnit){
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity){
                try{
                    if(nanos <= 0) return false;
                    nanos = emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();
            return  true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity){
                rejectPolicy.reject(this,task);
            }
            else {
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }


    public int size(){
        lock.lock();
        try {
            return capacity;
        }finally {
            lock.unlock();
        }

    }
}


static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10,
            ((queue, task)->{
                //queue.put(task);//死等
                //queue.offer(task,5,TimeUnit.SECONDS);//超时等待
                //throw new RuntimeException("任务执行失败.");//抛出异常
                task.run(); //主线程执行任务
            }));
    for (int i = 0; i < 5; i++){
        int index = i;
        threadPool.execute(()->{
            logger.info(""+index);
        });
    }
}

java.util.concurrent

自定义锁

//自定义锁(不可重入锁)
class MyLock implements Lock{
    //同步器类 独占锁 AQS
    class MySync extends AbstractQueuedSynchronizer{
        @Override //尝试获得锁
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());//设置锁的持有者线程
                return true;
            }
            return false;
        }

        @Override //尝试释放锁
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override //是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public  Condition newCondition(){
            return new ConditionObject();
        }
    }

    private MySync sync = new MySync();

    @Override //加锁 不成功进入等待队列等待
    public void lock() {
        sync.acquire(1);
    }

    @Override //可打断的加锁
    public void lockInterruptibly() throws InterruptedException {
        sync.tryAcquire(1);
    }

    @Override //尝试加锁 仅尝试一次
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

static Logger logger = Logger.getLogger("log");

public static void main(String[] args) throws InterruptedException {
    MyLock lock = new MyLock();
    new Thread(()->{
        lock.lock();
        try{
            logger.info("locking");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            logger.info("unlocking");
            lock.unlock();
        }
    }).start();

    new Thread(()->{
        lock.lock();
        try{
            logger.info("locking");
        }
        finally {
            logger.info("unlocking");
            lock.unlock();
        }
    }).start();
}

一月 05, 2021 4:05:27 下午 Main lambda$main$0
信息: locking
一月 05, 2021 4:05:28 下午 Main lambda$main$0
信息: unlocking
一月 05, 2021 4:05:28 下午 Main lambda$main$1
信息: locking
一月 05, 2021 4:05:28 下午 Main lambda$main$1
信息: unlocking

读写锁

并发的读操作不互斥,并发读操作和写操作互斥。

class DataContainer{
    static Logger logger = Logger.getLogger("log");
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock= rw.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock= rw.writeLock();

    public Object read(){
        logger.info("get readlock...");
        readLock.lock();
        try {
            logger.info("reading...");
            return data;
        }finally {
            logger.info("release readlock...");
            readLock.unlock();
        }
    }

    public void write(){
        logger.info("get writelock...");
        writeLock.lock();
        try {
            logger.info("writing...");
        }finally {
            logger.info("release writelock...");
            writeLock.unlock();
        }
    }
}

static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    DataContainer dataContainer = new DataContainer();
    /*
    new Thread(()->{
        dataContainer.read();
    }).start();

    new Thread(()->{
        dataContainer.read();
    }).start();
    */

    new Thread(()->{
        dataContainer.write();
    }).start();

    new Thread(()->{
        dataContainer.read();
    }).start();
}

一月 06, 2021 11:29:39 上午 DataContainer write
信息: get writelock...
一月 06, 2021 11:29:39 上午 DataContainer read
信息: get readlock...
一月 06, 2021 11:29:40 上午 DataContainer write
信息: writing...
一月 06, 2021 11:29:40 上午 DataContainer write
信息: release writelock...
一月 06, 2021 11:29:40 上午 DataContainer read
信息: reading...
一月 06, 2021 11:29:40 上午 DataContainer read
信息: release readlock...

StampedLock

class DataContainer{

    static Logger logger = Logger.getLogger("log");
    private int data;
    private final StampedLock lock = new StampedLock();

    DataContainer(int data){
        this.data = data;
    }

    public int read(int readTime) throws InterruptedException {
        logger.info("tryOptimisticRead...");
        long stamp = lock.tryOptimisticRead();
        logger.info("stamp:"+stamp);
        Thread.sleep(readTime);
        if(lock.validate(stamp)){ //戳是否未被更新
            logger.info("read finish(OptimisticRead)...");
            return data;
        }
        logger.info("get readlock...");
        try {
            stamp = lock.readLock();
            logger.info("stamp:"+stamp);
            logger.info("reading...");
            Thread.sleep(readTime);
            logger.info("read finish...");
            return data;
        }finally {
            logger.info("release readlock...");
            lock.unlockRead(stamp);
        }
    }

    public void write(int data){
        logger.info("get writelock...");
        long stamp = lock.writeLock();
        logger.info("stamp:"+stamp);
        try {
            logger.info("writing...");
            this.data = data;
        }finally {
            logger.info("release writelock...");
            lock.unlockWrite(stamp);
        }
    }
}

static Logger logger = Logger.getLogger("log");

public static void main(String[] args) throws InterruptedException {
    DataContainer dataContainer = new DataContainer(0);
    new Thread(()->{
        try {
            dataContainer.read(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    Thread.sleep(1000);
    new Thread(()->{
        try {
            dataContainer.read(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

一月 08, 2021 4:08:01 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 4:08:01 下午 DataContainer read
信息: stamp:256
一月 08, 2021 4:08:02 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 4:08:02 下午 DataContainer read
信息: stamp:256
一月 08, 2021 4:08:02 下午 DataContainer read
信息: read finish(OptimisticRead)...
一月 08, 2021 4:08:04 下午 DataContainer read
信息: read finish(OptimisticRead)...

static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    DataContainer dataContainer = new DataContainer(0);
    new Thread(()->{
        try {
            dataContainer.read(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(()->{
        dataContainer.write(10);
    }).start();
}

一月 08, 2021 5:22:44 下午 DataContainer read
信息: tryOptimisticRead...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: get writelock...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: stamp:384
一月 08, 2021 5:22:44 下午 DataContainer read
信息: stamp:256
一月 08, 2021 5:22:44 下午 DataContainer write
信息: writing...
一月 08, 2021 5:22:44 下午 DataContainer write
信息: release writelock...
一月 08, 2021 5:22:46 下午 DataContainer read
信息: get readlock...
一月 08, 2021 5:22:46 下午 DataContainer read
信息: stamp:513
一月 08, 2021 5:22:46 下午 DataContainer read
信息: reading...
一月 08, 2021 5:22:48 下午 DataContainer read
信息: read finish...
一月 08, 2021 5:22:48 下午 DataContainer read
信息: release readlock...

Semaphore 信号量

static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(3);
    for (int i = 0; i < 10; i++) {
        new Thread(()->{
            try {
                semaphore.acquire();
                logger.info("running..");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                logger.info("end...");
                semaphore.release();
            }

        }).start();
    }
}

一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: running..
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...
一月 11, 2021 9:44:32 上午 Main lambda$main$0
信息: end...

CountDownLatch

static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch =  new CountDownLatch(2);
    new Thread(()->{
        logger.info("running");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown();
        logger.info("end");
    }).start();
    new Thread(()->{
        logger.info("running");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown();
        logger.info("end");
    }).start();
    logger.info("main thread waiting");
    countDownLatch.await();//主线程等待其他所有线程执行完毕再恢复运行
    logger.info("main thread restart");
}

一月 11, 2021 4:29:53 下午 Main lambda$main$0
信息: running
一月 11, 2021 4:29:53 下午 Main lambda$main$1
信息: running
一月 11, 2021 4:29:53 下午 Main main
信息: main thread waiting
一月 11, 2021 4:29:54 下午 Main lambda$main$0
信息: end
一月 11, 2021 4:29:55 下午 Main lambda$main$1
信息: end
一月 11, 2021 4:29:55 下午 Main main
信息: main thread restart

CyclicBarrier

static Logger logger = Logger.getLogger("log");
public static void main(String[] args) throws InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(2);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
        logger.info("all tasks finished");
    });

    service.submit(()->{
        logger.info("task1 running");
        try {
            Thread.sleep(1000);
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    });

    service.submit(()->{
        logger.info("task2 running");
        try {
            Thread.sleep(1000);
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    service.shutdown();
}

一月 11, 2021 4:44:32 下午 Main lambda$main$1
信息: task1 running
一月 11, 2021 4:44:32 下午 Main lambda$main$2
信息: task2 running
一月 11, 2021 4:44:33 下午 Main lambda$main$0
信息: all tasks finished


Java      并发

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!