Set up MongoDB 8.0 change streams to monitor database changes in real-time. Configure replica sets, implement Python and Node.js clients, and add filtering and resumability for production applications.
Prerequisites
- MongoDB 8.0
- Python 3.8+
- Node.js 16+
- At least 4GB RAM
- Root or sudo access
What this solves
MongoDB change streams provide real-time notifications when data changes in your database, enabling applications to react immediately to inserts, updates, and deletes. This tutorial shows you how to configure MongoDB 8.0 with replica sets and implement change stream listeners in Python and Node.js for real-time data synchronization, event processing, and application updates.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions.
sudo apt update && sudo apt upgrade -y
sudo apt install -y wget curl gnupg lsb-release
Install MongoDB 8.0 repository
Add the official MongoDB repository to install version 8.0.
curl -fsSL https://www.mongodb.org/static/pgp/server-8.0.asc | sudo gpg -o /usr/share/keyrings/mongodb-server-8.0.gpg --dearmor
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-8.0.gpg ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/8.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-8.0.list
sudo apt update
Install MongoDB 8.0
Install the MongoDB server and client tools.
sudo apt install -y mongodb-org
Configure MongoDB for replica set
Change streams require a replica set configuration. Edit the MongoDB configuration file to enable replica sets.
storage:
dbPath: /var/lib/mongo
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log
net:
port: 27017
bindIp: 127.0.0.1
replication:
replSetName: "rs0"
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod.pid
timeZoneInfo: /usr/share/zoneinfo
Start MongoDB service
Enable and start the MongoDB service with the new configuration.
sudo systemctl enable mongod
sudo systemctl start mongod
sudo systemctl status mongod
Initialize replica set
Connect to MongoDB and initialize the replica set configuration.
mongosh --eval "rs.initiate({
_id: 'rs0',
members: [
{ _id: 0, host: 'localhost:27017' }
]
})"
Create test database and collection
Set up a sample database with test data to monitor with change streams.
mongosh --eval "
use testdb
db.events.insertMany([
{ name: 'user_login', userId: 1001, timestamp: new Date() },
{ name: 'order_created', orderId: 2001, amount: 99.99, timestamp: new Date() },
{ name: 'payment_processed', orderId: 2001, status: 'completed', timestamp: new Date() }
])
"
Implement Python change stream client
Install Python MongoDB driver
Install PyMongo to connect to MongoDB from Python applications.
pip3 install pymongo
Create basic change stream monitor
Create a Python script to monitor all changes in the database.
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
from datetime import datetime
import json
def main():
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client.testdb
collection = db.events
print(f"Starting change stream monitor at {datetime.now()}")
print("Watching for changes in testdb.events collection...")
try:
# Open change stream on the collection
with collection.watch() as stream:
for change in stream:
print(f"\n--- Change detected at {datetime.now()} ---")
print(f"Operation: {change['operationType']}")
if 'fullDocument' in change:
print(f"Document: {json.dumps(change['fullDocument'], default=str, indent=2)}")
if 'documentKey' in change:
print(f"Document ID: {change['documentKey']['_id']}")
# Resume token for fault tolerance
print(f"Resume Token: {change['_id']}")
except KeyboardInterrupt:
print("\nMonitoring stopped by user")
except Exception as e:
print(f"Error: {e}")
finally:
client.close()
if __name__ == "__main__":
main()
Create filtered change stream monitor
Implement a more advanced monitor with filtering and specific operation handling.
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
from datetime import datetime
import json
import logging
Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mongodb-changestream.log'),
logging.StreamHandler()
]
)
class ChangeStreamProcessor:
def __init__(self, connection_string='mongodb://localhost:27017/'):
self.client = MongoClient(connection_string)
self.db = self.client.testdb
self.collection = self.db.events
self.resume_token = None
def process_insert(self, change):
"""Handle insert operations"""
doc = change['fullDocument']
logging.info(f"New document inserted: {doc['name']} - ID: {doc['_id']}")
# Custom logic for different event types
if doc['name'] == 'user_login':
self.handle_user_login(doc)
elif doc['name'] == 'order_created':
self.handle_order_created(doc)
def process_update(self, change):
"""Handle update operations"""
doc_id = change['documentKey']['_id']
updated_fields = change.get('updateDescription', {}).get('updatedFields', {})
logging.info(f"Document updated - ID: {doc_id}, Fields: {updated_fields}")
def process_delete(self, change):
"""Handle delete operations"""
doc_id = change['documentKey']['_id']
logging.info(f"Document deleted - ID: {doc_id}")
def handle_user_login(self, doc):
"""Custom handler for user login events"""
user_id = doc.get('userId')
logging.info(f"Processing user login for user {user_id}")
# Add your business logic here
def handle_order_created(self, doc):
"""Custom handler for order creation events"""
order_id = doc.get('orderId')
amount = doc.get('amount')
logging.info(f"Processing new order {order_id} with amount ${amount}")
# Add your business logic here
def start_monitoring(self):
"""Start the change stream with filtering and resumability"""
# Filter to only watch insert and update operations
pipeline = [
{
'$match': {
'operationType': {'$in': ['insert', 'update', 'delete']},
'fullDocument.name': {'$exists': True}
}
}
]
logging.info("Starting filtered change stream monitor")
try:
# Resume from last known position if available
resume_options = {}
if self.resume_token:
resume_options['resume_after'] = self.resume_token
with self.collection.watch(pipeline, **resume_options) as stream:
for change in stream:
# Store resume token for fault tolerance
self.resume_token = change['_id']
operation = change['operationType']
if operation == 'insert':
self.process_insert(change)
elif operation == 'update':
self.process_update(change)
elif operation == 'delete':
self.process_delete(change)
except Exception as e:
logging.error(f"Error in change stream: {e}")
# In production, you might want to implement retry logic here
finally:
self.client.close()
def main():
processor = ChangeStreamProcessor()
try:
processor.start_monitoring()
except KeyboardInterrupt:
logging.info("Monitoring stopped by user")
if __name__ == "__main__":
main()
Implement Node.js change stream client
Install Node.js MongoDB driver
Install the official MongoDB driver for Node.js applications.
mkdir /opt/nodejs-monitor
cd /opt/nodejs-monitor
npm init -y
npm install mongodb
Create Node.js change stream monitor
Implement a Node.js-based change stream processor with async/await patterns.
const { MongoClient } = require('mongodb');
const fs = require('fs').promises;
class MongoChangeStreamMonitor {
constructor(connectionString = 'mongodb://localhost:27017/') {
this.connectionString = connectionString;
this.client = null;
this.db = null;
this.collection = null;
this.resumeToken = null;
}
async connect() {
try {
this.client = new MongoClient(this.connectionString);
await this.client.connect();
this.db = this.client.db('testdb');
this.collection = this.db.collection('events');
console.log('Connected to MongoDB');
} catch (error) {
console.error('Connection error:', error);
throw error;
}
}
async processChange(change) {
const timestamp = new Date().toISOString();
console.log(\n--- Change detected at ${timestamp} ---);
console.log(Operation: ${change.operationType});
// Store resume token for fault tolerance
this.resumeToken = change._id;
switch (change.operationType) {
case 'insert':
await this.handleInsert(change);
break;
case 'update':
await this.handleUpdate(change);
break;
case 'delete':
await this.handleDelete(change);
break;
case 'replace':
await this.handleReplace(change);
break;
}
// Log to file for audit trail
await this.logChange(change);
}
async handleInsert(change) {
const doc = change.fullDocument;
console.log(New document: ${JSON.stringify(doc, null, 2)});
// Business logic based on document type
if (doc.name === 'user_login') {
await this.processUserLogin(doc);
} else if (doc.name === 'order_created') {
await this.processOrderCreated(doc);
}
}
async handleUpdate(change) {
const docId = change.documentKey._id;
const updatedFields = change.updateDescription?.updatedFields || {};
console.log(Updated document ${docId}:, updatedFields);
// If full document is available after update
if (change.fullDocument) {
console.log('Full document after update:', change.fullDocument);
}
}
async handleDelete(change) {
const docId = change.documentKey._id;
console.log(Deleted document: ${docId});
}
async handleReplace(change) {
const docId = change.documentKey._id;
const newDoc = change.fullDocument;
console.log(Replaced document ${docId}:, newDoc);
}
async processUserLogin(doc) {
console.log(Processing user login for user ${doc.userId});
// Add your custom logic here
// Example: update user last_seen timestamp, send notifications, etc.
}
async processOrderCreated(doc) {
console.log(Processing new order ${doc.orderId} with amount $${doc.amount});
// Add your custom logic here
// Example: send confirmation email, update inventory, etc.
}
async logChange(change) {
const logEntry = {
timestamp: new Date().toISOString(),
operation: change.operationType,
documentId: change.documentKey._id,
resumeToken: change._id
};
const logLine = JSON.stringify(logEntry) + '\n';
await fs.appendFile('/var/log/mongodb-changestream-nodejs.log', logLine);
}
async startMonitoring() {
try {
// Define pipeline for filtering changes
const pipeline = [
{
$match: {
operationType: { $in: ['insert', 'update', 'delete', 'replace'] },
'fullDocument.name': { $exists: true }
}
}
];
// Configure change stream options
const options = {
fullDocument: 'updateLookup' // Include full document for updates
};
// Resume from last known position if available
if (this.resumeToken) {
options.resumeAfter = this.resumeToken;
}
console.log('Starting change stream monitoring...');
const changeStream = this.collection.watch(pipeline, options);
// Handle change stream events
changeStream.on('change', (change) => {
this.processChange(change).catch(console.error);
});
changeStream.on('error', (error) => {
console.error('Change stream error:', error);
// Implement retry logic here
});
// Keep the process running
process.on('SIGINT', async () => {
console.log('\nShutting down change stream monitor...');
await changeStream.close();
await this.client.close();
process.exit(0);
});
} catch (error) {
console.error('Error starting monitoring:', error);
throw error;
}
}
}
async function main() {
const monitor = new MongoChangeStreamMonitor();
try {
await monitor.connect();
await monitor.startMonitoring();
} catch (error) {
console.error('Failed to start monitoring:', error);
process.exit(1);
}
}
// Run the monitor
if (require.main === module) {
main().catch(console.error);
}
module.exports = MongoChangeStreamMonitor;
Create systemd service for Node.js monitor
Set up a systemd service to run the Node.js change stream monitor as a daemon.
[Unit]
Description=MongoDB Change Stream Monitor
After=network.target mongod.service
Requires=mongod.service
[Service]
Type=simple
User=mongodb
Group=mongodb
WorkingDirectory=/opt/nodejs-monitor
ExecStart=/usr/bin/node monitor.js
Restart=always
RestartSec=10
Environment=NODE_ENV=production
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Set permissions and enable service
Configure proper permissions for the monitor files and enable the systemd service.
sudo chown -R mongodb:mongodb /opt/nodejs-monitor
sudo chmod 755 /opt/nodejs-monitor
sudo chmod 644 /opt/nodejs-monitor/monitor.js
sudo touch /var/log/mongodb-changestream-nodejs.log
sudo chown mongodb:mongodb /var/log/mongodb-changestream-nodejs.log
sudo chmod 644 /var/log/mongodb-changestream-nodejs.log
sudo systemctl daemon-reload
sudo systemctl enable mongodb-changestream
Configure production features
Set up change stream resumability
Create a Python script that demonstrates resume token persistence for fault tolerance.
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
import json
import os
import logging
from datetime import datetime
class ResumableChangeStream:
def __init__(self, connection_string='mongodb://localhost:27017/'):
self.client = MongoClient(connection_string)
self.db = self.client.testdb
self.collection = self.db.events
self.resume_token_file = '/var/lib/mongodb/resume_token.json'
self.resume_token = self.load_resume_token()
def load_resume_token(self):
"""Load resume token from persistent storage"""
try:
if os.path.exists(self.resume_token_file):
with open(self.resume_token_file, 'r') as f:
data = json.load(f)
return data.get('resume_token')
except Exception as e:
logging.warning(f"Could not load resume token: {e}")
return None
def save_resume_token(self, token):
"""Save resume token to persistent storage"""
try:
os.makedirs(os.path.dirname(self.resume_token_file), exist_ok=True)
with open(self.resume_token_file, 'w') as f:
json.dump({
'resume_token': token,
'timestamp': datetime.now().isoformat()
}, f)
except Exception as e:
logging.error(f"Could not save resume token: {e}")
def start_monitoring(self):
"""Start monitoring with resume capability"""
pipeline = [
{
'$match': {
'operationType': {'$in': ['insert', 'update', 'delete']}
}
}
]
options = {}
if self.resume_token:
options['resume_after'] = self.resume_token
logging.info(f"Resuming from saved token")
try:
with self.collection.watch(pipeline, **options) as stream:
for change in stream:
# Process the change
logging.info(f"Change: {change['operationType']} on {change['documentKey']['_id']}")
# Save resume token after each change
self.resume_token = change['_id']
self.save_resume_token(self.resume_token)
except pymongo.errors.PyMongoError as e:
logging.error(f"MongoDB error: {e}")
# Implement retry logic
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
monitor = ResumableChangeStream()
monitor.start_monitoring()
Configure authentication and SSL
For production deployments, configure MongoDB with authentication and SSL encryption following security best practices.
security:
authorization: enabled
net:
tls:
mode: requireTLS
certificateKeyFile: /etc/ssl/mongodb.pem
CAFile: /etc/ssl/ca.pem
Test change stream functionality
Start the Python monitor
Run the filtered monitor in the background to test change detection.
sudo mkdir -p /var/log
sudo touch /var/log/mongodb-changestream.log
sudo chown mongodb:mongodb /var/log/mongodb-changestream.log
python3 /opt/mongodb-monitor/filtered_monitor.py &
Generate test changes
Insert, update, and delete documents to trigger change stream events.
mongosh --eval "
use testdb
// Insert new document
db.events.insertOne({ name: 'user_registration', userId: 1002, email: 'user@example.com', timestamp: new Date() })
// Update existing document
db.events.updateOne({ userId: 1001 }, { \$set: { lastLogin: new Date() } })
// Delete a document
db.events.deleteOne({ name: 'user_registration' })
"
Start Node.js monitor service
Enable and start the systemd service for the Node.js monitor.
sudo systemctl start mongodb-changestream
sudo systemctl status mongodb-changestream
sudo journalctl -u mongodb-changestream -f
Verify your setup
# Check MongoDB replica set status
mongosh --eval "rs.status()"
Verify change stream is working
mongosh --eval "use testdb; db.events.insertOne({test: true, timestamp: new Date()})"
Check log files
sudo tail -f /var/log/mongodb-changestream.log
sudo tail -f /var/log/mongodb-changestream-nodejs.log
Check service status
sudo systemctl status mongodb-changestream
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| ChangeStreamNotEnabled error | MongoDB not configured as replica set | Initialize replica set with rs.initiate() |
| Connection refused | MongoDB service not running | sudo systemctl start mongod |
| Permission denied on log files | Incorrect file ownership | sudo chown mongodb:mongodb /var/log/mongodb-*.log |
| Resume token invalid | Token expired or collection dropped | Remove resume token file and restart monitor |
| High memory usage | Change stream buffer overflow | Add pipeline filters to reduce change volume |
Next steps
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly NC='\033[0m'
# Configuration
readonly SCRIPT_NAME="$(basename "$0")"
readonly MONGODB_VERSION="8.0"
readonly MONGODB_USER="mongodb"
readonly MONGODB_DATA_DIR="/var/lib/mongodb"
readonly NODEJS_MONITOR_DIR="/opt/nodejs-monitor"
readonly PYTHON_MONITOR_DIR="/opt/python-monitor"
# Global variables
REPLICA_SET_NAME=""
PKG_MGR=""
PKG_INSTALL=""
PKG_UPDATE=""
usage() {
echo "Usage: $SCRIPT_NAME [replica-set-name]"
echo " replica-set-name: Name for MongoDB replica set (default: rs0)"
echo ""
echo "Example: $SCRIPT_NAME myapp-rs"
exit 1
}
log() {
echo -e "${GREEN}[INFO]${NC} $*"
}
warn() {
echo -e "${YELLOW}[WARN]${NC} $*"
}
error() {
echo -e "${RED}[ERROR]${NC} $*" >&2
}
cleanup() {
error "Installation failed. Cleaning up..."
systemctl stop mongod 2>/dev/null || true
systemctl disable mongod 2>/dev/null || true
systemctl disable mongodb-changestream 2>/dev/null || true
rm -f /etc/systemd/system/mongodb-changestream.service
systemctl daemon-reload
}
trap cleanup ERR
detect_distro() {
if [ ! -f /etc/os-release ]; then
error "Cannot detect distribution. /etc/os-release not found."
exit 1
fi
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
;;
*)
error "Unsupported distribution: $ID"
exit 1
;;
esac
}
check_prerequisites() {
if [ "$EUID" -ne 0 ]; then
error "This script must be run as root"
exit 1
fi
if ! command -v curl >/dev/null 2>&1; then
error "curl is required but not installed"
exit 1
fi
}
install_mongodb_repo() {
case "$PKG_MGR" in
apt)
curl -fsSL https://www.mongodb.org/static/pgp/server-${MONGODB_VERSION}.asc | gpg -o /usr/share/keyrings/mongodb-server-${MONGODB_VERSION}.gpg --dearmor
echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-${MONGODB_VERSION}.gpg ] https://repo.mongodb.org/apt/ubuntu $(lsb_release -cs)/mongodb-org/${MONGODB_VERSION} multiverse" > /etc/apt/sources.list.d/mongodb-org-${MONGODB_VERSION}.list
apt update
;;
dnf|yum)
cat > /etc/yum.repos.d/mongodb-org-${MONGODB_VERSION}.repo << EOF
[mongodb-org-${MONGODB_VERSION}]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/8/mongodb-org/${MONGODB_VERSION}/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-${MONGODB_VERSION}.asc
EOF
;;
esac
}
configure_replica_set() {
# Wait for MongoDB to be fully started
sleep 5
# Initialize replica set
mongosh --eval "rs.initiate({_id: '${REPLICA_SET_NAME}', members: [{_id: 0, host: 'localhost:27017'}]})" || true
# Wait for replica set to be ready
sleep 10
}
create_python_monitor() {
mkdir -p "$PYTHON_MONITOR_DIR"
cat > "${PYTHON_MONITOR_DIR}/monitor.py" << 'EOF'
#!/usr/bin/env python3
import pymongo
from pymongo import MongoClient
import json
import logging
from datetime import datetime
class MongoChangeStreamMonitor:
def __init__(self, connection_string='mongodb://localhost:27017/'):
self.client = MongoClient(connection_string)
self.db = self.client.testdb
self.collection = self.db.events
def start_monitoring(self):
try:
with self.collection.watch() as stream:
for change in stream:
print(f"Change detected: {change['operationType']} on {change['ns']['coll']}")
print(f"Document ID: {change['documentKey']['_id']}")
if 'fullDocument' in change:
print(f"Full document: {change['fullDocument']}")
print("-" * 50)
except KeyboardInterrupt:
print("\nShutting down monitor...")
except Exception as e:
logging.error(f"Error in change stream: {e}")
finally:
self.client.close()
if __name__ == "__main__":
monitor = MongoChangeStreamMonitor()
monitor.start_monitoring()
EOF
chmod 755 "$PYTHON_MONITOR_DIR"
chmod 644 "${PYTHON_MONITOR_DIR}/monitor.py"
chown -R "$MONGODB_USER:$MONGODB_USER" "$PYTHON_MONITOR_DIR"
}
create_nodejs_monitor() {
mkdir -p "$NODEJS_MONITOR_DIR"
cat > "${NODEJS_MONITOR_DIR}/package.json" << 'EOF'
{
"name": "mongodb-changestream-monitor",
"version": "1.0.0",
"description": "MongoDB Change Stream Monitor",
"main": "monitor.js",
"dependencies": {
"mongodb": "^6.3.0"
}
}
EOF
cat > "${NODEJS_MONITOR_DIR}/monitor.js" << 'EOF'
const { MongoClient } = require('mongodb');
class MongoChangeStreamMonitor {
constructor(uri = 'mongodb://localhost:27017/') {
this.client = new MongoClient(uri);
this.db = null;
this.collection = null;
}
async connect() {
await this.client.connect();
this.db = this.client.db('testdb');
this.collection = this.db.collection('events');
console.log('Connected to MongoDB');
}
async startMonitoring() {
const changeStream = this.collection.watch();
changeStream.on('change', (change) => {
console.log(`Change detected: ${change.operationType} on ${change.ns.coll}`);
console.log(`Document ID: ${change.documentKey._id}`);
if (change.fullDocument) {
console.log(`Full document: ${JSON.stringify(change.fullDocument)}`);
}
console.log('-'.repeat(50));
});
process.on('SIGINT', async () => {
console.log('\nShutting down...');
await changeStream.close();
await this.client.close();
process.exit(0);
});
}
}
async function main() {
const monitor = new MongoChangeStreamMonitor();
await monitor.connect();
await monitor.startMonitoring();
}
if (require.main === module) {
main().catch(console.error);
}
EOF
# Install Node.js if not present
if ! command -v node >/dev/null 2>&1; then
if [ "$PKG_MGR" = "apt" ]; then
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
$PKG_INSTALL nodejs
else
$PKG_INSTALL nodejs npm
fi
fi
cd "$NODEJS_MONITOR_DIR"
npm install
chmod 755 "$NODEJS_MONITOR_DIR"
chmod 644 "${NODEJS_MONITOR_DIR}"/*.{js,json}
chown -R "$MONGODB_USER:$MONGODB_USER" "$NODEJS_MONITOR_DIR"
}
create_systemd_service() {
cat > /etc/systemd/system/mongodb-changestream.service << EOF
[Unit]
Description=MongoDB Change Stream Monitor
After=network.target mongod.service
Requires=mongod.service
[Service]
Type=simple
User=$MONGODB_USER
Group=$MONGODB_USER
WorkingDirectory=$NODEJS_MONITOR_DIR
ExecStart=/usr/bin/node monitor.js
Restart=always
RestartSec=10
Environment=NODE_ENV=production
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable mongodb-changestream
}
main() {
REPLICA_SET_NAME="${1:-rs0}"
detect_distro
check_prerequisites
echo "[1/8] Updating system packages..."
$PKG_UPDATE
echo "[2/8] Installing prerequisites..."
$PKG_INSTALL wget curl gnupg python3 python3-pip
echo "[3/8] Adding MongoDB repository..."
install_mongodb_repo
echo "[4/8] Installing MongoDB..."
$PKG_INSTALL mongodb-org
echo "[5/8] Configuring MongoDB for replica set..."
sed -i 's/#replication:/replication:\n replSetName: "'$REPLICA_SET_NAME'"/' /etc/mongod.conf
systemctl enable mongod
systemctl start mongod
echo "[6/8] Initializing replica set..."
configure_replica_set
echo "[7/8] Installing Python dependencies..."
pip3 install pymongo
create_python_monitor
echo "[8/8] Setting up Node.js monitor service..."
create_nodejs_monitor
create_systemd_service
log "MongoDB 8.0 with change streams installed successfully!"
log "Replica set name: $REPLICA_SET_NAME"
log "Python monitor: $PYTHON_MONITOR_DIR/monitor.py"
log "Node.js monitor service: mongodb-changestream"
log ""
log "To start the Node.js monitor: systemctl start mongodb-changestream"
log "To run the Python monitor: python3 $PYTHON_MONITOR_DIR/monitor.py"
# Verification
if systemctl is-active --quiet mongod; then
log "✓ MongoDB is running"
else
error "✗ MongoDB is not running"
fi
if mongosh --eval "rs.status()" >/dev/null 2>&1; then
log "✓ Replica set is configured"
else
warn "✗ Replica set may not be ready yet"
fi
}
if [ "${BASH_SOURCE[0]}" = "${0}" ]; then
main "$@"
fi
Review the script before running. Execute with: bash install.sh