SIGNING_SECRET
値が設定されている場合、各リクエストがsesinetdインスタンスから送信され、攻撃者からでないことが検証されます。
スクリプトを実行するには、mysqlユーザ名とmysqlパスワードを指定する必要があります。スクリプトがデータベースに使用するデフォルト名は、sidefx_webhook
です。
python3 webhook_server.py <mysql username> <mysql password>
Note
署名シークレットを変更する度に、Webhookサーバーを更新して、sesinetdからのリクエストを正しく検証できるようにする必要があります。
import sys import json import hmac import hashlib import time import os import asyncio import threading import queue import mysql.connector from mysql.connector import errorcode sys.path.append(os.path.expandvars("$HFS/houdini/python3.9libs/")) import hwebserver Response = hwebserver.Response SIGNING_SECRET = None DB_NAME = "sidefx_webhook" USER = None PASSWORD = None TABLES = {} TABLES['checkin'] = ( "CREATE TABLE `checkin` (" " `license_id` char(8) NOT NULL," " `count` int(64) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `total_tokens` int(64) NOT NULL DEFAULT 0," " `timestamp` int(11) NOT NULL" ")" ) TABLES['checkout'] = ( "CREATE TABLE `checkout` (" " `license_id` char(8) NOT NULL," " `count` int(64) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `total_tokens` int(64) NOT NULL DEFAULT 0," " `timestamp` int(11) NOT NULL" ")" ) TABLES['heartbeat'] = ( "CREATE TABLE `heartbeat` (" " `license_id` char(8) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `success_count` int(64) NOT NULL," " `fail_count` int(64) NOT NULL," " `timestamp` int(11) NOT NULL" ")" ) def _compute_sidefx_signature(timestamp, body): base_string = u"v0:{}:{}".format(timestamp, body.decode()) return b"v0=" + hmac.new(SIGNING_SECRET.encode(), msg=base_string.encode(), digestmod=hashlib.sha256).hexdigest().encode() def _sidefx_signature_errors(request): """Validate that the response was signed by sesinetd. See https://sidefx.com/docs/houdini/licensing/webhooks/ """ expected_signature = request.headers().get("X-SideFX-Signature") if expected_signature is None: return "Missing header X-SideFX-Signature" expected_signature = expected_signature.encode() timestamp = request.headers().get("X-SideFX-Request-Timestamp") if timestamp is None: return "Missing header X-SideFX-Request-Timestamp" try: float(timestamp) except ValueError: return "X-SideFX-Request-Timestamp is not a number" # Watch out for replay attacks. Ignore anything that's more than five # minutes old. if abs(time.time() - float(timestamp)) > 300: return "The timestamp is too old" signature = _compute_sidefx_signature(timestamp, request.body()) if hasattr(hmac, "compare_digest"): signature_matches = hmac.compare_digest(signature, expected_signature) else: signature_matches = signature == expected_signature if not signature_matches: return "Incorrect signature header" return None def _create_database(cursor): """Create the mysql db used to store all events the webserver receives.""" try: cursor.execute(f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'") except mysql.connector.Error as err: print(f"Failed creating database: {err}") sys.exit(1) def _create_tables(cursor): """Create all tables for every event that the webserver may receive.""" try: cursor.execute(f"USE {DB_NAME}") except mysql.connector.Error as err: print(f"Database {DB_NAME} does not exist.") if err.errno == errorcode.ER_BAD_DB_ERROR: _create_database(cursor) print(f"Database {DB_NAME} created successfully.") cnx.database = DB_NAME else: print(err) sys.exit(1) for table_name in TABLES: table_description = TABLES[table_name] try: print(f"Creating table {table_name}: ", end='') cursor.execute(table_description) except mysql.connector.Error as err: if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: print("already exists") else: print(err.msg) else: print("OK") def handle_checkin(cursor, info): """Handle all checkin events by saving them into the `checkin` sql table.""" sql = "INSERT INTO checkin (license_id, seat_id, user_machine, product, \ version, available, total_tokens, timestamp) VALUES (%(license_id)s, \ %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \ %(available)s, %(total_tokens)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["count"] = ev.get("count", 0) values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["total_tokens"] = ev.get("total_tokens", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True def handle_checkout(cursor, info): """Handle all checkout events by saving them into the `checkout` sql table.""" sql = "INSERT INTO checkout (license_id, seat_id, user_machine, product, \ version, available, total_tokens, timestamp) VALUES (%(license_id)s, \ %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \ %(available)s, %(total_tokens)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["count"] = ev.get("count", 0) values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["total_tokens"] = ev.get("total_tokens", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True def handle_heartbeat(cursor, info): """Handle all heartbeat events by saving them into the `heartbeat` sql table.""" sql = "INSERT INTO heartbeat (license_id, seat_id, user_machine, product, \ success, timestamp) VALUES (%(license_id)s, %(seat_id)s, \ %(user_machine)s, %(product)s, %(success)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["success_count"] = ev.get("success_count", 0) values["fail_count"] = ev.get("fail_count", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True class StopToken(): """Stop token to indicate to the queue we are done handling events.""" def __init__(self): pass def handle_rate_limit(ev): """Handle events that inform the server they have been rate limited.""" minute_rate_limited = ev.get("minute_rate_limited", 0) print(f"Server has been rate limited: {minute_rate_limited}") async def handle_webhook_events(): """Handle the queue of webhook events that the webserver received.""" handlers = dict() handlers["E100"] = handle_checkin handlers["E101"] = handle_checkout handlers["E102"] = handle_heartbeat cnx = mysql.connector.connect(user=USER, password=PASSWORD) cursor = cnx.cursor() #_create_database(cursor) _create_tables(cursor) try: while True: try: ev = EVENT_QUEUE.get() if isinstance(ev, StopToken): break ev_type = ev.get("type", "") if ev_type == "event_callback": event_id = ev["event_id"] fn = handlers.get(event_id) if fn is not None: if fn(cursor, ev): cnx.commit() elif ev_type == "rate_limited": handle_rate_limit(ev) except Exception as err: print(err) finally: EVENT_QUEUE.task_done() finally: if cursor is not None: cursor.close() def asyncio_handle_webhooks(): """Kick off handling the webhook events.""" loop = asyncio.DefaultEventLoopPolicy().new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(handle_webhook_events()) @hwebserver.urlHandler("/sidefx.webhook") def webhook(request): """Handle all incoming webhook events. To ensure we handle these events quickly we place all events onto a queue that a background thread handles. If the server takes to long sesinetd will think the server is in a bad state and will stop sending events.""" global EVENT_QUEUE if request.contentType() != "application/json": return Response(400, "Expected JSON body") if SIGNING_SECRET is not None: errors = _sidefx_signature_errors(request) if errors is not None: print(f"invalid signature: {errors}") return Response(errors, 400) data = json.loads(request.body()) if data["type"] == "url_verification": resp_json = dict() resp_json["challenge"] = data["challenge"] return Response(json.dumps(resp_json), 200, "application/json") # Handle the event in the background so that we can respond quickly. If we # take to long then sesinetd will think this server is in a bad state. EVENT_QUEUE.put(data) return Response(b"", 200) def main(): global SIGNING_SECRET, EVENT_QUEUE, USER, PASSWORD SIGNING_SECRET = "8fjP3P1EYJiUwU9ONjRGw00UxgbHNm" USER = sys.argv[1] PASSWORD = sys.argv[2] EVENT_QUEUE = queue.Queue() background_thread = threading.Thread(target=asyncio_handle_webhooks) background_thread.start() try: hwebserver.run(8080, debug=True, reload_source_changes=False) finally: # Add the stop token to the queue to inform the background worker # thread to stop handling events. EVENT_QUEUE.put(StopToken()) if background_thread is not None: background_thread.join() if __name__ == "__main__": main()