Last active
February 10, 2026 06:07
-
-
Save Always-Self-Hosted/bb88f73e37cd03ee3d9b24b99098392d to your computer and use it in GitHub Desktop.
Tweetmash Auto Battler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| # Copyright (c) 2025 Ash <always-self-hosted@protonmail.com> | |
| # This file is released under MIT license | |
| from argparse import ArgumentParser, Namespace | |
| from enum import Enum | |
| from itertools import count | |
| from json import loads, dumps | |
| from logging import Formatter, StreamHandler, getLogger | |
| from pathlib import Path | |
| from threading import Event, Thread | |
| from time import monotonic, sleep | |
| from urllib import request | |
| from os import getenv | |
| from urllib.error import HTTPError, URLError | |
| from urllib.request import Request | |
| SCRIPT_NAME = Path(__file__).stem | |
| SCRIPT_VERSION = "0.2.0" | |
| SCRIPT_ASCII = f""" | |
| Tweetmash Auto Battler v{SCRIPT_VERSION} | |
| """ | |
| BASE_URL = "https://api.tweetmash.com/api" | |
| LOG_LEVELS = {"debug": 10, "info": 20, "warning": 30, "error": 40} | |
| CLIENT = request.build_opener() | |
| # This is a good faith attempt at being a good citizen of the open internet. | |
| # If this UA gets blocked, we should reach out to suno and plead for them to reconsider. | |
| CLIENT.addheaders = [("User-agent", f"{SCRIPT_NAME}/{SCRIPT_VERSION}")] | |
| class Colours(Enum): | |
| RESET = "\x1b[0m" | |
| RED = "\x1b[31m" | |
| GREEN = "\x1b[32m" | |
| YELLOW = "\x1b[33m" | |
| BLUE = "\x1b[34m" | |
| class Logger: | |
| last_msg_len = 0 | |
| cr_used = False | |
| extras = {"start": "", "spacing": "", "end": ""} | |
| def __init__(self, log_level: str) -> None: | |
| self.logger = getLogger(SCRIPT_NAME) | |
| self.logger.setLevel(LOG_LEVELS[log_level]) | |
| handler = StreamHandler() | |
| handler.setLevel(LOG_LEVELS[log_level]) | |
| handler.setFormatter(Formatter("{start}{message}{spacing}{end}", style="{")) | |
| handler.terminator = "" | |
| self.logger.addHandler(handler) | |
| def log_message( | |
| self, | |
| level: str, | |
| message: str, | |
| colour: Colours = Colours.RESET, | |
| cr: bool = False, | |
| final_msg: bool = False, | |
| ) -> None: | |
| if LOG_LEVELS[level] < self.logger.level: | |
| return | |
| raw = self.logger.level == LOG_LEVELS["debug"] | |
| msg_len = len(message) | |
| message = f"{colour.value}{message}{Colours.RESET.value}" | |
| spacing = " " * max(0, self.last_msg_len - msg_len) | |
| self.last_msg_len = msg_len | |
| self.extras["start"] = "\n" if raw and self.cr_used else "" | |
| self.extras["end"] = "\r" if cr and not final_msg else "\n" | |
| self.extras["spacing"] = spacing if self.cr_used and not raw else "" | |
| self.cr_used = self.extras["end"] == "\r" | |
| self.logger.log(LOG_LEVELS[level], message, extra=self.extras) | |
| def send_request( | |
| logger: Logger, | |
| url: str, | |
| method: str = "GET", | |
| body: dict | None = None, | |
| retries: int = 1, | |
| backoff: float = 2, | |
| ) -> dict | None: | |
| data = None | |
| headers = {} | |
| if body is not None: | |
| data = dumps(body).encode("utf-8") | |
| headers["content-type"] = "application/json" | |
| req = Request(url, data=data, headers=headers, method=method) | |
| logger.log_message("debug", f"{req.get_method()} {req.get_full_url()}") | |
| while True: | |
| try: | |
| with CLIENT.open(req, timeout=60) as resp: | |
| raw = resp.read() | |
| logger.log_message("debug", f"Response {resp.status} {resp.reason}") | |
| return loads(raw) if raw else None | |
| except (HTTPError, URLError, OSError, TimeoutError) as err: | |
| logger.log_message("debug", f"Error {err.code}: {err.msg}") | |
| if retries > 0: | |
| label = "Rate limited!" if err.code == 429 else err.msg | |
| logger.log_message("debug", f"{label}. Retrying in {backoff:.1f}s") | |
| sleep(backoff) | |
| retries -= 1 | |
| backoff *= 2 | |
| continue | |
| raise | |
| def get_next_battle(logger: Logger) -> tuple[str, str] | None: | |
| data = send_request(logger, f"{BASE_URL}/battles/next") | |
| if not isinstance(data, dict): | |
| logger.log_message("warning", "Invalid response", Colours.YELLOW, cr=True) | |
| return None | |
| battle = data.get("battle") | |
| if not isinstance(battle, dict): | |
| logger.log_message("warning", f"Missing battle object: {data}", Colours.YELLOW, cr=True) | |
| return None | |
| battle_id = battle.get("id") | |
| tweet_a = battle.get("tweetA", {}) | |
| tweet_b = battle.get("tweetB", {}) | |
| tweet_id = tweet_a.get("id") or tweet_b.get("id") | |
| if not battle_id or not tweet_id: | |
| logger.log_message("warning", f"Incomplete battle payload: {battle}", Colours.YELLOW, cr=True) | |
| return None | |
| logger.log_message("debug", f"Parsed battle_id={battle_id} tweet_id={tweet_id}") | |
| return battle_id, tweet_id | |
| def vote(logger: Logger, battle_id: str, tweet_id: str) -> None: | |
| send_request( | |
| logger, | |
| f"{BASE_URL}/battles/{battle_id}/vote", | |
| method="POST", | |
| body={"winnerTweetId": tweet_id}, | |
| ) | |
| def worker(logger: Logger, stop: Event, counter: count, base_interval: float, max_backoff: float) -> None: | |
| interval = base_interval | |
| while not stop.is_set(): | |
| started = monotonic() | |
| try: | |
| result = get_next_battle(logger) | |
| if not result: | |
| logger.log_message("warning", f"No battles available! Waiting {interval:.1f}s", Colours.YELLOW, cr=True) | |
| interval = min(interval * 2, max_backoff) | |
| elif not result[1]: | |
| logger.log_message("debug", "Battle missing tweet id") | |
| else: | |
| battle_id, tweet_id = result | |
| vote(logger, battle_id, tweet_id) | |
| total = next(counter) | |
| logger.log_message("info", f"Voted battle {battle_id}! Total votes: {total}", Colours.GREEN, cr=True) | |
| interval = base_interval | |
| except Exception as err: | |
| logger.log_message("error", f"{err}! Waiting {interval:.1f}s", Colours.RED, cr=True) | |
| interval = min(interval * 2, max_backoff) | |
| elapsed = monotonic() - started | |
| stop.wait(max(0, interval - elapsed)) | |
| def get_cli_args() -> Namespace: | |
| parser = ArgumentParser(prog=SCRIPT_NAME) | |
| parser.add_argument("--token", default=getenv("TWEETMASH_TOKEN", ""), help="Tweetmash session token") | |
| parser.add_argument("--log-level", choices=LOG_LEVELS, default="info") | |
| parser.add_argument("--threads", type=int, default=1, help="Number of concurrent workers") | |
| parser.add_argument("--sleep", type=float, default=0.1, help="Seconds delay between votes") | |
| parser.add_argument("--max-backoff", type=float, default=60, help="Maximum backoff delay in seconds") | |
| return parser.parse_args() | |
| def validate_cli_args(logger: Logger, args: Namespace) -> bool: | |
| if not args.token.strip(): | |
| logger.log_message("error", "No Tweetmash session token provided.", Colours.RED) | |
| return False | |
| return True | |
| def main() -> int: | |
| args = get_cli_args() | |
| logger = Logger(args.log_level) | |
| logger.log_message("info", SCRIPT_ASCII, Colours.BLUE) | |
| if not validate_cli_args(logger, args): | |
| return 1 | |
| CLIENT.addheaders = [ | |
| ("origin", "https://tweetmash.com"), | |
| ("referer", "https://tweetmash.com/"), | |
| ("cookie", f"__Secure-better-auth.session_token={args.token.strip()}"), | |
| ] | |
| stop = Event() | |
| counter = count(1) | |
| threads = [ | |
| Thread(target=worker, args=(logger, stop, counter, args.sleep, args.max_backoff), daemon=True) | |
| for _ in range(args.threads) | |
| ] | |
| for t in threads: | |
| t.start() | |
| logger.log_message("info", f"Started {args.threads} workers", Colours.GREEN, cr=True) | |
| try: | |
| while any(t.is_alive() for t in threads): | |
| sleep(args.sleep) | |
| except KeyboardInterrupt: | |
| logger.log_message("info", f"Keyboard Interrupt detected! Stopping threads...", Colours.BLUE) | |
| stop.set() | |
| for t in threads: | |
| t.join(timeout=1) | |
| total = next(counter) - 1 | |
| logger.log_message("info", f"Stopped after {total} votes", Colours.GREEN, final_msg=True) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment