概要
Pigeon Cloudのwebhookを使用することで、システム上で発生したイベントをリアルタイムで受け取ることができます。
基本的な受信方法
最もシンプルな実装では、以下のようなコードでwebhookを受信できます:
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_json()
# webhookデータの処理
notification_id = data.get('notification_log_id')
action = data.get('action')
table = data.get('table')
records = data.get('records', [])
# レコードデータの処理
if records:
record = records[0] # 最初のレコード
raw_data = record.get('raw_data', {}) # 生データ
view_data = record.get('view_data', {}) # 表示用データ
print(f"Received webhook - Action: {action}, Table: {table}")
print(f"Raw data: {raw_data}")
print(f"View data: {view_data}")
return 'OK', 200
if __name__ == '__main__':
app.run(port=5000)
セキュアな実装(推奨)
セキュリティを強化するため、以下の検証を実装することを強く推奨します:
- 管理画面から公開鍵をダウンロード(「公開鍵ダウンロード」ボタンをクリック)
- 以下のコードを使用して署名検証を実装:
from flask import Flask, request
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_pem_public_key
import time
import base64
app = Flask(__name__)
# 公開鍵の読み込み
with open('webhook_public_key.pem', 'rb') as f:
PUBLIC_KEY = load_pem_public_key(f.read())
def verify_signature(timestamp, request_id, body, signature):
try:
# タイムスタンプの検証(5分以内)
current_time = int(time.time())
if abs(current_time - int(timestamp)) > 300: # 5分
return False
# 署名の検証
data = f"{timestamp}.{request_id}.{body}".encode()
signature_bytes = base64.b64decode(signature)
PUBLIC_KEY.verify(
signature_bytes,
data,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception as e:
print(f"Signature verification failed: {e}")
return False
@app.route('/webhook', methods=['POST'])
def webhook():
# ヘッダーから検証情報を取得
signature = request.headers.get('X-Webhook-Signature')
timestamp = request.headers.get('X-Webhook-Timestamp')
request_id = request.headers.get('X-Webhook-Request-Id')
if not all([signature, timestamp, request_id]):
return 'Missing required headers', 400
# リクエストボディを取得
body = request.get_data(as_text=True)
# 署名を検証
if not verify_signature(timestamp, request_id, body, signature):
return 'Invalid signature', 401
# JSONデータを処理
data = request.get_json()
records = data.get('records', [])
if records:
record = records[0]
print(f"Verified webhook - ID: {request_id}")
print(f"Raw data: {record.get('raw_data')}")
print(f"View data: {record.get('view_data')}")
return 'OK', 200
if __name__ == '__main__':
app.run(port=5000, ssl_context='adhoc') # HTTPSを有効化
セキュリティに関する注意事項
- HTTPS必須: 本番環境では必ずHTTPSを使用してください。上記コードでは簡易的なSSL設定を使用していますが、本番環境では適切な証明書を使用してください。
- 署名検証: なりすましやデータ改ざんを防ぐため、署名検証を実装することを強く推奨します。
- タイムスタンプ検証: リプレイアタック(同じwebhookの再送)を防ぐため、タイムスタンプの検証を行うことを推奨します。
- べき等性の確保: request_idを保存し、同じIDのwebhookを2回処理しないようにすることを推奨します。
受信するデータ形式
webhookは以下のような形式のJSONデータを送信します:
{
"notification_log_id": 123,
"notification_text": "新規レコードが作成されました",
"action": "create",
"table": "dataset__2",
"table_name": "顧客管理",
"data_id": 456,
"records": [
{
"raw_data": {
"id": "456",
...
},
"view_data": {
"id": "456",
...
}
}
]
}
注意点:
raw_data
: データベースに保存されている生の値view_data
: 画面表示用にフォーマットされた値- フィールド名は管理画面で設定された項目名(ラベル)が使用されます
エラー処理
実装時は以下のエラーケースを考慮してください:
- 署名検証失敗
- タイムスタンプ期限切れ
- 必要なヘッダーの欠落
- JSONパース失敗
- ネットワークエラー
各エラーケースに対して適切なログ記録とエラーハンドリングを実装することを推奨します。
補足
- 署名検証は任意ですが、セキュリティのために強く推奨されます
- HTTPS接続が必須となります
- webhook URLはシステム管理者が設定できます