'打造自己的專屬java鎖,巧妙規避不穩定的第三方'

Java Sync BigDataKer 2019-07-18
"

java juc 包下面已經提供了很多併發鎖工具供我們使用,但在日常開發中,為了各種原因我們總是會用多線程來併發處理一些問題,然而並不是所有的場景都可以使用juc 或者java本身提供的鎖來方便的幫助我們控制多線程帶來的併發問題,這個時候就需要我們根據自己的業務場景來子實現定製一把我們自己專屬的鎖,來滿足我們的需要。假設系統對接了很多第三方公司,來幫助我們完成業務,但這些第三方的服務接口穩定性參差不齊,以往的過程中我們可能會做一些監控措施來幫助我們監控接口的穩定性,但這會存在一個問題,就是當我們監控到操作失敗的時候其實已經會有用戶產生操作失敗的結果了,這對重視用戶體驗的互聯網公司肯定是不能忍的,為此我們可以每個用戶來訪問時都同時調用多個第三方,只要有一個返回結果,我們就可以給用戶做相應的展示,這樣即使有一兩個第三方出現故障對用戶也是無感知的,但另一個問題來了,同時併發調用第三方我怎麼選哪個結果呢,很簡單,當然是返回最快的了!具體如何選用最快的返回結果就用到我們今天的主題了,定製自己的鎖。

上面所說的大致流程可以描述為這樣:接受用戶請求 → 多線程組裝報文調用第三方 → 阻塞等待 → 任意結果返回喚醒主線程繼續處理。基於此流程很自然想到這個阻塞其實就可以用多線程中的鎖來實現,主線程在將任務提交給線程池多線程處理後,去獲取一個鎖,而這個鎖需要在線程中第一個第三方返回結果時才能獲取到,這樣就讓主線程繼續執行,有了大概思路,我們來看下具體如何實現。

"

java juc 包下面已經提供了很多併發鎖工具供我們使用,但在日常開發中,為了各種原因我們總是會用多線程來併發處理一些問題,然而並不是所有的場景都可以使用juc 或者java本身提供的鎖來方便的幫助我們控制多線程帶來的併發問題,這個時候就需要我們根據自己的業務場景來子實現定製一把我們自己專屬的鎖,來滿足我們的需要。假設系統對接了很多第三方公司,來幫助我們完成業務,但這些第三方的服務接口穩定性參差不齊,以往的過程中我們可能會做一些監控措施來幫助我們監控接口的穩定性,但這會存在一個問題,就是當我們監控到操作失敗的時候其實已經會有用戶產生操作失敗的結果了,這對重視用戶體驗的互聯網公司肯定是不能忍的,為此我們可以每個用戶來訪問時都同時調用多個第三方,只要有一個返回結果,我們就可以給用戶做相應的展示,這樣即使有一兩個第三方出現故障對用戶也是無感知的,但另一個問題來了,同時併發調用第三方我怎麼選哪個結果呢,很簡單,當然是返回最快的了!具體如何選用最快的返回結果就用到我們今天的主題了,定製自己的鎖。

上面所說的大致流程可以描述為這樣:接受用戶請求 → 多線程組裝報文調用第三方 → 阻塞等待 → 任意結果返回喚醒主線程繼續處理。基於此流程很自然想到這個阻塞其實就可以用多線程中的鎖來實現,主線程在將任務提交給線程池多線程處理後,去獲取一個鎖,而這個鎖需要在線程中第一個第三方返回結果時才能獲取到,這樣就讓主線程繼續執行,有了大概思路,我們來看下具體如何實現。

打造自己的專屬java鎖,巧妙規避不穩定的第三方

看過java 源碼的同學對AbstractQueuedSynchronizer一定不會陌生,java中的很多鎖ReentrantLock 、ReadWriteLock 、ReentrantReadWriteLock 和一些其他的併發工具CountDownLatch、 Semaphore等都基於此抽象類實現,AbstractQueuedSynchronizer中通過一個FIFO隊列來管理等待加鎖的線程,通過一個state的int變量控制線程加鎖狀態,其內部也幫我實現了線程獲得鎖和掛起的方法,我們這裡參考CountDownLatch來實現,因為我們的需求和CountDownLatch正好相反,CountDownLatch是多個線程都處理完才能繼續,而我們是隻要有一個處理完就能繼續,簡單來說就是主線程喚醒的判斷條件不一致。先來看下CountDownLatch的使用:

public static void main(String[] args) throws InterruptedException {
CountDownLatch await = new CountDownLatch(5);
// 依次創建並啟動線程
for (int i = 0; i < 5; ++i) {
new Thread(new MyRunnable(await)).start();
}
await.await();
System.out.println("over!");
}
class MyRunnable implements Runnable {
private final CountDownLatch await;
public MyRunnable(CountDownLatch await) {
this.await = await;
}
public void run() {
try {
//業務處理
await.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

熟悉套路之後為了更好的說明,我們引入其部分源碼(jdk 1.8中部分方法)

"

java juc 包下面已經提供了很多併發鎖工具供我們使用,但在日常開發中,為了各種原因我們總是會用多線程來併發處理一些問題,然而並不是所有的場景都可以使用juc 或者java本身提供的鎖來方便的幫助我們控制多線程帶來的併發問題,這個時候就需要我們根據自己的業務場景來子實現定製一把我們自己專屬的鎖,來滿足我們的需要。假設系統對接了很多第三方公司,來幫助我們完成業務,但這些第三方的服務接口穩定性參差不齊,以往的過程中我們可能會做一些監控措施來幫助我們監控接口的穩定性,但這會存在一個問題,就是當我們監控到操作失敗的時候其實已經會有用戶產生操作失敗的結果了,這對重視用戶體驗的互聯網公司肯定是不能忍的,為此我們可以每個用戶來訪問時都同時調用多個第三方,只要有一個返回結果,我們就可以給用戶做相應的展示,這樣即使有一兩個第三方出現故障對用戶也是無感知的,但另一個問題來了,同時併發調用第三方我怎麼選哪個結果呢,很簡單,當然是返回最快的了!具體如何選用最快的返回結果就用到我們今天的主題了,定製自己的鎖。

上面所說的大致流程可以描述為這樣:接受用戶請求 → 多線程組裝報文調用第三方 → 阻塞等待 → 任意結果返回喚醒主線程繼續處理。基於此流程很自然想到這個阻塞其實就可以用多線程中的鎖來實現,主線程在將任務提交給線程池多線程處理後,去獲取一個鎖,而這個鎖需要在線程中第一個第三方返回結果時才能獲取到,這樣就讓主線程繼續執行,有了大概思路,我們來看下具體如何實現。

打造自己的專屬java鎖,巧妙規避不穩定的第三方

看過java 源碼的同學對AbstractQueuedSynchronizer一定不會陌生,java中的很多鎖ReentrantLock 、ReadWriteLock 、ReentrantReadWriteLock 和一些其他的併發工具CountDownLatch、 Semaphore等都基於此抽象類實現,AbstractQueuedSynchronizer中通過一個FIFO隊列來管理等待加鎖的線程,通過一個state的int變量控制線程加鎖狀態,其內部也幫我實現了線程獲得鎖和掛起的方法,我們這裡參考CountDownLatch來實現,因為我們的需求和CountDownLatch正好相反,CountDownLatch是多個線程都處理完才能繼續,而我們是隻要有一個處理完就能繼續,簡單來說就是主線程喚醒的判斷條件不一致。先來看下CountDownLatch的使用:

public static void main(String[] args) throws InterruptedException {
CountDownLatch await = new CountDownLatch(5);
// 依次創建並啟動線程
for (int i = 0; i < 5; ++i) {
new Thread(new MyRunnable(await)).start();
}
await.await();
System.out.println("over!");
}
class MyRunnable implements Runnable {
private final CountDownLatch await;
public MyRunnable(CountDownLatch await) {
this.await = await;
}
public void run() {
try {
//業務處理
await.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

熟悉套路之後為了更好的說明,我們引入其部分源碼(jdk 1.8中部分方法)

打造自己的專屬java鎖,巧妙規避不穩定的第三方

 private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}
}

代碼中可以看到,CountDownLatch主要就是倚靠一個內部類Sync來實現,而Sync實現了AbstractQueuedSynchronizer的tryAcquireShared和tryReleaseShared方法,這兩方法的主要目的是:tryAcquireShared就是在調用await方法後來判斷是否需要阻塞還是執行,tryReleaseShared 就是用來釋放state的狀態,而state的狀態又影響了tryAcquireShared的返回結果,決定了線程是阻塞還是會被喚起繼續執行,具體的判斷邏輯是在AbstractQueuedSynchronizer中的acquireSharedInterruptibly方法中

"

java juc 包下面已經提供了很多併發鎖工具供我們使用,但在日常開發中,為了各種原因我們總是會用多線程來併發處理一些問題,然而並不是所有的場景都可以使用juc 或者java本身提供的鎖來方便的幫助我們控制多線程帶來的併發問題,這個時候就需要我們根據自己的業務場景來子實現定製一把我們自己專屬的鎖,來滿足我們的需要。假設系統對接了很多第三方公司,來幫助我們完成業務,但這些第三方的服務接口穩定性參差不齊,以往的過程中我們可能會做一些監控措施來幫助我們監控接口的穩定性,但這會存在一個問題,就是當我們監控到操作失敗的時候其實已經會有用戶產生操作失敗的結果了,這對重視用戶體驗的互聯網公司肯定是不能忍的,為此我們可以每個用戶來訪問時都同時調用多個第三方,只要有一個返回結果,我們就可以給用戶做相應的展示,這樣即使有一兩個第三方出現故障對用戶也是無感知的,但另一個問題來了,同時併發調用第三方我怎麼選哪個結果呢,很簡單,當然是返回最快的了!具體如何選用最快的返回結果就用到我們今天的主題了,定製自己的鎖。

上面所說的大致流程可以描述為這樣:接受用戶請求 → 多線程組裝報文調用第三方 → 阻塞等待 → 任意結果返回喚醒主線程繼續處理。基於此流程很自然想到這個阻塞其實就可以用多線程中的鎖來實現,主線程在將任務提交給線程池多線程處理後,去獲取一個鎖,而這個鎖需要在線程中第一個第三方返回結果時才能獲取到,這樣就讓主線程繼續執行,有了大概思路,我們來看下具體如何實現。

打造自己的專屬java鎖,巧妙規避不穩定的第三方

看過java 源碼的同學對AbstractQueuedSynchronizer一定不會陌生,java中的很多鎖ReentrantLock 、ReadWriteLock 、ReentrantReadWriteLock 和一些其他的併發工具CountDownLatch、 Semaphore等都基於此抽象類實現,AbstractQueuedSynchronizer中通過一個FIFO隊列來管理等待加鎖的線程,通過一個state的int變量控制線程加鎖狀態,其內部也幫我實現了線程獲得鎖和掛起的方法,我們這裡參考CountDownLatch來實現,因為我們的需求和CountDownLatch正好相反,CountDownLatch是多個線程都處理完才能繼續,而我們是隻要有一個處理完就能繼續,簡單來說就是主線程喚醒的判斷條件不一致。先來看下CountDownLatch的使用:

public static void main(String[] args) throws InterruptedException {
CountDownLatch await = new CountDownLatch(5);
// 依次創建並啟動線程
for (int i = 0; i < 5; ++i) {
new Thread(new MyRunnable(await)).start();
}
await.await();
System.out.println("over!");
}
class MyRunnable implements Runnable {
private final CountDownLatch await;
public MyRunnable(CountDownLatch await) {
this.await = await;
}
public void run() {
try {
//業務處理
await.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

熟悉套路之後為了更好的說明,我們引入其部分源碼(jdk 1.8中部分方法)

打造自己的專屬java鎖,巧妙規避不穩定的第三方

 private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}
}

代碼中可以看到,CountDownLatch主要就是倚靠一個內部類Sync來實現,而Sync實現了AbstractQueuedSynchronizer的tryAcquireShared和tryReleaseShared方法,這兩方法的主要目的是:tryAcquireShared就是在調用await方法後來判斷是否需要阻塞還是執行,tryReleaseShared 就是用來釋放state的狀態,而state的狀態又影響了tryAcquireShared的返回結果,決定了線程是阻塞還是會被喚起繼續執行,具體的判斷邏輯是在AbstractQueuedSynchronizer中的acquireSharedInterruptibly方法中

打造自己的專屬java鎖,巧妙規避不穩定的第三方

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

基於此我們的大概思路就是,利用自己的類實現tryAcquireShared和tryReleaseShared方法來幫助我們管理state狀態,決定主線程什麼時候可以獲得鎖繼續運行,什麼時候需要阻塞。依靠AbstractQueuedSynchronizer的內部機制幫助我們及時獲取子線程的處理信息,最快的回到主線程來處理我們業務邏輯。實現代碼如下:

package com.chengxiansheng.common;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class ReqBraker {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
    
    protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
int c = getState();
c = 0;
return true;
}
}
private final Sync sync;
private ResultDto resultDto;//處理結果
public ReqBraker(){
this.sync = new Sync(1);
}
/**
* 請求返回
* 此處要考慮接口調用失敗的情況,如果失敗要等待其他線程則不調用此方法
*/
public void reqReuturn(ResultDto resultDto){
this.resultDto = resultDto;
sync.tryReleaseShared(1);
}
/**
* 主線程等待指定最長等待時間
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 獲取處理結果
* @return
*/
public ResultDto getResultDto(){
return resultDto;
}
  
}
package com.chengxiansheng.common;
public class RequsetWorker implements Runnable {

private RequestDto requestDto;//請求信息

private ReqBraker reqBraker;

public RequsetWorker(RequestDto requestDto, ReqBraker reqBraker){
this.requestDto = requestDto;
this.reqBraker = reqBraker;
}

public void run() {
//請求第三方
//判斷返回結果
if(requestDto.isSeccess()){
reqBraker.reqReuturn(resultDto);
}
}
}

具體的使用方法如下:

public void requestService(RequestDto request, List<Service> serviceList) {
try {
ReqBraker braker = new ReqBraker();
for (Service service : serviceList) {
executorService.submit(new RequsetWorker(braker, request);
}
braker.await(5, TimeUnit.SECONDS); //請求處理完成 最長時間5S
ResultDto result = braker.getResultDto();
if(result == null){ //可能超過了最終等待時間
//返回處理失敗;
}
//返回處理結果;
  } catch (Exception e) {
//異常處理
}

當然實際的使用中可能會比這複雜,因為我們要有各種業務處理情況去要考慮,本文只是一個範例來幫助大家介紹一個新的思路來解決問題,合理利用Java中的工具的同事我們也要理解其實現原理,來定製符合我們使用場景的方法,才能寫出更高效的代碼,實現更高效的系統。AbstractQueuedSynchronizer的作用也遠不止此,但我們掌握了它就可以更好的玩轉多線程,玩轉併發,來創新的實現各種複雜處理和邏輯。

本文來源於博客園 作者:chengxiansheng

原文:http://www.cnblogs.com/chengxiansheng/

"

相關推薦

推薦中...