Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added target pod to client result and made clients consistent #799

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions benchmarks/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,27 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
request_id: int):
start_time = asyncio.get_event_loop().time()
first_response_time = None

target_pod = ""
try:
logging.info(f"Request {request_id}: Starting streaming request to {endpoint}")
stream = await client.chat.completions.create(
response_stream = await client.chat.completions.create(
model=model,
messages=prompt,
temperature=0,
max_tokens=2048,
stream=True,
stream_options={"include_usage": True},
)

if hasattr(response_stream, 'response') and hasattr(response_stream.response, 'headers'):
target_pod = response_stream.response.headers.get('target-pod')

text_chunks = []
prompt_tokens = 0
output_tokens = 0
total_tokens = 0

try:
async for chunk in stream:
async for chunk in response_stream:
if chunk.choices:
if chunk.choices[0].delta.content is not None:
if not first_response_time:
Expand Down Expand Up @@ -81,6 +83,7 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
"end_time": response_time,
"ttft": ttft,
"tpot": tpot,
"target_pod": target_pod,
}

# Write result to JSONL file
Expand All @@ -93,7 +96,6 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
error_time = asyncio.get_event_loop().time()
# Determine error type based on exception class
error_type = type(e).__name__

error_result = {
"request_id": request_id,
"status": "error",
Expand All @@ -103,7 +105,8 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
"input": prompt,
"latency": error_time - start_time,
"start_time": start_time,
"end_time": error_time
"end_time": error_time,
"target_pod": target_pod,
}
logging.error(f"Request {request_id}: Error ({error_type}): {str(e)}")
output_file.write(json.dumps(error_result) + "\n")
Expand Down Expand Up @@ -144,50 +147,76 @@ async def benchmark_streaming(client: openai.AsyncOpenAI,
logging.warning(f"All {num_requests} requests completed for deployment.")

# Asynchronous request handler
async def send_request_batch(client, model, endpoint, prompt, output_file):
async def send_request_batch(client, model, endpoint, prompt, output_file, request_id):
start_time = asyncio.get_event_loop().time()
target_pod = ""
try:
response = await client.chat.completions.create(
model=model,
messages=prompt,
temperature=0,
max_tokens=2048
)
if hasattr(response, 'response') and hasattr(response.response, 'headers'):
target_pod = response.response.headers.get('target-pod')

latency = asyncio.get_event_loop().time() - start_time
response_time = asyncio.get_event_loop().time()
latency = response_time - start_time
prompt_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
throughput = output_tokens / latency
output_text = response.choices[0].message.content

result = {
"request_id": request_id,
"status": "success",
"input": prompt,
"output": output_text,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"start_time": start_time,
"current_time": asyncio.get_event_loop().time(),
"latency": latency,
"throughput": throughput
"throughput": throughput,
"start_time": start_time,
"end_time": response_time,
"ttft": "Unknown",
"tpot": "Unknown",
"target_pod": target_pod,
}
logging.info(result)
# Write result to JSONL file
output_file.write(json.dumps(result) + "\n")
output_file.flush() # Ensure data is written immediately to the file

return result

except Exception as e:
logging.error(f"Error sending request to at {endpoint}: {str(e)}")
return None
error_time = asyncio.get_event_loop().time()
error_type = type(e).__name__
error_result = {
"request_id": request_id,
"status": "error",
"error_type": error_type,
"error_message": str(e),
"error_traceback": traceback.format_exc(),
"input": prompt,
"latency": error_time - start_time,
"start_time": start_time,
"end_time": error_time,
"target_pod": target_pod
}
logging.error(f"Request {request_id}: Error ({error_type}): {str(e)}")
output_file.write(json.dumps(error_result) + "\n")
output_file.flush()
return error_result


async def benchmark_batch(client: openai.AsyncOpenAI,
endpoint: str,
model: str,
load_struct: List,
output_file: io.TextIOWrapper):
request_id = 0
batch_tasks = []
base_time = time.time()
num_requests = 0
Expand All @@ -202,8 +231,14 @@ async def benchmark_batch(client: openai.AsyncOpenAI,
formatted_prompts = [wrap_prompt_as_chat_message(request["prompt"]) for request in requests]
for formatted_prompt in formatted_prompts:
task = asyncio.create_task(
send_request_batch(client, model, endpoint, formatted_prompt, output_file)
send_request_batch(client = client,
model = model,
endpoint = endpoint,
formatted_prompt = formatted_prompt,
output_file = output_file,
request_id = request_id)
)
request_id += 1
batch_tasks.append(task)
num_requests += len(requests)
await asyncio.gather(*batch_tasks)
Expand Down