mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-11 03:46:50 +08:00
fix: 上传文档增加限流处理
This commit is contained in:
@@ -17,6 +17,11 @@ public class CaseAiDocumentAsyncHandler {
|
|||||||
|
|
||||||
private final AtomicInteger currentTaskCount = new AtomicInteger(0);
|
private final AtomicInteger currentTaskCount = new AtomicInteger(0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 限流,默认QPS 40
|
||||||
|
*/
|
||||||
|
private final TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter(40);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("aiDocExecutor")
|
@Qualifier("aiDocExecutor")
|
||||||
private ThreadPoolTaskExecutor aiDocExecutor;
|
private ThreadPoolTaskExecutor aiDocExecutor;
|
||||||
@@ -39,8 +44,13 @@ public class CaseAiDocumentAsyncHandler {
|
|||||||
currentTaskCount.incrementAndGet();
|
currentTaskCount.incrementAndGet();
|
||||||
|
|
||||||
aiDocExecutor.submit(() -> {
|
aiDocExecutor.submit(() -> {
|
||||||
processCases(cases, optTypeEnum);
|
try {
|
||||||
currentTaskCount.decrementAndGet();
|
// 限流
|
||||||
|
rateLimiter.acquire();
|
||||||
|
processCases(cases, optTypeEnum);
|
||||||
|
} finally {
|
||||||
|
currentTaskCount.decrementAndGet();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package com.xboe.module.boecase.async;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 令牌桶限流算法实现
|
||||||
|
*/
|
||||||
|
public class TokenBucketRateLimiter {
|
||||||
|
|
||||||
|
private final double permitsPerSecond; // 每秒生成的令牌数(即 TPS)
|
||||||
|
private final AtomicLong nextFreeTicketMicros = new AtomicLong(0); // 下一个令牌可用的时间(微秒)
|
||||||
|
private final AtomicLong storedPermits = new AtomicLong(0); // 当前桶中存储的令牌数(本简化版不支持突发,可省略)
|
||||||
|
private static final long MICROSECONDS_PER_SECOND = 1_000_000L;
|
||||||
|
|
||||||
|
public TokenBucketRateLimiter(double permitsPerSecond) {
|
||||||
|
this.permitsPerSecond = permitsPerSecond;
|
||||||
|
this.nextFreeTicketMicros.set(System.nanoTime() / 1000); // 初始化为当前时间(微秒)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取一个令牌,阻塞直到可用
|
||||||
|
*/
|
||||||
|
public void acquire() {
|
||||||
|
long waitMicros = reserve(1);
|
||||||
|
if (waitMicros > 0) {
|
||||||
|
try {
|
||||||
|
long waitNanos = waitMicros * 1000; // 转为纳秒
|
||||||
|
TimeUnit.NANOSECONDS.sleep(waitNanos);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 预留 1 个令牌,返回需要等待的微秒数
|
||||||
|
*/
|
||||||
|
private long reserve(int permits) {
|
||||||
|
long nowMicros = System.nanoTime() / 1000;
|
||||||
|
long nextFreeTicket = nextFreeTicketMicros.get();
|
||||||
|
long waitMicros = Math.max(0, nextFreeTicket - nowMicros);
|
||||||
|
|
||||||
|
long newNextFreeTicket = nowMicros + waitMicros + (long) (permits * MICROSECONDS_PER_SECOND / permitsPerSecond);
|
||||||
|
while (!nextFreeTicketMicros.compareAndSet(nextFreeTicket, newNextFreeTicket)) {
|
||||||
|
// CAS 失败,说明其他线程修改了时间,重试
|
||||||
|
nowMicros = System.nanoTime() / 1000;
|
||||||
|
nextFreeTicket = nextFreeTicketMicros.get();
|
||||||
|
waitMicros = Math.max(0, nextFreeTicket - nowMicros);
|
||||||
|
newNextFreeTicket = nowMicros + waitMicros + (long) (permits * MICROSECONDS_PER_SECOND / permitsPerSecond);
|
||||||
|
}
|
||||||
|
|
||||||
|
return waitMicros;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user