签名验证

为了确保回调通知的安全性和完整性,Scenext使用HMAC-SHA256算法对所有回调请求进行签名。

签名算法

生成过程

  1. 将回调数据转换为JSON字符串(按键名排序)
  2. 使用您的API密钥作为密钥
  3. 使用HMAC-SHA256算法生成签名
  4. 将签名转换为十六进制字符串

Python实现

import hmac
import hashlib
import json

def generate_signature(payload, api_key):
    """
    生成HMAC-SHA256签名
    :param payload: 要发送的数据
    :param api_key: 用于签名的API key
    :return: 十六进制签名
    """
    # 将payload转换为JSON字符串
    if isinstance(payload, dict):
        payload_str = json.dumps(payload, sort_keys=True)
    else:
        payload_str = str(payload)
    
    # 使用HMAC-SHA256生成签名
    signature = hmac.new(
        key=api_key.encode('utf-8'),
        msg=payload_str.encode('utf-8'),
        digestmod=hashlib.sha256
    ).hexdigest()
    
    return signature

def verify_signature(payload, signature, api_key):
    """
    验证HMAC-SHA256签名
    """
    expected_signature = generate_signature(payload, api_key)
    return hmac.compare_digest(expected_signature, signature)

JavaScript实现

const crypto = require('crypto');

function generateSignature(payload, apiKey) {
    // 将payload转换为JSON字符串(按键排序)
    const payloadStr = JSON.stringify(payload, Object.keys(payload).sort());
    
    // 生成签名
    const signature = crypto
        .createHmac('sha256', apiKey)
        .update(payloadStr, 'utf8')
        .digest('hex');
    
    return signature;
}

function verifySignature(payload, signature, apiKey) {
    const expectedSignature = generateSignature(payload, apiKey);
    
    return crypto.timingSafeEqual(
        Buffer.from(expectedSignature, 'hex'),
        Buffer.from(signature, 'hex')
    );
}

PHP实现

<?php
function generateSignature($payload, $apiKey) {
    // 将payload转换为JSON字符串(按键排序)
    if (is_array($payload)) {
        ksort($payload);
        $payloadStr = json_encode($payload);
    } else {
        $payloadStr = (string)$payload;
    }
    
    // 生成签名
    return hash_hmac('sha256', $payloadStr, $apiKey);
}

function verifySignature($payload, $signature, $apiKey) {
    $expectedSignature = generateSignature($payload, $apiKey);
    return hash_equals($expectedSignature, $signature);
}
?>

完整验证示例

Flask (Python) 服务器

from flask import Flask, request, jsonify
import hmac
import hashlib
import json

app = Flask(__name__)

API_KEY = "your_api_key_here"  # 您的API密钥

def verify_signature(payload, signature):
    """验证签名"""
    payload_str = json.dumps(payload, sort_keys=True)
    expected_signature = hmac.new(
        key=API_KEY.encode('utf-8'),
        msg=payload_str.encode('utf-8'),
        digestmod=hashlib.sha256
    ).hexdigest()
    
    return hmac.compare_digest(expected_signature, signature)

@app.route('/webhook', methods=['POST'])
def webhook():
    # 获取签名
    signature = request.headers.get('X-Signature')
    if not signature:
        return jsonify({'error': 'Missing signature'}), 400
    
    # 获取请求数据
    try:
        data = request.get_json()
    except Exception:
        return jsonify({'error': 'Invalid JSON'}), 400
    
    # 验证签名
    if not verify_signature(data, signature):
        return jsonify({'error': 'Invalid signature'}), 401
    
    # 处理回调数据
    task_id = data.get('task_id')
    status = data.get('status')
    
    print(f"Received webhook for task {task_id} with status {status}")
    
    # 根据状态处理逻辑
    if status == 'COMPLETED':
        # 任务完成处理
        result = data.get('result', {})
        video_url = result.get('video_url')
        print(f"Video completed: {video_url}")
        
    elif status == 'FAILED':
        # 任务失败处理
        result = data.get('result', {})
        error = result.get('error')
        print(f"Video generation failed: {error}")
    
    return jsonify({'message': 'OK'}), 200

if __name__ == '__main__':
    app.run(debug=True)

Express (Node.js) 服务器

const express = require('express');
const crypto = require('crypto');

const app = express();
const API_KEY = 'your_api_key_here';  // 您的API密钥

app.use(express.json());

function verifySignature(payload, signature) {
    const payloadStr = JSON.stringify(payload, Object.keys(payload).sort());
    const expectedSignature = crypto
        .createHmac('sha256', API_KEY)
        .update(payloadStr, 'utf8')
        .digest('hex');
    
    return crypto.timingSafeEqual(
        Buffer.from(expectedSignature, 'hex'),
        Buffer.from(signature, 'hex')
    );
}

app.post('/webhook', (req, res) => {
    const signature = req.headers['x-signature'];
    if (!signature) {
        return res.status(400).json({ error: 'Missing signature' });
    }
    
    const data = req.body;
    
    if (!verifySignature(data, signature)) {
        return res.status(401).json({ error: 'Invalid signature' });
    }
    
    // 处理回调数据
    const { task_id, status, result } = data;
    
    console.log(`Received webhook for task ${task_id} with status ${status}`);
    
    if (status === 'COMPLETED') {
        console.log(`Video completed: ${result.video_url}`);
    } else if (status === 'FAILED') {
        console.log(`Video generation failed: ${result.error}`);
    }
    
    res.status(200).json({ message: 'OK' });
});

app.listen(3000, () => {
    console.log('Webhook server listening on port 3000');
});

安全最佳实践

重要提示

常见问题

  1. 检查API密钥是否正确
  2. 确保JSON序列化时按键名排序
  3. 验证请求数据是否被修改
  4. 检查字符编码是否为UTF-8
强烈不建议跳过签名验证。这会使您的系统面临安全风险,包括但不限于:伪造的回调请求、数据篡改等。
时间戳验证不是必需的,但建议实现。您可以检查时间戳是否在合理范围内(比如5分钟内),以防止重放攻击。