129 lines
4.5 KiB
Python
129 lines
4.5 KiB
Python
![]() |
import requests
|
|||
|
import json
|
|||
|
import sys
|
|||
|
import time
|
|||
|
from datetime import datetime
|
|||
|
|
|||
|
# 外部API地址
|
|||
|
API_URL = "http://81.69.223.133:48329/api/application/chat_message/94922d0e-20e5-11f0-ac62-0242ac120002"
|
|||
|
|
|||
|
# 测试数据
|
|||
|
test_data = {
|
|||
|
"message": "1+1",
|
|||
|
"re_chat": False,
|
|||
|
"stream": True
|
|||
|
}
|
|||
|
|
|||
|
# 请求头
|
|||
|
headers = {
|
|||
|
"Content-Type": "application/json",
|
|||
|
"User-Agent": "Apifox/1.0.0 (https://apifox.com)",
|
|||
|
"Accept": "*/*",
|
|||
|
"Host": "81.69.223.133:48329",
|
|||
|
"Connection": "keep-alive"
|
|||
|
}
|
|||
|
|
|||
|
print(f"API URL: {API_URL}")
|
|||
|
print(f"发送请求: {json.dumps(test_data, ensure_ascii=False)}")
|
|||
|
print("等待响应...")
|
|||
|
|
|||
|
start_time = time.time()
|
|||
|
# 发送请求并获取流式响应
|
|||
|
response = requests.post(
|
|||
|
url=API_URL,
|
|||
|
json=test_data,
|
|||
|
headers=headers,
|
|||
|
stream=True # 启用流式传输
|
|||
|
)
|
|||
|
|
|||
|
print(f"响应状态码: {response.status_code}")
|
|||
|
print(f"收到初始响应时间: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
|
|||
|
|
|||
|
if response.status_code != 200 and response.status_code != 201:
|
|||
|
print(f"错误: {response.status_code}, {response.text}")
|
|||
|
sys.exit(1)
|
|||
|
|
|||
|
# 处理流式响应
|
|||
|
print("\n----- 开始接收流式响应 -----\n")
|
|||
|
|
|||
|
buffer = ""
|
|||
|
last_time = start_time
|
|||
|
response_count = 0
|
|||
|
full_content = "" # 用于收集完整内容
|
|||
|
|
|||
|
# 使用小的chunk_size,更好地展示流式效果
|
|||
|
for chunk in response.iter_content(chunk_size=1):
|
|||
|
if chunk:
|
|||
|
current_time = time.time()
|
|||
|
time_diff = current_time - last_time
|
|||
|
elapsed = current_time - start_time
|
|||
|
|
|||
|
# 解码字节为字符串
|
|||
|
try:
|
|||
|
chunk_str = chunk.decode('utf-8')
|
|||
|
buffer += chunk_str
|
|||
|
|
|||
|
# 检查是否有完整的数据行
|
|||
|
if '\n\n' in buffer:
|
|||
|
lines = buffer.split('\n\n')
|
|||
|
# 除了最后一行,其他都是完整的
|
|||
|
for line in lines[:-1]:
|
|||
|
if line.strip():
|
|||
|
response_count += 1
|
|||
|
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
|||
|
|
|||
|
print(f"\n[{timestamp}] 响应 #{response_count} (距上次: {time_diff:.3f}s, 总计: {elapsed:.3f}s)")
|
|||
|
print(f"{line}")
|
|||
|
|
|||
|
# 如果想解析JSON内容
|
|||
|
if line.startswith('data: '):
|
|||
|
try:
|
|||
|
json_str = line[6:] # 去掉 "data: " 前缀
|
|||
|
data = json.loads(json_str)
|
|||
|
|
|||
|
# 提取并累积内容
|
|||
|
if 'content' in data:
|
|||
|
content = data.get('content', '')
|
|||
|
full_content += content
|
|||
|
|
|||
|
# 如果内容太长,只显示前50个字符
|
|||
|
if len(content) > 50:
|
|||
|
content_display = content[:50] + "..."
|
|||
|
else:
|
|||
|
content_display = content
|
|||
|
|
|||
|
print(f"内容片段: '{content_display}'")
|
|||
|
print(f"是否结束: {data.get('is_end', False)}")
|
|||
|
except json.JSONDecodeError:
|
|||
|
pass
|
|||
|
|
|||
|
# 保留最后一个可能不完整的行
|
|||
|
buffer = lines[-1]
|
|||
|
|
|||
|
# 重置计时器以准确测量下一个数据包间隔
|
|||
|
last_time = current_time
|
|||
|
except UnicodeDecodeError:
|
|||
|
# 忽略解码错误,可能是部分Unicode字符
|
|||
|
pass
|
|||
|
|
|||
|
# 处理可能的剩余数据
|
|||
|
if buffer.strip():
|
|||
|
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
|
|||
|
elapsed = time.time() - start_time
|
|||
|
print(f"\n[{timestamp}] 最终响应 (总计: {elapsed:.3f}s)")
|
|||
|
print(f"{buffer}")
|
|||
|
|
|||
|
# 尝试处理最后一段数据
|
|||
|
if buffer.startswith('data: '):
|
|||
|
try:
|
|||
|
json_str = buffer[6:] # 去掉 "data: " 前缀
|
|||
|
data = json.loads(json_str)
|
|||
|
if 'content' in data:
|
|||
|
full_content += data.get('content', '')
|
|||
|
except:
|
|||
|
pass
|
|||
|
|
|||
|
total_time = time.time() - start_time
|
|||
|
print(f"\n----- 响应结束 -----")
|
|||
|
print(f"总响应时间: {total_time:.3f}秒, 共接收 {response_count} 个数据包")
|
|||
|
print(f"完整收集的内容: {full_content}")
|