Implement MongoDB 8.0 change streams for real-time data processing and application synchronization

Intermediate 45 min Apr 16, 2026 12 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up MongoDB 8.0 change streams to monitor database changes in real-time. Configure replica sets, implement Python and Node.js clients, and add filtering and resumability for production applications.

Prerequisites

  • MongoDB 8.0
  • Python 3.8+
  • Node.js 16+
  • At least 4GB RAM
  • Root or sudo access

What this solves

MongoDB change streams provide real-time notifications when data changes in your database, enabling applications to react immediately to inserts, updates, and deletes. This tutorial shows you how to configure MongoDB 8.0 with replica sets and implement change stream listeners in Python and Node.js for real-time data synchronization, event processing, and application updates.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions.

sudo apt update && sudo apt upgrade -y
sudo apt install -y wget curl gnupg lsb-release
sudo dnf update -y
sudo dnf install -y wget curl gnupg2

Install MongoDB 8.0 repository

Add the official MongoDB repository to install version 8.0.

curl -fsSL https://www.mongodb.org/static/pgp/server-8.0.asc | sudo gpg -o /usr/share/keyrings/mongodb-server-8.0.gpg --dearmor
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-8.0.gpg ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/8.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-8.0.list
sudo apt update
sudo tee /etc/yum.repos.d/mongodb-org-8.0.repo << EOF
[mongodb-org-8.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/9/mongodb-org/8.0/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-8.0.asc
EOF

Install MongoDB 8.0

Install the MongoDB server and client tools.

sudo apt install -y mongodb-org
sudo dnf install -y mongodb-org

Configure MongoDB for replica set

Change streams require a replica set configuration. Edit the MongoDB configuration file to enable replica sets.

storage:
  dbPath: /var/lib/mongo
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log

net:
  port: 27017
  bindIp: 127.0.0.1

replication:
  replSetName: "rs0"

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod.pid
  timeZoneInfo: /usr/share/zoneinfo

Start MongoDB service

Enable and start the MongoDB service with the new configuration.

sudo systemctl enable mongod
sudo systemctl start mongod
sudo systemctl status mongod

Initialize replica set

Connect to MongoDB and initialize the replica set configuration.

mongosh --eval "rs.initiate({
  _id: 'rs0',
  members: [
    { _id: 0, host: 'localhost:27017' }
  ]
})"

Create test database and collection

Set up a sample database with test data to monitor with change streams.

mongosh --eval "
use testdb
db.events.insertMany([
  { name: 'user_login', userId: 1001, timestamp: new Date() },
  { name: 'order_created', orderId: 2001, amount: 99.99, timestamp: new Date() },
  { name: 'payment_processed', orderId: 2001, status: 'completed', timestamp: new Date() }
])
"

Implement Python change stream client

Install Python MongoDB driver

Install PyMongo to connect to MongoDB from Python applications.

pip3 install pymongo

Create basic change stream monitor

Create a Python script to monitor all changes in the database.

#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
from datetime import datetime
import json

def main():
    # Connect to MongoDB
    client = MongoClient('mongodb://localhost:27017/')
    db = client.testdb
    collection = db.events
    
    print(f"Starting change stream monitor at {datetime.now()}")
    print("Watching for changes in testdb.events collection...")
    
    try:
        # Open change stream on the collection
        with collection.watch() as stream:
            for change in stream:
                print(f"\n--- Change detected at {datetime.now()} ---")
                print(f"Operation: {change['operationType']}")
                
                if 'fullDocument' in change:
                    print(f"Document: {json.dumps(change['fullDocument'], default=str, indent=2)}")
                
                if 'documentKey' in change:
                    print(f"Document ID: {change['documentKey']['_id']}")
                
                # Resume token for fault tolerance
                print(f"Resume Token: {change['_id']}")
                
    except KeyboardInterrupt:
        print("\nMonitoring stopped by user")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        client.close()

if __name__ == "__main__":
    main()

Create filtered change stream monitor

Implement a more advanced monitor with filtering and specific operation handling.

#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
from datetime import datetime
import json
import logging

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/mongodb-changestream.log'), logging.StreamHandler() ] ) class ChangeStreamProcessor: def __init__(self, connection_string='mongodb://localhost:27017/'): self.client = MongoClient(connection_string) self.db = self.client.testdb self.collection = self.db.events self.resume_token = None def process_insert(self, change): """Handle insert operations""" doc = change['fullDocument'] logging.info(f"New document inserted: {doc['name']} - ID: {doc['_id']}") # Custom logic for different event types if doc['name'] == 'user_login': self.handle_user_login(doc) elif doc['name'] == 'order_created': self.handle_order_created(doc) def process_update(self, change): """Handle update operations""" doc_id = change['documentKey']['_id'] updated_fields = change.get('updateDescription', {}).get('updatedFields', {}) logging.info(f"Document updated - ID: {doc_id}, Fields: {updated_fields}") def process_delete(self, change): """Handle delete operations""" doc_id = change['documentKey']['_id'] logging.info(f"Document deleted - ID: {doc_id}") def handle_user_login(self, doc): """Custom handler for user login events""" user_id = doc.get('userId') logging.info(f"Processing user login for user {user_id}") # Add your business logic here def handle_order_created(self, doc): """Custom handler for order creation events""" order_id = doc.get('orderId') amount = doc.get('amount') logging.info(f"Processing new order {order_id} with amount ${amount}") # Add your business logic here def start_monitoring(self): """Start the change stream with filtering and resumability""" # Filter to only watch insert and update operations pipeline = [ { '$match': { 'operationType': {'$in': ['insert', 'update', 'delete']}, 'fullDocument.name': {'$exists': True} } } ] logging.info("Starting filtered change stream monitor") try: # Resume from last known position if available resume_options = {} if self.resume_token: resume_options['resume_after'] = self.resume_token with self.collection.watch(pipeline, **resume_options) as stream: for change in stream: # Store resume token for fault tolerance self.resume_token = change['_id'] operation = change['operationType'] if operation == 'insert': self.process_insert(change) elif operation == 'update': self.process_update(change) elif operation == 'delete': self.process_delete(change) except Exception as e: logging.error(f"Error in change stream: {e}") # In production, you might want to implement retry logic here finally: self.client.close() def main(): processor = ChangeStreamProcessor() try: processor.start_monitoring() except KeyboardInterrupt: logging.info("Monitoring stopped by user") if __name__ == "__main__": main()

Implement Node.js change stream client

Install Node.js MongoDB driver

Install the official MongoDB driver for Node.js applications.

mkdir /opt/nodejs-monitor
cd /opt/nodejs-monitor
npm init -y
npm install mongodb

Create Node.js change stream monitor

Implement a Node.js-based change stream processor with async/await patterns.

const { MongoClient } = require('mongodb');
const fs = require('fs').promises;

class MongoChangeStreamMonitor {
    constructor(connectionString = 'mongodb://localhost:27017/') {
        this.connectionString = connectionString;
        this.client = null;
        this.db = null;
        this.collection = null;
        this.resumeToken = null;
    }

    async connect() {
        try {
            this.client = new MongoClient(this.connectionString);
            await this.client.connect();
            this.db = this.client.db('testdb');
            this.collection = this.db.collection('events');
            console.log('Connected to MongoDB');
        } catch (error) {
            console.error('Connection error:', error);
            throw error;
        }
    }

    async processChange(change) {
        const timestamp = new Date().toISOString();
        console.log(\n--- Change detected at ${timestamp} ---);
        console.log(Operation: ${change.operationType});
        
        // Store resume token for fault tolerance
        this.resumeToken = change._id;
        
        switch (change.operationType) {
            case 'insert':
                await this.handleInsert(change);
                break;
            case 'update':
                await this.handleUpdate(change);
                break;
            case 'delete':
                await this.handleDelete(change);
                break;
            case 'replace':
                await this.handleReplace(change);
                break;
        }
        
        // Log to file for audit trail
        await this.logChange(change);
    }

    async handleInsert(change) {
        const doc = change.fullDocument;
        console.log(New document: ${JSON.stringify(doc, null, 2)});
        
        // Business logic based on document type
        if (doc.name === 'user_login') {
            await this.processUserLogin(doc);
        } else if (doc.name === 'order_created') {
            await this.processOrderCreated(doc);
        }
    }

    async handleUpdate(change) {
        const docId = change.documentKey._id;
        const updatedFields = change.updateDescription?.updatedFields || {};
        console.log(Updated document ${docId}:, updatedFields);
        
        // If full document is available after update
        if (change.fullDocument) {
            console.log('Full document after update:', change.fullDocument);
        }
    }

    async handleDelete(change) {
        const docId = change.documentKey._id;
        console.log(Deleted document: ${docId});
    }

    async handleReplace(change) {
        const docId = change.documentKey._id;
        const newDoc = change.fullDocument;
        console.log(Replaced document ${docId}:, newDoc);
    }

    async processUserLogin(doc) {
        console.log(Processing user login for user ${doc.userId});
        // Add your custom logic here
        // Example: update user last_seen timestamp, send notifications, etc.
    }

    async processOrderCreated(doc) {
        console.log(Processing new order ${doc.orderId} with amount $${doc.amount});
        // Add your custom logic here
        // Example: send confirmation email, update inventory, etc.
    }

    async logChange(change) {
        const logEntry = {
            timestamp: new Date().toISOString(),
            operation: change.operationType,
            documentId: change.documentKey._id,
            resumeToken: change._id
        };
        
        const logLine = JSON.stringify(logEntry) + '\n';
        await fs.appendFile('/var/log/mongodb-changestream-nodejs.log', logLine);
    }

    async startMonitoring() {
        try {
            // Define pipeline for filtering changes
            const pipeline = [
                {
                    $match: {
                        operationType: { $in: ['insert', 'update', 'delete', 'replace'] },
                        'fullDocument.name': { $exists: true }
                    }
                }
            ];

            // Configure change stream options
            const options = {
                fullDocument: 'updateLookup' // Include full document for updates
            };

            // Resume from last known position if available
            if (this.resumeToken) {
                options.resumeAfter = this.resumeToken;
            }

            console.log('Starting change stream monitoring...');
            const changeStream = this.collection.watch(pipeline, options);

            // Handle change stream events
            changeStream.on('change', (change) => {
                this.processChange(change).catch(console.error);
            });

            changeStream.on('error', (error) => {
                console.error('Change stream error:', error);
                // Implement retry logic here
            });

            // Keep the process running
            process.on('SIGINT', async () => {
                console.log('\nShutting down change stream monitor...');
                await changeStream.close();
                await this.client.close();
                process.exit(0);
            });

        } catch (error) {
            console.error('Error starting monitoring:', error);
            throw error;
        }
    }
}

async function main() {
    const monitor = new MongoChangeStreamMonitor();
    
    try {
        await monitor.connect();
        await monitor.startMonitoring();
    } catch (error) {
        console.error('Failed to start monitoring:', error);
        process.exit(1);
    }
}

// Run the monitor
if (require.main === module) {
    main().catch(console.error);
}

module.exports = MongoChangeStreamMonitor;

Create systemd service for Node.js monitor

Set up a systemd service to run the Node.js change stream monitor as a daemon.

[Unit]
Description=MongoDB Change Stream Monitor
After=network.target mongod.service
Requires=mongod.service

[Service]
Type=simple
User=mongodb
Group=mongodb
WorkingDirectory=/opt/nodejs-monitor
ExecStart=/usr/bin/node monitor.js
Restart=always
RestartSec=10
Environment=NODE_ENV=production
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

Set permissions and enable service

Configure proper permissions for the monitor files and enable the systemd service.

sudo chown -R mongodb:mongodb /opt/nodejs-monitor
sudo chmod 755 /opt/nodejs-monitor
sudo chmod 644 /opt/nodejs-monitor/monitor.js
sudo touch /var/log/mongodb-changestream-nodejs.log
sudo chown mongodb:mongodb /var/log/mongodb-changestream-nodejs.log
sudo chmod 644 /var/log/mongodb-changestream-nodejs.log
sudo systemctl daemon-reload
sudo systemctl enable mongodb-changestream

Configure production features

Set up change stream resumability

Create a Python script that demonstrates resume token persistence for fault tolerance.

#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
import json
import os
import logging
from datetime import datetime

class ResumableChangeStream:
    def __init__(self, connection_string='mongodb://localhost:27017/'):
        self.client = MongoClient(connection_string)
        self.db = self.client.testdb
        self.collection = self.db.events
        self.resume_token_file = '/var/lib/mongodb/resume_token.json'
        self.resume_token = self.load_resume_token()
        
    def load_resume_token(self):
        """Load resume token from persistent storage"""
        try:
            if os.path.exists(self.resume_token_file):
                with open(self.resume_token_file, 'r') as f:
                    data = json.load(f)
                    return data.get('resume_token')
        except Exception as e:
            logging.warning(f"Could not load resume token: {e}")
        return None
        
    def save_resume_token(self, token):
        """Save resume token to persistent storage"""
        try:
            os.makedirs(os.path.dirname(self.resume_token_file), exist_ok=True)
            with open(self.resume_token_file, 'w') as f:
                json.dump({
                    'resume_token': token,
                    'timestamp': datetime.now().isoformat()
                }, f)
        except Exception as e:
            logging.error(f"Could not save resume token: {e}")
            
    def start_monitoring(self):
        """Start monitoring with resume capability"""
        pipeline = [
            {
                '$match': {
                    'operationType': {'$in': ['insert', 'update', 'delete']}
                }
            }
        ]
        
        options = {}
        if self.resume_token:
            options['resume_after'] = self.resume_token
            logging.info(f"Resuming from saved token")
        
        try:
            with self.collection.watch(pipeline, **options) as stream:
                for change in stream:
                    # Process the change
                    logging.info(f"Change: {change['operationType']} on {change['documentKey']['_id']}")
                    
                    # Save resume token after each change
                    self.resume_token = change['_id']
                    self.save_resume_token(self.resume_token)
                    
        except pymongo.errors.PyMongoError as e:
            logging.error(f"MongoDB error: {e}")
            # Implement retry logic
            
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    monitor = ResumableChangeStream()
    monitor.start_monitoring()

Configure authentication and SSL

For production deployments, configure MongoDB with authentication and SSL encryption following security best practices.

security:
  authorization: enabled
  
net:
  tls:
    mode: requireTLS
    certificateKeyFile: /etc/ssl/mongodb.pem
    CAFile: /etc/ssl/ca.pem

Test change stream functionality

Start the Python monitor

Run the filtered monitor in the background to test change detection.

sudo mkdir -p /var/log
sudo touch /var/log/mongodb-changestream.log
sudo chown mongodb:mongodb /var/log/mongodb-changestream.log
python3 /opt/mongodb-monitor/filtered_monitor.py &

Generate test changes

Insert, update, and delete documents to trigger change stream events.

mongosh --eval "
use testdb
// Insert new document
db.events.insertOne({ name: 'user_registration', userId: 1002, email: 'user@example.com', timestamp: new Date() })

// Update existing document
db.events.updateOne({ userId: 1001 }, { \$set: { lastLogin: new Date() } })

// Delete a document
db.events.deleteOne({ name: 'user_registration' })
"

Start Node.js monitor service

Enable and start the systemd service for the Node.js monitor.

sudo systemctl start mongodb-changestream
sudo systemctl status mongodb-changestream
sudo journalctl -u mongodb-changestream -f

Verify your setup

# Check MongoDB replica set status
mongosh --eval "rs.status()"

Verify change stream is working

mongosh --eval "use testdb; db.events.insertOne({test: true, timestamp: new Date()})"

Check log files

sudo tail -f /var/log/mongodb-changestream.log sudo tail -f /var/log/mongodb-changestream-nodejs.log

Check service status

sudo systemctl status mongodb-changestream

Common issues

Symptom Cause Fix
ChangeStreamNotEnabled error MongoDB not configured as replica set Initialize replica set with rs.initiate()
Connection refused MongoDB service not running sudo systemctl start mongod
Permission denied on log files Incorrect file ownership sudo chown mongodb:mongodb /var/log/mongodb-*.log
Resume token invalid Token expired or collection dropped Remove resume token file and restart monitor
High memory usage Change stream buffer overflow Add pipeline filters to reduce change volume

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle high availability infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.