initial commits

This commit is contained in:
2026-06-26 17:02:21 +08:00
commit 2851fa01cf
28 changed files with 2411 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
"""Local Anthropic-compatible proxy for AI Nexus Claude models."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,6 @@
from nexus_claude_api.cli import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
import argparse
import sys
import uvicorn
from nexus_claude_api.config import (
DEFAULT_ENDPOINT_URL,
DEFAULT_HOST,
DEFAULT_MODEL,
DEFAULT_PORT,
DEFAULT_SMALL_MODEL,
Settings,
)
from nexus_claude_api.server import create_app
from nexus_claude_api.shell import generate_claude_code_powershell
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="nexus-claude-api")
subparsers = parser.add_subparsers(dest="command")
start = subparsers.add_parser("start", help="Start the local API server")
start.add_argument("--host", default=DEFAULT_HOST)
start.add_argument("--port", "-p", type=int, default=DEFAULT_PORT)
start.add_argument("--endpoint-url", default=DEFAULT_ENDPOINT_URL)
start.add_argument("--api-key")
start.add_argument("--model", default=DEFAULT_MODEL)
start.add_argument("--small-model", default=DEFAULT_SMALL_MODEL)
start.add_argument("--claude-code", action="store_true")
start.add_argument("--verbose", "-v", action="store_true")
start.add_argument(
"--dry-run",
action="store_true",
help="Validate config and print helper output without starting the server.",
)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "start":
return run_start(args)
parser.print_help()
return 0
def run_start(args: argparse.Namespace) -> int:
settings = Settings.from_values(
host=args.host,
port=args.port,
endpoint_url=args.endpoint_url,
api_key=args.api_key,
model=args.model,
small_model=args.small_model,
verbose=args.verbose,
)
if not settings.api_key:
print(
"Missing Nexus API key. Add nexus-claude-api.local.json, set "
"NEXUS_API_KEY or AWS_BEARER_TOKEN_BEDROCK, or pass --api-key.",
file=sys.stderr,
)
return 2
if args.claude_code:
print("Claude Code command:")
print(generate_claude_code_powershell(settings))
if args.dry_run:
return 0
app = create_app(settings)
uvicorn.run(app, host=settings.host, port=settings.port, log_level="debug" if settings.verbose else "info")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,90 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from pathlib import Path
DEFAULT_ENDPOINT_URL = "https://genai-nexus.api.corpinter.net"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 4141
DEFAULT_MODEL = "claude-sonnet-4.6"
DEFAULT_SMALL_MODEL = "claude-haiku-4.5"
DEFAULT_OPUS_MODEL = "claude-opus-4.6"
LOCAL_CONFIG_FILENAME = "nexus-claude-api.local.json"
MODEL_ID_MAP = {
"claude-sonnet-4.6": "claude-sonnet-4.6",
"claude-opus-4.6": "claude-opus-4.6",
"claude-haiku-4.5": "claude-haiku-4.5",
"claude-sonnet-4": "claude-sonnet-4.6",
"claude-opus-4": "claude-opus-4.6",
"claude-haiku-4": "claude-haiku-4.5",
}
@dataclass(frozen=True)
class Settings:
host: str = DEFAULT_HOST
port: int = DEFAULT_PORT
endpoint_url: str = DEFAULT_ENDPOINT_URL
api_key: str | None = None
model: str = DEFAULT_MODEL
small_model: str = DEFAULT_SMALL_MODEL
verbose: bool = False
require_api_key: bool = True
@classmethod
def from_values(
cls,
*,
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
endpoint_url: str = DEFAULT_ENDPOINT_URL,
api_key: str | None = None,
model: str = DEFAULT_MODEL,
small_model: str = DEFAULT_SMALL_MODEL,
verbose: bool = False,
require_api_key: bool = True,
) -> "Settings":
local_config = load_local_config()
resolved_api_key = (
api_key
or local_config.get("api_key")
or local_config.get("nexus_api_key")
or os.environ.get("NEXUS_API_KEY")
or os.environ.get("AWS_BEARER_TOKEN_BEDROCK")
)
return cls(
host=host,
port=port,
endpoint_url=endpoint_url,
api_key=resolved_api_key,
model=model,
small_model=small_model,
verbose=verbose,
require_api_key=require_api_key,
)
@property
def base_url(self) -> str:
return f"http://{self.host}:{self.port}"
def resolve_backend_model(model: str) -> str:
return MODEL_ID_MAP.get(model, model)
def load_local_config(path: str | Path = LOCAL_CONFIG_FILENAME) -> dict[str, str]:
config_path = Path(path)
if not config_path.exists():
return {}
try:
raw = json.loads(config_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in {config_path}") from exc
if not isinstance(raw, dict):
raise ValueError(f"{config_path} must contain a JSON object")
return {str(key): str(value) for key, value in raw.items() if value is not None}

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class NexusClaudeError(Exception):
def __init__(
self,
message: str,
*,
status_code: int = 400,
error_type: str = "invalid_request_error",
) -> None:
super().__init__(message)
self.message = message
self.status_code = status_code
self.error_type = error_type
def anthropic_error_response(
message: str,
*,
status_code: int = 400,
error_type: str = "invalid_request_error",
) -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={
"type": "error",
"error": {
"type": error_type,
"message": message,
},
},
)
def map_http_exception(exc: HTTPException) -> JSONResponse:
detail = str(exc.detail) if exc.detail else "Request failed"
return anthropic_error_response(
detail,
status_code=exc.status_code,
error_type="invalid_request_error",
)

View File

@@ -0,0 +1,137 @@
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
class AnthropicTextBlock(BaseModel):
type: Literal["text"]
text: str
class AnthropicImageSource(BaseModel):
type: Literal["base64"]
media_type: str
data: str
class AnthropicImageBlock(BaseModel):
type: Literal["image"]
source: AnthropicImageSource
class AnthropicToolResultBlock(BaseModel):
type: Literal["tool_result"]
tool_use_id: str
content: str | list[dict[str, Any]]
is_error: bool | None = None
class AnthropicToolUseBlock(BaseModel):
type: Literal["tool_use"]
id: str
name: str
input: dict[str, Any]
class AnthropicThinkingBlock(BaseModel):
type: Literal["thinking"]
thinking: str
AnthropicContentBlock = (
AnthropicTextBlock
| AnthropicImageBlock
| AnthropicToolResultBlock
| AnthropicToolUseBlock
| AnthropicThinkingBlock
)
class AnthropicMessage(BaseModel):
role: Literal["user", "assistant"]
content: str | list[AnthropicContentBlock]
class AnthropicTool(BaseModel):
name: str
description: str | None = None
input_schema: dict[str, Any]
class AnthropicToolChoice(BaseModel):
type: Literal["auto", "any", "tool", "none"]
name: str | None = None
class AnthropicMetadata(BaseModel):
user_id: str | None = None
class AnthropicMessagesRequest(BaseModel):
model_config = ConfigDict(extra="allow")
model: str
messages: list[AnthropicMessage]
max_tokens: int = Field(default=1024, ge=1)
system: str | list[AnthropicTextBlock] | None = None
metadata: AnthropicMetadata | None = None
stop_sequences: list[str] | None = None
stream: bool | None = False
temperature: float | None = None
top_p: float | None = None
top_k: int | None = None
tools: list[AnthropicTool] | None = None
tool_choice: AnthropicToolChoice | None = None
class AnthropicUsage(BaseModel):
input_tokens: int = 0
output_tokens: int = 0
cache_creation_input_tokens: int | None = None
cache_read_input_tokens: int | None = None
class AnthropicMessageResponse(BaseModel):
id: str
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
content: list[AnthropicTextBlock | AnthropicToolUseBlock]
model: str
stop_reason: str | None
stop_sequence: str | None = None
usage: AnthropicUsage
class CountTokensRequest(BaseModel):
model_config = ConfigDict(extra="allow")
model: str
messages: list[AnthropicMessage]
system: str | list[AnthropicTextBlock] | None = None
tools: list[AnthropicTool] | None = None
class CountTokensResponse(BaseModel):
input_tokens: int
SUPPORTED_MODELS = [
{
"id": "claude-sonnet-4.6",
"display_name": "Claude Sonnet 4.6",
"owned_by": "anthropic",
},
{
"id": "claude-opus-4.6",
"display_name": "Claude Opus 4.6",
"owned_by": "anthropic",
},
{
"id": "claude-haiku-4.5",
"display_name": "Claude Haiku 4.5",
"owned_by": "anthropic",
},
]

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import os
from typing import Any
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from nexus_claude_api.config import Settings
from nexus_claude_api.errors import NexusClaudeError
class NexusClient:
def __init__(self, settings: Settings) -> None:
if settings.api_key:
os.environ["AWS_BEARER_TOKEN_BEDROCK"] = settings.api_key
self.settings = settings
self._client = boto3.client(
service_name="bedrock-runtime",
endpoint_url=settings.endpoint_url,
region_name="nexus",
)
def converse(self, request: dict[str, Any]) -> dict[str, Any]:
try:
return self._client.converse(**request)
except ClientError as exc:
raise _map_client_error(exc) from exc
except BotoCoreError as exc:
raise NexusClaudeError(
"Failed to call Nexus Converse API",
status_code=502,
error_type="api_error",
) from exc
def converse_stream(self, request: dict[str, Any]) -> Any:
try:
response = self._client.converse_stream(**request)
return response.get("stream", [])
except ClientError as exc:
raise _map_client_error(exc) from exc
except BotoCoreError as exc:
raise NexusClaudeError(
"Failed to call Nexus Converse Stream API",
status_code=502,
error_type="api_error",
) from exc
def _map_client_error(exc: ClientError) -> NexusClaudeError:
error = exc.response.get("Error", {})
code = error.get("Code", "")
message = error.get("Message", "Nexus API request failed")
status_code = int(exc.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 502))
if code in {"AccessDeniedException", "UnrecognizedClientException"}:
status_code = 403
elif code in {"ThrottlingException", "TooManyRequestsException"}:
status_code = 429
return NexusClaudeError(message, status_code=status_code, error_type="api_error")

View File

@@ -0,0 +1,2 @@
"""HTTP route modules."""

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
def root() -> dict[str, str]:
return {"status": "ok", "service": "nexus-claude-api"}
@router.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from pydantic import ValidationError
from nexus_claude_api.errors import NexusClaudeError, anthropic_error_response
from nexus_claude_api.models import (
AnthropicMessagesRequest,
CountTokensRequest,
CountTokensResponse,
)
from nexus_claude_api.nexus_client import NexusClient
from nexus_claude_api.tokens import estimate_input_tokens
from nexus_claude_api.translators.anthropic_to_bedrock import anthropic_to_bedrock_request
from nexus_claude_api.translators.bedrock_to_anthropic import bedrock_to_anthropic_response
from nexus_claude_api.translators.stream import (
bedrock_stream_to_anthropic_events,
sse_frame,
)
router = APIRouter()
def get_nexus_client(request: Request) -> NexusClient:
return request.app.state.nexus_client
@router.post("/v1/messages", response_model=None)
async def create_message(
request: Request,
client: Annotated[NexusClient, Depends(get_nexus_client)],
) -> Response:
try:
raw = await request.json()
payload = AnthropicMessagesRequest.model_validate(raw)
bedrock_request = anthropic_to_bedrock_request(payload)
if payload.stream:
stream = client.converse_stream(bedrock_request)
return StreamingResponse(
(
sse_frame(event)
for event in bedrock_stream_to_anthropic_events(
stream,
model=payload.model,
)
),
media_type="text/event-stream",
)
response = client.converse(bedrock_request)
anthropic_response = bedrock_to_anthropic_response(
response,
model=payload.model,
)
return JSONResponse(content=anthropic_response.model_dump(exclude_none=True))
except ValidationError as exc:
return anthropic_error_response(str(exc), status_code=400)
except NexusClaudeError as exc:
return anthropic_error_response(
exc.message,
status_code=exc.status_code,
error_type=exc.error_type,
)
@router.post("/v1/messages/count_tokens")
async def count_tokens(request: Request) -> JSONResponse:
try:
raw = await request.json()
payload = CountTokensRequest.model_validate(raw)
response = CountTokensResponse(input_tokens=estimate_input_tokens(payload))
return JSONResponse(content=response.model_dump())
except ValidationError as exc:
return anthropic_error_response(str(exc), status_code=400)

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from fastapi import APIRouter
from nexus_claude_api.models import SUPPORTED_MODELS
router = APIRouter()
@router.get("/v1/models")
def list_models() -> dict[str, object]:
return {
"object": "list",
"data": [
{
"id": model["id"],
"object": "model",
"type": "model",
"created": 0,
"created_at": "1970-01-01T00:00:00.000Z",
"owned_by": model["owned_by"],
"display_name": model["display_name"],
}
for model in SUPPORTED_MODELS
],
"has_more": False,
}

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.requests import Request
from fastapi.responses import JSONResponse
from nexus_claude_api.config import Settings
from nexus_claude_api.errors import NexusClaudeError, anthropic_error_response
from nexus_claude_api.nexus_client import NexusClient
from nexus_claude_api.routes.health import router as health_router
from nexus_claude_api.routes.messages import router as messages_router
from nexus_claude_api.routes.models import router as models_router
def create_app(
settings: Settings | None = None,
nexus_client: NexusClient | None = None,
) -> FastAPI:
resolved_settings = settings or Settings.from_values(require_api_key=False)
app = FastAPI(title="nexus-claude-api", version="0.1.0")
app.state.settings = resolved_settings
app.state.nexus_client = nexus_client or NexusClient(resolved_settings)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(health_router)
app.include_router(models_router)
app.include_router(messages_router)
@app.exception_handler(NexusClaudeError)
async def handle_nexus_error(_: Request, exc: NexusClaudeError) -> JSONResponse:
return anthropic_error_response(
exc.message,
status_code=exc.status_code,
error_type=exc.error_type,
)
logging.basicConfig(
level=logging.DEBUG if resolved_settings.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
return app

View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from nexus_claude_api.config import DEFAULT_OPUS_MODEL, Settings
def generate_claude_code_powershell(settings: Settings) -> str:
env = {
"ANTHROPIC_BASE_URL": settings.base_url,
"ANTHROPIC_AUTH_TOKEN": "dummy",
"ANTHROPIC_MODEL": settings.model,
"ANTHROPIC_DEFAULT_SONNET_MODEL": settings.model,
"ANTHROPIC_DEFAULT_OPUS_MODEL": DEFAULT_OPUS_MODEL,
"ANTHROPIC_SMALL_FAST_MODEL": settings.small_model,
"ANTHROPIC_DEFAULT_HAIKU_MODEL": settings.small_model,
"DISABLE_NON_ESSENTIAL_MODEL_CALLS": "1",
"CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC": "1",
}
assignments = [f"$env:{key}='{value}'" for key, value in env.items()]
return "\n".join([*assignments, "claude"])

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import json
from typing import Any
from nexus_claude_api.models import CountTokensRequest
def estimate_input_tokens(payload: CountTokensRequest | dict[str, Any]) -> int:
if hasattr(payload, "model_dump"):
text = json.dumps(payload.model_dump(), ensure_ascii=False, default=str)
else:
text = json.dumps(payload, ensure_ascii=False, default=str)
# Conservative local approximation: about 4 characters per token.
return max(1, (len(text) + 3) // 4)

View File

@@ -0,0 +1,2 @@
"""Anthropic and Bedrock translation helpers."""

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
import base64
from typing import Any
from nexus_claude_api.config import resolve_backend_model
from nexus_claude_api.errors import NexusClaudeError
from nexus_claude_api.models import (
AnthropicContentBlock,
AnthropicImageBlock,
AnthropicMessage,
AnthropicMessagesRequest,
AnthropicTextBlock,
AnthropicThinkingBlock,
AnthropicToolResultBlock,
AnthropicToolUseBlock,
)
def anthropic_to_bedrock_request(payload: AnthropicMessagesRequest) -> dict[str, Any]:
request: dict[str, Any] = {
"modelId": resolve_backend_model(payload.model),
"messages": [_message_to_bedrock(message) for message in payload.messages],
}
system = _system_to_bedrock(payload.system)
if system:
request["system"] = system
inference_config: dict[str, Any] = {}
if payload.max_tokens:
inference_config["maxTokens"] = payload.max_tokens
if payload.temperature is not None:
inference_config["temperature"] = payload.temperature
if payload.top_p is not None:
inference_config["topP"] = payload.top_p
if inference_config:
request["inferenceConfig"] = inference_config
if payload.stop_sequences:
request["stopSequences"] = payload.stop_sequences
tool_config = _tools_to_bedrock(payload)
if tool_config:
request["toolConfig"] = tool_config
return request
def _system_to_bedrock(
system: str | list[AnthropicTextBlock] | None,
) -> list[dict[str, str]] | None:
if system is None:
return None
if isinstance(system, str):
return [{"text": system}]
return [{"text": block.text} for block in system]
def _message_to_bedrock(message: AnthropicMessage) -> dict[str, Any]:
return {
"role": message.role,
"content": _content_to_bedrock(message.content),
}
def _content_to_bedrock(
content: str | list[AnthropicContentBlock],
) -> list[dict[str, Any]]:
if isinstance(content, str):
return [{"text": content}]
blocks: list[dict[str, Any]] = []
for block in content:
if isinstance(block, AnthropicTextBlock):
blocks.append({"text": block.text})
elif isinstance(block, AnthropicThinkingBlock):
blocks.append({"text": block.thinking})
elif isinstance(block, AnthropicImageBlock):
blocks.append(_image_to_bedrock(block))
elif isinstance(block, AnthropicToolUseBlock):
blocks.append(
{
"toolUse": {
"toolUseId": block.id,
"name": block.name,
"input": block.input,
}
}
)
elif isinstance(block, AnthropicToolResultBlock):
blocks.append(
{
"toolResult": {
"toolUseId": block.tool_use_id,
"content": _tool_result_content(block.content),
**({"status": "error"} if block.is_error else {}),
}
}
)
else:
raise NexusClaudeError(f"Unsupported content block: {block!r}")
return blocks
def _image_to_bedrock(block: AnthropicImageBlock) -> dict[str, Any]:
media_type = block.source.media_type
if "/" not in media_type:
raise NexusClaudeError(f"Invalid image media_type: {media_type}")
image_format = media_type.split("/", 1)[1]
if image_format == "jpeg":
image_format = "jpg"
try:
image_bytes = base64.b64decode(block.source.data, validate=True)
except Exception as exc:
raise NexusClaudeError("Invalid base64 image data") from exc
return {
"image": {
"format": image_format,
"source": {"bytes": image_bytes},
}
}
def _tool_result_content(content: str | list[dict[str, Any]]) -> list[dict[str, Any]]:
if isinstance(content, str):
return [{"text": content}]
result: list[dict[str, Any]] = []
for item in content:
if item.get("type") == "text" and "text" in item:
result.append({"text": item["text"]})
else:
result.append({"json": item})
return result
def _tools_to_bedrock(payload: AnthropicMessagesRequest) -> dict[str, Any] | None:
if not payload.tools or (payload.tool_choice and payload.tool_choice.type == "none"):
return None
tool_config: dict[str, Any] = {
"tools": [
{
"toolSpec": {
"name": tool.name,
"description": tool.description or "",
"inputSchema": {"json": tool.input_schema},
}
}
for tool in payload.tools
]
}
if payload.tool_choice:
choice = payload.tool_choice
if choice.type == "auto":
tool_config["toolChoice"] = {"auto": {}}
elif choice.type == "any":
tool_config["toolChoice"] = {"any": {}}
elif choice.type == "tool" and choice.name:
tool_config["toolChoice"] = {"tool": {"name": choice.name}}
return tool_config

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
import time
import uuid
from typing import Any
from nexus_claude_api.models import (
AnthropicMessageResponse,
AnthropicTextBlock,
AnthropicToolUseBlock,
AnthropicUsage,
)
def bedrock_to_anthropic_response(
response: dict[str, Any],
*,
model: str,
) -> AnthropicMessageResponse:
output_message = response.get("output", {}).get("message", {})
content = _content_to_anthropic(output_message.get("content", []))
usage = response.get("usage", {})
return AnthropicMessageResponse(
id=response.get("ResponseMetadata", {}).get("RequestId")
or f"msg_{uuid.uuid4().hex}",
content=content or [AnthropicTextBlock(type="text", text="")],
model=model,
stop_reason=map_stop_reason(response.get("stopReason")),
stop_sequence=None,
usage=AnthropicUsage(
input_tokens=int(usage.get("inputTokens", 0) or 0),
output_tokens=int(usage.get("outputTokens", 0) or 0),
),
)
def _content_to_anthropic(
content: list[dict[str, Any]],
) -> list[AnthropicTextBlock | AnthropicToolUseBlock]:
blocks: list[AnthropicTextBlock | AnthropicToolUseBlock] = []
for block in content:
if "text" in block:
blocks.append(AnthropicTextBlock(type="text", text=str(block["text"])))
elif "toolUse" in block:
tool = block["toolUse"]
blocks.append(
AnthropicToolUseBlock(
type="tool_use",
id=str(tool.get("toolUseId") or tool.get("id") or ""),
name=str(tool.get("name") or ""),
input=tool.get("input") or {},
)
)
return blocks
def map_stop_reason(reason: str | None) -> str:
mapping = {
"end_turn": "end_turn",
"max_tokens": "max_tokens",
"stop_sequence": "stop_sequence",
"tool_use": "tool_use",
"endTurn": "end_turn",
"maxTokens": "max_tokens",
"stopSequence": "stop_sequence",
"toolUse": "tool_use",
}
return mapping.get(reason or "", "end_turn")
def message_start_event(*, message_id: str, model: str) -> dict[str, Any]:
return {
"type": "message_start",
"message": {
"id": message_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
},
}
def now_message_id() -> str:
return f"msg_{int(time.time() * 1000)}_{uuid.uuid4().hex[:12]}"

View File

@@ -0,0 +1,118 @@
from __future__ import annotations
import json
from typing import Any, Iterable
from nexus_claude_api.translators.bedrock_to_anthropic import (
map_stop_reason,
message_start_event,
now_message_id,
)
def sse_frame(event: dict[str, Any]) -> str:
event_type = event["type"]
return f"event: {event_type}\ndata: {json.dumps(event, ensure_ascii=False)}\n\n"
def bedrock_stream_to_anthropic_events(
events: Iterable[dict[str, Any]],
*,
model: str,
) -> Iterable[dict[str, Any]]:
message_id = now_message_id()
usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
yield message_start_event(message_id=message_id, model=model)
open_blocks: set[int] = set()
final_stop_reason = "end_turn"
for event in events:
if "messageStart" in event:
continue
if "contentBlockStart" in event:
start = event["contentBlockStart"]
index = int(start.get("contentBlockIndex", start.get("index", 0)))
open_blocks.add(index)
block = start.get("start", {})
if "toolUse" in block:
tool = block["toolUse"]
content_block = {
"type": "tool_use",
"id": str(tool.get("toolUseId") or ""),
"name": str(tool.get("name") or ""),
"input": {},
}
else:
content_block = {"type": "text", "text": ""}
yield {
"type": "content_block_start",
"index": index,
"content_block": content_block,
}
elif "contentBlockDelta" in event:
delta_event = event["contentBlockDelta"]
index = int(delta_event.get("contentBlockIndex", delta_event.get("index", 0)))
if index not in open_blocks:
open_blocks.add(index)
yield {
"type": "content_block_start",
"index": index,
"content_block": {"type": "text", "text": ""},
}
delta = delta_event.get("delta", {})
if "text" in delta:
yield {
"type": "content_block_delta",
"index": index,
"delta": {"type": "text_delta", "text": delta["text"]},
}
elif "toolUse" in delta:
tool_delta = delta["toolUse"]
partial = tool_delta.get("input") or tool_delta.get("partialJson") or ""
if not isinstance(partial, str):
partial = json.dumps(partial, ensure_ascii=False)
yield {
"type": "content_block_delta",
"index": index,
"delta": {"type": "input_json_delta", "partial_json": partial},
}
elif "contentBlockStop" in event:
stop = event["contentBlockStop"]
index = int(stop.get("contentBlockIndex", stop.get("index", 0)))
open_blocks.discard(index)
yield {"type": "content_block_stop", "index": index}
elif "metadata" in event:
raw_usage = event["metadata"].get("usage", {})
usage["input_tokens"] = int(raw_usage.get("inputTokens", usage["input_tokens"]) or 0)
usage["output_tokens"] = int(
raw_usage.get("outputTokens", usage["output_tokens"]) or 0
)
elif "messageStop" in event:
final_stop_reason = map_stop_reason(event["messageStop"].get("stopReason"))
elif "error" in event:
yield {
"type": "error",
"error": {
"type": "api_error",
"message": str(event["error"]),
},
}
return
for index in sorted(open_blocks):
yield {"type": "content_block_stop", "index": index}
yield {
"type": "message_delta",
"delta": {"stop_reason": final_stop_reason, "stop_sequence": None},
"usage": usage,
}
yield {"type": "message_stop"}