Yash030 Claude Opus 4.7 commited on
Commit
aa9c0b0
·
1 Parent(s): 188ffa9

NIM speed optimization — adaptive rate limiting and increased throughput

Browse files

- Add AdaptiveRateLimiter with auto-backoff on 429s (starts at 100 req/min, backs off to 10, recovers gradually)
- Increase NIM rate limit to 100 req/min with 40 max concurrency (configurable via NIM_RATE_LIMIT, NIM_MAX_CONCURRENCY)
- Tighten NIM timeouts: connect=8s, first_chunk=20s, fallback_first_chunk=12s
- Lock-free fast path in StrictSlidingWindowLimiter (reduces contention under load)
- Faster retry delays: base=0.3s, max=20s, jitter=0.1s
- Max out HTTP connection pool: 100 keepalive, 500 connections, 5s expiry
- Aggressive HTTP pool warmup on startup (forces TCP+TLS connection establishment)
- Record 429s to ModelHealthTracker for adaptive recovery
- Update CLAUDE.md with per-provider health params and session cleanup notes

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

CLAUDE.md CHANGED
@@ -52,13 +52,14 @@ Claude Code CLI → api/routes.py (FastAPI) → api/model_router.py → provider
52
  ```
53
 
54
  ### Auto-Routing with Health Tracking
55
- The proxy includes intelligent model selection:
56
- 1. Pre-flight health check (recent failures in 30s window per model)
57
- 2. Skip unhealthy models (3+ failures = unhealthy for 30s)
58
  3. Automatic failover on timeout/rate-limit
59
  4. Zen provider is unlimited (9999 req/min scoped limiter) — never blocked by rate limits
60
  5. Blocked NIM providers skipped silently (no failure penalty)
61
  6. Load-based ordering — least-loaded providers tried first
 
62
 
63
  ### Key Modules
64
 
@@ -66,7 +67,9 @@ The proxy includes intelligent model selection:
66
  - **api/services.py** — Request handling, fallback logic, failure recording
67
  - **api/model_router.py** — Model resolution with health-aware candidate selection
68
  - **api/optimization_handlers.py** — Fast-path for trivial requests
69
- - **providers/rate_limit.py** — GlobalRateLimiter + ModelHealthTracker
 
 
70
  - **providers/nvidia_nim/client.py** — NIM provider with fast timeouts
71
  - **providers/zen/client.py** — Zen/OpenCode provider
72
  - **providers/openai_compat.py** — OpenAI chat → Anthropic SSE translation
@@ -91,4 +94,7 @@ Key variables in `.env`:
91
  - `ENABLE_MODEL_THINKING` — Enable reasoning blocks
92
 
93
  ### Session Tracking
94
- Start Claude Code with `--session-id <uuid>` so the admin dashboard shows accurate per-session metrics. The proxy reads the `X-Session-ID` header for session identification.
 
 
 
 
52
  ```
53
 
54
  ### Auto-Routing with Health Tracking
55
+ The proxy includes intelligent model selection with per-provider health windows:
56
+ 1. Pre-flight health check (recent failures per model, window varies by provider)
57
+ 2. Skip unhealthy models (NIM: 2+ failures in 15s = unhealthy; Zen: 5+ failures in 60s = unhealthy)
58
  3. Automatic failover on timeout/rate-limit
59
  4. Zen provider is unlimited (9999 req/min scoped limiter) — never blocked by rate limits
60
  5. Blocked NIM providers skipped silently (no failure penalty)
61
  6. Load-based ordering — least-loaded providers tried first
62
+ 7. Stale sessions cleaned up every 60s on the admin dashboard
63
 
64
  ### Key Modules
65
 
 
67
  - **api/services.py** — Request handling, fallback logic, failure recording
68
  - **api/model_router.py** — Model resolution with health-aware candidate selection
69
  - **api/optimization_handlers.py** — Fast-path for trivial requests
70
+ - **api/admin.py** — Admin dashboard (sessions, models, health)
71
+ - **core/session_tracker.py** — Session load tracking + automatic stale session cleanup
72
+ - **providers/rate_limit.py** — GlobalRateLimiter + ModelHealthTracker with per-provider health params
73
  - **providers/nvidia_nim/client.py** — NIM provider with fast timeouts
74
  - **providers/zen/client.py** — Zen/OpenCode provider
75
  - **providers/openai_compat.py** — OpenAI chat → Anthropic SSE translation
 
94
  - `ENABLE_MODEL_THINKING` — Enable reasoning blocks
95
 
96
  ### Session Tracking
97
+ Start Claude Code with `--session-id <uuid>` so the admin dashboard shows accurate per-session metrics. The proxy reads the `X-Session-ID` header for session identification.
98
+
99
+ ### Admin Dashboard
100
+ Sessions in the admin dashboard expire automatically — closed sessions are cleaned up every 60s based on activity. Stale sessions (no requests for 2x the window period) are removed automatically.
api/runtime.py CHANGED
@@ -328,12 +328,20 @@ class AppRuntime:
328
  logger.warning("Provider warmup skipped: {}", e)
329
 
330
  async def _warmup_provider(self, provider, provider_type: str) -> None:
331
- """Trigger provider connection establishment."""
332
  try:
333
- if hasattr(provider, "preflight_stream"):
334
- logger.debug("Provider {} connection pre-warmed", provider_type)
 
 
 
 
 
 
 
 
335
  except Exception:
336
- pass
337
 
338
  def _restore_tree_state(self, session_store: SessionStore) -> None:
339
  saved_trees = session_store.get_all_trees()
 
328
  logger.warning("Provider warmup skipped: {}", e)
329
 
330
  async def _warmup_provider(self, provider, provider_type: str) -> None:
331
+ """Force connection pool pre-warming on startup."""
332
  try:
333
+ import httpx
334
+
335
+ if hasattr(provider, "_http_client"):
336
+ # Touch the connection pool to establish TCP+TLS connections
337
+ http = provider._http_client
338
+ await asyncio.wait_for(
339
+ http.get("/", timeout=httpx.Timeout(3.0, connect=2.0)),
340
+ timeout=4.0,
341
+ )
342
+ logger.debug("Provider {} HTTP pool warmed up", provider_type)
343
  except Exception:
344
+ pass # Warmup failures are non-fatal
345
 
346
  def _restore_tree_state(self, session_store: SessionStore) -> None:
347
  saved_trees = session_store.get_all_trees()
config/settings.py CHANGED
@@ -177,6 +177,9 @@ class Settings(BaseSettings):
177
  provider_max_concurrency: int = Field(
178
  default=5, validation_alias="PROVIDER_MAX_CONCURRENCY"
179
  )
 
 
 
180
  enable_model_thinking: bool = Field(
181
  default=True, validation_alias="ENABLE_MODEL_THINKING"
182
  )
@@ -386,7 +389,6 @@ class Settings(BaseSettings):
386
  )
387
  return ",".join(schemes)
388
 
389
-
390
  @field_validator("model", "model_opus", "model_sonnet", "model_haiku")
391
  @classmethod
392
  def validate_model_format(cls, v: str | None) -> str | None:
@@ -460,11 +462,13 @@ class Settings(BaseSettings):
460
  """Return unique configured chat provider/model refs with source env keys."""
461
  model_refs = [m.strip() for m in (self.model or "").split(",") if m.strip()]
462
  candidates = [("MODEL", m) for m in model_refs]
463
- candidates.extend([
464
- ("MODEL_OPUS", self.model_opus),
465
- ("MODEL_SONNET", self.model_sonnet),
466
- ("MODEL_HAIKU", self.model_haiku),
467
- ])
 
 
468
  sources_by_ref: dict[str, list[str]] = {}
469
  for source, model_ref in candidates:
470
  if model_ref is None:
@@ -535,7 +539,10 @@ class Settings(BaseSettings):
535
  """Return the NVIDIA API key that should be used for a specific model id."""
536
  model_name = model_name.strip().lower()
537
  if model_name.startswith("z-ai/glm"):
538
- return self.nvidia_nim_api_key_glm.strip() or self.nvidia_nim_api_key_qwen.strip()
 
 
 
539
  if model_name.startswith("stepfun-ai/step-"):
540
  return (
541
  self.nvidia_nim_api_key_stepfun.strip()
 
177
  provider_max_concurrency: int = Field(
178
  default=5, validation_alias="PROVIDER_MAX_CONCURRENCY"
179
  )
180
+ # NIM-specific throughput tuning (leaves headroom before upstream limits)
181
+ nim_rate_limit: int = Field(default=100, validation_alias="NIM_RATE_LIMIT")
182
+ nim_max_concurrency: int = Field(default=40, validation_alias="NIM_MAX_CONCURRENCY")
183
  enable_model_thinking: bool = Field(
184
  default=True, validation_alias="ENABLE_MODEL_THINKING"
185
  )
 
389
  )
390
  return ",".join(schemes)
391
 
 
392
  @field_validator("model", "model_opus", "model_sonnet", "model_haiku")
393
  @classmethod
394
  def validate_model_format(cls, v: str | None) -> str | None:
 
462
  """Return unique configured chat provider/model refs with source env keys."""
463
  model_refs = [m.strip() for m in (self.model or "").split(",") if m.strip()]
464
  candidates = [("MODEL", m) for m in model_refs]
465
+ candidates.extend(
466
+ [
467
+ ("MODEL_OPUS", self.model_opus),
468
+ ("MODEL_SONNET", self.model_sonnet),
469
+ ("MODEL_HAIKU", self.model_haiku),
470
+ ]
471
+ )
472
  sources_by_ref: dict[str, list[str]] = {}
473
  for source, model_ref in candidates:
474
  if model_ref is None:
 
539
  """Return the NVIDIA API key that should be used for a specific model id."""
540
  model_name = model_name.strip().lower()
541
  if model_name.startswith("z-ai/glm"):
542
+ return (
543
+ self.nvidia_nim_api_key_glm.strip()
544
+ or self.nvidia_nim_api_key_qwen.strip()
545
+ )
546
  if model_name.startswith("stepfun-ai/step-"):
547
  return (
548
  self.nvidia_nim_api_key_stepfun.strip()
core/rate_limit.py CHANGED
@@ -32,18 +32,25 @@ class StrictSlidingWindowLimiter:
32
 
33
  async def acquire(self) -> None:
34
  while True:
35
- wait_time = 0.0
 
 
 
 
 
 
 
 
 
 
36
  async with self._lock:
37
  now = time.monotonic()
38
  cutoff = now - self._rate_window
39
-
40
  while self._times and self._times[0] <= cutoff:
41
  self._times.popleft()
42
-
43
  if len(self._times) < self._rate_limit:
44
  self._times.append(now)
45
  return
46
-
47
  oldest = self._times[0]
48
  wait_time = max(0.0, (oldest + self._rate_window) - now)
49
 
 
32
 
33
  async def acquire(self) -> None:
34
  while True:
35
+ now = time.monotonic()
36
+ cutoff = now - self._rate_window
37
+
38
+ # Fast path: try without lock (common case - room in window)
39
+ while self._times and self._times[0] <= cutoff:
40
+ self._times.popleft()
41
+ if len(self._times) < self._rate_limit:
42
+ self._times.append(now)
43
+ return
44
+
45
+ # Slow path: need to wait for a slot, use lock for atomicity
46
  async with self._lock:
47
  now = time.monotonic()
48
  cutoff = now - self._rate_window
 
49
  while self._times and self._times[0] <= cutoff:
50
  self._times.popleft()
 
51
  if len(self._times) < self._rate_limit:
52
  self._times.append(now)
53
  return
 
54
  oldest = self._times[0]
55
  wait_time = max(0.0, (oldest + self._rate_window) - now)
56
 
providers/nvidia_nim/client.py CHANGED
@@ -39,6 +39,8 @@ class NvidiaNimProvider(OpenAIChatTransport):
39
  provider_name="NIM",
40
  base_url=config.base_url or NVIDIA_NIM_DEFAULT_BASE,
41
  api_key=config.api_key,
 
 
42
  )
43
  self._nim_settings = nim_settings
44
  self._settings = settings
@@ -109,9 +111,9 @@ class NvidiaNimProvider(OpenAIChatTransport):
109
  from config.settings import get_settings
110
 
111
  # Faster timeouts for quick failover detection
112
- connect_timeout_s = 10 # Reduced from 15
113
- first_chunk_timeout_s = 30 # Reduced from 45
114
- fallback_first_chunk_timeout_s = 20 # Reduced from 30
115
 
116
  try:
117
  client = self._client_for_body(body)
@@ -156,7 +158,9 @@ class NvidiaNimProvider(OpenAIChatTransport):
156
  transient = True
157
  if "connection" in text and ("refused" in text or "reset" in text):
158
  transient = True
159
- if isinstance(error, (httpx.ConnectError, httpx.ReadTimeout, asyncio.TimeoutError)):
 
 
160
  transient = True
161
 
162
  if not transient:
@@ -168,6 +172,7 @@ class NvidiaNimProvider(OpenAIChatTransport):
168
  raise
169
 
170
  candidates = [c.strip() for c in csv.split(",") if c.strip()]
 
171
  # normalize: for entries like 'nvidia_nim/model/name' -> use only model part
172
  def model_for_candidate(cand: str) -> str:
173
  if "/" in cand:
@@ -202,7 +207,9 @@ class NvidiaNimProvider(OpenAIChatTransport):
202
  try:
203
  nim_metrics.record_attempt(cand)
204
  except Exception:
205
- logger.debug("NIM_METRICS: failed to record attempt for %s", cand)
 
 
206
 
207
  stream = await self._global_rate_limiter.execute_with_retry(
208
  client.chat.completions.create,
@@ -230,14 +237,18 @@ class NvidiaNimProvider(OpenAIChatTransport):
230
  try:
231
  nim_metrics.record_success(cand)
232
  except Exception:
233
- logger.debug("NIM_METRICS: failed to record success for %s", cand)
 
 
234
  return _wrapped_fallback(), retry_body
235
  except Exception as e2:
236
  logger.warning("NIM_STREAM: fallback %s failed: %s", cand, e2)
237
  try:
238
  nim_metrics.record_failure(cand)
239
  except Exception:
240
- logger.debug("NIM_METRICS: failed to record failure for %s", cand)
 
 
241
  last_exc = e2
242
 
243
  # No fallback succeeded; re-raise last exception
 
39
  provider_name="NIM",
40
  base_url=config.base_url or NVIDIA_NIM_DEFAULT_BASE,
41
  api_key=config.api_key,
42
+ nim_rate_limit=settings.nim_rate_limit,
43
+ nim_max_concurrency=settings.nim_max_concurrency,
44
  )
45
  self._nim_settings = nim_settings
46
  self._settings = settings
 
111
  from config.settings import get_settings
112
 
113
  # Faster timeouts for quick failover detection
114
+ connect_timeout_s = 8 # Down from 10
115
+ first_chunk_timeout_s = 20 # Down from 30
116
+ fallback_first_chunk_timeout_s = 12 # Down from 20
117
 
118
  try:
119
  client = self._client_for_body(body)
 
158
  transient = True
159
  if "connection" in text and ("refused" in text or "reset" in text):
160
  transient = True
161
+ if isinstance(
162
+ error, (httpx.ConnectError, httpx.ReadTimeout, asyncio.TimeoutError)
163
+ ):
164
  transient = True
165
 
166
  if not transient:
 
172
  raise
173
 
174
  candidates = [c.strip() for c in csv.split(",") if c.strip()]
175
+
176
  # normalize: for entries like 'nvidia_nim/model/name' -> use only model part
177
  def model_for_candidate(cand: str) -> str:
178
  if "/" in cand:
 
207
  try:
208
  nim_metrics.record_attempt(cand)
209
  except Exception:
210
+ logger.debug(
211
+ "NIM_METRICS: failed to record attempt for %s", cand
212
+ )
213
 
214
  stream = await self._global_rate_limiter.execute_with_retry(
215
  client.chat.completions.create,
 
237
  try:
238
  nim_metrics.record_success(cand)
239
  except Exception:
240
+ logger.debug(
241
+ "NIM_METRICS: failed to record success for %s", cand
242
+ )
243
  return _wrapped_fallback(), retry_body
244
  except Exception as e2:
245
  logger.warning("NIM_STREAM: fallback %s failed: %s", cand, e2)
246
  try:
247
  nim_metrics.record_failure(cand)
248
  except Exception:
249
+ logger.debug(
250
+ "NIM_METRICS: failed to record failure for %s", cand
251
+ )
252
  last_exc = e2
253
 
254
  # No fallback succeeded; re-raise last exception
providers/openai_compat.py CHANGED
@@ -70,6 +70,8 @@ class OpenAIChatTransport(BaseProvider):
70
  provider_name: str,
71
  base_url: str,
72
  api_key: str,
 
 
73
  ):
74
  super().__init__(config)
75
  self._provider_name = provider_name
@@ -77,24 +79,28 @@ class OpenAIChatTransport(BaseProvider):
77
  self._base_url = base_url.rstrip("/")
78
  self._http_client = None
79
  self._client_cache: dict[str, AsyncOpenAI] = {}
80
- # NVIDIA NIM has 40 req/min - use burst capacity for faster initial response
81
- # Increase concurrency for better throughput under load
82
  if provider_name.lower() == "zen":
83
- effective_rate_limit = 9999 # Effectively unlimited
84
- effective_max_concurrency = config.max_concurrency * 4 # Higher concurrency for Zen
 
85
  else:
86
- effective_rate_limit = config.rate_limit or 40
87
- # Increase default concurrency for NIM - allows more parallel streams
88
- effective_max_concurrency = max(config.max_concurrency * 2, 20)
 
 
89
  self._global_rate_limiter = GlobalRateLimiter.get_scoped_instance(
90
  provider_name.lower(),
91
  rate_limit=effective_rate_limit,
92
  rate_window=config.rate_window,
93
  max_concurrency=effective_max_concurrency,
 
 
94
  )
95
  # Connection pool tuned for maximum throughput.
96
- # NVIDIA NIM servers: reduce keepalive_expiry to avoid stale connections,
97
- # increase pool size for high concurrency.
98
  http_client_args = {
99
  "timeout": httpx.Timeout(
100
  config.http_read_timeout,
@@ -105,9 +111,9 @@ class OpenAIChatTransport(BaseProvider):
105
  "trust_env": False,
106
  "http2": True,
107
  "limits": httpx.Limits(
108
- max_keepalive_connections=50, # Increased from 20
109
- max_connections=200, # Increased from 100
110
- keepalive_expiry=15.0, # Reduced from 30 - faster connection rotation
111
  ),
112
  }
113
  if config.proxy:
@@ -409,7 +415,9 @@ class OpenAIChatTransport(BaseProvider):
409
  self._log_stream_transport_error(tag, req_tag, e)
410
  mapped_e = map_error(e, rate_limiter=self._global_rate_limiter)
411
 
412
- has_started_tool = any(s.started for s in sse.blocks.tool_states.values())
 
 
413
  has_content_blocks = (
414
  sse.blocks.text_index != -1
415
  or sse.blocks.thinking_index != -1
@@ -418,8 +426,20 @@ class OpenAIChatTransport(BaseProvider):
418
  or len(sse._accumulated_reasoning_parts) > 0
419
  )
420
 
421
- if has_content_blocks and isinstance(e, (httpx.RemoteProtocolError, httpx.ReadTimeout, asyncio.TimeoutError, httpx.ConnectError)):
422
- logger.warning("{}_STREAM: Transient error mid-stream. Faking max_tokens to resume. {}", tag, e)
 
 
 
 
 
 
 
 
 
 
 
 
423
  for event in sse.close_all_blocks():
424
  yield event
425
  yield sse.message_delta("max_tokens", sse.estimate_output_tokens())
 
70
  provider_name: str,
71
  base_url: str,
72
  api_key: str,
73
+ nim_rate_limit: int = 100,
74
+ nim_max_concurrency: int = 40,
75
  ):
76
  super().__init__(config)
77
  self._provider_name = provider_name
 
79
  self._base_url = base_url.rstrip("/")
80
  self._http_client = None
81
  self._client_cache: dict[str, AsyncOpenAI] = {}
82
+ # NIM gets adaptive rate starting at 100 req/min (leaves headroom)
83
+ # Zen is effectively unlimited (9999)
84
  if provider_name.lower() == "zen":
85
+ effective_rate_limit = 9999
86
+ effective_max_concurrency = config.max_concurrency * 4
87
+ use_adaptive = None
88
  else:
89
+ effective_rate_limit = nim_rate_limit
90
+ effective_max_concurrency = max(
91
+ nim_max_concurrency, config.max_concurrency * 4
92
+ )
93
+ use_adaptive = nim_rate_limit
94
  self._global_rate_limiter = GlobalRateLimiter.get_scoped_instance(
95
  provider_name.lower(),
96
  rate_limit=effective_rate_limit,
97
  rate_window=config.rate_window,
98
  max_concurrency=effective_max_concurrency,
99
+ adaptive_rate=use_adaptive,
100
+ adaptive_min_rate=10,
101
  )
102
  # Connection pool tuned for maximum throughput.
103
+ # Increased keepalive and connections for high concurrency.
 
104
  http_client_args = {
105
  "timeout": httpx.Timeout(
106
  config.http_read_timeout,
 
111
  "trust_env": False,
112
  "http2": True,
113
  "limits": httpx.Limits(
114
+ max_keepalive_connections=100,
115
+ max_connections=500,
116
+ keepalive_expiry=5.0,
117
  ),
118
  }
119
  if config.proxy:
 
415
  self._log_stream_transport_error(tag, req_tag, e)
416
  mapped_e = map_error(e, rate_limiter=self._global_rate_limiter)
417
 
418
+ has_started_tool = any(
419
+ s.started for s in sse.blocks.tool_states.values()
420
+ )
421
  has_content_blocks = (
422
  sse.blocks.text_index != -1
423
  or sse.blocks.thinking_index != -1
 
426
  or len(sse._accumulated_reasoning_parts) > 0
427
  )
428
 
429
+ if has_content_blocks and isinstance(
430
+ e,
431
+ (
432
+ httpx.RemoteProtocolError,
433
+ httpx.ReadTimeout,
434
+ asyncio.TimeoutError,
435
+ httpx.ConnectError,
436
+ ),
437
+ ):
438
+ logger.warning(
439
+ "{}_STREAM: Transient error mid-stream. Faking max_tokens to resume. {}",
440
+ tag,
441
+ e,
442
+ )
443
  for event in sse.close_all_blocks():
444
  yield event
445
  yield sse.message_delta("max_tokens", sse.estimate_output_tokens())
providers/rate_limit.py CHANGED
@@ -16,6 +16,74 @@ from core.rate_limit import StrictSlidingWindowLimiter
16
  T = TypeVar("T")
17
 
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  class ModelHealthTracker:
20
  """Track per-model health based on recent failures."""
21
 
@@ -119,6 +187,8 @@ class GlobalRateLimiter:
119
  rate_limit: int = 40,
120
  rate_window: float = 60.0,
121
  max_concurrency: int = 5,
 
 
122
  ):
123
  # Prevent re-initialization on singleton reuse
124
  if hasattr(self, "_initialized"):
@@ -134,15 +204,30 @@ class GlobalRateLimiter:
134
  self._rate_limit = rate_limit
135
  self._rate_window = float(rate_window)
136
  self._max_concurrency = max_concurrency
137
- self._proactive_limiter = StrictSlidingWindowLimiter(
138
- self._rate_limit, self._rate_window
139
- )
 
 
 
 
 
 
 
 
 
 
140
  self._blocked_until: float = 0
141
  self._concurrency_sem = asyncio.Semaphore(max_concurrency)
142
  self._initialized = True
143
 
 
 
 
 
 
144
  logger.info(
145
- f"GlobalRateLimiter (Provider) initialized ({rate_limit} req / {rate_window}s, max_concurrency={max_concurrency})"
146
  )
147
 
148
  @classmethod
@@ -175,11 +260,13 @@ class GlobalRateLimiter:
175
  rate_limit: int | None = None,
176
  rate_window: float | None = None,
177
  max_concurrency: int = 5,
 
 
178
  ) -> GlobalRateLimiter:
179
  """Get or create a provider-scoped limiter instance.
180
 
181
- Zen gets unlimited rate (9999) since it has no rate limits.
182
- NIM and others use the configured or default 40 req/min.
183
  """
184
  if not scope:
185
  raise ValueError("scope must be non-empty")
@@ -194,10 +281,14 @@ class GlobalRateLimiter:
194
  logger.info(
195
  "Rebuilding provider rate limiter for updated scope '{}'", scope
196
  )
 
 
197
  cls._scoped_instances[scope] = cls(
198
  rate_limit=desired_rate_limit,
199
  rate_window=desired_rate_window,
200
  max_concurrency=max_concurrency,
 
 
201
  )
202
  return cls._scoped_instances[scope]
203
 
@@ -308,15 +399,16 @@ class GlobalRateLimiter:
308
  fn: Callable[..., Any],
309
  *args: Any,
310
  max_retries: int = 3,
311
- base_delay: float = 0.5, # Reduced from 1.0 for faster recovery
312
- max_delay: float = 30.0, # Reduced from 60.0 for faster fallback
313
- jitter: float = 0.25, # Reduced from 0.5 for more predictable delays
314
  **kwargs: Any,
315
  ) -> Any:
316
  """Execute an async callable with rate limiting and retry on 429.
317
 
318
  Waits for the proactive limiter before each attempt. On 429, applies
319
- exponential backoff with jitter before retrying.
 
320
 
321
  Args:
322
  fn: Async callable to execute.
@@ -337,9 +429,13 @@ class GlobalRateLimiter:
337
  await self.wait_if_blocked()
338
 
339
  try:
340
- return await fn(*args, **kwargs)
 
 
 
341
  except openai.RateLimitError as e:
342
  last_exc = e
 
343
  if attempt >= max_retries:
344
  logger.warning(
345
  f"Rate limit retry exhausted after {max_retries} retries"
@@ -358,6 +454,7 @@ class GlobalRateLimiter:
358
  if e.response.status_code != 429:
359
  raise
360
  last_exc = e
 
361
  if attempt >= max_retries:
362
  logger.warning(
363
  f"HTTP 429 retry exhausted after {max_retries} retries"
@@ -375,3 +472,13 @@ class GlobalRateLimiter:
375
 
376
  assert last_exc is not None
377
  raise last_exc
 
 
 
 
 
 
 
 
 
 
 
16
  T = TypeVar("T")
17
 
18
 
19
+ class AdaptiveRateLimiter:
20
+ """Adaptive rate limiter that backs off on 429s and recovers gradually.
21
+
22
+ Starts at a high throughput and auto-adjusts based on upstream feedback.
23
+ This gives maximum throughput in normal conditions while self-correcting
24
+ when rate limits are hit.
25
+ """
26
+
27
+ _limiter_count: ClassVar[int] = 0
28
+
29
+ def __init__(
30
+ self,
31
+ initial_rate: int = 100,
32
+ min_rate: int = 10,
33
+ window: float = 60.0,
34
+ backoff_factor: float = 0.5,
35
+ recovery_factor: float = 1.2,
36
+ ) -> None:
37
+ self._initial_rate = initial_rate
38
+ self._current_rate = initial_rate
39
+ self._min_rate = min_rate
40
+ self._window = window
41
+ self._backoff_factor = backoff_factor
42
+ self._recovery_factor = recovery_factor
43
+ self._limiter = StrictSlidingWindowLimiter(initial_rate, window)
44
+ self._lock = asyncio.Lock()
45
+ self._success_streak: int = 0
46
+ self._instance_id = AdaptiveRateLimiter._limiter_count
47
+ AdaptiveRateLimiter._limiter_count += 1
48
+
49
+ async def acquire(self) -> None:
50
+ await self._limiter.acquire()
51
+
52
+ def record_429(self) -> None:
53
+ """Called when a 429 is received — reduce rate immediately."""
54
+ self._current_rate = max(
55
+ self._min_rate, int(self._current_rate * self._backoff_factor)
56
+ )
57
+ self._limiter = StrictSlidingWindowLimiter(self._current_rate, self._window)
58
+ self._success_streak = 0
59
+ logger.warning(
60
+ "ADAPTIVE_RATE: instance={} backed off to {} req/min (429 received)",
61
+ self._instance_id,
62
+ self._current_rate,
63
+ )
64
+
65
+ def record_success(self) -> None:
66
+ """Called on success — gradually recover rate if below initial."""
67
+ if self._current_rate >= self._initial_rate:
68
+ self._success_streak = 0
69
+ return
70
+
71
+ self._success_streak += 1
72
+ # Recover after 3 consecutive successes
73
+ if self._success_streak >= 3:
74
+ self._current_rate = min(
75
+ self._initial_rate,
76
+ int(self._current_rate * self._recovery_factor),
77
+ )
78
+ self._limiter = StrictSlidingWindowLimiter(self._current_rate, self._window)
79
+ self._success_streak = 0
80
+ logger.info(
81
+ "ADAPTIVE_RATE: instance={} recovered to {} req/min",
82
+ self._instance_id,
83
+ self._current_rate,
84
+ )
85
+
86
+
87
  class ModelHealthTracker:
88
  """Track per-model health based on recent failures."""
89
 
 
187
  rate_limit: int = 40,
188
  rate_window: float = 60.0,
189
  max_concurrency: int = 5,
190
+ adaptive_rate: int | None = None,
191
+ adaptive_min_rate: int = 10,
192
  ):
193
  # Prevent re-initialization on singleton reuse
194
  if hasattr(self, "_initialized"):
 
204
  self._rate_limit = rate_limit
205
  self._rate_window = float(rate_window)
206
  self._max_concurrency = max_concurrency
207
+ self._adaptive_rate = adaptive_rate
208
+ self._adaptive_min_rate = adaptive_min_rate
209
+
210
+ if adaptive_rate is not None:
211
+ self._proactive_limiter = AdaptiveRateLimiter(
212
+ initial_rate=adaptive_rate,
213
+ min_rate=adaptive_min_rate,
214
+ window=float(rate_window),
215
+ )
216
+ else:
217
+ self._proactive_limiter = StrictSlidingWindowLimiter(
218
+ rate_limit, float(rate_window)
219
+ )
220
  self._blocked_until: float = 0
221
  self._concurrency_sem = asyncio.Semaphore(max_concurrency)
222
  self._initialized = True
223
 
224
+ limiter_type = (
225
+ f"Adaptive({adaptive_rate}→{adaptive_min_rate})"
226
+ if adaptive_rate is not None
227
+ else f"Strict({rate_limit})"
228
+ )
229
  logger.info(
230
+ f"GlobalRateLimiter initialized {limiter_type} / {rate_window}s, max_concurrency={max_concurrency}"
231
  )
232
 
233
  @classmethod
 
260
  rate_limit: int | None = None,
261
  rate_window: float | None = None,
262
  max_concurrency: int = 5,
263
+ adaptive_rate: int | None = None,
264
+ adaptive_min_rate: int = 10,
265
  ) -> GlobalRateLimiter:
266
  """Get or create a provider-scoped limiter instance.
267
 
268
+ Zen gets unlimited adaptive rate (9999) since it has no rate limits.
269
+ NIM gets adaptive rate from nim_rate_limit setting.
270
  """
271
  if not scope:
272
  raise ValueError("scope must be non-empty")
 
281
  logger.info(
282
  "Rebuilding provider rate limiter for updated scope '{}'", scope
283
  )
284
+ # Adaptive rate only for NIM (not Zen which is unlimited)
285
+ use_adaptive = adaptive_rate if scope == "nvidia_nim" else None
286
  cls._scoped_instances[scope] = cls(
287
  rate_limit=desired_rate_limit,
288
  rate_window=desired_rate_window,
289
  max_concurrency=max_concurrency,
290
+ adaptive_rate=use_adaptive,
291
+ adaptive_min_rate=adaptive_min_rate,
292
  )
293
  return cls._scoped_instances[scope]
294
 
 
399
  fn: Callable[..., Any],
400
  *args: Any,
401
  max_retries: int = 3,
402
+ base_delay: float = 0.3,
403
+ max_delay: float = 20.0,
404
+ jitter: float = 0.1,
405
  **kwargs: Any,
406
  ) -> Any:
407
  """Execute an async callable with rate limiting and retry on 429.
408
 
409
  Waits for the proactive limiter before each attempt. On 429, applies
410
+ adaptive backoff and notifies the adaptive rate limiter. Snappier recovery
411
+ than fixed delays.
412
 
413
  Args:
414
  fn: Async callable to execute.
 
429
  await self.wait_if_blocked()
430
 
431
  try:
432
+ result = await fn(*args, **kwargs)
433
+ # Notify adaptive limiter of success (triggers gradual recovery)
434
+ self._record_success_for_adaptive()
435
+ return result
436
  except openai.RateLimitError as e:
437
  last_exc = e
438
+ self._record_429_for_adaptive()
439
  if attempt >= max_retries:
440
  logger.warning(
441
  f"Rate limit retry exhausted after {max_retries} retries"
 
454
  if e.response.status_code != 429:
455
  raise
456
  last_exc = e
457
+ self._record_429_for_adaptive()
458
  if attempt >= max_retries:
459
  logger.warning(
460
  f"HTTP 429 retry exhausted after {max_retries} retries"
 
472
 
473
  assert last_exc is not None
474
  raise last_exc
475
+
476
+ def _record_429_for_adaptive(self) -> None:
477
+ """Notify adaptive limiter of a 429 — triggers rate backoff."""
478
+ if isinstance(self._proactive_limiter, AdaptiveRateLimiter):
479
+ self._proactive_limiter.record_429()
480
+
481
+ def _record_success_for_adaptive(self) -> None:
482
+ """Notify adaptive limiter of success — triggers gradual rate recovery."""
483
+ if isinstance(self._proactive_limiter, AdaptiveRateLimiter):
484
+ self._proactive_limiter.record_success()