Create & Validate Secure Webhook Token
#
import jwt
import hashlib
import json
import time
import uuid
from datetime import datetime, timedelta
def create_swt(webhook_data, payload=None, secret=None):
now = int(time.time())
claims = {
'webhook': {
'event': webhook_data['event'],
'data': webhook_data.get('data', {})
},
'iss': 'webhook-service.example.com',
'exp': now + 300, # Valid for 5 minutes
'nbf': now,
'iat': now,
'jti': str(uuid.uuid4())
}
# Add payload hash if present
if payload:
payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
hash_obj = hashlib.sha256(payload_str.encode('utf-8'))
claims['webhook']['hash'] = f"sha256:{hash_obj.hexdigest()}"
return jwt.encode(claims, secret, algorithm='HS256')
def validate_swt(token, payload=None, secret=None):
try:
decoded = jwt.decode(token, secret, algorithms=['HS256'])
# Check required claims
required_claims = ['webhook', 'iss', 'exp', 'nbf', 'iat', 'jti']
for claim in required_claims:
if claim not in decoded:
raise ValueError(f'Missing required claim: {claim}')
# Validate payload hash if present
if payload and 'hash' in decoded['webhook']:
algorithm, expected_hash = decoded['webhook']['hash'].split(':')
payload_str = json.dumps(payload, separators=(',', ':'), sort_keys=True)
if algorithm == 'sha256':
actual_hash = hashlib.sha256(payload_str.encode('utf-8')).hexdigest()
else:
raise ValueError(f'Unsupported hash algorithm: {algorithm}')
if actual_hash != expected_hash:
raise ValueError('Payload hash mismatch')
return decoded
except jwt.InvalidTokenError as e:
raise ValueError(f'SWT validation failed: {str(e)}')