Skip to content

Instantly share code, notes, and snippets.

@onacit
Last active December 13, 2025 10:56
Show Gist options
  • Select an option

  • Save onacit/8c38e22acd793a7fc644e1c283ab5bbb to your computer and use it in GitHub Desktop.

Select an option

Save onacit/8c38e22acd793a7fc644e1c283ab5bbb to your computer and use it in GitHub Desktop.
Generates SQLite database from the rickandmortyapi.com
.venv
.idea
rickandmortyapi.db
--
-- ------------------------------------------------------------------------------------------------------------------ id
SELECT *
FROM character
ORDER BY id ASC
;
-- ---------------------------------------------------------------------------------------------------------------- nane
-- -------------------------------------------------------------------------------------------------------------- status
SELECT DISTINCT status
FROM character
Order by status ASC
;
-- -------------------------------------------------------------------------------------------------------------- species
SELECT DISTINCT species
FROM character
Order by species ASC
;
-- ---------------------------------------------------------------------------------------------------------------- type
SELECT DISTINCT type
FROM character
Order by type ASC
;
-- -------------------------------------------------------------------------------------------------------------- gender
SELECT DISTINCT gender
FROM character
Order by gender ASC
;
-- -------------------------------------------------------------------------------------------------------------- origin
-- ------------------------------------------------------------------------------------------------------------ location
-- --------------------------------------------------------------------------------------------------------------- image
-- ------------------------------------------------------------------------------------------------------------- episode
-- ----------------------------------------------------------------------------------------------------------------- url
-- ------------------------------------------------------------------------------------------------------------- created
-- ----------------------------------------------------------------------------------------------------------- origin_id
SELECT c.name character_name, l.name origin_name
FROM character c
JOIN location l ON c.origin_id = l.id
ORDER BY c.id ASC
;
-- --------------------------------------------------------------------------------------------------------- location_id
SELECT c.name character_name, l.name location_name
FROM character c
JOIN location l ON c.location_id = l.id
ORDER BY c.id ASC
;
--
-- --------------------------------------------------------------------------------------------------------- location_id
SELECT lr.location_id, l.id
FROM location_resident lr
LEFT OUTER JOIN location l ON lr.location_id = l.id
WHERE l.id IS NULL
;
-- --------------------------------------------------------------------------------------------------------- resident_id
SELECT lr.*, c.id
FROM location_resident lr
LEFT OUTER JOIN character c ON lr.resident_id = c.id
WHERE c.id IS NULL
;
SELECT lr.*, c.id, c.location_id
FROM location_resident lr
LEFT OUTER JOIN character c ON lr.resident_id = c.id AND lr.location_id = c.location_id
WHERE c.id IS NULL
;
@startuml
!theme plain
top to bottom direction
skinparam linetype ortho
class character {
name: text
status: text
species: text
type: text
gender: text
origin: text
location: text
image: text
episode: text
url: text
created: text
origin_id: integer
location_id: integer
id: integer
}
class character_episode {
character_id: integer
episode_id: integer
}
class episode {
name: text
air_date: text
episode: text
characters: text
url: text
created: text
id: integer
}
class episode_character {
episode_id: integer
character_id: integer
}
class location {
name: text
type: text
dimension: text
residents: text
url: text
created: text
id: integer
}
class location_resident {
location_id: integer
resident_id: integer
}
class sqlite_master {
type: text
name: text
tbl_name: text
rootpage: int
sql: text
}
character -[#595959,plain]-^ location : "location_id:id"
character -[#595959,plain]-^ location : "origin_id:id"
character_episode -[#595959,plain]-^ character : "character_id:id"
character_episode -[#595959,plain]-^ episode : "episode_id:id"
episode_character -[#595959,plain]-^ character : "character_id:id"
episode_character -[#595959,plain]-^ episode : "episode_id:id"
location_resident -[#595959,plain]-^ character : "resident_id:id"
location_resident -[#595959,plain]-^ location : "location_id:id"
@enduml
altgraph @ file:///AppleInternal/Library/BuildRoots/4~CAP1ugDqYZ2ZVF_54thwSWnK-8L4LO5_Zcx-VcI/Library/Caches/com.apple.xbs/Sources/python3/altgraph-0.17.2-py2.py3-none-any.whl
black==25.11.0
certifi==2025.11.12
charset-normalizer==3.4.4
click==8.1.8
future @ file:///AppleInternal/Library/BuildRoots/4~CAP1ugDqYZ2ZVF_54thwSWnK-8L4LO5_Zcx-VcI/Library/Caches/com.apple.xbs/Sources/python3/future-0.18.2-py3-none-any.whl
idna==3.11
macholib @ file:///AppleInternal/Library/BuildRoots/4~CAP1ugDqYZ2ZVF_54thwSWnK-8L4LO5_Zcx-VcI/Library/Caches/com.apple.xbs/Sources/python3/macholib-1.15.2-py2.py3-none-any.whl
mypy_extensions==1.1.0
packaging==25.0
pathspec==0.12.1
platformdirs==4.4.0
pytokens==0.3.0
requests==2.32.5
six @ file:///AppleInternal/Library/BuildRoots/4~CAP1ugDqYZ2ZVF_54thwSWnK-8L4LO5_Zcx-VcI/Library/Caches/com.apple.xbs/Sources/python3/six-1.15.0-py2.py3-none-any.whl
tomli==2.3.0
typing_extensions==4.15.0
urllib3==2.6.2
import json
import os
import requests
import sqlite3
# use with connection.set_trace_callback(log_sql_callback)
def log_sql_callback(statement):
print(f"Executing SQL statement: {statement}")
db_file = "rickandmortyapi.db"
def create():
sql_file = "rickandmortyapi.sql"
if os.path.exists(db_file):
print(f"removing existing database file: '{db_file}'")
os.remove(db_file)
connection = None
try:
with open(sql_file, "r", encoding="utf-8") as f:
sql_script = f.read()
connection = sqlite3.connect(db_file)
cursor = connection.cursor()
print(f"new database '{db_file}' created and connected.")
cursor.executescript(sql_script)
connection.commit()
print(f"successfully executed DDL script from '{sql_file}'.")
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
base_url = "https://rickandmortyapi.com/api"
def read(path, page=1):
full_url = f"{base_url}{path}?page={page}"
print(f"fetching {full_url}")
try:
response = requests.get(full_url)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"failed to read {full_url}: {e}")
return None
def location():
connection = None
try:
connection = sqlite3.connect(db_file)
# connection.set_trace_callback(log_sql_callback)
cursor = connection.cursor()
page = 1
while True:
response = read("/location", page)
if response is None:
break
for result in response["results"]:
cursor.execute(
"""INSERT INTO location
(id, name, type, dimension, residents, url, created)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
result["id"],
result["name"],
result["type"].strip() or None,
result["dimension"].strip() or None,
",".join(result["residents"]).strip() or None,
result["url"],
result["created"],
),
)
page += 1
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def character():
connection = None
try:
connection = sqlite3.connect(db_file)
# connection.set_trace_callback(log_sql_callback)
cursor = connection.cursor()
page = 1
while True:
response = read("/character", page)
if response is None:
break
for result in response["results"]:
cursor.execute(
"""INSERT INTO character
(id, name, status, species, type, gender, origin, location, image, episode, url, created, origin_id,
location_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
result["id"],
result["name"],
result["status"],
result["species"],
result["type"].strip() or None,
result["gender"],
json.dumps(result["origin"]),
json.dumps(result["location"]),
result["image"],
",".join(result["episode"]),
result["url"],
result["created"],
result["origin"]["url"].split("/")[-1] or None,
result["location"]["url"].split("/")[-1] or None,
),
)
page += 1
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def episode():
connection = None
try:
connection = sqlite3.connect(db_file)
# connection.set_trace_callback(log_sql_callback)
cursor = connection.cursor()
page = 1
while True:
response = read("/episode", page)
if response is None:
break
for result in response["results"]:
cursor.execute(
"""INSERT INTO episode
(id, name, air_date, episode, characters, url, created)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
result["id"],
result["name"],
result["air_date"],
result["episode"],
",".join(result["characters"]),
result["url"],
result["created"],
),
)
page += 1
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def character():
connection = None
try:
connection = sqlite3.connect(db_file)
# connection.set_trace_callback(log_sql_callback)
cursor = connection.cursor()
page = 1
while True:
response = read("/character", page)
if response is None:
break
for result in response["results"]:
cursor.execute(
"""INSERT INTO character
(id, name, status, species, type, gender, origin, location, image, episode, url, created, origin_id,
location_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
result["id"],
result["name"],
result["status"],
result["species"],
result["type"].strip() or None,
result["gender"],
json.dumps(result["origin"]),
json.dumps(result["location"]),
result["image"],
",".join(result["episode"]),
result["url"],
result["created"],
result["origin"]["url"].split("/")[-1] or None,
result["location"]["url"].split("/")[-1] or None,
),
)
page += 1
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def location_resident():
connection = None
try:
connection = sqlite3.connect(db_file)
# connection.set_trace_callback(log_sql_callback)
cursor = connection.cursor()
cursor.execute("""SELECT id, residents FROM location ORDER BY id ASC""")
for each in cursor.fetchall():
id = each[0]
residents = each[1]
if residents is None:
continue
for resident_id in sorted(
[e.split("/")[-1] for e in residents.split(",")], key=int
):
cursor.execute(
"""INSERT INTO location_resident (location_id, resident_id) VALUES (?, ?)""",
(id, resident_id),
)
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def character_episode():
connection = None
try:
connection = sqlite3.connect(db_file)
cursor = connection.cursor()
cursor.execute("""SELECT id, episode FROM character ORDER BY id ASC""")
for each in cursor.fetchall():
id = each[0]
episode = each[1]
if episode is None:
continue
for episode_id in sorted(
[e.split("/")[-1] for e in episode.split(",")], key=int
):
cursor.execute(
"""INSERT INTO character_episode (character_id, episode_id) VALUES (?, ?)""",
(id, episode_id),
)
connection.commit()
except sqlite3.Error as e:
print(f"a database error occurred: {e}")
except IOError as e:
print(f"an error reading the SQL file occurred: {e}")
finally:
if connection:
connection.close()
def episode_character():
connection = None
try:
connection = sqlite3.connect(db_file)
cursor = connection.cursor()
cursor.execute("""SELECT id, characters FROM episode ORDER BY id ASC""")
for each in cursor.fetchall():
id = each[0]
characters = each[1]
if characters is None:
continue
for character_id in sorted(
[e.split("/")[-1] for e in characters.split(",")], key=int
):
cursor.execute(
"""INSERT INTO episode_character (episode_id, character_id) VALUES (?, ?)""",
(id, character_id),
)
connection.commit()
except sqlite3.Error as character_id:
print(f"a database error occurred: {character_id}")
except IOError as character_id:
print(f"an error reading the SQL file occurred: {character_id}")
finally:
if connection:
connection.close()
def vacuum():
try:
connection = sqlite3.connect(db_file, isolation_level=None)
cursor = connection.cursor()
cursor.execute("VACUUM")
print(f"database '{db_file}' successfully vacuumed.")
except sqlite3.Error as e:
print(f"an error occurred during VACUUM: {e}")
finally:
if connection:
connection.close()
def count(table):
connection = None
try:
connection = sqlite3.connect(db_file)
cursor = connection.cursor()
query = f"SELECT COUNT(*) FROM {table}"
cursor.execute(query)
result = cursor.fetchone()
if result:
return result[0]
else:
return None
except sqlite3.Error as e:
print(f"an error occurred while counting: {table} {e}")
finally:
if connection:
connection.close()
create()
location()
character()
episode()
location_resident()
character_episode()
episode_character()
vacuum()
location_count = count("location")
print(f"location count: {location_count}")
assert location_count == 126
character_count = count("character")
print(f"character count: {character_count}")
assert character_count == 826
episode_count = count("episode")
print(f"episode count: {episode_count}")
assert episode_count == 51
print("done")
CREATE TABLE IF NOT EXISTS location
(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
type TEXT,
dimension TEXT,
residents TEXT,
url TEXT NOT NULL,
created TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS character
(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
species TEXT NOT NULL,
type TEXT,
gender TEXT NOT NULL,
origin TEXT NOT NULL,
location TEXT NOT NULL,
image TEXT NOT NULL,
episode TEXT NOT NULL,
url TEXT NOT NULL,
created TEXT NOT NULL,
origin_id INTEGER,
location_id INTEGER,
FOREIGN KEY (origin_id) REFERENCES location (id),
FOREIGN KEY (location_id) REFERENCES location (id)
);
CREATE TABLE IF NOT EXISTS episode
(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
air_date TEXT NOT NULL,
episode TEXT NOT NULL UNIQUE,
characters TEXT NOT NULL,
url TEXT NOT NULL,
created TEXT NOT NULL
);
-- location.residents
CREATE TABLE IF NOT EXISTS location_resident
(
location_id INTEGER NOT NULL,
resident_id INTEGER NOT NULL,
FOREIGN KEY (location_id) REFERENCES location (id),
FOREIGN KEY (resident_id) REFERENCES character (id),
UNIQUE (location_id, resident_id)
);
CREATE INDEX location_resident_resident_id ON location_resident (resident_id);
-- character.episode
CREATE TABLE IF NOT EXISTS character_episode
(
character_id INTEGER NOT NULL,
episode_id INTEGER NOT NULL,
FOREIGN KEY (character_id) REFERENCES character (id),
FOREIGN KEY (episode_id) REFERENCES episode (id),
UNIQUE (character_id, episode_id)
);
CREATE INDEX character_episode_episode_id ON character_episode (episode_id);
-- CREATE TABLE IF NOT EXISTS character_location
-- (
-- character_id INTEGER NOT NULL,
-- location_id INTEGER NOT NULL,
-- FOREIGN KEY (character_id) REFERENCES character (id),
-- FOREIGN KEY (location_id) REFERENCES location (id),
-- UNIQUE (character_id, location_id)
-- );
-- episode.characters
CREATE TABLE IF NOT EXISTS episode_character
(
episode_id INTEGER NOT NULL,
character_id INTEGER NOT NULL,
FOREIGN KEY (episode_id) REFERENCES episode (id),
FOREIGN KEY (character_id) REFERENCES character (id),
UNIQUE (episode_id, character_id)
);
CREATE INDEX episode_character_character_id ON episode_character (character_id);
# Python Script to Find Whitespace-Only Values
import sqlite3
# Connect to your SQLite database file
# Replace 'your_database.db' with your actual database file name
conn = sqlite3.connect("rickandmortyapi.db")
cursor = conn.cursor()
# Query the sqlite_master table to get all user-defined table names
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';"
)
tables = cursor.fetchall()
for table_name_tuple in tables:
table_name = table_name_tuple[0]
print(f"\n--- Checking table: {table_name} ---")
# Query the pragma_table_info to get column names for the current table
cursor.execute(f"PRAGMA table_info('{table_name}');")
columns_info = cursor.fetchall()
# Filter for columns that are likely to be text and store their names
# The 'type' might be 'TEXT', 'VARCHAR', etc.
text_columns = [
col[1]
for col in columns_info
if "TEXT" in col[2].upper() or "CHAR" in col[2].upper()
]
for column_name in text_columns:
# Construct the dynamic SQL query to find rows where the trimmed column is an empty string
# TRIM(column_name) = '' identifies values that are only whitespace after trimming
query = f"SELECT rowid, * FROM '{table_name}' WHERE TRIM('{column_name}') = '' AND '{column_name}' IS NOT NULL;"
cursor.execute(query)
rows = cursor.fetchall()
if rows:
print(f" Found whitespace-only values in column '{column_name}':")
for row in rows:
print(
f" Row ID {row[0]}: {row[columns_info.index((0, column_name, '', 0, None, 0)) + 1]!r}"
) # Display the raw value
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment