Skip to main content

Webhook Security

Azotte implements comprehensive security measures to ensure webhook integrity, authenticity, and reliable delivery while protecting against malicious attacks.

Signature Verification

HMAC-SHA256 Signatures

Every webhook payload is signed with HMAC-SHA256 to verify authenticity:

const crypto = require('crypto');

function verifyWebhookSignature(payload, signature, secret) {
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');

const computedSignature = `sha256=${expectedSignature}`;

// Use constant-time comparison to prevent timing attacks
return crypto.timingSafeEqual(
Buffer.from(signature, 'utf8'),
Buffer.from(computedSignature, 'utf8')
);
}

// Express.js middleware
function validateWebhookSignature(req, res, next) {
const signature = req.headers['azotte-signature'];
const payload = req.body;

if (!verifyWebhookSignature(payload, signature, process.env.WEBHOOK_SECRET)) {
return res.status(401).json({ error: 'Invalid signature' });
}

next();
}

Multiple Signature Versions

Azotte supports signature versioning for backward compatibility:

import hmac
import hashlib
import time

class WebhookSignatureValidator:
def __init__(self):
self.tolerance = 300 # 5 minutes

def verify_signature(self, payload: str, signature_header: str, secret: str) -> bool:
"""
Verify webhook signature with timestamp validation
Format: t=timestamp,v1=signature
"""
elements = signature_header.split(',')

timestamp = None
signatures = {}

for element in elements:
key, value = element.split('=', 1)
if key == 't':
timestamp = int(value)
elif key.startswith('v'):
signatures[key] = value

if timestamp is None:
raise ValueError('No timestamp found in signature header')

# Check timestamp tolerance
current_time = int(time.time())
if abs(current_time - timestamp) > self.tolerance:
raise ValueError('Timestamp outside tolerance')

# Verify signature
signed_payload = f"{timestamp}.{payload}"

for version, signature in signatures.items():
if version == 'v1':
expected_signature = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()

if hmac.compare_digest(signature, expected_signature):
return True

return False

Endpoint Security

HTTPS Requirements

All webhook endpoints must use HTTPS with valid TLS certificates:

# Nginx configuration for webhook endpoints
server {
listen 443 ssl http2;
server_name webhooks.myapp.com;

# TLS configuration
ssl_certificate /path/to/certificate.pem;
ssl_certificate_key /path/to/private-key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;

# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload";
add_header X-Content-Type-Options "nosniff";
add_header X-Frame-Options "DENY";
add_header X-XSS-Protection "1; mode=block";

# Rate limiting
limit_req_zone $binary_remote_addr zone=webhook:10m rate=10r/s;

location /webhooks/azotte {
limit_req zone=webhook burst=20 nodelay;

# Only allow Azotte IP ranges
allow 54.187.174.169/32;
allow 54.187.205.235/32;
deny all;

proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

IP Allowlisting

Configure firewall rules to only accept webhooks from Azotte's IP ranges:

# iptables rules
iptables -A INPUT -p tcp --dport 443 -s 54.187.174.169/32 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -s 54.187.205.235/32 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -j DROP

# AWS Security Group
aws ec2 authorize-security-group-ingress \
--group-id sg-12345678 \
--protocol tcp \
--port 443 \
--cidr 54.187.174.169/32

# Application-level IP checking
const ALLOWED_IPS = [
'54.187.174.169',
'54.187.205.235',
// Add more Azotte IP ranges as provided
];

function validateSourceIP(req, res, next) {
const clientIP = req.ip || req.connection.remoteAddress;
const forwardedIPs = req.headers['x-forwarded-for']?.split(',') || [];

const sourceIP = forwardedIPs[0]?.trim() || clientIP;

if (!ALLOWED_IPS.includes(sourceIP)) {
return res.status(403).json({ error: 'Forbidden: Invalid source IP' });
}

next();
}

Authentication Methods

API Key Authentication

Secure webhook endpoints with API key validation:

package webhook

import (
"crypto/subtle"
"net/http"
)

type WebhookAuthenticator struct {
apiKeys map[string]bool
}

func NewWebhookAuthenticator() *WebhookAuthenticator {
return &WebhookAuthenticator{
apiKeys: map[string]bool{
"whk_live_abc123": true,
"whk_test_xyz789": true,
},
}
}

func (wa *WebhookAuthenticator) ValidateAPIKey(r *http.Request) bool {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
return false
}

// Extract API key from "Bearer {key}" format
if len(authHeader) < 8 || authHeader[:7] != "Bearer " {
return false
}

apiKey := authHeader[7:]

// Use constant-time comparison
for validKey := range wa.apiKeys {
if subtle.ConstantTimeCompare([]byte(apiKey), []byte(validKey)) == 1 {
return true
}
}

return false
}

Mutual TLS (mTLS)

Implement client certificate authentication for enhanced security:

# Certificate generation
openssl genrsa -out client.key 2048
openssl req -new -key client.key -out client.csr
openssl x509 -req -in client.csr -signkey client.key -out client.crt

# Nginx mTLS configuration
server {
listen 443 ssl;

ssl_certificate /path/to/server.crt;
ssl_certificate_key /path/to/server.key;

# Client certificate verification
ssl_client_certificate /path/to/ca.crt;
ssl_verify_client on;
proto_set_header SSL-Client-Cert $ssl_client_cert;
proxy_set_header SSL-Client-Verify $ssl_client_verify;

location /webhooks/azotte {
# Only proceed if client cert is valid
if ($ssl_client_verify != SUCCESS) {
return 403;
}

proxy_pass http://backend;
}
}

Request Validation

Content-Type Validation

Ensure webhook requests have correct content types:

interface WebhookValidationConfig {
allowedContentTypes: string[];
maxPayloadSize: number; // bytes
requiredHeaders: string[];
allowedMethods: string[];
}

class WebhookValidator {
private config: WebhookValidationConfig;

constructor(config: WebhookValidationConfig) {
this.config = config;
}

validateRequest(req: Request): ValidationResult {
const errors: string[] = [];

// Validate HTTP method
if (!this.config.allowedMethods.includes(req.method)) {
errors.push(`Invalid HTTP method: ${req.method}`);
}

// Validate Content-Type
const contentType = req.headers['content-type'];
if (!this.config.allowedContentTypes.includes(contentType)) {
errors.push(`Invalid Content-Type: ${contentType}`);
}

// Validate payload size
const contentLength = parseInt(req.headers['content-length'] || '0');
if (contentLength > this.config.maxPayloadSize) {
errors.push(`Payload too large: ${contentLength} bytes`);
}

// Validate required headers
for (const header of this.config.requiredHeaders) {
if (!req.headers[header]) {
errors.push(`Missing required header: ${header}`);
}
}

return {
isValid: errors.length === 0,
errors
};
}
}

// Usage
const validator = new WebhookValidator({
allowedContentTypes: ['application/json'],
maxPayloadSize: 1024 * 1024, // 1MB
requiredHeaders: ['azotte-signature', 'user-agent'],
allowedMethods: ['POST']
});

JSON Schema Validation

Validate webhook payload structure:

{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["id", "object", "type", "created", "data"],
"properties": {
"id": {
"type": "string",
"pattern": "^evt_[a-zA-Z0-9]+$"
},
"object": {
"type": "string",
"enum": ["event"]
},
"type": {
"type": "string",
"enum": [
"customer.created", "customer.updated", "customer.deleted",
"subscription.created", "subscription.updated", "subscription.canceled",
"payment.succeeded", "payment.failed", "payment.refunded",
"invoice.created", "invoice.payment_succeeded", "invoice.payment_failed"
]
},
"created": {
"type": "integer",
"minimum": 0
},
"data": {
"type": "object",
"required": ["object"],
"properties": {
"object": {
"type": "object"
},
"previous_attributes": {
"type": "object"
}
}
}
}
}
const Ajv = require('ajv');
const webhookSchema = require('./webhook-schema.json');

const ajv = new Ajv();
const validate = ajv.compile(webhookSchema);

function validateWebhookPayload(payload) {
const valid = validate(payload);

if (!valid) {
return {
isValid: false,
errors: validate.errors
};
}

return { isValid: true };
}

Rate Limiting & DDoS Protection

Adaptive Rate Limiting

Implement smart rate limiting based on endpoint behavior:

-- Redis Lua script for adaptive rate limiting
local key = KEYS[1]
local window_size = tonumber(ARGV[1])
local max_requests = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])

-- Clean old entries
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window_size)

-- Count current requests
local current_count = redis.call('ZCARD', key)

if current_count < max_requests then
-- Add current request
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, window_size)
return {1, max_requests - current_count - 1}
else
return {0, 0}
end
import redis
import time

class AdaptiveRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.script = self.redis.register_script("""
-- Lua script content here
""")

def is_allowed(self, identifier: str, max_requests: int = 100, window_seconds: int = 60) -> tuple[bool, int]:
"""
Check if request is allowed under rate limit
Returns (allowed, remaining_requests)
"""
current_time = int(time.time())
key = f"rate_limit:{identifier}"

result = self.script(
keys=[key],
args=[window_seconds, max_requests, current_time]
)

allowed = bool(result[0])
remaining = int(result[1])

return allowed, remaining

def get_adaptive_limit(self, identifier: str) -> int:
"""Adjust rate limit based on historical behavior"""
history_key = f"rate_history:{identifier}"

# Get request history for last hour
hour_ago = int(time.time()) - 3600
request_count = self.redis.zcount(history_key, hour_ago, '+inf')

# Adaptive limits based on usage patterns
if request_count > 1000:
return 50 # Heavy user - stricter limits
elif request_count > 100:
return 100 # Normal user
else:
return 200 # Light user - more generous

Security Monitoring

Webhook Security Events

Monitor webhook security events in real-time:

interface SecurityEvent {
id: string;
timestamp: Date;
type: 'invalid_signature' | 'rate_limit_exceeded' | 'ip_blocked' | 'payload_too_large';
source_ip: string;
user_agent?: string;
endpoint: string;
details: Record<string, any>;
}

class WebhookSecurityMonitor {
private eventStore: SecurityEventStore;
private alerting: AlertingService;

constructor(eventStore: SecurityEventStore, alerting: AlertingService) {
this.eventStore = eventStore;
this.alerting = alerting;
}

async recordSecurityEvent(event: SecurityEvent): Promise<void> {
await this.eventStore.store(event);

// Check for attack patterns
await this.analyzeSecurityPatterns(event);
}

async analyzeSecurityPatterns(event: SecurityEvent): Promise<void> {
const recentEvents = await this.eventStore.getRecentEvents(
event.source_ip,
300 // last 5 minutes
);

// Check for brute force attacks
if (recentEvents.length > 20) {
await this.alerting.sendAlert({
severity: 'high',
message: `Possible brute force attack from ${event.source_ip}`,
details: { event_count: recentEvents.length, time_window: '5min' }
});

// Auto-block IP
await this.blockIP(event.source_ip, 3600); // 1 hour
}

// Check for signature bypass attempts
const invalidSignatureEvents = recentEvents.filter(e =>
e.type === 'invalid_signature'
);

if (invalidSignatureEvents.length > 5) {
await this.alerting.sendAlert({
severity: 'medium',
message: `Multiple invalid signature attempts from ${event.source_ip}`,
details: { attempts: invalidSignatureEvents.length }
});
}
}
}

Anomaly Detection

Implement machine learning-based anomaly detection:

import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class WebhookAnomalyDetector:
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.is_trained = False

def extract_features(self, request_data: dict) -> np.array:
"""Extract numerical features from webhook request"""
features = [
len(request_data.get('payload', '')), # Payload size
request_data.get('timestamp', 0) % 86400, # Time of day
len(request_data.get('headers', {})), # Number of headers
1 if request_data.get('has_valid_signature', False) else 0,
request_data.get('response_time_ms', 0),
len(request_data.get('user_agent', '')),
]
return np.array(features).reshape(1, -1)

def train(self, historical_requests: list):
"""Train the anomaly detection model"""
features_list = []

for request in historical_requests:
features = self.extract_features(request)
features_list.append(features[0])

X = np.array(features_list)
X_scaled = self.scaler.fit_transform(X)

self.model.fit(X_scaled)
self.is_trained = True

def detect_anomaly(self, request_data: dict) -> tuple[bool, float]:
"""
Detect if request is anomalous
Returns (is_anomaly, anomaly_score)
"""
if not self.is_trained:
return False, 0.0

features = self.extract_features(request_data)
features_scaled = self.scaler.transform(features)

prediction = self.model.predict(features_scaled)[0]
anomaly_score = self.model.score_samples(features_scaled)[0]

is_anomaly = prediction == -1

return is_anomaly, abs(anomaly_score)

Incident Response

Automated Response Actions

Configure automated responses to security threats:

incident_response_rules:
brute_force_attack:
trigger:
event_type: "rate_limit_exceeded"
threshold: 50
time_window: 300 # 5 minutes
actions:
- block_ip_temporarily: 3600 # 1 hour
- notify_security_team: true
- escalate_after: 5 # incidents

signature_bypass_attempts:
trigger:
event_type: "invalid_signature"
threshold: 10
time_window: 600 # 10 minutes
actions:
- require_additional_auth: true
- audit_endpoint_config: true
- notify_devops_team: true

payload_anomalies:
trigger:
anomaly_score: 0.8
actions:
- quarantine_request: true
- deep_payload_analysis: true
- notify_security_analyst: true

Testing Webhook Security

Security Test Suite

import pytest
import requests
import hmac
import hashlib
import time

class TestWebhookSecurity:
def setup_method(self):
self.webhook_url = "https://myapp.com/webhooks/azotte"
self.valid_secret = "whsec_test_secret"
self.valid_payload = '{"id":"evt_test","type":"customer.created"}'

def generate_signature(self, payload: str, secret: str, timestamp: int = None) -> str:
if timestamp is None:
timestamp = int(time.time())

signed_payload = f"{timestamp}.{payload}"
signature = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()

return f"t={timestamp},v1={signature}"

def test_valid_signature_accepted(self):
signature = self.generate_signature(self.valid_payload, self.valid_secret)

response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)

assert response.status_code == 200

def test_invalid_signature_rejected(self):
invalid_signature = "t=123456789,v1=invalid_signature"

response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': invalid_signature
}
)

assert response.status_code == 401

def test_expired_timestamp_rejected(self):
old_timestamp = int(time.time()) - 7200 # 2 hours ago
signature = self.generate_signature(
self.valid_payload, self.valid_secret, old_timestamp
)

response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)

assert response.status_code == 401

def test_rate_limiting(self):
signature = self.generate_signature(self.valid_payload, self.valid_secret)

# Send requests rapidly to trigger rate limit
responses = []
for _ in range(150): # Exceed rate limit
response = requests.post(
self.webhook_url,
data=self.valid_payload,
headers={
'Content-Type': 'application/json',
'Azotte-Signature': signature
}
)
responses.append(response.status_code)

# Should have some 429 (Too Many Requests) responses
assert 429 in responses

Best Practices

Implementation Guidelines

  1. Always verify signatures - Never trust unsigned webhooks
  2. Use HTTPS only - Encrypt all webhook traffic
  3. Implement rate limiting - Protect against DoS attacks
  4. Validate payloads - Check structure and content
  5. Log security events - Monitor for suspicious activity
  6. Use IP allowlisting - Restrict webhook sources
  7. Implement timeouts - Prevent webhook replay attacks

Security Checklist

  • Webhook signatures verified using HMAC-SHA256
  • HTTPS with valid SSL certificates enforced
  • IP allowlisting configured for Azotte ranges
  • Rate limiting implemented with reasonable limits
  • Request payload size limits enforced
  • Security headers configured (HSTS, CSP, etc.)
  • Error handling doesn't leak sensitive information
  • Logging and monitoring in place
  • Incident response procedures documented
  • Regular security testing performed

Next Steps