侧边栏壁纸
博主头像
爱探索

行动起来,活在当下

  • 累计撰写 42 篇文章
  • 累计创建 11 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Java-锁(lock)

jelly
2024-07-31 / 0 评论 / 0 点赞 / 35 阅读 / 0 字

Java中的锁(lock)

1. 概述

在java se 5以及之前java锁都是通缩synchronized的实现的,在java se 5之后java提供了Lock接口以及相关实现类来实现锁。它提供了与synchronized相似的功能,只是在使用时需要显示的去获取、释放锁。虽然它缺少了synchronized隐式的获取、释放锁的便捷性,但是它拥有了锁的释放与获取的可操作性、可中断的获取锁以及超时获取锁等多种synchronized所不具备的同步特性

2. Lock接口

2.1 Lock接口提供的synchronized关键字所不具备的主要特性如表

特性描述
尝试非阻塞的获取锁当前线程程获取锁时,如果该锁没有被其他线程持有,则立即获取锁,如果已被线程持有则立即返回结果并不会阻塞当前线程
能被中断的获取锁与synchronized不同的是,获取锁的线程可以响应中断,当前获取锁的线程如果被中断时中断异常会被抛出并释放锁
超时获取锁在指定时间之前获取锁,如果在截至时间到了任然没有获取到锁,则返回。

2.2 Lock的API

方法名称描述
void lock()获取锁。调用该方法后会获取锁并返回,如果锁被其他线程获取则阻塞
void lockInterruptibly() throws InterruptedException;与lock()相同,但是可以响应中断
boolean tryLock();非阻塞的获取锁,调用该函数获取锁不论成功或失败都会立即返回,获取成功返回true,获取失败返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;超时的获取锁,在一下几种情况下会返回: 1. 当前线程在超时时间被获取到了锁。2.当前线程在超时时间内被中断。3.当前线程获取锁超时。
void unlock();释放锁
Condition newCondition();获取等待通知组件,该组件与当前锁绑定,只有获取到锁才可以操作await等函数

3. 队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO(先进先出)队列来完成资源获 取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器是实现锁(也可以是任意同步组件)的关键。可以这样理解同步器与锁的关系,锁是面向使用者的,它定义了使用者与锁交 互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者, 它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

3.1 队列同步器重要方式

修改同步状态方法:

  • getState():获取当前同步状态。
  • ·setState(int newState):设置当前同步状态。
  • ·compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性。

可被重写的方法:

方法名称描述
boolean tryAcquire(int arg)独占是获取同步状态
boolean tryRelease(int arg)独占是释放同步状态
int tryAcquireShared(int arg)共享式获取同步状态
boolean tryReleaseShared(int arg)共享式释放同步状态
boolean isHeldExclusively()判断同步器是否独享是被线程持有

同步器提供的模板方法:

方法名称描述
void acquire(int arg)独占式获取同步状态,如果当前线程获取同步状态成功则返回,否则将会进入同同步队列
void acquireInterruptibly(int arg)与acquire(int arg)相同,但是该函数会响应中断
boolean tryAcquireNanos(int arg, long nanosTimeout)在acquireInterruptibly基础上增加了超时限制
boolean release(int arg)独占式释放同步
void acquireShared(int arg)共享式获取同步状态
void acquireSharedInterruptibly(int arg)与acquireShared相同增加了响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
boolean releaseShared(int arg)共享式释放锁
boolean hasQueuedThreads()获取同步队列上的线程集合

同步器提供的模板方法基本分为三类:独占式获取与释放锁共享式获取与释放锁查询同步队列中等待线程情况

3.1.1 示例

下面我们实现一个简单的独占式锁的同步组件


public class Mutex implements Lock {
    private final Sync sync = new Sync();

    //获取锁设置状态为1,锁获取失败会被加入同步队列,具体可以看public final void acquire(int arg)实现
    
    @Override
    public void lock() { sync.acquire(1); }
    @Override
    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}
    @Override
    public boolean tryLock() {  return sync.tryAcquire(1);}
    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
    //// 释放锁,将状态设置为0
    @Override
    public void unlock() {    sync.acquire(0); }
    @Override
    public Condition newCondition() {  return sync.newCondition(); }
    public boolean isLocked() {   return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() {    return sync.hasQueuedThreads();}

    private class Sync extends AbstractQueuedSynchronizer {
        /**
         * 判断锁是否处于占用状态
         *
         * @return
         */
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        /**
         * 获取锁
         *
         * @param acquires
         * @return
         */
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {//通过CAS替换state值为1 如果替换成功则返回true表示并获取到锁
                setExclusiveOwnerThread(Thread.currentThread());//设置当前拥有独占访问权
                return true;
            }
            return false;
        }

        /**
         * 释放锁
         *
         * @param releases
         * @return
         */
        protected boolean tryRelease(int releases) {
            if (getState() == 0) throw new IllegalMonitorStateException();//如果state==0则表示当前线程没有锁,则抛出异常
            setExclusiveOwnerThread(null);//空参数表示没有线程拥有访问权限
            setState(0);//状态设置为0 表示没有获取到锁,及释放锁
            return true;
        }
        
        
        /**
         * 返回一个Condition,每个condition都包含了一个condition队列
         */
        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

上面实例中Mutex有一个内部类Sync继承了AbstractQueuedSynchronizer同步器并实现了独占式获取和释放同步 状态,具体看上面注释

3.2 同步器的实现分析

下面我们将通过同步队列、独占式同 步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据 结构与模板方法,来分析同步队列的实现

3.2.1 同步队列

同步器内部依赖一个同步队列(FIFO队列)来完成同步状态的管理,当一个线程获取同步状态失败时,该线程的状态信息会被构造成一个节点(Node)并将其加入同步队列中,并阻塞当前线程。当某一个线程同步状态结束时,它会将同步队列中首节点中的线程环境,使其再次尝试获取同步状态

节点的属性类型与名称以及描述:

方法名称描述
volatile int waitStatus;等待状态
1. CANCELLED:值1,等待队列中的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态,并且不会在发生变化
2. SIGNAL:值-1,当前节点的后继节点处于阻塞状态(当前节点拿到了同步状态,后继节点还在同步队列中进行等待),如果当前节点释放,同步状态会通知后继节点,使后继节点继续运行。
3. CONDITION:值-2,当前节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用signal()(相当于notify),该节点会从等待队列转移到同步队列中。
4. PROPAGATE:值-3,共享式同步状态会无条件的传播
volatile Node prev;前驱节点
volatile Node next;后继几点
Node nextWaiter;等待队列中的后继节点,如果该节点是共享的,那么他将是一个SHARED常量
volatile Thread thread;获取同步状态的线程

同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部

结构如下图:

3.2.2 独占式获取、释放锁代码分析

ReentrantLock为例获取锁:


    //java.util.concurrent.locks.ReentrantLock.class

    public void lock() {
        sync.lock();//调用同步器中的lock
    }

    //java.util.concurrent.locks.ReentrantLock.class ReentrantLock.NonfairSync .class
    final void lock() {
        acquire(1);//这里值为1 表示获取锁(注意这里实际可以传入任何东西,但是在ReentrantLock实现中这里1表示获取同步状态)
    }
    //java.util.concurrent.locks.AbstractQueuedSynchronizer.class 
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&     /* tryAcquire返回true表示获取获取到锁,也就是说**这里只有在如果获取到锁就会直接返回**,如果没有获取锁则调用 acquireQueued将该线程信息构造成 Node节点并加入同步队列*/
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))式)
    }

    ////将当前线程信息构造成Node节点加入到同步队列(使用CAS方
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
    }

    //java.util.concurrent.locks.AbstractQueuedSynchronizer.class 
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
            
                if (p == head && tryAcquire(arg)) {//自旋获取锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
               
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//parkAndCheckInterrupt阻塞当前线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

ReentrantLock为例释放锁:


    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)//将其状态置为0(初始状态)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);//设置状态,这里修改时已经获取去了锁,所以不在需要使用CAS方式设置状态
        return free;
    }

        private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒阻塞状态
    }

3.2.2 共享式获取、释放锁代码分析

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。例如读写锁中的读锁就是一个共享锁。

ReentrantReadWriteLock例释放锁:


    public void lock() {
            sync.acquireShared(1);//共享式获取锁
    }

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//返回小于表示获取失败大于等于0表示获取成功
            doAcquireShared(arg);
    }

    //法尝试获取同步状 态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同 步状态
    protected final int tryAcquireShared(int unused) {

        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() && r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//自旋获取同步状态
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
0

评论区