daren_project/test_external_api.py
2025-04-29 10:22:57 +08:00

129 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import sys
import time
from datetime import datetime
# 外部API地址
API_URL = "http://81.69.223.133:48329/api/application/chat_message/94922d0e-20e5-11f0-ac62-0242ac120002"
# 测试数据
test_data = {
"message": "1+1",
"re_chat": False,
"stream": True
}
# 请求头
headers = {
"Content-Type": "application/json",
"User-Agent": "Apifox/1.0.0 (https://apifox.com)",
"Accept": "*/*",
"Host": "81.69.223.133:48329",
"Connection": "keep-alive"
}
print(f"API URL: {API_URL}")
print(f"发送请求: {json.dumps(test_data, ensure_ascii=False)}")
print("等待响应...")
start_time = time.time()
# 发送请求并获取流式响应
response = requests.post(
url=API_URL,
json=test_data,
headers=headers,
stream=True # 启用流式传输
)
print(f"响应状态码: {response.status_code}")
print(f"收到初始响应时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
if response.status_code != 200 and response.status_code != 201:
print(f"错误: {response.status_code}, {response.text}")
sys.exit(1)
# 处理流式响应
print("\n----- 开始接收流式响应 -----\n")
buffer = ""
last_time = start_time
response_count = 0
full_content = "" # 用于收集完整内容
# 使用小的chunk_size更好地展示流式效果
for chunk in response.iter_content(chunk_size=1):
if chunk:
current_time = time.time()
time_diff = current_time - last_time
elapsed = current_time - start_time
# 解码字节为字符串
try:
chunk_str = chunk.decode('utf-8')
buffer += chunk_str
# 检查是否有完整的数据行
if '\n\n' in buffer:
lines = buffer.split('\n\n')
# 除了最后一行,其他都是完整的
for line in lines[:-1]:
if line.strip():
response_count += 1
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
print(f"\n[{timestamp}] 响应 #{response_count} (距上次: {time_diff:.3f}s, 总计: {elapsed:.3f}s)")
print(f"{line}")
# 如果想解析JSON内容
if line.startswith('data: '):
try:
json_str = line[6:] # 去掉 "data: " 前缀
data = json.loads(json_str)
# 提取并累积内容
if 'content' in data:
content = data.get('content', '')
full_content += content
# 如果内容太长只显示前50个字符
if len(content) > 50:
content_display = content[:50] + "..."
else:
content_display = content
print(f"内容片段: '{content_display}'")
print(f"是否结束: {data.get('is_end', False)}")
except json.JSONDecodeError:
pass
# 保留最后一个可能不完整的行
buffer = lines[-1]
# 重置计时器以准确测量下一个数据包间隔
last_time = current_time
except UnicodeDecodeError:
# 忽略解码错误可能是部分Unicode字符
pass
# 处理可能的剩余数据
if buffer.strip():
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
elapsed = time.time() - start_time
print(f"\n[{timestamp}] 最终响应 (总计: {elapsed:.3f}s)")
print(f"{buffer}")
# 尝试处理最后一段数据
if buffer.startswith('data: '):
try:
json_str = buffer[6:] # 去掉 "data: " 前缀
data = json.loads(json_str)
if 'content' in data:
full_content += data.get('content', '')
except:
pass
total_time = time.time() - start_time
print(f"\n----- 响应结束 -----")
print(f"总响应时间: {total_time:.3f}秒, 共接收 {response_count} 个数据包")
print(f"完整收集的内容: {full_content}")