This tutorial will teach you how to store telemetry sessions in a database for historical queries and long-term analysis.
By the end of this tutorial, you will know how to:
45 minutes
Create a new script database_integration.py:
"""
Database Integration
Persistent storage for telemetry data.
"""
from datetime import datetime
from pathlib import Path
from typing import List, Dict
import json
from src.database.repository import TelemetryRepository
from src.database.models import Base
from src.utils import setup_logger
logger = setup_logger("db_integration", "INFO")
SQLite is perfect for local development and doesn’t require any server setup:
def setup_sqlite_database(db_path: str = "data/telemetry.db") -> TelemetryRepository:
"""
Configure SQLite database.
Args:
db_path: Path to the database file
Returns:
TelemetryRepository: Configured repository instance
"""
logger.info(f"Setting up SQLite: {db_path}")
# Ensure data directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
# Create repository
repository = TelemetryRepository(
connection_string=f'sqlite:///{db_path}',
echo=False # Set to True for SQL query logging
)
# Create tables
repository.create_tables()
logger.info("✓ Database initialized")
logger.info(f"✓ Tables created at: {db_path}")
return repository
PostgreSQL is recommended for production deployments with multiple users:
def setup_postgresql_database(
host: str = "localhost",
port: int = 5432,
database: str = "lfs_telemetry",
user: str = "lfs_user",
password: str = "password"
) -> TelemetryRepository:
"""
Configure PostgreSQL database.
Args:
host: PostgreSQL server host
port: Server port (default 5432)
database: Database name
user: Username
password: User password
Returns:
TelemetryRepository: Configured repository instance
"""
logger.info(f"Connecting to PostgreSQL: {host}:{port}/{database}")
# Build connection string
conn_string = f'postgresql://{user}:{password}@{host}:{port}/{database}'
# Create repository
repository = TelemetryRepository(
connection_string=conn_string,
echo=False,
pool_size=5 # Connection pool size
)
# Create tables
repository.create_tables()
logger.info("✓ Connected to PostgreSQL")
logger.info("✓ Tables created/verified")
return repository
The database schema is already defined in src/database/models.py. Here’s an overview:
1. Sessions Table
2. Laps Table
3. TelemetryPoint Table
4. Circuits & Vehicles Tables
# Example of the database structure:
Session (1) --- (N) Laps (1) --- (N) TelemetryPoints
| |
|--- (N) Circuit Includes:
|--- (N) Vehicle - Speed, RPM
- Position data
- Controls (throttle, brake)
The repository handles all these relationships automatically.
def load_session_data(filepath: str) -> List[Dict]:
"""
Load session data from JSON file.
Args:
filepath: Path to JSON file from Tutorial 1
Returns:
List of telemetry data points
"""
logger.info(f"Loading session data: {filepath}")
with open(filepath, 'r') as f:
data = json.load(f)
logger.info(f"✓ Loaded {len(data)} telemetry points")
return data
def extract_laps(telemetry_data: List[Dict]) -> List[List[Dict]]:
"""
Separate telemetry data into individual laps.
Args:
telemetry_data: Complete telemetry data
Returns:
List of laps, where each lap is a list of data points
"""
logger.info("Extracting individual laps...")
laps = []
current_lap = []
last_lap_number = -1
for sample in telemetry_data:
lap_number = sample.get('lap', 0)
if lap_number != last_lap_number and current_lap:
# New lap detected, save previous one
laps.append(current_lap)
current_lap = []
current_lap.append(sample)
last_lap_number = lap_number
# Add last lap
if current_lap:
laps.append(current_lap)
logger.info(f"✓ Found {len(laps)} laps")
return laps
def save_session_to_database(
repository: TelemetryRepository,
telemetry_data: List[Dict],
circuit_name: str = "Blackwood GP",
vehicle_name: str = "XF GTI",
driver_name: str = "Driver"
) -> int:
"""
Save a complete session to the database.
Args:
repository: TelemetryRepository instance
telemetry_data: Telemetry data from collection
circuit_name: Name of the circuit (short name like "BL1")
vehicle_name: Name of the vehicle (short name like "XFG")
driver_name: Name of the driver
Returns:
Session ID
"""
logger.info("Saving session to database...")
# Separate into laps
laps = extract_laps(telemetry_data)
# Map common circuit/vehicle names to short codes
# In practice, you'd have a proper mapping or configuration
circuit_short = "BL1" # Blackwood GP
vehicle_short = "XFG" # XF GTI
# First, ensure circuit and vehicle exist in database
repository.get_or_create_circuit(
name=circuit_name,
short_name=circuit_short,
length=2000.0 # Circuit length in meters
)
repository.get_or_create_vehicle(
name=vehicle_name,
short_name=vehicle_short,
class_type="TBO"
)
# Create session (uses short names for foreign key lookup)
session_id = repository.save_session(
datetime_start=datetime.now(),
circuit_name=circuit_short, # Must use short name
vehicle_name=vehicle_short, # Must use short name
driver_name=driver_name,
duration=None # Will calculate later if needed
)
logger.info(f"✓ Session created: ID {session_id}")
# Process each lap
for lap_idx, lap_data in enumerate(laps):
# Calculate lap time (in milliseconds)
if lap_data:
first_time = lap_data[0].get('timestamp', 0)
last_time = lap_data[-1].get('timestamp', 0)
lap_time = int((last_time - first_time) * 1000) # Convert to ms
else:
lap_time = 0
# Save lap
lap_id = repository.save_lap(
session_id=session_id,
lap_number=lap_idx + 1,
lap_time=lap_time,
valid=True
)
# Prepare telemetry points for batch insert
telemetry_points = []
for point in lap_data:
telemetry_point = {
'timestamp': int(point.get('timestamp', 0) * 1000), # ms
'speed': float(point.get('speed', 0)),
'rpm': int(point.get('rpm', 0)),
'gear': int(point.get('gear', 0)),
'throttle': float(point.get('throttle', 0)),
'brake': float(point.get('brake', 0)),
'position_x': float(point.get('pos_x', 0)),
'position_y': float(point.get('pos_y', 0)),
'position_z': float(point.get('pos_z', 0))
}
telemetry_points.append(telemetry_point)
# Save telemetry points in batch
count = repository.save_telemetry_points(lap_id, telemetry_points)
logger.info(f" Lap {lap_idx + 1}: {count} points saved")
logger.info(f"✓ Session saved: ID {session_id}, {len(laps)} laps")
return session_id
import numpy as np
def query_best_laps(repository: TelemetryRepository, circuit_name: str = None, limit: int = 10):
"""
Query the best laps from the database across all sessions.
Args:
repository: TelemetryRepository instance
circuit_name: Filter by circuit (optional)
limit: Maximum number of results
Returns:
List of best laps
"""
logger.info("Querying best laps...")
# Get all sessions (filtered by circuit if specified)
sessions = repository.get_sessions(circuit=circuit_name, limit=100)
# Collect best lap from each session
all_best_laps = []
for session in sessions:
best_lap = repository.get_best_lap(session.id)
if best_lap and best_lap.lap_time: # Only include laps with valid times
all_best_laps.append((best_lap, session))
# Sort by lap time (ascending) and limit results
all_best_laps.sort(key=lambda x: x[0].lap_time)
best_laps = all_best_laps[:limit]
logger.info(f"\n🏆 Top {len(best_laps)} Best Laps:")
for idx, (lap, session) in enumerate(best_laps, 1):
lap_time_seconds = lap.lap_time / 1000.0 # Convert ms to seconds
circuit = session.circuit.name if session.circuit else "Unknown"
driver = session.driver_name or "Unknown"
logger.info(f"{idx}. Lap {lap.lap_number} - "
f"Time: {lap_time_seconds:.3f}s - "
f"Circuit: {circuit} - "
f"Driver: {driver}")
return [lap for lap, _ in best_laps]
def query_session_statistics(repository: TelemetryRepository, session_id: int):
"""
Get statistics for a specific session.
Args:
repository: TelemetryRepository instance
session_id: Session ID
"""
logger.info(f"Querying session {session_id} statistics...")
# Get session (relationships are eagerly loaded)
session = repository.get_session(session_id)
if not session:
logger.error(f"Session {session_id} not found")
return
circuit = session.circuit.name if session.circuit else "Unknown"
vehicle = session.vehicle.name if session.vehicle else "Unknown"
driver = session.driver_name or "Unknown"
logger.info(f"\n📊 Session #{session.id}")
logger.info(f" Circuit: {circuit}")
logger.info(f" Vehicle: {vehicle}")
logger.info(f" Driver: {driver}")
logger.info(f" Date/Time: {session.datetime}")
logger.info(f" Total Laps: {len(session.laps)}")
if session.laps:
lap_times = [lap.lap_time / 1000.0 for lap in session.laps if lap.lap_time] # Convert to seconds
if lap_times:
logger.info(f" Best Lap: {min(lap_times):.3f}s")
logger.info(f" Worst Lap: {max(lap_times):.3f}s")
logger.info(f" Average: {np.mean(lap_times):.3f}s")
logger.info(f" Std Dev: {np.std(lap_times):.3f}s")
return session
def compare_sessions(repository: TelemetryRepository, session_id1: int, session_id2: int):
"""
Compare two sessions.
Args:
repository: TelemetryRepository instance
session_id1: First session ID
session_id2: Second session ID
"""
logger.info(f"\n=== Comparing Sessions {session_id1} vs {session_id2} ===")
# Get both sessions
s1 = repository.get_session(session_id1)
s2 = repository.get_session(session_id2)
if not s1 or not s2:
logger.error("One or both sessions do not exist")
return
# Get best laps using repository method
best_lap1 = repository.get_best_lap(session_id1)
best_lap2 = repository.get_best_lap(session_id2)
if best_lap1 and best_lap2:
time1 = best_lap1.lap_time / 1000.0
time2 = best_lap2.lap_time / 1000.0
diff = time2 - time1
logger.info(f"\n⏱️ Best Laps Comparison:")
logger.info(f" Session {session_id1}: {time1:.3f}s")
logger.info(f" Session {session_id2}: {time2:.3f}s")
logger.info(f" Difference: {abs(diff):.3f}s")
if diff < 0:
logger.info(f" ✓ Session {session_id1} was faster by {abs(diff):.3f}s")
elif diff > 0:
logger.info(f" ✓ Session {session_id2} was faster by {abs(diff):.3f}s")
else:
logger.info(f" = Same lap time!")
# Compare session stats
logger.info(f"\n📊 Session Statistics:")
logger.info(f" Session {session_id1}: {len(s1.laps)} laps")
logger.info(f" Session {session_id2}: {len(s2.laps)} laps")
return s1, s2
def list_all_sessions(repository: TelemetryRepository):
"""
List all sessions in the database.
Args:
repository: TelemetryRepository instance
"""
logger.info("\n📋 All Sessions:")
# Get all sessions (limit can be increased if needed)
sessions = repository.get_sessions(limit=100)
if not sessions:
logger.info(" No sessions found in database")
return []
for session in sessions:
circuit = session.circuit.name if session.circuit else "Unknown"
driver = session.driver_name or "Unknown"
logger.info(f" ID {session.id}: {circuit} - {driver} - "
f"{session.datetime.strftime('%Y-%m-%d %H:%M')} - "
f"{session.total_laps} laps")
return sessions
The database models already include optimized indexes for common queries:
# Indexes are defined in src/database/models.py:
# Session indexes (for filtering by circuit and date)
Index('idx_sessions_circuit', Session.circuit_id)
Index('idx_sessions_datetime', Session.datetime)
# Lap indexes (for finding by session)
Index('idx_laps_session', Lap.session_id)
# Telemetry indexes (for lap queries)
Index('idx_telemetry_lap', TelemetryPoint.lap_id)
These indexes significantly improve query performance when:
For large datasets, use batch operations to improve performance:
def batch_save_sessions(repository: TelemetryRepository, session_files: List[str]):
"""
Save multiple sessions efficiently.
Args:
repository: TelemetryRepository instance
session_files: List of JSON files to import
"""
logger.info(f"Batch saving {len(session_files)} sessions...")
for idx, filepath in enumerate(session_files, 1):
logger.info(f"\n[{idx}/{len(session_files)}] Processing {filepath}")
# Load data
telemetry_data = load_session_data(filepath)
# Extract metadata from filename or data
# Format: session_YYYYMMDD_HHMMSS.json
filename = Path(filepath).stem
# Save to database
session_id = save_session_to_database(
repository,
telemetry_data,
circuit_name="Blackwood GP",
vehicle_name="XF GTI",
driver_name="Driver"
)
logger.info(f" ✓ Saved as session ID: {session_id}")
logger.info(f"\n✓ Batch save complete: {len(session_files)} sessions")
limit parameter for large result setsdef export_session_to_json(repository: TelemetryRepository, session_id: int,
output_file: str):
"""
Export a session to JSON format.
Args:
repository: TelemetryRepository instance
session_id: Session ID to export
output_file: Output file path
"""
logger.info(f"Exporting session {session_id} to {output_file}...")
# Get session
session = repository.get_session(session_id)
if not session:
logger.error(f"Session {session_id} not found")
return
# Build export data structure
data = {
'session_id': session.id,
'datetime': session.datetime.isoformat(),
'circuit': session.circuit.name if session.circuit else "Unknown",
'vehicle': session.vehicle.name if session.vehicle else "Unknown",
'driver': session.driver_name or "Unknown",
'total_laps': len(session.laps),
'laps': []
}
# Export each lap
for lap in session.laps:
lap_data = {
'lap_number': lap.lap_number,
'lap_time': lap.lap_time,
'sector1_time': lap.sector1_time,
'sector2_time': lap.sector2_time,
'sector3_time': lap.sector3_time,
'valid': lap.valid,
'telemetry_points': []
}
# Get telemetry points for this lap
telemetry_points = repository.get_telemetry_points(lap.id)
for point in telemetry_points:
telemetry_point = {
'timestamp': point.timestamp,
'speed': point.speed,
'rpm': point.rpm,
'gear': point.gear,
'throttle': point.throttle,
'brake': point.brake,
'position_x': point.position_x,
'position_y': point.position_y,
'position_z': point.position_z
}
lap_data['telemetry_points'].append(telemetry_point)
data['laps'].append(lap_data)
# Write to file
with open(output_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"✓ Session exported: {output_file}")
logger.info(f" {len(data['laps'])} laps exported")
def export_all_sessions(repository: TelemetryRepository, output_dir: str = "exports"):
"""
Export all sessions to individual JSON files.
Args:
repository: TelemetryRepository instance
output_dir: Directory for export files
"""
logger.info("Exporting all sessions...")
# Create output directory
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Get all sessions
sessions = repository.get_sessions(limit=1000)
for session in sessions:
# Generate filename
filename = f"session_{session.id}_{session.datetime.strftime('%Y%m%d_%H%M%S')}.json"
filepath = Path(output_dir) / filename
# Export
export_session_to_json(repository, session.id, str(filepath))
logger.info(f"✓ Exported {len(sessions)} sessions to {output_dir}/")
def import_session_from_json(repository: TelemetryRepository, json_file: str) -> int:
"""
Import a session from JSON format.
Args:
repository: TelemetryRepository instance
json_file: Path to JSON file
Returns:
New session ID
"""
logger.info(f"Importing session from {json_file}...")
# Load JSON data
with open(json_file, 'r') as f:
data = json.load(f)
# Create session
session_id = repository.save_session(
datetime_start=datetime.fromisoformat(data['datetime']) if 'datetime' in data else datetime.now(),
circuit_name=data.get('circuit', 'Unknown'),
vehicle_name=data.get('vehicle', 'Unknown'),
driver_name=data.get('driver', 'Unknown'),
duration=None
)
# Import laps
laps_data = data.get('laps', [])
for lap_data in laps_data:
# Save lap
lap_id = repository.save_lap(
session_id=session_id,
lap_number=lap_data['lap_number'],
lap_time=lap_data.get('lap_time'),
sector1_time=lap_data.get('sector1_time'),
sector2_time=lap_data.get('sector2_time'),
sector3_time=lap_data.get('sector3_time'),
valid=lap_data.get('valid', True)
)
# Import telemetry points
telemetry_points = lap_data.get('telemetry_points', [])
if telemetry_points:
repository.save_telemetry_points(lap_id, telemetry_points)
logger.info(f"✓ Session imported: ID {session_id}")
logger.info(f" {len(laps_data)} laps imported")
return session_id
Here’s a complete script that demonstrates all the concepts:
def main():
"""
Main function demonstrating database integration.
"""
logger.info("=== Database Integration Tutorial ===\n")
# Step 1: Setup database
logger.info("Step 1: Setting up database...")
repository = setup_sqlite_database("data/telemetry.db")
# Step 2: Load sample data (from Tutorial 1)
logger.info("\nStep 2: Loading sample data...")
# Check if we have session files
data_dir = Path("data")
session_files = list(data_dir.glob("session_*.json"))
if not session_files:
logger.warning("No session files found in data/ directory")
logger.info("Please run Tutorial 1 first to collect telemetry data")
return
# Use the first session file
sample_file = session_files[0]
telemetry_data = load_session_data(str(sample_file))
# Step 3: Save to database
logger.info("\nStep 3: Saving session to database...")
session_id = save_session_to_database(
repository,
telemetry_data,
circuit_name="Blackwood GP",
vehicle_name="XF GTI",
driver_name="YourName"
)
# Step 4: Query data
logger.info("\nStep 4: Querying historical data...")
# List all sessions
list_all_sessions(repository)
# Get best laps
query_best_laps(repository, limit=5)
# Get session statistics
query_session_statistics(repository, session_id)
# Step 5: Export data
logger.info("\nStep 5: Exporting data...")
export_session_to_json(repository, session_id, "data/session_export.json")
# Step 6: Compare sessions (if we have multiple)
if len(session_files) > 1:
logger.info("\nStep 6: Loading and comparing second session...")
# Load second session
telemetry_data2 = load_session_data(str(session_files[1]))
session_id2 = save_session_to_database(
repository,
telemetry_data2,
circuit_name="Blackwood GP",
vehicle_name="XF GTI",
driver_name="YourName"
)
# Compare
compare_sessions(repository, session_id, session_id2)
# Get final session count
all_sessions = list_all_sessions(repository)
logger.info("\n✓ Tutorial completed successfully!")
logger.info(f" Database location: data/telemetry.db")
logger.info(f" Total sessions: {len(all_sessions)}")
logger.info(f" Use a SQLite browser to explore the data!")
if __name__ == "__main__":
main()
Save the script and run it:
cd LFS-Ayats
python database_integration.py
Expected output:
=== Database Integration Tutorial ===
Step 1: Setting up database...
INFO - Setting up SQLite: data/telemetry.db
INFO - ✓ Database initialized
INFO - ✓ Tables created at: data/telemetry.db
Step 2: Loading sample data...
INFO - Loading session data: data/session_20240115_143022.json
INFO - ✓ Loaded 2543 telemetry points
Step 3: Saving session to database...
INFO - Saving session to database...
INFO - Extracting individual laps...
INFO - ✓ Found 5 laps
INFO - Lap 1: 512 points prepared
INFO - Lap 2: 498 points prepared
...
INFO - ✓ Session saved: ID 1, 5 laps
Step 4: Querying historical data...
INFO - 📋 All Sessions:
INFO - ID 1: Blackwood GP - YourName - 2024-01-15 14:30 - 5 laps
INFO - 🏆 Top 5 Best Laps:
INFO - 1. Lap 3 - Time: 93.542s - Circuit: Blackwood GP - Driver: YourName
...
✓ Tutorial completed successfully!
For production environments, PostgreSQL is recommended:
# Create database and user
sudo -u postgres psql
CREATE DATABASE lfs_telemetry;
CREATE USER lfs_user WITH PASSWORD 'secure_password_here';
GRANT ALL PRIVILEGES ON DATABASE lfs_telemetry TO lfs_user;
\q
Then update your script to use PostgreSQL:
repository = setup_postgresql_database(
host="localhost",
port=5432,
database="lfs_telemetry",
user="lfs_user",
password="secure_password_here"
)
The repository includes Alembic for database migrations:
# Initialize Alembic (already done in this repo)
alembic init alembic
# Generate a new migration (after model changes)
alembic revision --autogenerate -m "Description of changes"
# Apply migrations
alembic upgrade head
# Rollback last migration
alembic downgrade -1
SQLite Backups:
# Simple file copy
cp data/telemetry.db data/backups/telemetry_$(date +%Y%m%d_%H%M%S).db
# Compressed backup
tar -czf telemetry_backup_$(date +%Y%m%d).tar.gz data/telemetry.db
PostgreSQL Backups:
# Full database dump
pg_dump lfs_telemetry > backup_$(date +%Y%m%d).sql
# Compressed dump
pg_dump lfs_telemetry | gzip > backup_$(date +%Y%m%d).sql.gz
# Restore from backup
psql lfs_telemetry < backup_20240115.sql
For multi-user applications, configure connection pooling:
repository = TelemetryRepository(
connection_string="postgresql://user:pass@localhost/lfs_telemetry",
pool_size=10, # Number of connections in pool
echo=False # Disable SQL logging in production
)
Store credentials securely using environment variables:
import os
from dotenv import load_dotenv
load_dotenv()
repository = TelemetryRepository(
connection_string=os.getenv("DATABASE_URL"),
echo=os.getenv("DEBUG", "False") == "True"
)
Try these exercises to deepen your understanding:
Create queries to track your improvement over time:
def analyze_improvement_over_time(repository: TelemetryRepository,
circuit_name: str,
driver_name: str):
"""
Analyze lap time improvement over multiple sessions.
Track progress and identify trends.
"""
# TODO: Implement this function
# Hint: Query sessions ordered by date, extract best lap from each
pass
Implement a global ranking system:
def generate_global_rankings(repository: TelemetryRepository,
circuit_name: str = None):
"""
Generate rankings across all drivers for a circuit.
Include statistics like best lap, average lap, consistency.
"""
# TODO: Implement this function
# Hint: Use aggregation queries (GROUP BY driver)
pass
Create a function to analyze telemetry data spatially:
def generate_speed_heatmap(repository: TelemetryRepository,
lap_id: int):
"""
Generate a heatmap of speeds across the circuit.
Useful for identifying braking and acceleration zones.
"""
# TODO: Implement this function
# Hint: Load telemetry points, group by position, calculate avg speed
pass
Implement an archiving system for old sessions:
def archive_old_sessions(repository: TelemetryRepository,
days_old: int = 90):
"""
Archive sessions older than specified days.
Export to JSON and optionally delete from active database.
"""
# TODO: Implement this function
# Hint: Query sessions by date, export, then delete if needed
pass
1. Database Locked (SQLite)
sqlite3.OperationalError: database is locked
Solution: Close all connections properly, or use PostgreSQL for concurrent access.
2. Connection Pool Exhausted
sqlalchemy.exc.TimeoutError: QueuePool limit exceeded
Solution: Increase pool_size or ensure connections are closed after use.
3. Migration Conflicts
alembic.util.exc.CommandError: Target database is not up to date
Solution: Run alembic upgrade head to apply pending migrations.
4. Large Dataset Performance If queries are slow with large datasets:
Congratulations! You’ve learned how to:
src/api/)🎉 You can now store and query historical telemetry data! Continue exploring the visualization and API modules to build a complete telemetry analysis system.