Webhook Security
Azotte implements comprehensive security measures to ensure webhook integrity, authenticity, and reliable delivery while protecting against malicious attacks.
Signature Verification
HMAC-SHA256 Signatures
Every webhook payload is signed with HMAC-SHA256 to verify authenticity:
const crypto = require('crypto');
function verifyWebhookSignature(payload, signature, secret) {
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');
const computedSignature = `sha256=${expectedSignature}`;
// Use constant-time comparison to prevent timing attacks
return crypto.timingSafeEqual(
Buffer.from(signature, 'utf8'),
Buffer.from(computedSignature, 'utf8')
);
}
// Express.js middleware
function validateWebhookSignature(req, res, next) {
const signature = req.headers['azotte-signature'];
const payload = req.body;
if (!verifyWebhookSignature(payload, signature, process.env.WEBHOOK_SECRET)) {
return res.status(401).json({ error: 'Invalid signature' });
}
next();
}
Multiple Signature Versions
Azotte supports signature versioning for backward compatibility:
import hmac
import hashlib
import time
class WebhookSignatureValidator:
def __init__(self):
self.tolerance = 300 # 5 minutes
def verify_signature(self, payload: str, signature_header: str, secret: str) -> bool:
"""
Verify webhook signature with timestamp validation
Format: t=timestamp,v1=signature
"""
elements = signature_header.split(',')
timestamp = None
signatures = {}
for element in elements:
key, value = element.split('=', 1)
if key == 't':
timestamp = int(value)
elif key.startswith('v'):
signatures[key] = value
if timestamp is None:
raise ValueError('No timestamp found in signature header')
# Check timestamp tolerance
current_time = int(time.time())
if abs(current_time - timestamp) > self.tolerance:
raise ValueError('Timestamp outside tolerance')
# Verify signature
signed_payload = f"{timestamp}.{payload}"
for version, signature in signatures.items():
if version == 'v1':
expected_signature = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
if hmac.compare_digest(signature, expected_signature):
return True
return False
Endpoint Security
HTTPS Requirements
All webhook endpoints must use HTTPS with valid TLS certificates:
# Nginx configuration for webhook endpoints
server {
listen 443 ssl http2;
server_name webhooks.myapp.com;
# TLS configuration
ssl_certificate /path/to/certificate.pem;
ssl_certificate_key /path/to/private-key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload";
add_header X-Content-Type-Options "nosniff";
add_header X-Frame-Options "DENY";
add_header X-XSS-Protection "1; mode=block";
# Rate limiting
limit_req_zone $binary_remote_addr zone=webhook:10m rate=10r/s;
location /webhooks/azotte {
limit_req zone=webhook burst=20 nodelay;
# Only allow Azotte IP ranges
allow 54.187.174.169/32;
allow 54.187.205.235/32;
deny all;
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
IP Allowlisting
Configure firewall rules to only accept webhooks from Azotte's IP ranges:
# iptables rules
iptables -A INPUT -p tcp --dport 443 -s 54.187.174.169/32 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -s 54.187.205.235/32 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -j DROP
# AWS Security Group
aws ec2 authorize-security-group-ingress \
--group-id sg-12345678 \
--protocol tcp \
--port 443 \
--cidr 54.187.174.169/32
# Application-level IP checking
const ALLOWED_IPS = [
'54.187.174.169',
'54.187.205.235',
// Add more Azotte IP ranges as provided
];
function validateSourceIP(req, res, next) {
const clientIP = req.ip || req.connection.remoteAddress;
const forwardedIPs = req.headers['x-forwarded-for']?.split(',') || [];
const sourceIP = forwardedIPs[0]?.trim() || clientIP;
if (!ALLOWED_IPS.includes(sourceIP)) {
return res.status(403).json({ error: 'Forbidden: Invalid source IP' });
}
next();
}
Authentication Methods
API Key Authentication
Secure webhook endpoints with API key validation:
package webhook
import (
"crypto/subtle"
"net/http"
)
type WebhookAuthenticator struct {
apiKeys map[string]bool
}
func NewWebhookAuthenticator() *WebhookAuthenticator {
return &WebhookAuthenticator{
apiKeys: map[string]bool{
"whk_live_abc123": true,
"whk_test_xyz789": true,
},
}
}
func (wa *WebhookAuthenticator) ValidateAPIKey(r *http.Request) bool {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
return false
}
// Extract API key from "Bearer {key}" format
if len(authHeader) < 8 || authHeader[:7] != "Bearer " {
return false
}
apiKey := authHeader[7:]
// Use constant-time comparison
for validKey := range wa.apiKeys {
if subtle.ConstantTimeCompare([]byte(apiKey), []byte(validKey)) == 1 {
return true
}
}
return false
}
Mutual TLS (mTLS)
Implement client certificate authentication for enhanced security:
# Certificate generation
openssl genrsa -out client.key 2048
openssl req -new -key client.key -out client.csr
openssl x509 -req -in client.csr -signkey client.key -out client.crt
# Nginx mTLS configuration
server {
listen 443 ssl;
ssl_certificate /path/to/server.crt;
ssl_certificate_key /path/to/server.key;
# Client certificate verification
ssl_client_certificate /path/to/ca.crt;
ssl_verify_client on;
proto_set_header SSL-Client-Cert $ssl_client_cert;
proxy_set_header SSL-Client-Verify $ssl_client_verify;
location /webhooks/azotte {
# Only proceed if client cert is valid
if ($ssl_client_verify != SUCCESS) {
return 403;
}
proxy_pass http://backend;
}
}
Request Validation
Content-Type Validation
Ensure webhook requests have correct content types:
interface WebhookValidationConfig {
allowedContentTypes: string[];
maxPayloadSize: number; // bytes
requiredHeaders: string[];
allowedMethods: string[];
}
class WebhookValidator {
private config: WebhookValidationConfig;
constructor(config: WebhookValidationConfig) {
this.config = config;
}
validateRequest(req: Request): ValidationResult {
const errors: string[] = [];
// Validate HTTP method
if (!this.config.allowedMethods.includes(req.method)) {
errors.push(`Invalid HTTP method: ${req.method}`);
}
// Validate Content-Type
const contentType = req.headers['content-type'];
if (!this.config.allowedContentTypes.includes(contentType)) {
errors.push(`Invalid Content-Type: ${contentType}`);
}
// Validate payload size
const contentLength = parseInt(req.headers['content-length'] || '0');
if (contentLength > this.config.maxPayloadSize) {
errors.push(`Payload too large: ${contentLength} bytes`);
}
// Validate required headers
for (const header of this.config.requiredHeaders) {
if (!req.headers[header]) {
errors.push(`Missing required header: ${header}`);
}
}
return {
isValid: errors.length === 0,
errors
};
}
}
// Usage
const validator = new WebhookValidator({
allowedContentTypes: ['application/json'],
maxPayloadSize: 1024 * 1024, // 1MB
requiredHeaders: ['azotte-signature', 'user-agent'],
allowedMethods: ['POST']
});
JSON Schema Validation
Validate webhook payload structure:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["id", "object", "type", "created", "data"],
"properties": {
"id": {
"type": "string",
"pattern": "^evt_[a-zA-Z0-9]+$"
},
"object": {
"type": "string",
"enum": ["event"]
},
"type": {
"type": "string",
"enum": [
"customer.created", "customer.updated", "customer.deleted",
"subscription.created", "subscription.updated", "subscription.canceled",
"payment.succeeded", "payment.failed", "payment.refunded",
"invoice.created", "invoice.payment_succeeded", "invoice.payment_failed"
]
},
"created": {
"type": "integer",
"minimum": 0
},
"data": {
"type": "object",
"required": ["object"],
"properties": {
"object": {
"type": "object"
},
"previous_attributes": {
"type": "object"
}
}
}
}
}
const Ajv = require('ajv');
const webhookSchema = require('./webhook-schema.json');
const ajv = new Ajv();
const validate = ajv.compile(webhookSchema);
function validateWebhookPayload(payload) {
const valid = validate(payload);
if (!valid) {
return {
isValid: false,
errors: validate.errors
};
}
return { isValid: true };
}
Rate Limiting & DDoS Protection
Adaptive Rate Limiting
Implement smart rate limiting based on endpoint behavior:
-- Redis Lua script for adaptive rate limiting
local key = KEYS[1]
local window_size = tonumber(ARGV[1])
local max_requests = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- Clean old entries
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window_size)
-- Count current requests
local current_count = redis.call('ZCARD', key)
if current_count < max_requests then
-- Add current request
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, window_size)
return {1, max_requests - current_count - 1}
else
return {0, 0}
end
import redis
import time
class AdaptiveRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.script = self.redis.register_script("""
-- Lua script content here
""")
def is_allowed(self, identifier: str, max_requests: int = 100, window_seconds: int = 60) -> tuple[bool, int]:
"""
Check if request is allowed under rate limit
Returns (allowed, remaining_requests)
"""
current_time = int(time.time())
key = f"rate_limit:{identifier}"
result = self.script(
keys=[key],
args=[window_seconds, max_requests, current_time]
)
allowed = bool(result[0])
remaining = int(result[1])
return allowed, remaining
def get_adaptive_limit(self, identifier: str) -> int:
"""Adjust rate limit based on historical behavior"""
history_key = f"rate_history:{identifier}"
# Get request history for last hour
hour_ago = int(time.time()) - 3600
request_count = self.redis.zcount(history_key, hour_ago, '+inf')
# Adaptive limits based on usage patterns
if request_count > 1000:
return 50 # Heavy user - stricter limits
elif request_count > 100:
return 100 # Normal user
else:
return 200 # Light user - more generous
Security Monitoring
Webhook Security Events
Monitor webhook security events in real-time:
interface SecurityEvent {
id: string;
timestamp: Date;
type: 'invalid_signature' | 'rate_limit_exceeded' | 'ip_blocked' | 'payload_too_large';
source_ip: string;
user_agent?: string;
endpoint: string;
details: Record<string, any>;
}
class WebhookSecurityMonitor {
private eventStore: SecurityEventStore;
private alerting: AlertingService;
constructor(eventStore: SecurityEventStore, alerting: AlertingService) {
this.eventStore = eventStore;
this.alerting = alerting;
}
async recordSecurityEvent(event: SecurityEvent): Promise<void> {
await this.eventStore.store(event);
// Check for attack patterns
await this.analyzeSecurityPatterns(event);
}
async analyzeSecurityPatterns(event: SecurityEvent): Promise<void> {
const recentEvents = await this.eventStore.getRecentEvents(
event.source_ip,
300 // last 5 minutes
);
// Check for brute force attacks
if (recentEvents.length > 20) {
await this.alerting.sendAlert({
severity: 'high',
message: `Possible brute force attack from ${event.source_ip}`,
details: { event_count: recentEvents.length, time_window: '5min' }
});
// Auto-block IP
await this.blockIP(event.source_ip, 3600); // 1 hour
}
// Check for signature bypass attempts
const invalidSignatureEvents = recentEvents.filter(e =>
e.type === 'invalid_signature'
);
if (invalidSignatureEvents.length > 5) {
await this.alerting.sendAlert({
severity: 'medium',
message: `Multiple invalid signature attempts from ${event.source_ip}`,
details: { attempts: invalidSignatureEvents.length }
});
}
}
}
Anomaly Detection
Implement machine learning-based anomaly detection:
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class WebhookAnomalyDetector:
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.is_trained = False
def extract_features(self, request_data: dict) -> np.array:
"""Extract numerical features from webhook request"""
features = [
len(request_data.get('payload', '')), # Payload size
request_data.get('timestamp', 0) % 86400, # Time of day
len(request_data.get('headers', {})), # Number of headers
1 if request_data.get('has_valid_signature', False) else 0,
request_data.get('response_time_ms', 0),
len(request_data.get('user_agent', '')),
]
return np.array(features).reshape(1, -1)
def train(self, historical_requests: list):
"""Train the anomaly detection model"""
features_list = []
for request in historical_requests:
features = self.extract_features(request)
features_list.append(features[0])
X = np.array(features_list)
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
self.is_trained = True
def detect_anomaly(self, request_data: dict) -> tuple[bool, float]:
"""
Detect if request is anomalous
Returns (is_anomaly, anomaly_score)
"""
if not self.is_trained:
return False, 0.0
features = self.extract_features(request_data)
features_scaled = self.scaler.transform(features)
prediction = self.model.predict(features_scaled)[0]
anomaly_score = self.model.score_samples(features_scaled)[0]
is_anomaly = prediction == -1
return is_anomaly, abs(anomaly_score)
Incident Response
Automated Response Actions
Configure automated responses to security threats:
incident_response_rules:
brute_force_attack:
trigger:
event_type: "rate_limit_exceeded"
threshold: 50
time_window: 300 # 5 minutes
actions:
- block_ip_temporarily: 3600 # 1 hour
- notify_security_team: true
- escalate_after: 5 # incidents
signature_bypass_attempts:
trigger:
event_type: "invalid_signature"
threshold: 10
time_window: 600 # 10 minutes
actions:
- require_additional_auth: true
- audit_endpoint_config: true
- notify_devops_team: true
payload_anomalies:
trigger:
anomaly_score: 0.8
actions:
- quarantine_request: true
- deep_payload_analysis: true
- notify_security_analyst: true
Testing Webhook Security
Security Test Suite
import pytest
import requests
import hmac
import hashlib
import time
class TestWebhookSecurity:
def setup_method(self):
self.webhook_url = "https://myapp.com/webhooks/azotte"
self.valid_secret = "whsec_test_secret"
self.valid_payload = '{"id":"evt_test","type":"customer.created"}'
def generate_signature(self, payload: str, secret: str, timestamp: int = None) -> str:
if timestamp is None:
timestamp = int(time.time())
signed_payload = f"{timestamp}.{payload}"
signature = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return f"t={timestamp},v1={signature}"
def test_valid_signature_accepted(self):
signature = self.generate_signature(self.valid_payload, self.valid_secret)
response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)
assert response.status_code == 200
def test_invalid_signature_rejected(self):
invalid_signature = "t=123456789,v1=invalid_signature"
response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': invalid_signature
}
)
assert response.status_code == 401
def test_expired_timestamp_rejected(self):
old_timestamp = int(time.time()) - 7200 # 2 hours ago
signature = self.generate_signature(
self.valid_payload, self.valid_secret, old_timestamp
)
response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)
assert response.status_code == 401
def test_rate_limiting(self):
signature = self.generate_signature(self.valid_payload, self.valid_secret)
# Send requests rapidly to trigger rate limit
responses = []
for _ in range(150): # Exceed rate limit
response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)
responses.append(response.status_code)
# Should have some 429 (Too Many Requests) responses
assert 429 in responses
Best Practices
Implementation Guidelines
- Always verify signatures - Never trust unsigned webhooks
- Use HTTPS only - Encrypt all webhook traffic
- Implement rate limiting - Protect against DoS attacks
- Validate payloads - Check structure and content
- Log security events - Monitor for suspicious activity
- Use IP allowlisting - Restrict webhook sources
- Implement timeouts - Prevent webhook replay attacks
Security Checklist
- Webhook signatures verified using HMAC-SHA256
- HTTPS with valid SSL certificates enforced
- IP allowlisting configured for Azotte ranges
- Rate limiting implemented with reasonable limits
- Request payload size limits enforced
- Security headers configured (HSTS, CSP, etc.)
- Error handling doesn't leak sensitive information
- Logging and monitoring in place
- Incident response procedures documented
- Regular security testing performed
Next Steps
- Learn about Event Types
- Understand Retry Policy
- Explore Webhooks Overview