diff --git a/gemini/gemini.py b/gemini/gemini.py index 6abb089..c463687 100644 --- a/gemini/gemini.py +++ b/gemini/gemini.py @@ -3,9 +3,61 @@ import argparse import json import os import sys +import urllib.request from google import genai from google.genai import types +PRICING_FILE = ".gemini_pricing.json" + +def fetch_pricing_for_model(client, target_model): + url = "https://cloud.google.com/gemini-enterprise-agent-platform/generative-ai/pricing" + print(f"Fetching live pricing for {target_model} from the web...") + + try: + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req) as response: + html = response.read().decode('utf-8') + except Exception as e: + print(f"Warning: Failed to fetch HTML from {url}: {e}", file=sys.stderr) + return None + + prompt = f""" + Extract the API pricing for the model '{target_model}' from the following HTML text. + Find the cost per 1 million input tokens, 1 million cached content tokens, and 1 million output tokens. + Return ONLY a valid JSON object with this exact structure: + {{ + "{target_model}": {{ + "input": 0.00, + "cached": 0.00, + "output": 0.00 + }} + }} + If a value is not found, use 0.00. + + HTML DATA: + {html} + """ + + config = types.GenerateContentConfig( + response_mime_type="application/json", + temperature=0.0 + ) + + print("Parsing pricing data via background AI session...") + try: + res = client.models.generate_content( + model="gemini-3.1-flash-lite", + contents=prompt, + config=config + ) + new_data = json.loads(res.text) + print(f"Successfully retrieved pricing for {target_model}.") + print(new_data) + return new_data + except Exception as e: + print(f"Warning: Failed to extract pricing using AI: {e}", file=sys.stderr) + return None + def main(): parser = argparse.ArgumentParser(description="Gemini API CLI with File & Context Caching") parser.add_argument("-c", "--context", type=str, default=None, @@ -18,6 +70,8 @@ def main(): help="Destroy cloud files/cache, and delete local context") parser.add_argument("-x", "--clear-history", action="store_true", help="Clear the conversation history without destroying files/caches") + parser.add_argument("--pricing", action="store_true", + help="Force update the pricing info for the specified model from the web") parser.add_argument("-o", "--output", type=str, help="Direct the raw output to a specific file instead of stdout") parser.add_argument("-p", "--prompt", type=str, @@ -33,7 +87,36 @@ def main(): sys.exit(1) client = genai.Client() + + # --------------------------------------------------------- + # PRICING CONFIGURATION + # --------------------------------------------------------- + pricing_data = {} + if os.path.exists(PRICING_FILE): + try: + with open(PRICING_FILE, "r") as f: + file_pricing = json.load(f) + pricing_data.update(file_pricing) + except json.JSONDecodeError: + print(f"Warning: {PRICING_FILE} is corrupted. Using defaults.", file=sys.stderr) + + if args.pricing or args.model not in pricing_data: + new_pricing = fetch_pricing_for_model(client, args.model) + if new_pricing: + pricing_data.update(new_pricing) + with open(PRICING_FILE, "w") as f: + json.dump(pricing_data, f, indent=4) + elif not os.path.exists(PRICING_FILE): + with open(PRICING_FILE, "w") as f: + json.dump(pricing_data, f, indent=4) + + if args.pricing and not prompt_text and not args.files and not args.destroy and not args.clear_history: + return + + # --------------------------------------------------------- + # STATE MANAGEMENT + # --------------------------------------------------------- context_data = {"file_ids": [], "caches": {}, "history": []} if args.context and os.path.exists(args.context): @@ -234,6 +317,8 @@ def main(): ) full_response_text = "" + usage_metadata = None + finish_reason_str = "UNKNOWN" if args.output: with open(args.output, "w") as f: @@ -242,16 +327,43 @@ def main(): f.write(chunk.text) f.flush() full_response_text += chunk.text - print(f"\nDone! Raw output saved directly to {args.output}") + if chunk.usage_metadata: + usage_metadata = chunk.usage_metadata + if chunk.candidates and chunk.candidates[0].finish_reason: + finish_reason_str = chunk.candidates[0].finish_reason.name + print(f"Done! Raw output saved directly to {args.output}") else: - print("-" * 40) for chunk in response_stream: if chunk.text: print(chunk.text, end="", flush=True) full_response_text += chunk.text - print("\n" + "-" * 40) + if chunk.usage_metadata: + usage_metadata = chunk.usage_metadata + if chunk.candidates and chunk.candidates[0].finish_reason: + finish_reason_str = chunk.candidates[0].finish_reason.name + print() + + # --------------------------------------------------------- + # USAGE AND COST CALCULATION + # --------------------------------------------------------- + if usage_metadata: + prompt_tokens = usage_metadata.prompt_token_count or 0 + output_tokens = usage_metadata.candidates_token_count or 0 + cached_tokens = getattr(usage_metadata, 'cached_content_token_count', 0) or 0 + + uncached_tokens = max(0, prompt_tokens - cached_tokens) + rates = pricing_data.get(args.model, pricing_data.get("default")) + + input_cost = (uncached_tokens / 1_000_000) * rates["input"] + cached_cost = (cached_tokens / 1_000_000) * rates["cached"] + output_cost = (output_tokens / 1_000_000) * rates["output"] + total_cost = input_cost + cached_cost + output_cost + + print("\n[--- Execution Summary ---]") + print(f"Finish Reason: {finish_reason_str}") + print(f"Token Usage: Input: {uncached_tokens:,} | Cached: {cached_tokens:,} | Output: {output_tokens:,}") + print(f"Est. Cost: ${total_cost:.6f} (Model: {args.model})") - # Append this turn to the local history and save context_data["history"].append({"role": "user", "text": prompt_text}) context_data["history"].append({"role": "model", "text": full_response_text})