这份文档是由非线智能提供的,OpenAI-compatible API 的独立评测代码模板。客户可以直接复制下面的单文件脚本,保存为 nonelinear_openai_eval.py,用于跑出市面上各大对应聚合API的基础可用性、流式 TTFT、并发、RPM/TPM、Prompt Cache、Tool Call、Thinking 字段和多模态 smoke 测试。
脚本不包含 API Key,只从环境变量或当前目录 .env 读取访问凭证。
环境准备
推荐使用 Python 3.9+。脚本只依赖 Python 标准库,不需要安装第三方包。
方式一:使用环境变量。
export NONELINEAR_OPENAI_BASE_URL=https://api.nonelinear.com/v1
export NONELINEAR_API_KEY=sk-...
方式二:在运行目录创建 .env。
NONELINEAR_OPENAI_BASE_URL=https://api.nonelinear.com/v1
NONELINEAR_API_KEY=sk-...
结果文件使用 JSON Lines 格式。每一行是一条请求记录或汇总记录,便于后续导入表格、数据库或日志系统分析。
运行示例
基础 chat + stream:
python3 nonelinear_openai_eval.py smoke \
--models gpt-5.4-nano,claude-sonnet-4.6,gemini-3-flash-preview \
--output results/customer-smoke.jsonl
并发阶梯:
python3 nonelinear_openai_eval.py concurrency \
--model gpt-5.4-nano \
--levels 1,5,10,20,50,100 \
--output results/customer-concurrency.jsonl
RPM 窗口:
python3 nonelinear_openai_eval.py throughput \
--mode rpm \
--model gpt-5.4-nano \
--target 600 \
--duration 60 \
--output results/customer-rpm.jsonl
TPM 窗口:
python3 nonelinear_openai_eval.py throughput \
--mode tpm \
--model gemini-3-flash-preview \
--target 100000 \
--duration 60 \
--approx-prompt-tokens 4000 \
--output results/customer-tpm.jsonl
Prompt Cache:
python3 nonelinear_openai_eval.py cache \
--model gpt-5.4-nano \
--runs 3 \
--output results/customer-cache.jsonl
Tool Call:
python3 nonelinear_openai_eval.py tools \
--models gpt-5.4-nano,gpt-5.4-mini \
--output results/customer-tools.jsonl
Thinking / reasoning 字段:
python3 nonelinear_openai_eval.py thinking \
--models gemini-3.1-pro-preview,claude-sonnet-4.6-thinking,gpt-5.4-nano \
--output results/customer-thinking.jsonl
多模态 smoke:
python3 nonelinear_openai_eval.py vision \
--model GLM-4.6V \
--image ./sample.png \
--output results/customer-vision.jsonl
输出字段
常用字段如下:
| 字段 | 含义 |
|---|---|
model_requested |
请求时指定的模型 |
model_returned |
API 响应中返回的模型名 |
task_id |
非流式请求的追踪 ID |
http_status |
HTTP 状态码 |
success |
请求是否成功 |
elapsed_sec |
单请求耗时 |
prompt_tokens |
输入 token |
completion_tokens |
输出 token |
total_tokens |
总 token |
cached_tokens |
缓存命中 token |
api_cost |
API 返回的请求费用 |
reasoning_content_present |
是否返回 reasoning 内容字段 |
tool_calls |
模型返回的工具调用 |
observed_rpm |
吞吐汇总中的实际 RPM |
observed_tpm |
吞吐汇总中的实际 TPM |
注意事项
- 不要把
.env、API Key 或带敏感业务内容的 JSONL 提交到公开仓库。 - RPM/TPM 测试会产生实际调用费用,跑高目标前应确认预算和限流规则。
stream请求不传task_id,因为本文评测主要用非流式请求做账单追踪闭环。vision命令会把本地图片编码成 base64 data URL 后发送,请确认图片内容可用于测试。- 不同模型的 reasoning、cache、tool call 字段可能存在差异,业务 SDK 应兼容字段缺失。
完整单文件脚本
保存为 nonelinear_openai_eval.py:
#!/usr/bin/env python3
import argparse
import base64
import concurrent.futures
import json
import math
import mimetypes
import os
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
SCRIPT_VERSION = "customer-openai-compatible-eval-2026-05-22"
def load_env(path=".env"):
env_path = Path(path)
if not env_path.exists():
return
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip())
def now_iso():
return datetime.now(timezone.utc).isoformat()
def percentile(values, pct):
values = [item for item in values if isinstance(item, (int, float))]
if not values:
return None
ordered = sorted(values)
index = max(0, min(len(ordered) - 1, math.ceil(len(ordered) * pct / 100) - 1))
return round(ordered[index], 3)
def write_jsonl(path, rows):
output = Path(path)
output.parent.mkdir(parents=True, exist_ok=True)
with output.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n")
def openai_request(model, messages, max_tokens=16, stream=False, task_id=None, tools=None, timeout=60, extra=None):
load_env()
base_url = os.environ["NONELINEAR_OPENAI_BASE_URL"].rstrip("/")
api_key = os.environ["NONELINEAR_API_KEY"]
body = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"stream": stream,
}
if task_id:
body["task_id"] = task_id
if tools:
body["tools"] = tools
body["tool_choice"] = "auto"
if extra:
body.update(extra)
request = urllib.request.Request(
base_url + "/chat/completions",
data=json.dumps(body).encode("utf-8"),
headers={
"Authorization": "Bearer " + api_key,
"Content-Type": "application/json",
},
method="POST",
)
started = time.perf_counter()
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
if stream:
data = read_stream(response, started)
else:
data = json.loads(response.read().decode("utf-8"))
return {
"ok": True,
"status": response.status,
"elapsed_sec": round(time.perf_counter() - started, 3),
"response": data,
"request_body": body,
}
except urllib.error.HTTPError as error:
return {
"ok": False,
"status": error.code,
"elapsed_sec": round(time.perf_counter() - started, 3),
"error": error.read().decode(errors="replace")[:2000],
"request_body": body,
}
except Exception as error:
return {
"ok": False,
"status": None,
"elapsed_sec": round(time.perf_counter() - started, 3),
"error": type(error).__name__ + ": " + str(error),
"request_body": body,
}
def read_stream(response, started):
first_chunk = None
first_content = None
chunks = 0
content_parts = []
finish_reason = None
for raw in response:
if not raw.strip() or not raw.startswith(b"data:"):
continue
payload = raw[5:].strip()
if payload == b"[DONE]":
break
chunks += 1
if first_chunk is None:
first_chunk = time.perf_counter()
try:
event = json.loads(payload.decode("utf-8"))
except Exception:
continue
choice = (event.get("choices") or [{}])[0]
finish_reason = choice.get("finish_reason") or finish_reason
delta = choice.get("delta") or {}
piece = delta.get("content")
if piece:
if first_content is None:
first_content = time.perf_counter()
content_parts.append(piece)
ended = time.perf_counter()
return {
"stream_chunks": chunks,
"ttft_sec": round(first_chunk - started, 3) if first_chunk else None,
"first_content_delta_sec": round(first_content - started, 3) if first_content else None,
"total_sec": round(ended - started, 3),
"finish_reason": finish_reason,
"content": "".join(content_parts),
}
def choice(data):
return (data.get("choices") or [{}])[0] if isinstance(data, dict) else {}
def message(data):
return choice(data).get("message") or {}
def usage(data):
if not isinstance(data, dict):
return {}
return data.get("usage") or {}
def cached_tokens(data):
use = usage(data)
details = use.get("prompt_tokens_details") or use.get("input_tokens_details") or {}
value = details.get("cached_tokens") or details.get("cache_read_input_tokens") or use.get("cached_tokens")
return value if isinstance(value, (int, float)) else 0
def summarize_result(result, model, task_id=None, sample_id=None):
data = result.get("response") if result.get("ok") else {}
if task_id is None:
task_id = (result.get("request_body") or {}).get("task_id")
msg = message(data)
use = usage(data)
return {
"sample_id": sample_id,
"timestamp": now_iso(),
"script_version": SCRIPT_VERSION,
"model_requested": model,
"model_returned": data.get("model") if isinstance(data, dict) else None,
"task_id": task_id,
"http_status": result.get("status"),
"success": bool(result.get("ok")),
"elapsed_sec": result.get("elapsed_sec"),
"error": None if result.get("ok") else result.get("error"),
"finish_reason": choice(data).get("finish_reason") if isinstance(data, dict) else None,
"content": msg.get("content"),
"reasoning_content_present": bool(msg.get("reasoning_content")),
"tool_calls": msg.get("tool_calls"),
"prompt_tokens": use.get("prompt_tokens") or use.get("input_tokens"),
"completion_tokens": use.get("completion_tokens") or use.get("output_tokens"),
"total_tokens": use.get("total_tokens"),
"cached_tokens": cached_tokens(data),
"api_cost": use.get("cost"),
}
def run_smoke(args):
rows = []
for model in args.models.split(","):
model = model.strip()
task_id = f"{args.task_id_prefix}-chat-{model}"
result = openai_request(
model,
[{"role": "user", "content": "Reply with exactly: ok"}],
max_tokens=args.max_tokens,
task_id=task_id,
timeout=args.timeout,
)
rows.append(summarize_result(result, model, task_id, "chat"))
stream_model = args.stream_model or args.models.split(",")[0].strip()
result = openai_request(
stream_model,
[{"role": "user", "content": "Count from 1 to 20, one number per short segment."}],
max_tokens=80,
stream=True,
timeout=args.timeout,
)
row = summarize_result(result, stream_model, None, "stream")
if result.get("ok"):
row.update(result["response"])
rows.append(row)
return rows
def run_concurrency(args):
rows = []
for level in [int(item) for item in args.levels.split(",") if item]:
started = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=level) as executor:
futures = []
for index in range(level):
task_id = f"{args.task_id_prefix}-c{level}-{args.model}-req-{index}"
futures.append(
executor.submit(
openai_request,
args.model,
[{"role": "user", "content": f"Reply exactly: A. Request {index}"}],
args.max_tokens,
False,
task_id,
None,
args.timeout,
)
)
results = [future.result() for future in concurrent.futures.as_completed(futures)]
wall_sec = round(time.perf_counter() - started, 3)
request_rows = [summarize_result(item, args.model, sample_id=f"c{level}") for item in results]
latencies = [row["elapsed_sec"] for row in request_rows if row["success"]]
rows.extend(request_rows)
rows.append(
{
"sample_id": "concurrency_summary",
"model_requested": args.model,
"concurrency": level,
"requests": len(request_rows),
"success_count": sum(1 for row in request_rows if row["success"]),
"success_rate": round(sum(1 for row in request_rows if row["success"]) / len(request_rows), 4),
"wall_sec": wall_sec,
"latency_p50_sec": percentile(latencies, 50),
"latency_p95_sec": percentile(latencies, 95),
"latency_p99_sec": percentile(latencies, 99),
"api_cost_sum": round(sum(row.get("api_cost") or 0 for row in request_rows), 6),
}
)
return rows
def build_throughput_prompt(mode, index, approx_prompt_tokens):
if mode == "rpm":
return f"Reply exactly: ok-{index}"
repeat = max(1, int(approx_prompt_tokens / 6))
padding = " ".join(["NONELINEAR_TPM_EVAL_STATIC_TEXT"] * repeat)
return "Read the following static text and reply exactly with TPM-OK-%d.\n\n%s" % (index, padding)
def run_throughput(args):
if args.mode == "rpm":
planned = max(1, math.floor(args.target * args.duration / 60))
else:
planned = max(1, math.ceil(args.target * args.duration / 60 / args.approx_prompt_tokens))
interval_s = args.duration / planned
rows = []
started = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor:
futures = []
for index in range(planned):
due = started + index * interval_s
delay = due - time.perf_counter()
if delay > 0:
time.sleep(delay)
task_id = f"{args.task_id_prefix}-{args.mode}-{args.model}-target-{args.target}-req-{index}"
prompt = build_throughput_prompt(args.mode, index, args.approx_prompt_tokens)
futures.append(
executor.submit(
openai_request,
args.model,
[{"role": "user", "content": prompt}],
args.max_tokens,
False,
task_id,
None,
args.timeout,
)
)
for index, future in enumerate(concurrent.futures.as_completed(futures)):
rows.append(summarize_result(future.result(), args.model, sample_id=f"throughput-{index}"))
wall_sec = time.perf_counter() - started
success_rows = [row for row in rows if row["success"]]
total_tokens = sum(row.get("total_tokens") or 0 for row in success_rows)
rows.append(
{
"sample_id": "throughput_summary",
"mode": args.mode,
"target": args.target,
"duration_s": args.duration,
"planned_requests": planned,
"success_count": len(success_rows),
"failure_count": planned - len(success_rows),
"success_rate": round(len(success_rows) / planned, 4) if planned else None,
"wall_sec": round(wall_sec, 3),
"observed_rpm": round(len(success_rows) / wall_sec * 60, 3) if wall_sec else None,
"observed_tpm": round(total_tokens / wall_sec * 60, 3) if wall_sec else None,
"total_tokens": total_tokens,
"api_cost_sum": round(sum(row.get("api_cost") or 0 for row in success_rows), 6),
"latency_p50_sec": percentile([row.get("elapsed_sec") for row in success_rows], 50),
"latency_p95_sec": percentile([row.get("elapsed_sec") for row in success_rows], 95),
"latency_p99_sec": percentile([row.get("elapsed_sec") for row in success_rows], 99),
}
)
return rows
def run_cache(args):
rows = []
static_prefix = " ".join(["STATIC_CACHE_PREFIX_NONELINEAR_EVAL"] * args.repeat)
for index in range(1, args.runs + 1):
task_id = f"{args.task_id_prefix}-{args.model}-run-{index}"
result = openai_request(
args.model,
[
{"role": "system", "content": static_prefix},
{"role": "user", "content": f"Reply with exactly: ok. Run {index}"},
],
max_tokens=args.max_tokens,
task_id=task_id,
timeout=args.timeout,
)
row = summarize_result(result, args.model, task_id, f"cache-{index}")
row["run"] = index
rows.append(row)
time.sleep(args.sleep)
return rows
def run_tools(args):
tool = {
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
rows = []
for model in args.models.split(","):
model = model.strip()
for index in range(1, args.runs + 1):
task_id = f"{args.task_id_prefix}-{model}-run-{index}"
result = openai_request(
model,
[{"role": "user", "content": "请调用工具查询 Shanghai 的天气。不要直接回答天气。"}],
max_tokens=args.max_tokens,
task_id=task_id,
tools=[tool],
timeout=args.timeout,
)
row = summarize_result(result, model, task_id, f"tool-{index}")
calls = row.get("tool_calls") or []
first = calls[0] if calls else {}
function = first.get("function") or {}
try:
arguments = json.loads(function.get("arguments") or "{}")
except Exception:
arguments = {}
row["tool_call_supported"] = bool(calls)
row["tool_name_returned"] = function.get("name")
row["tool_arguments_valid"] = function.get("name") == "get_weather" and arguments.get("city") == "Shanghai"
rows.append(row)
return rows
def run_thinking(args):
rows = []
prompt = "用一句话回答:1+1 等于几?如果你有内部推理字段,请仍然只在最终答案中输出数字。"
for model in args.models.split(","):
model = model.strip()
task_id = f"{args.task_id_prefix}-{model}"
result = openai_request(
model,
[{"role": "user", "content": prompt}],
max_tokens=args.max_tokens,
task_id=task_id,
timeout=args.timeout,
)
row = summarize_result(result, model, task_id, "thinking")
data = result.get("response") or {}
details = usage(data).get("completion_tokens_details") or {}
row["reasoning_tokens"] = details.get("reasoning_tokens")
row["content_empty"] = row.get("content") in ("", None)
rows.append(row)
return rows
def encode_image(path):
mime, _ = mimetypes.guess_type(str(path))
mime = mime or "image/jpeg"
with Path(path).open("rb") as handle:
payload = base64.b64encode(handle.read()).decode("utf-8")
return f"data:{mime};base64,{payload}"
def run_vision(args):
task_id = f"{args.task_id_prefix}-{args.model}-vision"
content = [
{"type": "text", "text": args.prompt},
{"type": "image_url", "image_url": {"url": encode_image(args.image)}},
]
result = openai_request(
args.model,
[{"role": "user", "content": content}],
max_tokens=args.max_tokens,
task_id=task_id,
timeout=args.timeout,
)
row = summarize_result(result, args.model, task_id, "vision")
row["image_path"] = args.image
return [row]
def add_common(parser):
parser.add_argument("--output", required=True)
parser.add_argument("--timeout", type=int, default=60)
parser.add_argument("--max-tokens", type=int, default=16)
parser.add_argument("--task-id-prefix", default="nl-customer-eval")
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command", required=True)
smoke = subparsers.add_parser("smoke")
add_common(smoke)
smoke.add_argument("--models", required=True)
smoke.add_argument("--stream-model")
concurrency = subparsers.add_parser("concurrency")
add_common(concurrency)
concurrency.add_argument("--model", required=True)
concurrency.add_argument("--levels", default="1,5,10")
throughput = subparsers.add_parser("throughput")
add_common(throughput)
throughput.add_argument("--mode", choices=["rpm", "tpm"], required=True)
throughput.add_argument("--model", required=True)
throughput.add_argument("--target", type=int, required=True)
throughput.add_argument("--duration", type=int, default=60)
throughput.add_argument("--approx-prompt-tokens", type=int, default=4000)
throughput.add_argument("--max-workers", type=int, default=200)
cache = subparsers.add_parser("cache")
add_common(cache)
cache.add_argument("--model", required=True)
cache.add_argument("--runs", type=int, default=3)
cache.add_argument("--repeat", type=int, default=1300)
cache.add_argument("--sleep", type=float, default=1.0)
tools = subparsers.add_parser("tools")
add_common(tools)
tools.add_argument("--models", required=True)
tools.add_argument("--runs", type=int, default=3)
thinking = subparsers.add_parser("thinking")
add_common(thinking)
thinking.add_argument("--models", required=True)
vision = subparsers.add_parser("vision")
add_common(vision)
vision.add_argument("--model", required=True)
vision.add_argument("--image", required=True)
vision.add_argument("--prompt", default="请描述图片内容,并尽量读取其中的文字。")
vision.set_defaults(max_tokens=512)
args = parser.parse_args()
runners = {
"smoke": run_smoke,
"concurrency": run_concurrency,
"throughput": run_throughput,
"cache": run_cache,
"tools": run_tools,
"thinking": run_thinking,
"vision": run_vision,
}
rows = runners[args.command](args)
write_jsonl(args.output, rows)
print(json.dumps({"output": args.output, "rows": len(rows)}, ensure_ascii=False, separators=(",", ":")))
if __name__ == "__main__":
main()