From 06d8b189ddafb1e59fde597ad375f8a5e9f98484 Mon Sep 17 00:00:00 2001 From: Abijah Date: Thu, 4 Jun 2026 17:19:03 -0700 Subject: [PATCH] Added the >200k tier. Fixed problems with using a non-existant model --- gemini/gemini.py | 95 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 32 deletions(-) diff --git a/gemini/gemini.py b/gemini/gemini.py index 41937d2..bec079c 100644 --- a/gemini/gemini.py +++ b/gemini/gemini.py @@ -23,16 +23,23 @@ def fetch_pricing_for_model(client, target_model): prompt = f""" Extract the API pricing for the model '{target_model}' from the following HTML text. - Find the cost per 1 million input tokens, 1 million cached content tokens, and 1 million output tokens. - Return ONLY a valid JSON object with this exact structure: + Find the cost per 1 million tokens for input, cached content, and output. + Many models have a split tier where the cost increases if the prompt exceeds 200k tokens. + + CRITICAL: If the model '{target_model}' is definitively NOT found in the HTML data, return an empty JSON object: {{}} + + Otherwise, return ONLY a valid JSON object with this exact structure: {{ "{target_model}": {{ "input": 0.00, "cached": 0.00, - "output": 0.00 + "output": 0.00, + "input_over_200k": 0.00, + "cached_over_200k": 0.00, + "output_over_200k": 0.00 }} }} - If a value is not found, use 0.00. + If a tier value is not found, duplicate the base tier values. HTML DATA: {html} @@ -104,7 +111,7 @@ def main(): with open(PRICING_FILE, "r") as f: pricing_data = json.load(f) except json.JSONDecodeError: - print(f"Warning: {PRICING_FILE} is corrupted. Starting fresh.") + print(f"Warning: {PRICING_FILE} is corrupted. Starting fresh.", file=sys.stderr) # Fetch pricing if forced, or if the model isn't currently tracked if args.pricing or args.model not in pricing_data: @@ -319,38 +326,49 @@ def main(): print("Generating response (this may take a moment for large outputs)...\n") - response_stream = client.models.generate_content_stream( - model=args.model, - contents=api_contents, - config=config - ) - full_response_text = "" usage_metadata = None finish_reason_str = "UNKNOWN" - if args.output: - with open(args.output, "w") as f: + try: + response_stream = client.models.generate_content_stream( + model=args.model, + contents=api_contents, + config=config + ) + + if args.output: + with open(args.output, "w") as f: + for chunk in response_stream: + if chunk.text: + f.write(chunk.text) + f.flush() + full_response_text += chunk.text + if chunk.usage_metadata: + usage_metadata = chunk.usage_metadata + if chunk.candidates and chunk.candidates[0].finish_reason: + finish_reason_str = chunk.candidates[0].finish_reason.name + print(f"Done! Raw output saved directly to {args.output}", file=sys.stderr) + else: for chunk in response_stream: if chunk.text: - f.write(chunk.text) - f.flush() + print(chunk.text, end="", flush=True) full_response_text += chunk.text if chunk.usage_metadata: usage_metadata = chunk.usage_metadata if chunk.candidates and chunk.candidates[0].finish_reason: finish_reason_str = chunk.candidates[0].finish_reason.name - print(f"Done! Raw output saved directly to {args.output}") - else: - for chunk in response_stream: - if chunk.text: - print(chunk.text, end="", flush=True) - full_response_text += chunk.text - if chunk.usage_metadata: - usage_metadata = chunk.usage_metadata - if chunk.candidates and chunk.candidates[0].finish_reason: - finish_reason_str = chunk.candidates[0].finish_reason.name - print() + print() + + except Exception as e: + # Catch the 404 Model Not Found error (or other API failures) gracefully + if "404" in str(e) and "NOT_FOUND" in str(e): + print(f"\n[Error] The model '{args.model}' does not exist or is not available.", file=sys.stderr) + else: + print(f"\n[API Error] {e}", file=sys.stderr) + + # Exit cleanly so we don't calculate costs or save bad history + return # --------------------------------------------------------- # USAGE AND COST CALCULATION @@ -361,18 +379,31 @@ def main(): cached_tokens = getattr(usage_metadata, 'cached_content_token_count', 0) or 0 uncached_tokens = max(0, prompt_tokens - cached_tokens) + rates = pricing_data.get(args.model, {}) - # Fetch the rate dynamically from the parsed JSON or default to 0.0 if fetch failed - rates = pricing_data.get(args.model, {"input": 0.0, "cached": 0.0, "output": 0.0}) + input_rate = rates.get("input", 0.0) + cached_rate = rates.get("cached", 0.0) + output_rate = rates.get("output", 0.0) + tier_label = "Base Tier" - input_cost = (uncached_tokens / 1_000_000) * rates["input"] - cached_cost = (cached_tokens / 1_000_000) * rates["cached"] - output_cost = (output_tokens / 1_000_000) * rates["output"] + # Check if prompt exceeded 200k tokens to apply tier pricing + if prompt_tokens > 200_000 and "input_over_200k" in rates: + if rates.get("input_over_200k", 0.0) > 0: + input_rate = rates["input_over_200k"] + tier_label = ">200k Tier" + if rates.get("cached_over_200k", 0.0) > 0: + cached_rate = rates["cached_over_200k"] + if rates.get("output_over_200k", 0.0) > 0: + output_rate = rates["output_over_200k"] + + input_cost = (uncached_tokens / 1_000_000) * input_rate + cached_cost = (cached_tokens / 1_000_000) * cached_rate + output_cost = (output_tokens / 1_000_000) * output_rate total_cost = input_cost + cached_cost + output_cost print("\n[--- Execution Summary ---]") print(f"Finish Reason: {finish_reason_str}") - print(f"Token Usage: Input: {uncached_tokens:,} | Cached: {cached_tokens:,} | Output: {output_tokens:,}") + print(f"Token Usage: Input: {uncached_tokens:,} | Cached: {cached_tokens:,} | Output: {output_tokens:,} ({tier_label})") print(f"Est. Cost: ${total_cost:.6f} (Model: {args.model})") context_data["history"].append({"role": "user", "text": prompt_text})