SuperTable Documentation

Version 1.0.0
Quick Start

Get started with SuperTable in just a few simple steps:


pip install supertable

export SUPERTABLE_HOME="$HOME/supertable"
            

Table of Contents

Navigation
Sections
Code copied to clipboard!

Documentation

Create Operations

create_super_table.py

Summary: Python script for Create super table.

from supertable.super_table import SuperTable
from examples.defaults import super_name, organization
from supertable.config.defaults import logger

st = SuperTable(super_name, organization)
logger.info(f"Created SuperTable: {st.super_name}")

create_roles.py

Summary: Python script for Create roles.

from supertable.rbac.role_manager import RoleManager
from examples.defaults import super_name, organization
from supertable.config.defaults import logger

# ---------- ROLE OPERATIONS ----------
# Initialize the RoleManager with a base directory.

role_manager = RoleManager(super_name=super_name, organization=organization)

# --- Create roles ---
admin_data = {
    "role": "admin",
    "tables": ["*"]
}
admin_hash = role_manager.create_role(admin_data)
logger.info(f"Admin role created with hash: {admin_hash}")

editor_data = {
    "role": "writer",
    "tables": ["table1", "table2"]
}
editor_hash = role_manager.create_role(editor_data)
logger.info(f"Editor role created with hash: {admin_hash}")

usage_data = {
    "role": "meta",
    "tables": ["table1"]
}
usage_hash = role_manager.create_role(usage_data)
logger.info(f"Usage role created with hash: {usage_hash}")

viewer_data = {
    "role": "reader",
    "tables": ["table1"],
    "columns": ["name", "email", "age"],
    "filters": {"country": "US", "active": True}
}
viewer_hash = role_manager.create_role(viewer_data)
logger.info(f"Viewer role created with hash: {viewer_hash}")

viewer2_data = {
    "role": "reader",
    "tables": ["table2", "table3"],
    "columns": ["name", "email", "age"],
    "filters": {"country": "EU", "active": False}
}
viewer2_hash = role_manager.create_role(viewer2_data)
logger.info(f"Viewer2 role created with hash: {viewer2_hash}")


# --- List all roles ---
all_roles = role_manager.list_roles()
logger.info(f"Listing all roles: {all_roles}")
for role in all_roles:
    logger.info(role)

# --- Retrieve a specific role's configuration ---
admin_config = role_manager.get_role(admin_hash)
logger.info(f"Retrieving Admin role configuration: {admin_config}")
logger.info(f"Admin config: {admin_config}")

create_users.py

Summary: Python script for Create users.

import os
from supertable.config.defaults import logger
from supertable.rbac.role_manager import RoleManager
from supertable.rbac.user_manager import UserManager

from examples.defaults import super_name, organization

# ---------- ROLE OPERATIONS ----------
# Initialize the RoleManager with a base directory.
role_manager = RoleManager(super_name=super_name, organization=organization)

# List valid roles from _roles.json.
valid_roles = role_manager.list_roles()
for role in valid_roles:
    logger.info(f"Role Type: {role['role']}, Hash: {role['hash']}")


# Extract valid role hashes by role type.
admin_role_hash = None
editor_role_hash = None
viewer_role_hash = None
usage_role_hash = None

for role in valid_roles:
    role_type = role["role"].lower()
    if role_type == "admin" and admin_role_hash is None:
        admin_role_hash = role["hash"]
    elif role_type == "writer" and editor_role_hash is None:
        editor_role_hash = role["hash"]
    elif role_type == "reader" and viewer_role_hash is None:
        viewer_role_hash = role["hash"]
    elif role_type == "meta" and usage_role_hash is None:
        usage_role_hash = role["hash"]

# If no viewer role exists, create one.
if viewer_role_hash is None:
    viewer_data = {
        "role": "reader",
        "tables": [],      # Empty list interpreted as all tables
        "columns": [],     # Empty list interpreted as all columns
        "filters": {}      # No filters = all rows
    }
    viewer_role_hash = role_manager.create_role(viewer_data)
    logger.info(f"Default Viewer role created with hash: {viewer_role_hash}")

# ---------- USER OPERATIONS ----------
# Initialize the UserManager with the same base directory.
user_manager = UserManager(super_name=super_name, organization=organization)

# --- Create users ---
# User Alice will have the admin and editor roles.
alice_data = {
    "username": "alice",
    "roles": [admin_role_hash, editor_role_hash]  # valid role hashes
}
alice_hash = user_manager.create_user(alice_data)
logger.info(f"User Alice created with hash: {alice_hash}")

# User Bob will have the viewer role.
bob_data = {
    "username": "bob",
    "roles": [viewer_role_hash]
}
bob_hash = user_manager.create_user(bob_data)
logger.info(f"User Bob created with hash: {bob_hash}")

# User Charlie is created with no roles.
charlie_data = {
    "username": "charlie",
    "roles": []
}
charlie_hash = user_manager.create_user(charlie_data)
logger.info(f"User Charlie created with hash: {charlie_hash}")

# --- Modify user ---
# Update Charlie: change his username and assign him the usage role.
user_manager.modify_user(charlie_hash, {"username": "charlie_updated", "roles": [usage_role_hash]})
charlie_data_updated = user_manager.get_user(charlie_hash)
logger.info(f"User Charlie after modification: {charlie_data_updated}")

# --- Delete a user ---
# Delete Bob.
user_manager.delete_user(bob_hash)
logger.info(f"User Bob deleted: {bob_hash}")


# --- Delete a role ---
# For example, delete the viewer role.
if viewer_role_hash:
    deleted = role_manager.delete_role(viewer_role_hash)
    if deleted:
        logger.info(f"Viewer role deleted: {viewer_role_hash}")
        # Remove the deleted role from all users.
        user_manager.remove_role_from_users(viewer_role_hash)
        logger.info(f"Viewer role removed from all users: {viewer_role_hash}")
    else:
        logger.error(f"Viewer role deletion failed: {viewer_role_hash}")

# --- List all users ---
logger.info(f"Listing all users:")
user_meta = user_manager.storage.read_json(user_manager.user_meta_path)
for user_hash, username in user_meta["users"].items():
    user_file_path = os.path.join(user_manager.user_dir, user_hash + ".json")
    user_data = user_manager.storage.read_json(user_file_path)
    print()
    logger.info(user_data)

Write Operations

write_dummy_data.py

Summary: Python script for Write dummy data.

import polars as pl
import glob, os

from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.data_writer import DataWriter
from supertable.simple_table import SimpleTable

overwrite_columns = ["day", "client"]
dw = DataWriter(super_name=super_name, organization=organization)
st = SimpleTable(dw.super_table, simple_name)
data_dir = st.data_dir

for ds in [1, 2, 6, 7, 3, 4, 5]:
    table_name, arrow = get_dummy_data(ds)
    print(f"\n=== Running write(ds={ds}) === Table: {simple_name} ===")
    _, _, ins, del_ = dw.write(
        user_hash=user_hash,
        simple_name=simple_name,
        data=arrow,
        overwrite_columns=overwrite_columns,
    )
    print(f"inserted={ins}, deleted={del_}")

write_single_data.py

Summary: Python script for Write single data.

from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.data_writer import DataWriter

overwrite_columns = ["day"]
data = get_dummy_data(1)[1]

data_writer = DataWriter(super_name=super_name, organization=organization)

columns, rows, inserted, deleted = data_writer.write(
    user_hash=user_hash,
    simple_name=simple_name,
    data=data,
    overwrite_columns=overwrite_columns,
)

write_staging.py

Summary: Python script for Write staging.

from examples.dummy_data import get_dummy_data
from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.super_table import SuperTable
from supertable.staging_area import StagingArea

super_table = SuperTable(super_name=super_name, organization=organization)

staging_area = StagingArea(super_table=super_table, organization=organization)


staging_area.save_as_parquet(arrow_table=get_dummy_data(1)[1],
                             table_name = simple_name,
                             file_name = "dummy_file_01.parquet" )

write_monitoring_simple.py

Summary: Python script for Write monitoring simple.

import time
import random

from supertable.monitoring_logger import MonitoringLogger
from examples.defaults import super_name, organization, MonitorType

# Use the MonitoringLogger in a context manager to ensure proper setup and teardown
with MonitoringLogger(
        super_name=super_name,
        organization=organization,
        monitor_type=MonitorType.METRICS.value,
        max_rows_per_file=500,
        flush_interval=0.1
) as monitor:

    # Generate a unique ID for this query run
    query_id = random.randint(100000, 999999)

    # Start a high-resolution timer
    start_time = time.perf_counter()

    # --- Place your actual work here ---
    # For demo purposes, we're just creating some random metrics
    stats = {
        "query_id": f"query_{query_id}",
        "rows_read": random.randint(100, 10000),
        "rows_processed": random.randint(100, 10000),
        "query_hash": random.randint(100000, 999999)
    }
    monitor.log_metric(stats)
    # --- Work complete ---

    # Stop the timer
    end_time = time.perf_counter()

    # Calculate and print the elapsed time in seconds
    elapsed = end_time - start_time
    print(f"Total execution time: {elapsed:.4f} seconds")

write_monitoring_parallel.py

Summary: Python script for Write monitoring parallel.

import threading
import time
import random
import os

from concurrent.futures import ThreadPoolExecutor, wait
from supertable.monitoring_logger import MonitoringLogger
from examples.defaults import super_name, organization, MonitorType

def print_monitor_stats(monitor):
    stats = monitor.get_queue_stats()
    print(f"\nProcessed: {stats['total_processed']}/{stats['total_received']} | "
          f"Queue: {stats['current_size']} | "
          f"Rate: {stats['processing_rate']:.1f} msg/s")


def verify_output(monitor, expected_count):
    # Wait for processing to complete
    for _ in range(20):  # 20 second timeout
        stats = monitor.get_queue_stats()
        if stats['total_processed'] >= expected_count:
            break
        time.sleep(1)

    # Verify final output
    if stats['total_processed'] < expected_count:
        print(f"\nWARNING: Only processed {stats['total_processed']}/{expected_count} messages")
    else:
        print(f"\nSUCCESS: Processed all {expected_count} messages")

    # Print final file stats
    catalog = monitor.storage.read_json(monitor.catalog_path)
    print(f"\nFinal Catalog Stats:")
    print(f"Files: {catalog['file_count']}")
    print(f"Rows: {catalog['total_rows']}")
    print(f"Version: {catalog['version']}")


print("Current working directory:", os.getcwd())

with MonitoringLogger(
        super_name=super_name,
        organization=organization,
        monitor_type=MonitorType.METRICS.value,
        max_rows_per_file=500,
        flush_interval=0.1
) as monitor:
    def worker(query_id):
        stats = {
            "query_id": f"query_{query_id}",
            "rows_read": random.randint(100, 10000),
            "rows_processed": random.randint(100, 10000),
            "query_hash": random.randint(100000, 999999)
        }
        monitor.log_metric(stats)
        return query_id


    # Start monitoring thread
    def monitor_loop():
        while not monitor.stop_event.is_set():
            print_monitor_stats(monitor)
            time.sleep(0.5)


    monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
    monitor_thread.start()

    # Submit work
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(worker, i) for i in range(1001)]
        wait(futures)  # Wait for all tasks to be submitted

    # Ensure queue is drained
    print("\nWaiting for queue to drain...")
    while monitor.queue.qsize() > 0 or len(monitor.current_batch) > 0:
        time.sleep(0.1)

    # Verify all messages were processed
    verify_output(monitor, 1000)

    # Extra safety wait
    time.sleep(1)

Read Operations

read_data_error.py

Summary: Python script for Read data error.

from supertable.data_reader import DataReader
from examples.defaults import super_name, simple_name, organization

user_hash="0d261765d0f88e3bdede483c3d41d125"
query = f"select count(*) as cnt from {simple_name}"

dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 85)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 85)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))

read_super_data_ok.py

Summary: Python script for Read super data ok.

from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization

query = f"select * as cnt from {super_name} where 1=1 limit 10"

dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 52)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 52)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))

read_table_data_ok.py

Summary: Python script for Read table data ok.

from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization

query = f"select * as cnt from {simple_name} where 1=1 limit 10"

dr = DataReader(super_name=super_name, organization=organization, query=query)
result = dr.execute(user_hash=user_hash, with_scan=False)
print("-" * 52)
print("Rows: ", result[0].shape[0], ", Columns: ", result[0].shape[1], ", " , result[1], ", Message: ", result[2])
print("-" * 52)
print(dr.timer.timings)
print("-" * len(str(dr.timer.timings)))
print(dr.plan_stats.stats)
print("-" * len(str(dr.plan_stats.stats)))

read_meta.py

Summary: Python script for reading metadata (super and table level).

import os
from supertable.config.defaults import logger
from supertable.meta_handler import MetaReader, find_tables
from examples.defaults import super_name, user_hash, simple_name, organization

current_working_directory = os.getcwd()

# Find available tables
result = find_tables(organization=organization)
logger.info(result)

# Initialize MetaReader
meta_reader = MetaReader(super_name=super_name, organization=organization)

# Super metadata
super_meta = meta_reader.get_super_meta(user_hash)
logger.info(super_meta)

# Table schema
schema = meta_reader.get_table_schema(simple_name, user_hash)
logger.info(schema)

# Table stats
stats = meta_reader.get_table_stats(simple_name, user_hash)
logger.info(stats)

read_staging.py

Summary: Python script for inspecting the staging area structure.

from examples.defaults import super_name, user_hash, simple_name, organization
from supertable.super_table import SuperTable
from supertable.staging_area import StagingArea

super_table = SuperTable(super_name=super_name, organization=organization)
staging_area = StagingArea(super_table=super_table, organization=organization)

# Fetch the staging area directory structure
staging_structure = staging_area.get_directory_structure()
print(staging_structure)

read_query_plans.py

Summary: Python script for reading and printing query plans and scan statistics.

from supertable.data_reader import DataReader
from examples.defaults import super_name, user_hash, simple_name, organization

query = f"SELECT * FROM {simple_name} LIMIT 5"

reader = DataReader(super_name=super_name, organization=organization, query=query)
result = reader.execute(user_hash=user_hash, with_scan=True)

print(reader.plan_stats.stats)
print(reader.timer.timings)

read_write_stats.py

Summary: Python script for retrieving write operation statistics.

from supertable.meta_handler import MetaReader
from examples.defaults import super_name, user_hash, organization

meta_reader = MetaReader(super_name=super_name, organization=organization)

# Fetch write statistics
write_stats = meta_reader.get_write_stats(user_hash)
print(write_stats)

read_user.py

Summary: Python script for fetching user metadata and details.

from supertable.rbac.user_manager import UserManager
from examples.defaults import super_name, organization

user_manager = UserManager(super_name=super_name, organization=organization)

# List users
users = user_manager.list_users()
print(users)

# Fetch a specific user's info if you know their hash
for user_hash in users:
    user_info = user_manager.get_user(user_hash)
    print(user_info)

Drop Operations

delete_table.py

Summary: Python script for deleting a specific table and its metadata.

from supertable.super_table import SuperTable
from examples.defaults import super_name, organization

super_table = SuperTable(super_name=super_name, organization=organization)

# Drop a specific table
success = super_table.drop_table("example_table_name")
print(f"Drop table successful: {success}")

delete_super_table.py

Summary: Python script for deleting an entire SuperTable (all tables inside).

from supertable.super_table import SuperTable
from examples.defaults import super_name, organization

super_table = SuperTable(super_name=super_name, organization=organization)

# Drop the whole SuperTable
success = super_table.drop_super_table()
print(f"Drop super table successful: {success}")

clean_obsolete_files.py

Summary: Python script for cleaning up unused files in staging areas.

from supertable.cleaner import Cleaner
from examples.defaults import super_name, organization

cleaner = Cleaner(super_name=super_name, organization=organization)

# Clean obsolete files
removed_files = cleaner.clean_obsolete_files()
print(f"Removed files: {removed_files}")

End of Documentation.