If the SIGNING_SECRET
value is set then each request will be verified that it
came from your sesinetd instance and not from an attacker.
To run the script the mysql username and mysql password must be provided. The
default name the script uses for the database is sidefx_webhook
.
python3 webhook_server.py <mysql username> <mysql password>
Note
Anytime the signing secret changes the webhook server must be updated so that it can correctly verify the requests from sesinetd.
import sys import json import hmac import hashlib import time import os import asyncio import threading import queue import mysql.connector from mysql.connector import errorcode sys.path.append(os.path.expandvars("$HFS/houdini/python3.9libs/")) import hwebserver Response = hwebserver.Response SIGNING_SECRET = None DB_NAME = "sidefx_webhook" USER = None PASSWORD = None TABLES = {} TABLES['checkin'] = ( "CREATE TABLE `checkin` (" " `license_id` char(8) NOT NULL," " `count` int(64) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `total_tokens` int(64) NOT NULL DEFAULT 0," " `timestamp` int(11) NOT NULL" ")" ) TABLES['checkout'] = ( "CREATE TABLE `checkout` (" " `license_id` char(8) NOT NULL," " `count` int(64) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `total_tokens` int(64) NOT NULL DEFAULT 0," " `timestamp` int(11) NOT NULL" ")" ) TABLES['heartbeat'] = ( "CREATE TABLE `heartbeat` (" " `license_id` char(8) NOT NULL," " `product` varchar(100) NOT NULL," " `version` varchar(100) NOT NULL," " `success_count` int(64) NOT NULL," " `fail_count` int(64) NOT NULL," " `timestamp` int(11) NOT NULL" ")" ) def _compute_sidefx_signature(timestamp, body): base_string = u"v0:{}:{}".format(timestamp, body.decode()) return b"v0=" + hmac.new(SIGNING_SECRET.encode(), msg=base_string.encode(), digestmod=hashlib.sha256).hexdigest().encode() def _sidefx_signature_errors(request): """Validate that the response was signed by sesinetd. See https://sidefx.com/docs/houdini/licensing/webhooks/ """ expected_signature = request.headers().get("X-SideFX-Signature") if expected_signature is None: return "Missing header X-SideFX-Signature" expected_signature = expected_signature.encode() timestamp = request.headers().get("X-SideFX-Request-Timestamp") if timestamp is None: return "Missing header X-SideFX-Request-Timestamp" try: float(timestamp) except ValueError: return "X-SideFX-Request-Timestamp is not a number" # Watch out for replay attacks. Ignore anything that's more than five # minutes old. if abs(time.time() - float(timestamp)) > 300: return "The timestamp is too old" signature = _compute_sidefx_signature(timestamp, request.body()) if hasattr(hmac, "compare_digest"): signature_matches = hmac.compare_digest(signature, expected_signature) else: signature_matches = signature == expected_signature if not signature_matches: return "Incorrect signature header" return None def _create_database(cursor): """Create the mysql db used to store all events the webserver receives.""" try: cursor.execute(f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'") except mysql.connector.Error as err: print(f"Failed creating database: {err}") sys.exit(1) def _create_tables(cursor): """Create all tables for every event that the webserver may receive.""" try: cursor.execute(f"USE {DB_NAME}") except mysql.connector.Error as err: print(f"Database {DB_NAME} does not exist.") if err.errno == errorcode.ER_BAD_DB_ERROR: _create_database(cursor) print(f"Database {DB_NAME} created successfully.") cnx.database = DB_NAME else: print(err) sys.exit(1) for table_name in TABLES: table_description = TABLES[table_name] try: print(f"Creating table {table_name}: ", end='') cursor.execute(table_description) except mysql.connector.Error as err: if err.errno == errorcode.ER_TABLE_EXISTS_ERROR: print("already exists") else: print(err.msg) else: print("OK") def handle_checkin(cursor, info): """Handle all checkin events by saving them into the `checkin` sql table.""" sql = "INSERT INTO checkin (license_id, seat_id, user_machine, product, \ version, available, total_tokens, timestamp) VALUES (%(license_id)s, \ %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \ %(available)s, %(total_tokens)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["count"] = ev.get("count", 0) values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["total_tokens"] = ev.get("total_tokens", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True def handle_checkout(cursor, info): """Handle all checkout events by saving them into the `checkout` sql table.""" sql = "INSERT INTO checkout (license_id, seat_id, user_machine, product, \ version, available, total_tokens, timestamp) VALUES (%(license_id)s, \ %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \ %(available)s, %(total_tokens)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["count"] = ev.get("count", 0) values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["total_tokens"] = ev.get("total_tokens", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True def handle_heartbeat(cursor, info): """Handle all heartbeat events by saving them into the `heartbeat` sql table.""" sql = "INSERT INTO heartbeat (license_id, seat_id, user_machine, product, \ success, timestamp) VALUES (%(license_id)s, %(seat_id)s, \ %(user_machine)s, %(product)s, %(success)s, %(timestamp)s)" timestamp = info["event_time"] ev = info["event"] values = {} values["license_id"] = ev.get("license_id", "") values["product"] = ev.get("product", "") values["version"] = ev.get("version", "") values["success_count"] = ev.get("success_count", 0) values["fail_count"] = ev.get("fail_count", 0) values["timestamp"] = timestamp cursor.execute(sql, values) return True class StopToken(): """Stop token to indicate to the queue we are done handling events.""" def __init__(self): pass def handle_rate_limit(ev): """Handle events that inform the server they have been rate limited.""" minute_rate_limited = ev.get("minute_rate_limited", 0) print(f"Server has been rate limited: {minute_rate_limited}") async def handle_webhook_events(): """Handle the queue of webhook events that the webserver received.""" handlers = dict() handlers["E100"] = handle_checkin handlers["E101"] = handle_checkout handlers["E102"] = handle_heartbeat cnx = mysql.connector.connect(user=USER, password=PASSWORD) cursor = cnx.cursor() #_create_database(cursor) _create_tables(cursor) try: while True: try: ev = EVENT_QUEUE.get() if isinstance(ev, StopToken): break ev_type = ev.get("type", "") if ev_type == "event_callback": event_id = ev["event_id"] fn = handlers.get(event_id) if fn is not None: if fn(cursor, ev): cnx.commit() elif ev_type == "rate_limited": handle_rate_limit(ev) except Exception as err: print(err) finally: EVENT_QUEUE.task_done() finally: if cursor is not None: cursor.close() def asyncio_handle_webhooks(): """Kick off handling the webhook events.""" loop = asyncio.DefaultEventLoopPolicy().new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(handle_webhook_events()) @hwebserver.urlHandler("/sidefx.webhook") def webhook(request): """Handle all incoming webhook events. To ensure we handle these events quickly we place all events onto a queue that a background thread handles. If the server takes to long sesinetd will think the server is in a bad state and will stop sending events.""" global EVENT_QUEUE if request.contentType() != "application/json": return Response(400, "Expected JSON body") if SIGNING_SECRET is not None: errors = _sidefx_signature_errors(request) if errors is not None: print(f"invalid signature: {errors}") return Response(errors, 400) data = json.loads(request.body()) if data["type"] == "url_verification": resp_json = dict() resp_json["challenge"] = data["challenge"] return Response(json.dumps(resp_json), 200, "application/json") # Handle the event in the background so that we can respond quickly. If we # take to long then sesinetd will think this server is in a bad state. EVENT_QUEUE.put(data) return Response(b"", 200) def main(): global SIGNING_SECRET, EVENT_QUEUE, USER, PASSWORD SIGNING_SECRET = "8fjP3P1EYJiUwU9ONjRGw00UxgbHNm" USER = sys.argv[1] PASSWORD = sys.argv[2] EVENT_QUEUE = queue.Queue() background_thread = threading.Thread(target=asyncio_handle_webhooks) background_thread.start() try: hwebserver.run(8080, debug=True, reload_source_changes=False) finally: # Add the stop token to the queue to inform the background worker # thread to stop handling events. EVENT_QUEUE.put(StopToken()) if background_thread is not None: background_thread.join() if __name__ == "__main__": main()