如果你的团队已经按 OpenAI SDK 或 OpenAI-compatible 协议接入大模型,迁移到非线智能时,除了单次调通,更值得关注的是三个工程化指标:高并发表现、RPM/TPM 吞吐、费用和请求追踪。

本文中的费用均来自 API 响应里的 usage.cost 字段;复现时需要在本地环境变量或 .env 中配置自己的api key。

先说结论

本轮 OpenAI-compatible 专项评测的结果可以概括为一句话:非线智能的 OpenAI-compatible 主链路已经具备第一轮工程验证的初步证据;对于生产级 SLA 验收、reasoning 字段、Claude tool call、长窗口稳定性等更细场景,建议结合业务负载继续扩展验证。

核心数据如下:

维度 评测结果
基础 chat gpt-5.4-nanoclaude-sonnet-4.6gemini-3-flash-preview 均返回 200
stream gpt-5.4-nano 流式请求成功,TTFT 2.053s
100 并发 gpt-5.4-nanogemini-3-flash-preview 均 100/100 成功
60 秒调度 RPM 最高观测:gpt-5.4-nano 581.010 RPM,gemini-3-flash-preview 569.301 RPM
60 秒调度 TPM 按 100k TPM 调度时,实际观测:gpt-5.4-nano 167,029 TPM,gemini-3-flash-preview 211,226 TPM
请求追踪 非流式请求可传 task_id,结果文件可按前缀分组
费用可观测 OpenAI-compatible 响应返回 usage.cost,可直接汇总
Prompt Cache gpt-5.4-nano 在本轮相同长前缀 3 次样本中,命中后的单次费用约下降 84%
Tool Call gpt-5.4-nanogpt-5.4-mini 最小样本通过;Claude 系工具调用建议按业务 schema 单独验证
Thinking/Reasoning gemini-3.1-pro-previewclaude-sonnet-4.6-thinking 可返回 reasoning_content
多模态 smoke GLM-4.6V 图片输入链路可用,提高输出预算后返回可读 OCR 内容

需要特别说明:OpenAI-compatible 下调用 Claude 系列模型,区别于 Claude 原生协议测试。 本文中的 claude-sonnet-4.6claude-haiku-4.5claude-sonnet-4.6-thinking 都是通过 OpenAI-compatible /chat/completions 路径调用,因此仍属于本文范围。

这次评测怎么设计

普通 smoke test 主要覆盖基础可用性。为了更接近开发者真实接入,我们把测试拆成五类:

测试类型 回答的问题 本轮做法
Smoke 基础请求是否可用 chat、stream、Claude/Gemini/GPT 模型族各取样
并发 同一时刻能否承载请求 1/5/10/20/50/100 并发短输出
RPM 固定调度窗口内能处理多少请求 60 秒调度窗口,目标 300/600 RPM
TPM 固定调度窗口内能处理多少 token 60 秒调度窗口,按 50k/100k TPM 计划请求量
功能兼容 OpenAI-compatible 细节是否可用 task_idusage.cost、cache、tool call、reasoning、多模态

并发和 RPM/TPM 是不同指标:并发更像瞬时压力,RPM/TPM 更像单位时间吞吐。为了避免把两个概念混在一起,本文分别报告。RPM/TPM 请求按 60 秒调度,观测 RPM/TPM 按实际墙钟耗时和成功请求的实际 token 数计算。

本轮聚焦同日短窗口与成本可控样本。Claude Sonnet 做 OpenAI-compatible smoke、轻量并发、tool call 与 thinking 对照;RPM/TPM 阶梯测试放在低成本模型上。

1. OpenAI-compatible 基础链路

基础请求结果如下:

场景 模型 HTTP 延迟/TTFT API 返回费用 备注
chat gpt-5.4-nano 200 3.578s ¥0.000050 基础文本请求成功
chat claude-sonnet-4.6 200 3.900s ¥0.000672 通过 OpenAI-compatible 调用 Claude
chat gemini-3-flash-preview 200 4.301s ¥0.000107 返回 reasoning 相关字段
stream gpt-5.4-nano 200 TTFT 2.053s 流式样本 42 chunks,总耗时 2.347s

这组 smoke test 的重点是协议兼容性:同一个 /chat/completions 路径下,可以请求 GPT、Claude、Gemini 等不同模型族。对于已经写好 OpenAI SDK 调用层的团队,这会显著降低迁移成本。Claude 系基础 chat、轻量并发和 thinking 样本已纳入本文;工具调用等扩展能力建议按业务 schema 单独验证。

stream 样本按当前评测约定省略 task_id,本文的 task_id 追踪样本聚焦非流式请求。

2. 并发:低成本模型测到 100,Claude 轻量验证到 20

并发测试使用短输出请求,主要观察同一时刻放量时的成功率、延迟和费用表现。

gpt-5.4-nano

并发 成功 成功率 P95 延迟 墙钟耗时 API 返回费用
1 1/1 100% 2.704s 2.708s ¥0.000091
5 5/5 100% 3.198s 3.459s ¥0.000455
10 10/10 100% 3.266s 3.320s ¥0.000910
20 20/20 100% 4.407s 8.937s ¥0.001820
50 50/50 100% 5.285s 7.908s ¥0.004550
100 100/100 100% 7.554s 11.123s ¥0.009100

gemini-3-flash-preview

并发 成功 成功率 P95 延迟 墙钟耗时 API 返回费用
1 1/1 100% 4.351s 4.354s ¥0.000142
5 5/5 100% 5.385s 5.592s ¥0.000689
10 10/10 100% 4.971s 5.274s ¥0.001273
20 20/20 100% 5.990s 5.996s ¥0.002599
50 50/50 100% 6.855s 10.118s ¥0.006540
100 100/100 100% 8.163s 8.194s ¥0.012743

低成本模型在 100 并发下均未出现 429、5xx 或系统性超时。

claude-sonnet-4.6

Claude Sonnet 本轮聚焦轻量并发,覆盖 1/5/10/20 四个阶梯。

并发 成功 成功率 P95 延迟 墙钟耗时 API 返回费用 备注
1 1/1 100% 2.894s 2.898s ¥0.001176 重试成功
5 5/5 100% 4.092s 4.146s ¥0.005880 -
10 10/10 100% 5.688s 6.012s ¥0.011760 -
20 20/20 100% 6.339s 6.806s ¥0.023520 -

结论很直接:OpenAI-compatible 下 claude-sonnet-4.6 可以调用,1/5/10/20 并发阶梯均取得成功样本。

3. RPM/TPM:60 秒调度窗口结果

RPM/TPM 测试使用 scripts/throughput_test.py。每组请求按 60 秒窗口调度,非流式请求均传入 task_id;观测 RPM/TPM 按实际墙钟耗时计算。

RPM 结果

模型 目标 RPM 计划请求 成功 状态分布 观测 RPM 观测 TPM P50 延迟 P95 延迟 P99 延迟 API 返回费用
gpt-5.4-nano 300 300 300 200:300 290.943 5,236.967 2.112s 6.680s 9.059s ¥0.020700
gpt-5.4-nano 600 600 599 200:599, None:1 581.010 10,458.187 2.135s 12.449s 15.246s ¥0.041331
gemini-3-flash-preview 300 300 300 200:300 283.060 6,068.805 3.524s 4.363s 4.650s ¥0.091052
gemini-3-flash-preview 600 600 600 200:600 569.301 12,295.947 3.544s 4.405s 4.952s ¥0.182215

两个模型都完成了 600 RPM 目标窗口,未观察到 429 或 5xx。gpt-5.4-nano 在 600 RPM 窗口成功率为 99.83%;gemini-3-flash-preview 600 RPM 窗口 600/600 成功。

TPM 结果

模型 调度目标 TPM 计划请求 成功 状态分布 观测 RPM 观测 TPM total_tokens P50 延迟 P95 延迟 P99 延迟 API 返回费用
gpt-5.4-nano 50,000 13 12 200:12, None:1 12.421 83,159.835 80,340 2.451s 5.430s 5.430s ¥0.113268
gpt-5.4-nano 100,000 25 25 200:25 24.948 167,029.004 167,375 2.405s 2.868s 3.378s ¥0.146947
gemini-3-flash-preview 50,000 13 13 200:13 13.231 115,014.975 113,004 4.616s 9.273s 9.273s ¥0.404024
gemini-3-flash-preview 100,000 25 25 200:25 24.298 211,225.875 217,330 4.534s 6.931s 7.343s ¥0.594364

两个模型在按 100k TPM 调度的窗口中均完成请求,实际观测 token 负载高于调度目标。原因是脚本按 approx_prompt_tokens 估算请求数,而最终 TPM 使用 API 返回的 total_tokens 和实际墙钟耗时计算。需要注意的是,gemini-3-flash-previewtotal_tokens 可能受到 reasoning 口径影响,解读时应同时看 prompt_tokenscompletion_tokenstotal_tokens

4. 费用和请求追踪:usage.costtask_id

OpenAI-compatible 响应返回 usage.cost,这对工程团队很关键:它让成本统计从“事后估算”变成“请求级汇总”。

非流式请求可以传入 task_id。本轮通过 task_id_prefix 将 smoke、并发、缓存、RPM/TPM、tool call、thinking、多模态 smoke 分组,方便在 JSONL 结果中追踪。

示例:

nl-20260519-throughput-rpm-gpt-5.4-nano-target-600-req-0
nl-20260518-supp-tool-gpt-5.4-mini-run-1
nl-20260518-supp-thinking-claude-sonnet-4.6-thinking-run-1

stream 样本按当前评测约定省略 task_id,本文的 task_id 追踪样本聚焦非流式请求。

5. Prompt Cache:GPT nano 命中明显

gpt-5.4-nano 长前缀缓存测试:

Run 输入 token cached_tokens API 返回费用 结论
1 11719 0 ¥0.016477 首次请求
2 11719 11008 ¥0.002607 命中
3 11719 11008 ¥0.002607 稳定命中

在本轮相同长前缀 3 次样本中,缓存命中后单次费用从 ¥0.016477 降至 ¥0.002607,降幅约 84%。这是一组明确的样本级成本优化证据,适合作为后续业务 prompt 复测的参考基线。

6. Tool Call、Thinking、多模态:兼容层细节比“能调通”更重要

Tool Call

工具定义为 get_weather(city),prompt 要求查询 Shanghai 天气,不直接回答。

模型 请求数 HTTP 成功 返回 tool_calls 参数正确 API 返回费用
gpt-5.4-nano 3 3 3 3 ¥0.001023
gpt-5.4-mini 3 3 3 3 ¥0.002568
claude-sonnet-4.6 3 3 3 3 ¥0.046053

GPT 系低成本模型的 OpenAI-compatible tool call 最小样本通过。如果业务强依赖工具调用,建议继续用更贴近生产的 schema、tool_choice 和输出预算做专项验证。

Thinking / Reasoning

模型 HTTP content reasoning_content finish_reason API 返回费用
gemini-3.1-pro-preview 200 reasoning 输出为主 length ¥0.001519
claude-sonnet-4.6 200 文本输出 stop ¥0.001575
claude-sonnet-4.6-thinking 200 reasoning 输出为主 length ¥0.002625
gpt-5.4-nano 200 文本输出 stop ¥0.000085
gpt-5.4-nano-high 200 文本输出 stop ¥0.000085

gemini-3.1-pro-previewclaude-sonnet-4.6-thinking 在短输出 thinking 样本中体现出 reasoning 字段价值。SDK 适配层建议同时处理 contentreasoning_contentfinish_reason 和相关扩展字段,让 thinking 模型的输出结构被完整利用。

多模态 Smoke

本轮先做最小多模态 smoke:

模型 样本 max_tokens HTTP content reasoning_content API 返回费用 备注
GLM-4.6V 3 条 64 200 reasoning 输出为主 ¥0.002609 图片输入链路可调用
GLM-4.6V 1 条复查 512 200 ¥0.001710 返回可读 OCR 内容

结论:OpenAI-compatible 图片输入链路可用。视觉/OCR 样本更适合使用高于纯文本压测的输出预算,512 token 复查样本已经返回可读 OCR 内容。

适用场景与专项验证

更适合:

• 已经使用 OpenAI SDK 或 OpenAI-compatible 协议,希望接入多模型的团队。
• 需要同时调用 GPT、Claude、Gemini 等模型族,但不想维护多套网关适配层的团队。
• 对请求级成本、task_id 追踪、JSONL 留痕有要求的开发团队。
• 需要做并发、RPM/TPM、缓存、tool call、reasoning 等工程验证的模型应用团队。

建议专项验证:

• 需要全天 SLA 证明的生产业务:建议在本文短窗口基础上追加跨时段与长窗口测试。
• 强依赖 OpenAI-compatible tool call 调 Claude 的业务:建议使用真实工具 schema、tool_choice 和业务 prompt 做专项验证。
• 使用 thinking 模型的业务:建议同时解析 message.contentreasoning_content
• 多模态质量评测场景:建议在 smoke 基础上追加全量图片理解、OCR、图表问答评测。

解读注意事项

关注项 本轮证据 建议
Gemini Flash 费用口径 固定 prompt 样本包含 reasoning 字段和 cost 差异 同时披露 prompt、completion、total token,明确 reasoning 口径
Gemini Flash 编号测试 短输出设置下 marker 输出受 reasoning 与 token 预算影响 换更短 marker,或明确 reasoning/输出参数
Claude Sonnet tool call 3/3 HTTP 200,对照样本已记录 使用 tool_choice=required、更高输出预算和业务工具参数做专项验证
thinking 模型输出结构 Gemini Pro 与 Claude thinking 样本体现 reasoning 字段 SDK 层同时处理 content 与 reasoning 字段
多模态输出预算 GLM-4.6V max_tokens=64 与 512 复查样本差异明显 多模态专项单独设置输出预算
RPM/TPM 长窗口 本轮为 60 秒调度窗口 后续补 10-30 分钟窗口和跨时段测试

如何复现

本文保留原始 JSONL 文件名和脚本源码,方便同行或开发者复跑。复现前只需要准备环境变量,不要把 .env 或 API Key 提交到公开仓库。

复现前提:

• Python 3.x;本文附录中的 RPM/TPM 脚本只使用标准库,通用请求函数同样只使用标准库。
• 从项目根目录运行命令,确保 scripts/_common.py 可被正确导入。
• 账号需要具备对应模型的调用权限;模型权限、网络状态和限流策略会影响复现结果。
--output 指向的 JSONL 文件会被覆盖;需要保留历史结果时请换新文件名。

OpenAI-compatible 主要结果文件分两类:

results/ 目录:

results/throughput-rpm-gpt-nano-300-20260519.jsonl
results/throughput-rpm-gpt-nano-600-20260519.jsonl
results/throughput-rpm-gemini-flash-300-20260519.jsonl
results/throughput-rpm-gemini-flash-600-20260519.jsonl
results/throughput-tpm-gpt-nano-50k-20260519.jsonl
results/throughput-tpm-gpt-nano-100k-20260519.jsonl
results/throughput-tpm-gemini-flash-50k-20260519.jsonl
results/throughput-tpm-gemini-flash-100k-20260519.jsonl
results/supp-tool-calls-20260518.jsonl
results/supp-thinking-fields-20260518.jsonl
results/supp-vision-smoke-20260518.jsonl

项目根目录:

results-concurrency-claude-sonnet-20260518.jsonl
results-cache-claude-haiku-openai-20260518.jsonl

复现前准备 .env

NONELINEAR_OPENAI_BASE_URL=https://api.nonelinear.com/v1
NONELINEAR_API_KEY=<redacted>

RPM 复现示例:

python3 scripts/throughput_test.py \
  --mode rpm \
  --model gpt-5.4-nano \
  --target 600 \
  --duration 60 \
  --max-tokens 16 \
  --task-id-prefix nl-20260519-throughput \
  --output results/throughput-rpm-gpt-nano-600-20260519.jsonl

TPM 复现示例:

python3 scripts/throughput_test.py \
  --mode tpm \
  --model gemini-3-flash-preview \
  --target 100000 \
  --duration 60 \
  --approx-prompt-tokens 4000 \
  --max-tokens 16 \
  --task-id-prefix nl-20260519-throughput \
  --output results/throughput-tpm-gemini-flash-100k-20260519.jsonl

结论

从 OpenAI-compatible 接入视角看,非线智能 API 在本轮短窗口样本中表现稳定:低成本模型完成 100 并发,60 秒调度窗口下完成 600 RPM 测试;按 100k TPM 调度时,实际观测 token 负载达到 gpt-5.4-nano 167,029 TPM、gemini-3-flash-preview 211,226 TPM。usage.cost 可读,task_id 可追踪,gpt-5.4-nano 在长前缀小样本中缓存命中后费用下降明显,GPT 系 tool call 最小样本通过。

同时,这些数据也给出了后续专项验证方向:Claude 工具调用适合结合真实 schema 继续验证;Gemini 和 Claude thinking 样本建议显式处理 reasoning_content;多模态 smoke 适合使用更高输出预算;60 秒窗口之后可以继续扩展到跨时段和长窗口测试。

如果你的目标是用 OpenAI-compatible 协议接入一个多模型 API 网关,本文的数据可以作为第一轮工程验证依据;如果你的目标是生产级 SLA 验收,还需要继续补充跨时段、长窗口和真实业务负载测试。

附录 A:RPM/TPM 测试脚本

以下脚本不包含 API Key。脚本通过 scripts/_common.py 读取 .env,并调用 OpenAI-compatible /chat/completions

文件路径:scripts/throughput_test.py

#!/usr/bin/env python3
import argparse
import concurrent.futures
import json
import math
import statistics
import time
from datetime import datetime, timezone
from pathlib import Path

from _common import openai_chat_request

SCRIPT_VERSION = "2026-05-19-throughput-v1"

def now_iso():
    return datetime.now(timezone.utc).isoformat()

def percentile(values, pct):
    if not values:
        return None
    ordered = sorted(values)
    idx = max(0, min(len(ordered) - 1, math.ceil(len(ordered) * pct / 100) - 1))
    return ordered[idx]

def write_jsonl(path, rows):
    output = Path(path)
    output.parent.mkdir(parents=True, exist_ok=True)
    with output.open("w", encoding="utf-8") as f:
        for row in rows:
            f.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n")

def build_prompt(mode, index, approx_prompt_tokens):
    if mode == "rpm":
        return f"Reply exactly: ok-{index}"

    repeat = max(1, int(approx_prompt_tokens / 6))
    padding = " ".join(["NONELINEAR_TPM_EVAL_STATIC_TEXT"] * repeat)
    return (
        "You are testing throughput. Read the following static text and reply exactly "
        f"with TPM-OK-{index}.\n\n{padding}"
    )

def run_one(args, index):
    task_id = f"{args.task_id_prefix}-{args.mode}-{args.model}-target-{args.target}-req-{index}"
    prompt = build_prompt(args.mode, index, args.approx_prompt_tokens)
    start = time.perf_counter()
    result = openai_chat_request(
        args.model,
        [{"role": "user", "content": prompt}],
        max_tokens=args.max_tokens,
        timeout=args.timeout,
        extra={"task_id": task_id},
    )
    elapsed = result.get("elapsed_sec")
    data = result.get("response") or {}
    usage = data.get("usage") or {}
    choice = (data.get("choices") or [{}])[0] if isinstance(data, dict) else {}
    message = choice.get("message") or {}
    return {
        "record_type": "request",
        "run_id": args.run_id,
        "timestamp": now_iso(),
        "script_path": "scripts/throughput_test.py",
        "script_version": SCRIPT_VERSION,
        "mode": args.mode,
        "target": args.target,
        "duration_s": args.duration,
        "model_requested": args.model,
        "model_returned": data.get("model") if isinstance(data, dict) else None,
        "task_id": task_id,
        "request_index": index,
        "scheduled_offset_s": round(index * args.interval_s, 3),
        "http_status": result.get("status"),
        "success": bool(result.get("ok")),
        "elapsed_sec": elapsed,
        "prompt_tokens": usage.get("prompt_tokens"),
        "completion_tokens": usage.get("completion_tokens"),
        "total_tokens": usage.get("total_tokens"),
        "api_cost": usage.get("cost"),
        "finish_reason": choice.get("finish_reason"),
        "content": message.get("content"),
        "reasoning_content_present": bool(message.get("reasoning_content")),
        "error_type": None if result.get("ok") else result.get("error", "").split(":", 1)[0],
        "error_message": None if result.get("ok") else result.get("error"),
        "wall_elapsed_sec": round(time.perf_counter() - start, 3),
    }

def summarize(args, rows, wall_sec):
    request_rows = [row for row in rows if row.get("record_type") == "request"]
    successes = [row for row in request_rows if row.get("success")]
    failures = [row for row in request_rows if not row.get("success")]
    latencies = [row["elapsed_sec"] for row in successes if isinstance(row.get("elapsed_sec"), (int, float))]
    total_tokens = sum(row.get("total_tokens") or 0 for row in successes)
    prompt_tokens = sum(row.get("prompt_tokens") or 0 for row in successes)
    completion_tokens = sum(row.get("completion_tokens") or 0 for row in successes)
    cost = sum(row.get("api_cost") or 0 for row in successes)
    status_counts = {}
    for row in request_rows:
        status = str(row.get("http_status"))
        status_counts[status] = status_counts.get(status, 0) + 1
    observed_rpm = len(successes) / wall_sec * 60 if wall_sec else None
    observed_tpm = total_tokens / wall_sec * 60 if wall_sec else None

    return {
        "record_type": "summary",
        "run_id": args.run_id,
        "timestamp": now_iso(),
        "script_path": "scripts/throughput_test.py",
        "script_version": SCRIPT_VERSION,
        "mode": args.mode,
        "target": args.target,
        "target_rpm": args.target if args.mode == "rpm" else None,
        "target_tpm": args.target if args.mode == "tpm" else None,
        "duration_s": args.duration,
        "wall_sec": round(wall_sec, 3),
        "model_requested": args.model,
        "planned_requests": args.planned_requests,
        "success_count": len(successes),
        "failure_count": len(failures),
        "success_rate": round(len(successes) / len(request_rows), 4) if request_rows else None,
        "status_counts": status_counts,
        "observed_rpm": round(observed_rpm, 3) if observed_rpm is not None else None,
        "observed_tpm": round(observed_tpm, 3) if observed_tpm is not None else None,
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "api_cost_sum": round(cost, 6),
        "latency_avg_sec": round(statistics.mean(latencies), 3) if latencies else None,
        "latency_p50_sec": round(percentile(latencies, 50), 3) if latencies else None,
        "latency_p95_sec": round(percentile(latencies, 95), 3) if latencies else None,
        "latency_p99_sec": round(percentile(latencies, 99), 3) if latencies else None,
        "stop_rule": "如出现大量 429/5xx/timeout,停止后续升阶并记录限流点",
    }

def compute_plan(args):
    if args.mode == "rpm":
        planned = max(1, math.floor(args.target * args.duration / 60))
    else:
        planned = max(1, math.ceil(args.target * args.duration / 60 / args.approx_prompt_tokens))
    args.planned_requests = planned
    args.interval_s = args.duration / planned

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode", choices=["rpm", "tpm"], required=True)
    parser.add_argument("--model", required=True)
    parser.add_argument("--target", type=int, required=True, help="Target RPM or TPM based on --mode")
    parser.add_argument("--duration", type=int, default=60)
    parser.add_argument("--max-tokens", type=int, default=16)
    parser.add_argument("--approx-prompt-tokens", type=int, default=4000)
    parser.add_argument("--timeout", type=int, default=45)
    parser.add_argument("--max-workers", type=int, default=200)
    parser.add_argument("--run-id", default="nl-throughput-20260519")
    parser.add_argument("--task-id-prefix", default="nl-20260519-throughput")
    parser.add_argument("--output", required=True)
    args = parser.parse_args()
    compute_plan(args)

    rows = []
    started = time.perf_counter()
    with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor:
        futures = []
        for index in range(args.planned_requests):
            due = started + index * args.interval_s
            delay = due - time.perf_counter()
            if delay > 0:
                time.sleep(delay)
            futures.append(executor.submit(run_one, args, index))
        for future in concurrent.futures.as_completed(futures):
            rows.append(future.result())

    wall_sec = time.perf_counter() - started
    rows.sort(key=lambda row: row.get("request_index", 10**9))
    rows.append(summarize(args, rows, wall_sec))
    write_jsonl(args.output, rows)
    print(json.dumps(rows[-1], ensure_ascii=False, separators=(",", ":")))

if __name__ == "__main__":
    main()

附录 B:最小 OpenAI-compatible 请求函数

下面是可独立复用的最小请求函数示例。它只从 .env 读取 NONELINEAR_OPENAI_BASE_URLNONELINEAR_API_KEY,不会输出 API Key。

import json
import os
import time
import urllib.error
import urllib.request
from pathlib import Path

def load_env(path=".env"):
    env_path = Path(path)
    if not env_path.exists():
        raise FileNotFoundError(f"Missing {path}")

    for raw in env_path.read_text().splitlines():
        line = raw.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        os.environ.setdefault(key, value)

def openai_chat_request(model, messages, max_tokens=8, stream=False, timeout=45, extra=None):
    load_env()
    base_url = os.environ["NONELINEAR_OPENAI_BASE_URL"].rstrip("/")
    api_key = os.environ["NONELINEAR_API_KEY"]
    body = {
        "model": model,
        "messages": messages,
        "max_tokens": max_tokens,
        "stream": stream,
    }
    if extra:
        body.update(extra)

    request = urllib.request.Request(
        base_url + "/chat/completions",
        data=json.dumps(body).encode(),
        headers={
            "Authorization": "Bearer " + api_key,
            "Content-Type": "application/json",
        },
        method="POST",
    )
    start = time.perf_counter()
    try:
        with urllib.request.urlopen(request, timeout=timeout) as response:
            data = json.loads(response.read().decode())
            return {
                "ok": True,
                "status": response.status,
                "elapsed_sec": round(time.perf_counter() - start, 3),
                "response": data,
            }
    except urllib.error.HTTPError as error:
        return {
            "ok": False,
            "status": error.code,
            "elapsed_sec": round(time.perf_counter() - start, 3),
            "error": error.read().decode(errors="replace")[:2000],
        }
    except Exception as error:
        return {
            "ok": False,
            "status": None,
            "elapsed_sec": round(time.perf_counter() - start, 3),
            "error": type(error).__name__ + ": " + str(error),
        }