Skip to content

Instantly share code, notes, and snippets.

@Always-Self-Hosted
Last active February 10, 2026 06:07
Show Gist options
  • Select an option

  • Save Always-Self-Hosted/bb88f73e37cd03ee3d9b24b99098392d to your computer and use it in GitHub Desktop.

Select an option

Save Always-Self-Hosted/bb88f73e37cd03ee3d9b24b99098392d to your computer and use it in GitHub Desktop.
Tweetmash Auto Battler
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2025 Ash <always-self-hosted@protonmail.com>
# This file is released under MIT license
from argparse import ArgumentParser, Namespace
from enum import Enum
from itertools import count
from json import loads, dumps
from logging import Formatter, StreamHandler, getLogger
from pathlib import Path
from threading import Event, Thread
from time import monotonic, sleep
from urllib import request
from os import getenv
from urllib.error import HTTPError, URLError
from urllib.request import Request
SCRIPT_NAME = Path(__file__).stem
SCRIPT_VERSION = "0.2.0"
SCRIPT_ASCII = f"""
Tweetmash Auto Battler v{SCRIPT_VERSION}
"""
BASE_URL = "https://api.tweetmash.com/api"
LOG_LEVELS = {"debug": 10, "info": 20, "warning": 30, "error": 40}
CLIENT = request.build_opener()
# This is a good faith attempt at being a good citizen of the open internet.
# If this UA gets blocked, we should reach out to suno and plead for them to reconsider.
CLIENT.addheaders = [("User-agent", f"{SCRIPT_NAME}/{SCRIPT_VERSION}")]
class Colours(Enum):
RESET = "\x1b[0m"
RED = "\x1b[31m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
BLUE = "\x1b[34m"
class Logger:
last_msg_len = 0
cr_used = False
extras = {"start": "", "spacing": "", "end": ""}
def __init__(self, log_level: str) -> None:
self.logger = getLogger(SCRIPT_NAME)
self.logger.setLevel(LOG_LEVELS[log_level])
handler = StreamHandler()
handler.setLevel(LOG_LEVELS[log_level])
handler.setFormatter(Formatter("{start}{message}{spacing}{end}", style="{"))
handler.terminator = ""
self.logger.addHandler(handler)
def log_message(
self,
level: str,
message: str,
colour: Colours = Colours.RESET,
cr: bool = False,
final_msg: bool = False,
) -> None:
if LOG_LEVELS[level] < self.logger.level:
return
raw = self.logger.level == LOG_LEVELS["debug"]
msg_len = len(message)
message = f"{colour.value}{message}{Colours.RESET.value}"
spacing = " " * max(0, self.last_msg_len - msg_len)
self.last_msg_len = msg_len
self.extras["start"] = "\n" if raw and self.cr_used else ""
self.extras["end"] = "\r" if cr and not final_msg else "\n"
self.extras["spacing"] = spacing if self.cr_used and not raw else ""
self.cr_used = self.extras["end"] == "\r"
self.logger.log(LOG_LEVELS[level], message, extra=self.extras)
def send_request(
logger: Logger,
url: str,
method: str = "GET",
body: dict | None = None,
retries: int = 1,
backoff: float = 2,
) -> dict | None:
data = None
headers = {}
if body is not None:
data = dumps(body).encode("utf-8")
headers["content-type"] = "application/json"
req = Request(url, data=data, headers=headers, method=method)
logger.log_message("debug", f"{req.get_method()} {req.get_full_url()}")
while True:
try:
with CLIENT.open(req, timeout=60) as resp:
raw = resp.read()
logger.log_message("debug", f"Response {resp.status} {resp.reason}")
return loads(raw) if raw else None
except (HTTPError, URLError, OSError, TimeoutError) as err:
logger.log_message("debug", f"Error {err.code}: {err.msg}")
if retries > 0:
label = "Rate limited!" if err.code == 429 else err.msg
logger.log_message("debug", f"{label}. Retrying in {backoff:.1f}s")
sleep(backoff)
retries -= 1
backoff *= 2
continue
raise
def get_next_battle(logger: Logger) -> tuple[str, str] | None:
data = send_request(logger, f"{BASE_URL}/battles/next")
if not isinstance(data, dict):
logger.log_message("warning", "Invalid response", Colours.YELLOW, cr=True)
return None
battle = data.get("battle")
if not isinstance(battle, dict):
logger.log_message("warning", f"Missing battle object: {data}", Colours.YELLOW, cr=True)
return None
battle_id = battle.get("id")
tweet_a = battle.get("tweetA", {})
tweet_b = battle.get("tweetB", {})
tweet_id = tweet_a.get("id") or tweet_b.get("id")
if not battle_id or not tweet_id:
logger.log_message("warning", f"Incomplete battle payload: {battle}", Colours.YELLOW, cr=True)
return None
logger.log_message("debug", f"Parsed battle_id={battle_id} tweet_id={tweet_id}")
return battle_id, tweet_id
def vote(logger: Logger, battle_id: str, tweet_id: str) -> None:
send_request(
logger,
f"{BASE_URL}/battles/{battle_id}/vote",
method="POST",
body={"winnerTweetId": tweet_id},
)
def worker(logger: Logger, stop: Event, counter: count, base_interval: float, max_backoff: float) -> None:
interval = base_interval
while not stop.is_set():
started = monotonic()
try:
result = get_next_battle(logger)
if not result:
logger.log_message("warning", f"No battles available! Waiting {interval:.1f}s", Colours.YELLOW, cr=True)
interval = min(interval * 2, max_backoff)
elif not result[1]:
logger.log_message("debug", "Battle missing tweet id")
else:
battle_id, tweet_id = result
vote(logger, battle_id, tweet_id)
total = next(counter)
logger.log_message("info", f"Voted battle {battle_id}! Total votes: {total}", Colours.GREEN, cr=True)
interval = base_interval
except Exception as err:
logger.log_message("error", f"{err}! Waiting {interval:.1f}s", Colours.RED, cr=True)
interval = min(interval * 2, max_backoff)
elapsed = monotonic() - started
stop.wait(max(0, interval - elapsed))
def get_cli_args() -> Namespace:
parser = ArgumentParser(prog=SCRIPT_NAME)
parser.add_argument("--token", default=getenv("TWEETMASH_TOKEN", ""), help="Tweetmash session token")
parser.add_argument("--log-level", choices=LOG_LEVELS, default="info")
parser.add_argument("--threads", type=int, default=1, help="Number of concurrent workers")
parser.add_argument("--sleep", type=float, default=0.1, help="Seconds delay between votes")
parser.add_argument("--max-backoff", type=float, default=60, help="Maximum backoff delay in seconds")
return parser.parse_args()
def validate_cli_args(logger: Logger, args: Namespace) -> bool:
if not args.token.strip():
logger.log_message("error", "No Tweetmash session token provided.", Colours.RED)
return False
return True
def main() -> int:
args = get_cli_args()
logger = Logger(args.log_level)
logger.log_message("info", SCRIPT_ASCII, Colours.BLUE)
if not validate_cli_args(logger, args):
return 1
CLIENT.addheaders = [
("origin", "https://tweetmash.com"),
("referer", "https://tweetmash.com/"),
("cookie", f"__Secure-better-auth.session_token={args.token.strip()}"),
]
stop = Event()
counter = count(1)
threads = [
Thread(target=worker, args=(logger, stop, counter, args.sleep, args.max_backoff), daemon=True)
for _ in range(args.threads)
]
for t in threads:
t.start()
logger.log_message("info", f"Started {args.threads} workers", Colours.GREEN, cr=True)
try:
while any(t.is_alive() for t in threads):
sleep(args.sleep)
except KeyboardInterrupt:
logger.log_message("info", f"Keyboard Interrupt detected! Stopping threads...", Colours.BLUE)
stop.set()
for t in threads:
t.join(timeout=1)
total = next(counter) - 1
logger.log_message("info", f"Stopped after {total} votes", Colours.GREEN, final_msg=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment