Skip to content

Database Schema Reference

Database: MongoDB Atlas ODM: Beanie (async Document Object Mapper) Driver: Motor (async MongoDB driver) Last Updated: 2026-01-30

Overview

Bayit+ uses MongoDB Atlas as its primary database with Beanie ODM for Python object-document mapping. The database contains 64+ collections covering content management, user data, subscriptions, AI features, and platform operations.

Key Characteristics

  • Async Operations: All database operations use Motor's async API
  • Document Validation: Pydantic models for schema validation
  • Indexes: Strategic indexes for query performance
  • Multi-Language: Hebrew, English, Spanish support
  • Audit Trail: Comprehensive logging for critical operations

Database Connection

python
# Connection configuration
MONGODB_URI = "mongodb+srv://..."  # From Google Cloud Secret Manager
DATABASE_NAME = "bayit_plus"

# Async connection via olorin-shared
from olorin_shared.database import init_mongodb, get_mongodb_client

client = await init_mongodb(settings.MONGODB_URI)
db = get_mongodb_database(client, "bayit_plus")

Core Collections

Users & Authentication

Collection: user

Purpose: User accounts, profiles, and authentication

Model: app.models.user.User

Key Fields:

FieldTypeDescriptionRequiredIndexed
_idObjectIdUser IDYesPrimary
emailEmailStrUser email (unique)YesUnique
namestrDisplay nameYesNo
firebase_uidstrFirebase Auth UIDNoUnique
password_hashstrBcrypt password hashNoNo
is_activeboolAccount active statusYesYes
rolestrUser role (user/admin/beta)YesYes
subscription_tierstrSubscription levelNoYes
devicesList[Device]Registered devicesNoNo
preferencesdictUser preferencesNoNo
created_atdatetimeAccount creationYesYes
updated_atdatetimeLast modificationYesNo
last_logindatetimeLast login timeNoYes

Device Sub-Document:

python
class Device:
    device_id: str         # SHA-256 hash
    device_name: str       # "iPhone 15 Pro"
    device_type: str       # mobile, desktop, tv, tablet
    browser: Optional[str] # Chrome, Safari, Firefox
    os: Optional[str]      # iOS 17.2, Windows 11
    platform: Optional[str] # iOS, Android, Web, tvOS
    ip_address: Optional[str]
    last_active: datetime
    registered_at: datetime
    is_current: bool

Indexes:

python
IndexModel([("email", ASCENDING)], unique=True)
IndexModel([("firebase_uid", ASCENDING)], unique=True, sparse=True)
IndexModel([("is_active", ASCENDING)])
IndexModel([("role", ASCENDING)])
IndexModel([("created_at", DESCENDING)])

Example Document:

json
{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "email": "user@example.com",
  "name": "John Doe",
  "firebase_uid": "abc123xyz",
  "is_active": true,
  "role": "user",
  "subscription_tier": "premium",
  "devices": [
    {
      "device_id": "a3f5...",
      "device_name": "iPhone 15 Pro",
      "device_type": "mobile",
      "platform": "iOS",
      "last_active": ISODate("2026-01-30T12:00:00Z")
    }
  ],
  "created_at": ISODate("2026-01-01T00:00:00Z"),
  "updated_at": ISODate("2026-01-30T12:00:00Z")
}

Collection: profile

Purpose: User profiles (separate from auth for performance)

Model: app.models.profile.Profile

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
biostrUser biography
avatar_urlstrProfile picture URL
language_preferencestrPreferred language (he/en/es)
subtitle_preferencesdictSubtitle settings
playback_preferencesdictPlayback settings

Indexes:

python
IndexModel([("user_id", ASCENDING)], unique=True)

Content Management

Collection: content

Purpose: All video-on-demand (VOD) content

Model: app.models.content.Content

Key Fields:

FieldTypeDescriptionIndexed
_idObjectIdContent IDPrimary
titlestrHebrew titleText
title_enstrEnglish titleText
title_esstrSpanish titleText
descriptionstrHebrew descriptionText
description_enstrEnglish descriptionText
description_esstrSpanish descriptionText
thumbnailstrThumbnail URLNo
backdropstrBackdrop image URLNo
poster_urlstrTMDB poster URLNo
section_idsList[str]Content sectionsYes
primary_section_idstrMain sectionYes
content_formatstrmovie/series/documentaryYes
audience_idstrAge ratingYes
genre_idsList[str]Genre classificationsYes
topic_tagsList[str]Theme tagsYes
subcategory_idsList[str]Sub-categoriesYes
stream_urlstrHLS/DASH stream URLNo
durationstr"1:45:00"No
yearintRelease yearYes
ratingfloatTMDB ratingNo
castList[str]Cast membersNo
directorstrDirector nameNo
tmdb_idintTMDB identifierUnique
is_featuredboolFeatured contentYes
view_countintTotal viewsYes
created_atdatetimeAdded dateYes

5-Axis Taxonomy System:

  1. Section - Where content lives in navigation (can be in multiple)

    • Example: ["movies", "judaism"] - cross-listing support
  2. Format - Structural content type

    • Values: movie, series, documentary, short, clip
  3. Audience - Age appropriateness

    • Values: general, kids, family, mature
  4. Genre - Mood/style (multiple allowed)

    • Example: ["drama", "thriller"]
  5. Topic Tags - Themes (multiple allowed)

    • Example: ["jewish", "educational", "history"]

Indexes:

python
# Full-text search
IndexModel([
    ("title", TEXT),
    ("title_en", TEXT),
    ("title_es", TEXT),
    ("description", TEXT)
], weights={"title": 10, "title_en": 10, "title_es": 10, "description": 5})

# Performance indexes
IndexModel([("section_ids", ASCENDING)])
IndexModel([("primary_section_id", ASCENDING)])
IndexModel([("content_format", ASCENDING)])
IndexModel([("audience_id", ASCENDING)])
IndexModel([("genre_ids", ASCENDING)])
IndexModel([("topic_tags", ASCENDING)])
IndexModel([("is_featured", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("view_count", DESCENDING)])
IndexModel([("tmdb_id", ASCENDING)], unique=True, sparse=True)
IndexModel([("year", DESCENDING)])
IndexModel([("created_at", DESCENDING)])

Collection: podcast

Purpose: Podcast shows and metadata

Model: app.models.content.Podcast

Key Fields:

FieldTypeDescription
titlestrPodcast title
descriptionstrPodcast description
thumbnailstrPodcast artwork
rss_feed_urlstrRSS feed URL
authorstrPodcast author
languagestrPrimary language
categoriesList[str]Podcast categories
episode_countintTotal episodes
last_updateddatetimeLast RSS check

Indexes:

python
IndexModel([("rss_feed_url", ASCENDING)], unique=True)
IndexModel([("title", TEXT)])
IndexModel([("created_at", DESCENDING)])

Collection: podcastepisode

Purpose: Individual podcast episodes

Model: app.models.content.PodcastEpisode

Key Fields:

FieldTypeDescription
podcast_idstrReference to Podcast._id
titlestrEpisode title
descriptionstrEpisode description
audio_urlstrAudio file URL
durationintDuration in seconds
published_datedatetimePublication date
episode_numberintEpisode number
season_numberintSeason number
transcriptstrEpisode transcript

Indexes:

python
IndexModel([("podcast_id", ASCENDING), ("published_date", DESCENDING)])
IndexModel([("podcast_id", ASCENDING), ("episode_number", ASCENDING)])

Collection: livechannel

Purpose: Live TV channels and streams

Model: app.models.content.LiveChannel

Key Fields:

FieldTypeDescription
channel_namestrChannel name
channel_numberintChannel number
stream_urlstrHLS stream URL
logo_urlstrChannel logo
languagestrPrimary language
countrystrCountry of origin
is_activeboolChannel status
supports_dubbingboolAI dubbing available
epg_enabledboolEPG data available

Indexes:

python
IndexModel([("channel_number", ASCENDING)], unique=True)
IndexModel([("is_active", ASCENDING)])
IndexModel([("country", ASCENDING)])

Collection: radiostation

Purpose: Internet radio stations

Model: app.models.content.RadioStation

Key Fields:

FieldTypeDescription
station_namestrStation name
stream_urlstrStream URL
logo_urlstrStation logo
countrystrCountry
languagestrLanguage
genrestrMusic genre
is_activeboolStation status

Beta 500 Program (Closed Beta)

Collection: betauser

Purpose: Beta 500 program participants

Model: app.models.beta.BetaUser

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
emailstrUser email
credits_balanceintAI credits remaining
total_credits_grantedintTotal credits ever granted
session_checkpointdatetimeLast credit deduction
fraud_scorefloatFraud risk score (0.0-1.0)
fingerprint_hashstrSHA-256 device fingerprint
is_activeboolBeta access status
enrolled_atdatetimeEnrollment date
last_credit_updatedatetimeLast credit change

Indexes:

python
IndexModel([("user_id", ASCENDING)], unique=True)
IndexModel([("email", ASCENDING)])
IndexModel([("is_active", ASCENDING)])
IndexModel([("fingerprint_hash", ASCENDING)])

Collection: betacredittransaction

Purpose: Beta credit transaction history

Model: app.models.beta.BetaCreditTransaction

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
transaction_typestrgrant/deduct/refund
amountintCredits changed
balance_afterintBalance after transaction
reasonstrTransaction reason
feature_usedstrai_search/recommendations/catchup
metadatadictAdditional context
created_atdatetimeTransaction time

Indexes:

python
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("feature_used", ASCENDING)])
IndexModel([("transaction_type", ASCENDING)])

Channel Live Chat

Collection: channelchatmessage

Purpose: Live chat messages for TV channels

Model: app.models.channel_chat.ChannelChatMessage

Key Fields:

FieldTypeDescription
channel_idstrChannel identifier
user_idstrReference to User._id
usernamestrDisplay name
messagestrSanitized message text
is_pinnedboolPinned by admin
is_deletedboolSoft delete flag
session_tokenstrWebSocket session
created_atdatetimeMessage timestamp

Indexes:

python
IndexModel([("channel_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("user_id", ASCENDING)])
IndexModel([("is_pinned", ASCENDING), ("created_at", DESCENDING)])

AI Features

Collection: contentembedding

Purpose: Vector embeddings for AI search

Model: app.models.content_embedding.ContentEmbedding

Key Fields:

FieldTypeDescription
content_idstrReference to Content._id
embedding_vectorList[float]1536-dim vector (OpenAI)
embedding_modelstrModel used
text_contentstrText used for embedding
metadatadictAdditional context
created_atdatetimeEmbedding creation

Indexes:

python
IndexModel([("content_id", ASCENDING)], unique=True)
IndexModel([("created_at", DESCENDING)])
# Note: Vector search uses Atlas Search index

Collection: recapsession

Purpose: AI catch-up summary sessions

Model: app.models.content_embedding.RecapSession

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
channel_idstrChannel identifier
summarystrAI-generated summary
start_timedatetimeRecap start
end_timedatetimeRecap end
credits_usedintAI credits deducted
transcript_lengthintInput text length
created_atdatetimeSession creation
expires_atdatetimeCache expiration (3 min TTL)

Indexes:

python
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("channel_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("expires_at", ASCENDING)], expireAfterSeconds=0) # TTL index

Subscriptions & Payments

Collection: subscription

Purpose: User subscription records

Model: app.models.subscription.Subscription

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
plan_idstrSubscription plan
statusstractive/canceled/expired
start_datedatetimeSubscription start
end_datedatetimeSubscription end
auto_renewboolAuto-renewal enabled
payment_methodstrPayment type
amountfloatSubscription price
currencystrCurrency code

Indexes:

python
IndexModel([("user_id", ASCENDING)])
IndexModel([("status", ASCENDING)])
IndexModel([("end_date", ASCENDING)])

Collection: transaction

Purpose: Payment transaction history

Model: app.models.admin.Transaction

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
amountfloatTransaction amount
currencystrCurrency code
transaction_typestrpayment/refund
statusstrsuccess/failed/pending
payment_providerstrstripe/paypal
provider_transaction_idstrExternal transaction ID
created_atdatetimeTransaction time

Indexes:

python
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("provider_transaction_id", ASCENDING)], unique=True, sparse=True)
IndexModel([("status", ASCENDING)])

User Activity & Analytics

Collection: watchhistory

Purpose: User viewing history

Model: app.models.watchlist.WatchHistory

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
content_idstrReference to Content._id
watched_atdatetimeWatch timestamp
duration_watchedintSeconds watched
progressfloat0.0-1.0 completion
device_typestrDevice used
platformstrPlatform used

Indexes:

python
IndexModel([("user_id", ASCENDING), ("watched_at", DESCENDING)])
IndexModel([("content_id", ASCENDING)])

Collection: searchquery

Purpose: Search analytics

Model: app.models.search_analytics.SearchQuery

Key Fields:

FieldTypeDescription
user_idstrReference to User._id
query_textstrSearch query
search_typestrtraditional/ai
results_countintResults returned
clicked_resultstrContent clicked
response_time_msintQuery latency
created_atdatetimeSearch timestamp

Indexes:

python
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("search_type", ASCENDING)])
IndexModel([("created_at", DESCENDING)])

Security & Audit

Collection: auditlog

Purpose: System audit trail

Model: app.models.admin.AuditLog

Key Fields:

FieldTypeDescription
user_idstrUser who performed action
action_typestrAction performed
resource_typestrResource affected
resource_idstrResource identifier
changesdictBefore/after values
ip_addressstrRequest IP
user_agentstrRequest user agent
created_atdatetimeAction timestamp

Indexes:

python
IndexModel([("user_id", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("action_type", ASCENDING)])
IndexModel([("resource_type", ASCENDING), ("resource_id", ASCENDING)])
IndexModel([("created_at", DESCENDING)])

Collection: securityauditlog

Purpose: Security event logging

Model: app.models.security_audit.SecurityAuditLog

Key Fields:

FieldTypeDescription
event_typestrSecurity event type
severitystrlow/medium/high/critical
user_idstrUser involved
ip_addressstrSource IP
detailsdictEvent details
created_atdatetimeEvent timestamp

Indexes:

python
IndexModel([("event_type", ASCENDING), ("created_at", DESCENDING)])
IndexModel([("severity", ASCENDING)])
IndexModel([("user_id", ASCENDING)])
IndexModel([("created_at", DESCENDING)])

Database Operations

Connecting to MongoDB

python
from motor.motor_asyncio import AsyncIOMotorClient
from beanie import init_beanie
from app.models.user import User
from app.models.content import Content

async def connect_to_mongo():
    client = AsyncIOMotorClient(settings.MONGODB_URI)
    await init_beanie(
        database=client.bayit_plus,
        document_models=[User, Content, ...]
    )

Query Examples

Find user by email:

python
user = await User.find_one(User.email == "user@example.com")

Find active content in section:

python
content = await Content.find(
    Content.section_ids.in_(["movies"]),
    Content.is_featured == True
).sort(-Content.created_at).limit(10).to_list()

Aggregate view count by section:

python
pipeline = [
    {"$match": {"is_featured": True}},
    {"$group": {
        "_id": "$primary_section_id",
        "total_views": {"$sum": "$view_count"},
        "content_count": {"$sum": 1}
    }}
]
results = await Content.aggregate(pipeline).to_list()

Index Management

Create indexes:

python
# Done automatically by Beanie during init_beanie()
# Manual creation:
await User.find_one().get_motor_collection().create_index([("email", 1)], unique=True)

Check existing indexes:

python
collection = User.find_one().get_motor_collection()
indexes = await collection.index_information()
print(indexes)

Drop index:

python
await collection.drop_index("email_1")

Data Migrations

Migration Strategy

Migration Tracking Collection: migrationrecord

Model: app.models.migration.MigrationRecord

python
class MigrationRecord(Document):
    migration_name: str
    applied_at: datetime
    status: str  # success/failed/rolled_back
    details: dict

Running Migrations

python
# Example migration script
from app.models.content import Content

async def migrate_content_taxonomy():
    """Migrate old category_id to new section_ids"""
    contents = await Content.find_all().to_list()

    for content in contents:
        if content.category_id and not content.section_ids:
            content.section_ids = [content.category_id]
            content.primary_section_id = content.category_id
            await content.save()

    # Record migration
    migration = MigrationRecord(
        migration_name="content_taxonomy_migration_v1",
        applied_at=datetime.now(timezone.utc),
        status="success",
        details={"migrated_count": len(contents)}
    )
    await migration.insert()

Backup & Recovery

Backup Strategy

Automated Backups:

  • MongoDB Atlas automatic backups (daily)
  • Point-in-time recovery available (last 7 days)
  • Backup retention: 30 days

Manual Backup:

bash
mongodump --uri="mongodb+srv://..." --db=bayit_plus --out=/backup/$(date +%Y-%m-%d)

Restore from Backup

bash
mongorestore --uri="mongodb+srv://..." --db=bayit_plus /backup/2026-01-30/bayit_plus

Disaster Recovery

  1. Verify backup availability in Atlas console
  2. Create restore point from desired backup
  3. Test restore in staging environment first
  4. Execute production restore with downtime announcement
  5. Verify data integrity post-restore

Performance Optimization

Query Performance

Best Practices:

  1. Use indexes - Always index fields used in queries
  2. Limit results - Use .limit() and pagination
  3. Project only needed fields - Use .project() to reduce data transfer
  4. Batch operations - Use bulk_write() for multiple updates
  5. Connection pooling - Motor handles this automatically

Monitoring:

python
# Enable slow query logging
await db.command("profile", 2, slowms=100)  # Log queries > 100ms

# Get slow queries
slow_queries = await db.system.profile.find({"millis": {"$gt": 100}}).to_list()

Index Optimization

Check index usage:

python
explain = await Content.find(Content.title == "Example").explain()
print(explain["executionStats"])

Remove unused indexes:

bash
# Check index usage in Atlas UI
# Drop indexes with 0 operations

Security

Access Control

Database User Roles:

  • Application User: Read/write access to bayit_plus database
  • Admin User: Full access + user management
  • Backup User: Read-only access for backups

Connection Security:

  • SSL/TLS encryption required
  • IP whitelist in Atlas
  • Database credentials in Google Cloud Secret Manager

Data Encryption

At Rest:

  • MongoDB Atlas encryption at rest (enabled)
  • Customer-managed keys (optional, not currently used)

In Transit:

  • SSL/TLS for all connections
  • Certificate verification enabled

Sensitive Data Handling

Encrypted Fields:

  • password_hash - Bcrypt hashing
  • Payment info - Tokenized via Stripe (not stored)
  • Personal data - PII flagged for GDPR compliance

Data Retention:

  • Deleted users: Anonymize after 30 days
  • Audit logs: Retain 1 year
  • Backups: 30 days retention

Monitoring & Alerts

MongoDB Atlas Monitoring

Key Metrics:

  • Query execution time (p95, p99)
  • Connection count
  • Disk usage
  • Index efficiency
  • Replication lag

Alerts Configured:

  • Slow queries > 500ms
  • Connection pool exhaustion
  • Disk usage > 80%
  • Replication lag > 10s

Application Monitoring

python
from app.core.metrics import mongodb_query_duration

# Track query performance
with mongodb_query_duration.time():
    results = await Content.find_all().to_list()

Troubleshooting

Common Issues

Issue: Connection Timeout

Solution:

python
# Increase timeout
client = AsyncIOMotorClient(
    settings.MONGODB_URI,
    serverSelectionTimeoutMS=5000,  # 5 seconds
    connectTimeoutMS=10000  # 10 seconds
)

Issue: Slow Queries

Solution:

  1. Check query uses indexes: .explain()
  2. Add missing indexes
  3. Use projection to reduce data transfer
  4. Implement caching for frequent queries

Issue: Duplicate Key Error

Solution:

python
try:
    await user.insert()
except DuplicateKeyError:
    # Handle duplicate email
    raise HTTPException(status_code=400, detail="Email already exists")


Document Status: ✅ Complete Last Updated: 2026-01-30 Maintained by: Backend Team Next Review: 2026-04-30

Released under the MIT License.