diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java index 9fbc57ae..a20ee416 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java @@ -17,6 +17,11 @@ public class CaseAiDocumentAsyncHandler { private final AtomicInteger currentTaskCount = new AtomicInteger(0); + /** + * 限流,默认QPS 40 + */ + private final TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter(40); + @Autowired @Qualifier("aiDocExecutor") private ThreadPoolTaskExecutor aiDocExecutor; @@ -39,8 +44,13 @@ public class CaseAiDocumentAsyncHandler { currentTaskCount.incrementAndGet(); aiDocExecutor.submit(() -> { - processCases(cases, optTypeEnum); - currentTaskCount.decrementAndGet(); + try { + // 限流 + rateLimiter.acquire(); + processCases(cases, optTypeEnum); + } finally { + currentTaskCount.decrementAndGet(); + } }); } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/TokenBucketRateLimiter.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/TokenBucketRateLimiter.java new file mode 100644 index 00000000..07f782d1 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/TokenBucketRateLimiter.java @@ -0,0 +1,55 @@ +package com.xboe.module.boecase.async; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 令牌桶限流算法实现 + */ +public class TokenBucketRateLimiter { + + private final double permitsPerSecond; // 每秒生成的令牌数(即 TPS) + private final AtomicLong nextFreeTicketMicros = new AtomicLong(0); // 下一个令牌可用的时间(微秒) + private final AtomicLong storedPermits = new AtomicLong(0); // 当前桶中存储的令牌数(本简化版不支持突发,可省略) + private static final long MICROSECONDS_PER_SECOND = 1_000_000L; + + public TokenBucketRateLimiter(double permitsPerSecond) { + this.permitsPerSecond = permitsPerSecond; + this.nextFreeTicketMicros.set(System.nanoTime() / 1000); // 初始化为当前时间(微秒) + } + + /** + * 获取一个令牌,阻塞直到可用 + */ + public void acquire() { + long waitMicros = reserve(1); + if (waitMicros > 0) { + try { + long waitNanos = waitMicros * 1000; // 转为纳秒 + TimeUnit.NANOSECONDS.sleep(waitNanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * 预留 1 个令牌,返回需要等待的微秒数 + */ + private long reserve(int permits) { + long nowMicros = System.nanoTime() / 1000; + long nextFreeTicket = nextFreeTicketMicros.get(); + long waitMicros = Math.max(0, nextFreeTicket - nowMicros); + + long newNextFreeTicket = nowMicros + waitMicros + (long) (permits * MICROSECONDS_PER_SECOND / permitsPerSecond); + while (!nextFreeTicketMicros.compareAndSet(nextFreeTicket, newNextFreeTicket)) { + // CAS 失败,说明其他线程修改了时间,重试 + nowMicros = System.nanoTime() / 1000; + nextFreeTicket = nextFreeTicketMicros.get(); + waitMicros = Math.max(0, nextFreeTicket - nowMicros); + newNextFreeTicket = nowMicros + waitMicros + (long) (permits * MICROSECONDS_PER_SECOND / permitsPerSecond); + } + + return waitMicros; + } +}