Get started with SuperTable in just a few simple steps:
pip install supertable
export SUPERTABLE_HOME="$HOME/supertable"
Summary: Python script for Create super table.
from supertable.super_table import SuperTable
from examples.defaults import super_name, organization
from supertable.config.defaults import logger
st = SuperTable(super_name, organization)
logger.info(f"Created SuperTable: {st.super_name}")
Summary: Python script for Create roles.
from supertable.rbac.role_manager import RoleManager
from examples.defaults import super_name, organization
from supertable.config.defaults import logger
# ---------- ROLE OPERATIONS ----------
# Initialize the RoleManager with a base directory.
role_manager = RoleManager(super_name=super_name, organization=organization)
# --- Create roles ---
admin_data = {
"role": "admin",
"tables": ["*"]
}
admin_hash = role_manager.create_role(admin_data)
logger.info(f"Admin role created with hash: {admin_hash}")
editor_data = {
"role": "writer",
"tables": ["table1", "table2"]
}
editor_hash = role_manager.create_role(editor_data)
logger.info(f"Editor role created with hash: {admin_hash}")
usage_data = {
"role": "meta",
"tables": ["table1"]
}
usage_hash = role_manager.create_role(usage_data)
logger.info(f"Usage role created with hash: {usage_hash}")
viewer_data = {
"role": "reader",
"tables": ["table1"],
"columns": ["name", "email", "age"],
"filters": {"country": "US", "active": True}
}
viewer_hash = role_manager.create_role(viewer_data)
logger.info(f"Viewer role created with hash: {viewer_hash}")
viewer2_data = {
"role": "reader",
"tables": ["table2", "table3"],
"columns": ["name", "email", "age"],
"filters": {"country": "EU", "active": False}
}
viewer2_hash = role_manager.create_role(viewer2_data)
logger.info(f"Viewer2 role created with hash: {viewer2_hash}")
# --- List all roles ---
all_roles = role_manager.list_roles()
logger.info(f"Listing all roles: {all_roles}")
for role in all_roles:
logger.info(role)
# --- Retrieve a specific role's configuration ---
admin_config = role_manager.get_role(admin_hash)
logger.info(f"Retrieving Admin role configuration: {admin_config}")
logger.info(f"Admin config: {admin_config}")
Summary: Python script for Create users.
import os
from supertable.config.defaults import logger
from supertable.rbac.role_manager import RoleManager
from supertable.rbac.user_manager import UserManager
from examples.defaults import super_name, organization
# ---------- ROLE OPERATIONS ----------
# Initialize the RoleManager with a base directory.
role_manager = RoleManager(super_name=super_name, organization=organization)
# List valid roles from _roles.json.
valid_roles = role_manager.list_roles()
for role in valid_roles:
logger.info(f"Role Type: {role['role']}, Hash: {role['hash']}")
# Extract valid role hashes by role type.
admin_role_hash = None
editor_role_hash = None
viewer_role_hash = None
usage_role_hash = None
for role in valid_roles:
role_type = role["role"].lower()
if role_type == "admin" and admin_role_hash is None:
admin_role_hash = role["hash"]
elif role_type == "writer" and editor_role_hash is None:
editor_role_hash = role["hash"]
elif role_type == "reader" and viewer_role_hash is None:
viewer_role_hash = role["hash"]
elif role_type == "meta" and usage_role_hash is None:
usage_role_hash = role["hash"]
# If no viewer role exists, create one.
if viewer_role_hash is None:
viewer_data = {
"role": "reader",
"tables": [], # Empty list interpreted as all tables
"columns": [], # Empty list interpreted as all columns
"filters": {} # No filters = all rows
}
viewer_role_hash = role_manager.create_role(viewer_data)
logger.info(f"Default Viewer role created with hash: {viewer_role_hash}")
# ---------- USER OPERATIONS ----------
# Initialize the UserManager with the same base directory.
user_manager = UserManager(super_name=super_name, organization=organization)
# --- Create users ---
# User Alice will have the admin and editor roles.
alice_data = {
"username": "alice",
"roles": [admin_role_hash, editor_role_hash] # valid role hashes
}
alice_hash = user_manager.create_user(alice_data)
logger.info(f"User Alice created with hash: {alice_hash}")
# User Bob will have the viewer role.
bob_data = {
"username": "bob",
"roles": [viewer_role_hash]
}
bob_hash = user_manager.create_user(bob_data)
logger.info(f"User Bob created with hash: {bob_hash}")
# User Charlie is created with no roles.
charlie_data = {
"username": "charlie",
"roles": []
}
charlie_hash = user_manager.create_user(charlie_data)
logger.info(f"User Charlie created with hash: {charlie_hash}")
# --- Modify user ---
# Update Charlie: change his username and assign him the usage role.
user_manager.modify_user(charlie_hash, {"username": "charlie_updated", "roles": [usage_role_hash]})
charlie_data_updated = user_manager.get_user(charlie_hash)
logger.info(f"User Charlie after modification: {charlie_data_updated}")
# --- Delete a user ---
# Delete Bob.
user_manager.delete_user(bob_hash)
logger.info(f"User Bob deleted: {bob_hash}")
# --- Delete a role ---
# For example, delete the viewer role.
if viewer_role_hash:
deleted = role_manager.delete_role(viewer_role_hash)
if deleted:
logger.info(f"Viewer role deleted: {viewer_role_hash}")
# Remove the deleted role from all users.
user_manager.remove_role_from_users(viewer_role_hash)
logger.info(f"Viewer role removed from all users: {viewer_role_hash}")
else:
logger.error(f"Viewer role deletion failed: {viewer_role_hash}")
# --- List all users ---
logger.info(f"Listing all users:")
user_meta = user_manager.storage.read_json(user_manager.user_meta_path)
for user_hash, username in user_meta["users"].items():
user_file_path = os.path.join(user_manager.user_dir, user_hash + ".json")
user_data = user_manager.storage.read_json(user_file_path)
print()
logger.info(user_data)
Summary: Python script for Write dummy data.
import polars as pl
import glob, os
from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.data_writer import DataWriter
from supertable.simple_table import SimpleTable
overwrite_columns = ["day", "client"]
dw = DataWriter(super_name=super_name, organization=organization)
st = SimpleTable(dw.super_table, simple_name)
data_dir = st.data_dir
for ds in [1, 2, 6, 7, 3, 4, 5]:
table_name, arrow = get_dummy_data(ds)
print(f"\n=== Running write(ds={ds}) === Table: {simple_name} ===")
_, _, ins, del_ = dw.write(
user_hash=user_hash,
simple_name=simple_name,
data=arrow,
overwrite_columns=overwrite_columns,
)
print(f"inserted={ins}, deleted={del_}")
Summary: Python script for Write single data.
from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.data_writer import DataWriter
overwrite_columns = ["day"]
data = get_dummy_data(1)[1]
data_writer = DataWriter(super_name=super_name, organization=organization)
columns, rows, inserted, deleted = data_writer.write(
user_hash=user_hash,
simple_name=simple_name,
data=data,
overwrite_columns=overwrite_columns,
)
Summary: Python script for Write staging.
from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.super_table import SuperTable
from supertable.staging_area import StagingArea
super_table = SuperTable(super_name=super_name, organization=organization)
staging_area = StagingArea(super_table=super_table, organization=organization)
staging_area.save_as_parquet(arrow_table=get_dummy_data(1)[1],
table_name = simple_name,
file_name = "dummy_file_01.parquet" )
Summary: Python script for Write monitoring simple.
import time
import random
from supertable.monitoring_logger import MonitoringLogger
from examples.defaults import super_name, organization, MonitorType
# Use the MonitoringLogger in a context manager to ensure proper setup and teardown
with MonitoringLogger(
super_name=super_name,
organization=organization,
monitor_type=MonitorType.METRICS.value,
max_rows_per_file=500,
flush_interval=0.1
) as monitor:
# Generate a unique ID for this query run
query_id = random.randint(100000, 999999)
# Start a high-resolution timer
start_time = time.perf_counter()
# --- Place your actual work here ---
# For demo purposes, we're just creating some random metrics
stats = {
"query_id": f"query_{query_id}",
"rows_read": random.randint(100, 10000),
"rows_processed": random.randint(100, 10000),
"query_hash": random.randint(100000, 999999)
}
monitor.log_metric(stats)
# --- Work complete ---
# Stop the timer
end_time = time.perf_counter()
# Calculate and print the elapsed time in seconds
elapsed = end_time - start_time
print(f"Total execution time: {elapsed:.4f} seconds")
Summary: Python script for Write monitoring parallel.
import threading
import time
import random
import os
from concurrent.futures import ThreadPoolExecutor, wait
from supertable.monitoring_logger import MonitoringLogger
from examples.defaults import super_name, organization, MonitorType
def print_monitor_stats(monitor):
stats = monitor.get_queue_stats()
print(f"\nProcessed: {stats['total_processed']}/{stats['total_received']} | "
f"Queue: {stats['current_size']} | "
f"Rate: {stats['processing_rate']:.1f} msg/s")
def verify_output(monitor, expected_count):
# Wait for processing to complete
for _ in range(20): # 20 second timeout
stats = monitor.get_queue_stats()
if stats['total_processed'] >= expected_count:
break
time.sleep(1)
# Verify final output
if stats['total_processed'] < expected_count:
print(f"\nWARNING: Only processed {stats['total_processed']}/{expected_count} messages")
else:
print(f"\nSUCCESS: Processed all {expected_count} messages")
# Print final file stats
catalog = monitor.storage.read_json(monitor.catalog_path)
print(f"\nFinal Catalog Stats:")
print(f"Files: {catalog['file_count']}")
print(f"Rows: {catalog['total_rows']}")
print(f"Version: {catalog['version']}")
print("Current working directory:", os.getcwd())
with MonitoringLogger(
super_name=super_name,
organization=organization,
monitor_type=MonitorType.METRICS.value,
max_rows_per_file=500,
flush_interval=0.1
) as monitor:
def worker(query_id):
stats = {
"query_id": f"query_{query_id}",
"rows_read": random.randint(100, 10000),
"rows_processed": random.randint(100, 10000),
"query_hash": random.randint(100000, 999999)
}
monitor.log_metric(stats)
return query_id
# Start monitoring thread
def monitor_loop():
while not monitor.stop_event.is_set():
print_monitor_stats(monitor)
time.sleep(0.5)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
# Submit work
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(worker, i) for i in range(1001)]
wait(futures) # Wait for all tasks to be submitted
# Ensure queue is drained
print("\nWaiting for queue to drain...")
while monitor.queue.qsize() > 0 or len(monitor.current_batch) > 0:
time.sleep(0.1)
# Verify all messages were processed
verify_output(monitor, 1000)
# Extra safety wait
time.sleep(1)
Summary: Python script for Read data error.
from supertable.data_reader import DataReader
from examples.defaults import super_name, simple_name, organization
user_hash="0d261765d0f88e3bdede483c3d41d125"
query = f"select count(*) as cnt from {simple_name}"
dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 85)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 85)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))
Summary: Python script for Read super data ok.
from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization
query = f"select * as cnt from {super_name} where 1=1 limit 10"
dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 52)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 52)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))
Summary: Python script for Read table data ok.
from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization
query = f"select * as cnt from {simple_name} where 1=1 limit 10"
dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 52)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 52)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))
Summary: Python script for reading metadata (super and table level).
import os
from supertable.config.defaults import logger
from supertable.meta_handler import MetaReader, find_tables
from examples.defaults import super_name, user_hash, simple_name, organization
current_working_directory = os.getcwd()
# Find available tables
result = find_tables(organization=organization)
logger.info(result)
# Initialize MetaReader
meta_reader = MetaReader(super_name=super_name, organization=organization)
# Super metadata
super_meta = meta_reader.get_super_meta(user_hash)
logger.info(super_meta)
# Table schema
schema = meta_reader.get_table_schema(simple_name, user_hash)
logger.info(schema)
# Table stats
stats = meta_reader.get_table_stats(simple_name, user_hash)
logger.info(stats)
Summary: Python script for inspecting the staging area structure.
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.super_table import SuperTable
from supertable.staging_area import StagingArea
super_table = SuperTable(super_name=super_name, organization=organization)
staging_area = StagingArea(super_table=super_table, organization=organization)
# Fetch the staging area directory structure
staging_structure = staging_area.get_directory_structure()
print(staging_structure)
Summary: Python script for reading and printing query plans and scan statistics.
from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization
query = f"SELECT * FROM {simple_name} LIMIT 5"
reader = DataReader(super_name=super_name, organization=organization, query=query)
result = reader.execute(user_hash=user_hash, with_scan=True)
print(reader.plan_stats.stats)
print(reader.timer.timings)
Summary: Python script for retrieving write operation statistics.
from supertable.meta_handler import MetaReader
from examples.defaults import super_name, user_hash, organization
meta_reader = MetaReader(super_name=super_name, organization=organization)
# Fetch write statistics
write_stats = meta_reader.get_write_stats(user_hash)
print(write_stats)
Summary: Python script for fetching user metadata and details.
from supertable.rbac.user_manager import UserManager
from examples.defaults import super_name, organization
user_manager = UserManager(super_name=super_name, organization=organization)
# List users
users = user_manager.list_users()
print(users)
# Fetch a specific user's info if you know their hash
for user_hash in users:
user_info = user_manager.get_user(user_hash)
print(user_info)
Summary: Python script for deleting a specific table and its metadata.
from supertable.super_table import SuperTable
from examples.defaults import super_name, organization
super_table = SuperTable(super_name=super_name, organization=organization)
# Drop a specific table
success = super_table.drop_table("example_table_name")
print(f"Drop table successful: {success}")
Summary: Python script for deleting an entire SuperTable (all tables inside).
from supertable.super_table import SuperTable
from examples.defaults import super_name, organization
super_table = SuperTable(super_name=super_name, organization=organization)
# Drop the whole SuperTable
success = super_table.drop_super_table()
print(f"Drop super table successful: {success}")
Summary: Python script for cleaning up unused files in staging areas.
from supertable.cleaner import Cleaner
from examples.defaults import super_name, organization
cleaner = Cleaner(super_name=super_name, organization=organization)
# Clean obsolete files
removed_files = cleaner.clean_obsolete_files()
print(f"Removed files: {removed_files}")
End of Documentation.