Added the ability to look up pricing, but it is broken.

This commit is contained in:
2026-06-04 16:42:41 -07:00
parent 6bd3c6e3ab
commit 9c2dd68a28
+116 -4
View File
@@ -3,9 +3,61 @@ import argparse
import json import json
import os import os
import sys import sys
import urllib.request
from google import genai from google import genai
from google.genai import types from google.genai import types
PRICING_FILE = ".gemini_pricing.json"
def fetch_pricing_for_model(client, target_model):
url = "https://cloud.google.com/gemini-enterprise-agent-platform/generative-ai/pricing"
print(f"Fetching live pricing for {target_model} from the web...")
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req) as response:
html = response.read().decode('utf-8')
except Exception as e:
print(f"Warning: Failed to fetch HTML from {url}: {e}", file=sys.stderr)
return None
prompt = f"""
Extract the API pricing for the model '{target_model}' from the following HTML text.
Find the cost per 1 million input tokens, 1 million cached content tokens, and 1 million output tokens.
Return ONLY a valid JSON object with this exact structure:
{{
"{target_model}": {{
"input": 0.00,
"cached": 0.00,
"output": 0.00
}}
}}
If a value is not found, use 0.00.
HTML DATA:
{html}
"""
config = types.GenerateContentConfig(
response_mime_type="application/json",
temperature=0.0
)
print("Parsing pricing data via background AI session...")
try:
res = client.models.generate_content(
model="gemini-3.1-flash-lite",
contents=prompt,
config=config
)
new_data = json.loads(res.text)
print(f"Successfully retrieved pricing for {target_model}.")
print(new_data)
return new_data
except Exception as e:
print(f"Warning: Failed to extract pricing using AI: {e}", file=sys.stderr)
return None
def main(): def main():
parser = argparse.ArgumentParser(description="Gemini API CLI with File & Context Caching") parser = argparse.ArgumentParser(description="Gemini API CLI with File & Context Caching")
parser.add_argument("-c", "--context", type=str, default=None, parser.add_argument("-c", "--context", type=str, default=None,
@@ -18,6 +70,8 @@ def main():
help="Destroy cloud files/cache, and delete local context") help="Destroy cloud files/cache, and delete local context")
parser.add_argument("-x", "--clear-history", action="store_true", parser.add_argument("-x", "--clear-history", action="store_true",
help="Clear the conversation history without destroying files/caches") help="Clear the conversation history without destroying files/caches")
parser.add_argument("--pricing", action="store_true",
help="Force update the pricing info for the specified model from the web")
parser.add_argument("-o", "--output", type=str, parser.add_argument("-o", "--output", type=str,
help="Direct the raw output to a specific file instead of stdout") help="Direct the raw output to a specific file instead of stdout")
parser.add_argument("-p", "--prompt", type=str, parser.add_argument("-p", "--prompt", type=str,
@@ -33,7 +87,36 @@ def main():
sys.exit(1) sys.exit(1)
client = genai.Client() client = genai.Client()
# ---------------------------------------------------------
# PRICING CONFIGURATION
# ---------------------------------------------------------
pricing_data = {}
if os.path.exists(PRICING_FILE):
try:
with open(PRICING_FILE, "r") as f:
file_pricing = json.load(f)
pricing_data.update(file_pricing)
except json.JSONDecodeError:
print(f"Warning: {PRICING_FILE} is corrupted. Using defaults.", file=sys.stderr)
if args.pricing or args.model not in pricing_data:
new_pricing = fetch_pricing_for_model(client, args.model)
if new_pricing:
pricing_data.update(new_pricing)
with open(PRICING_FILE, "w") as f:
json.dump(pricing_data, f, indent=4)
elif not os.path.exists(PRICING_FILE):
with open(PRICING_FILE, "w") as f:
json.dump(pricing_data, f, indent=4)
if args.pricing and not prompt_text and not args.files and not args.destroy and not args.clear_history:
return
# ---------------------------------------------------------
# STATE MANAGEMENT
# ---------------------------------------------------------
context_data = {"file_ids": [], "caches": {}, "history": []} context_data = {"file_ids": [], "caches": {}, "history": []}
if args.context and os.path.exists(args.context): if args.context and os.path.exists(args.context):
@@ -234,6 +317,8 @@ def main():
) )
full_response_text = "" full_response_text = ""
usage_metadata = None
finish_reason_str = "UNKNOWN"
if args.output: if args.output:
with open(args.output, "w") as f: with open(args.output, "w") as f:
@@ -242,16 +327,43 @@ def main():
f.write(chunk.text) f.write(chunk.text)
f.flush() f.flush()
full_response_text += chunk.text full_response_text += chunk.text
print(f"\nDone! Raw output saved directly to {args.output}") if chunk.usage_metadata:
usage_metadata = chunk.usage_metadata
if chunk.candidates and chunk.candidates[0].finish_reason:
finish_reason_str = chunk.candidates[0].finish_reason.name
print(f"Done! Raw output saved directly to {args.output}")
else: else:
print("-" * 40)
for chunk in response_stream: for chunk in response_stream:
if chunk.text: if chunk.text:
print(chunk.text, end="", flush=True) print(chunk.text, end="", flush=True)
full_response_text += chunk.text full_response_text += chunk.text
print("\n" + "-" * 40) if chunk.usage_metadata:
usage_metadata = chunk.usage_metadata
if chunk.candidates and chunk.candidates[0].finish_reason:
finish_reason_str = chunk.candidates[0].finish_reason.name
print()
# ---------------------------------------------------------
# USAGE AND COST CALCULATION
# ---------------------------------------------------------
if usage_metadata:
prompt_tokens = usage_metadata.prompt_token_count or 0
output_tokens = usage_metadata.candidates_token_count or 0
cached_tokens = getattr(usage_metadata, 'cached_content_token_count', 0) or 0
uncached_tokens = max(0, prompt_tokens - cached_tokens)
rates = pricing_data.get(args.model, pricing_data.get("default"))
input_cost = (uncached_tokens / 1_000_000) * rates["input"]
cached_cost = (cached_tokens / 1_000_000) * rates["cached"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
total_cost = input_cost + cached_cost + output_cost
print("\n[--- Execution Summary ---]")
print(f"Finish Reason: {finish_reason_str}")
print(f"Token Usage: Input: {uncached_tokens:,} | Cached: {cached_tokens:,} | Output: {output_tokens:,}")
print(f"Est. Cost: ${total_cost:.6f} (Model: {args.model})")
# Append this turn to the local history and save
context_data["history"].append({"role": "user", "text": prompt_text}) context_data["history"].append({"role": "user", "text": prompt_text})
context_data["history"].append({"role": "model", "text": full_response_text}) context_data["history"].append({"role": "model", "text": full_response_text})