Last active
April 13, 2026 08:06
-
-
Save eadz/1a8ada11cd3b6008e975aa5b7a7feab9 to your computer and use it in GitHub Desktop.
agent.rb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # _________agent.rb_________ | |
| # | |
| # A complete AI coding agent in one file. | |
| %w[net/http json fileutils timeout readline].each { require _1 } | |
| module Color | |
| refine String do | |
| def dim = "\e[90m#{self}\e[0m" | |
| def yellow = "\e[33m#{self}\e[0m" | |
| def cyan = "\e[36m#{self}\e[0m" | |
| def bold = "\e[1m#{self}\e[0m" | |
| end | |
| end | |
| using Color | |
| MODEL = ENV.fetch("GEMINI_MODEL", "gemini-3-flash-preview") | |
| API_KEY = ENV.fetch("GEMINI_API_KEY") | |
| CONTEXT = ENV.fetch("GEMINI_CONTEXT", 256_000).to_i | |
| # Helpers | |
| def resolve(path = nil) = File.expand_path(path || ".", Dir.pwd) | |
| def user(text) = { role: "user", parts: [{ text: text }] } | |
| def model(parts) = { role: "model", parts: parts } | |
| def tool_responses(responses) = { role: "user", parts: responses } | |
| def tool_result(name, content) = { functionResponse: { name: name, response: { content: content } } } | |
| def truncate(str, max = 30_000) | |
| return str if str.length <= max | |
| (max / 2).then { |h| "#{str[0, h]}\n\n#{"... (#{str.length - max} chars truncated) ...".dim}\n\n#{str[-h, h]}" } | |
| end | |
| def spin(label) | |
| return yield unless $stdout.tty? | |
| done = false | |
| t = Thread.new do | |
| "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏".chars.cycle do |frame| | |
| break if done | |
| print "\r#{frame.cyan} #{label.dim}" | |
| sleep 0.1 | |
| end | |
| end | |
| yield | |
| ensure | |
| done = true; t&.join; print "\r\e[K" | |
| end | |
| # LLM Client | |
| module LLM | |
| def self.connection | |
| @http ||= Net::HTTP.start("generativelanguage.googleapis.com", 443, use_ssl: true, read_timeout: 300) | |
| end | |
| def self.chat(model:, system:, contents:, tools: nil) | |
| body = { | |
| contents: contents, | |
| systemInstruction: (system && { parts: [{ text: system }] }), | |
| tools: (tools&.any? ? [{ functionDeclarations: tools }] : nil) | |
| }.compact | |
| req = Net::HTTP::Post.new("/v1beta/models/#{model}:generateContent?key=#{API_KEY}", "Content-Type" => "application/json") | |
| req.body = JSON.generate(body) | |
| retries = 0 | |
| loop do | |
| res = connection.request(req) | |
| case res.code.to_i | |
| when 200 | |
| return JSON.parse(res.body).then { |data| { | |
| parts: data.dig("candidates", 0, "content", "parts") || [], | |
| tokens: data.dig("usageMetadata", "totalTokenCount") || 0 | |
| } } | |
| when 429, 503 | |
| retries += 1 | |
| raise "Rate limited after #{retries} retries" if retries >= 3 | |
| sleep(retries * 2) | |
| else raise "API error #{res.code}: #{res.body}" | |
| end | |
| end | |
| end | |
| def self.text(parts) = parts.filter_map { _1["text"] }.join | |
| end | |
| # Tool DSL — `tool` above a `def` wires up description + schema via method_added | |
| module ToolDSL | |
| TYPES = { Integer => "integer", Float => "number", TrueClass => "boolean", FalseClass => "boolean" } | |
| def tool(desc, **params) | |
| @_next_tool = { desc: desc, params: params } | |
| end | |
| def method_added(name) | |
| super | |
| @_next_tool&.tap do |meta| | |
| @_next_tool = nil | |
| required = instance_method(name).parameters.filter_map { |t, n| n.to_s if t == :keyreq } | |
| (@_tools ||= {})[name] = { desc: meta[:desc], params: meta[:params], required: required } | |
| end | |
| end | |
| def tools = @_tools ||= {} | |
| def tool_names = tools.keys | |
| def schema_for(names) | |
| names.filter_map { |name| | |
| t = tools[name] or next | |
| props = t[:params].transform_values { |v| | |
| case v | |
| in [klass, desc] then { type: TYPES[klass] || "string", description: desc } | |
| in String => desc then { type: "string", description: desc } | |
| end | |
| } | |
| { name: name.to_s, description: t[:desc], parameters: { type: "object", properties: props, required: t[:required] } } | |
| } | |
| end | |
| end | |
| class Tools | |
| extend ToolDSL | |
| def initialize(spawn: nil) | |
| @spawn = spawn | |
| end | |
| tool "Run a bash command, return stdout+stderr.", | |
| command: "The bash command to execute" | |
| def bash(command:) | |
| Timeout.timeout(120) { IO.popen(["bash", "-c", command], err: [:child, :out], chdir: Dir.pwd, &:read) } | |
| .then { |out| truncate(out) } | |
| .then { |result| $?.success? ? result : "Exit code: #{$?.exitstatus}\n#{result}" } | |
| end | |
| tool "Read a file with line numbers. Always read before editing.", | |
| file_path: "Path to the file", | |
| offset: [Integer, "Start line, 1-based (default 1)"], | |
| limit: [Integer, "Max lines to read (default 2000)"] | |
| def read_file(file_path:, offset: 1, limit: 2000) | |
| lines = File.readlines(resolve(file_path)) | |
| start = [(offset.to_i - 1), 0].max | |
| (lines[start, limit.to_i] || []).map.with_index(start + 1) { |line, num| "#{num.to_s.rjust(6).dim}| #{line}" }.join | |
| end | |
| tool "Write content to a file, creating directories as needed.", | |
| file_path: "Path to the file", | |
| content: "Content to write" | |
| def write_file(file_path:, content:) | |
| path = resolve(file_path) | |
| FileUtils.mkdir_p(File.dirname(path)) | |
| File.write(path, content) | |
| "Wrote #{content.length} bytes to #{path}" | |
| end | |
| tool "Find-and-replace in a file. old_string must match exactly. Read first.", | |
| file_path: "Path to the file", | |
| old_string: "Exact text to find (must be unique unless replace_all)", | |
| new_string: "Replacement text", | |
| replace_all: [TrueClass, "Replace all occurrences (default false)"] | |
| def edit_file(file_path:, old_string:, new_string:, replace_all: false) | |
| path = resolve(file_path) | |
| content = File.read(path) | |
| count = content.scan(old_string).length | |
| raise "old_string not found in #{path}" if count == 0 | |
| raise "old_string matches #{count} times — use replace_all or be more specific" if count > 1 && !replace_all | |
| File.write(path, content.send(replace_all ? :gsub : :sub, old_string, new_string)) | |
| "Replaced #{replace_all ? count : 1} occurrence(s) in #{path}" | |
| end | |
| tool "Find files matching a glob pattern.", | |
| pattern: "Glob pattern (e.g. '**/*.rb')", | |
| path: "Directory to search (default: cwd)" | |
| def glob(pattern:, path: nil) | |
| Dir.glob(pattern, base: resolve(path)).sort.first(200) | |
| .then { |files| files.empty? ? "No files found matching #{pattern}" : files.join("\n") } | |
| end | |
| tool "Search file contents with regex via ripgrep.", | |
| pattern: "Regex pattern to search for", | |
| path: "Directory to search (default: cwd)", | |
| include: "File glob filter (e.g. '*.rb')" | |
| def grep(pattern:, path: nil, include: nil) | |
| cmd = ["rg", "-n", "--heading", "--color=never", "-e", pattern] | |
| cmd += ["--glob", include] if include | |
| cmd << resolve(path) | |
| IO.popen(cmd, err: [:child, :out], &:read) | |
| .then { |out| truncate(out.empty? ? "No matches found for /#{pattern}/" : out) } | |
| end | |
| tool "List directory contents.", | |
| path: "Directory to list (default: cwd)" | |
| def ls(path: nil) | |
| dir = resolve(path) | |
| Dir.children(dir).sort.map { |f| File.directory?(File.join(dir, f)) ? "#{f}/" : f }.join("\n") | |
| end | |
| tool "Spawn a sub-agent. Has all tools except agent. Call multiple times for parallel execution.", | |
| prompt: "The task for the sub-agent" | |
| def agent(prompt:) | |
| @spawn.call(prompt) | |
| end | |
| end | |
| class Agent | |
| SYSTEM = <<~PROMPT | |
| You are a coding agent running in the terminal. | |
| Read files before editing. Always. Be autonomous — search, read, decide, act. | |
| Verify changes work. Be concise. Match existing code style. | |
| Use exact text matches for edits. Use the agent tool for multi-step research. | |
| Working directory: %{dir} | Platform: %{platform} | |
| PROMPT | |
| def initialize(model: MODEL, tools: Tools.tool_names, silent: false) | |
| @model = model | |
| @tools = tools | |
| @prompt = SYSTEM % { dir: Dir.pwd, platform: RUBY_PLATFORM } | |
| @history = [] | |
| @tokens = 0 | |
| @silent = silent | |
| @toolbox = Tools.new(spawn: method(:spawn)) | |
| end | |
| def run(input) | |
| @history << user(input) | |
| loop do | |
| response = spin("thinking") { | |
| LLM.chat(model: @model, system: @prompt, contents: @history, tools: Tools.schema_for(@tools)) | |
| } | |
| response => { parts:, tokens: } | |
| @tokens = tokens | |
| calls = parts.select { _1["functionCall"] } | |
| @history << model(parts) | |
| text = LLM.text(parts) | |
| emit text | |
| break text if calls.empty? | |
| threads = calls.map { |call| | |
| fc = call["functionCall"] | |
| name = fc["name"] | |
| args = fc["args"] || {} | |
| hint = args.values.first.to_s.gsub(/\s+/, " ")[0, 60] | |
| [name, hint, Thread.new { execute_tool(name, args) }] | |
| } | |
| label = threads.length == 1 ? threads.first.first : "#{threads.length} tools" | |
| spin(label) { threads.each { |_, _, t| t.join } } | |
| @history << tool_responses(threads.map { |name, hint, t| | |
| result = t.value | |
| emit " #{name.yellow} #{hint.dim} → #{result.length.to_s.bold} chars" | |
| tool_result(name, result) | |
| }) | |
| end | |
| end | |
| def tokens = @tokens | |
| def clear! = (@history = []; @tokens = 0) | |
| def self.repl(model: MODEL) | |
| agent = new(model: model) | |
| puts "◆ ".cyan.bold + "agent.rb".bold + " (#{model.dim})" | |
| puts "Type " + "/quit".yellow + " to exit or " + "/clear".yellow + " to reset" | |
| puts | |
| loop do | |
| pct = (agent.tokens * 100.0 / CONTEXT).round | |
| input = Readline.readline("#{pct}% ".dim + "◆ ".cyan, true) | |
| case input | |
| when nil, "/quit" then break | |
| when /\A\s*\z/ then next | |
| when "/clear" then agent.clear!; puts "Cleared." | |
| else agent.run(input) | |
| end | |
| rescue Interrupt | |
| puts "\nInterrupted." | |
| end | |
| end | |
| private | |
| def emit(text) | |
| puts text unless @silent || text.empty? | |
| end | |
| def execute_tool(name, args) | |
| @toolbox.send(name.to_sym, **args.transform_keys(&:to_sym)) | |
| rescue => e | |
| "Error: #{e.message}" | |
| end | |
| def spawn(prompt) | |
| Agent.new(model: @model, tools: Tools.tool_names - [:agent], silent: false).run(prompt) | |
| end | |
| end | |
| ARGV.any? ? Agent.new.run(ARGV.join(" ")) : Agent.repl |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
agent.rb
A complete AI coding agent in one Ruby file. No gems. Under 250 lines of code.
It can read, write, edit, search, and run your code. It spawns parallel sub-agents for research tasks. It has a REPL with Readline history and a context window indicator. It talks to Gemini over a single persistent HTTP connection.
Tools are just methods. A
toolDSL above adefauto-wires the JSON schema viamethod_added. Required params are inferred from keyword arguments. No boilerplate.The spinner is a string.
"⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏".chars.cycleturns ten characters into an infinite animation. The data is the program.Parallel execution is trivial. Multiple tool calls launch as
Threadsand results are collected when all finish.Setup
Ruby 3.1+, a Gemini API key, and
rg(ripgrep) on PATH.