Houdini 20.5 Installation And Licensing Webhooks

Using Hwebserver and MySQL

Example script to run hwebserver and saving events to MySQL.

If the SIGNING_SECRET value is set then each request will be verified that it came from your sesinetd instance and not from an attacker.

To run the script the mysql username and mysql password must be provided. The default name the script uses for the database is sidefx_webhook.

python3 webhook_server.py <mysql username> <mysql password>

Note

Anytime the signing secret changes the webhook server must be updated so that it can correctly verify the requests from sesinetd.

import sys
import json
import hmac
import hashlib
import time
import os
import asyncio
import threading
import queue

import mysql.connector
from mysql.connector import errorcode

sys.path.append(os.path.expandvars("$HFS/houdini/python3.9libs/"))

import hwebserver

Response = hwebserver.Response

SIGNING_SECRET = None

DB_NAME = "sidefx_webhook"
USER = None
PASSWORD = None

TABLES = {}
TABLES['checkin'] = (
    "CREATE TABLE `checkin` ("
    " `license_id` char(8) NOT NULL,"
    " `count` int(64) NOT NULL,"
    " `product` varchar(100) NOT NULL,"
    " `version` varchar(100) NOT NULL,"
    " `total_tokens` int(64) NOT NULL DEFAULT 0,"
    " `timestamp` int(11) NOT NULL"
    ")"
)
TABLES['checkout'] = (
    "CREATE TABLE `checkout` ("
    " `license_id` char(8) NOT NULL,"
    " `count` int(64) NOT NULL,"
    " `product` varchar(100) NOT NULL,"
    " `version` varchar(100) NOT NULL,"
    " `total_tokens` int(64) NOT NULL DEFAULT 0,"
    " `timestamp` int(11) NOT NULL"
    ")"
)
TABLES['heartbeat'] = (
    "CREATE TABLE `heartbeat` ("
    " `license_id` char(8) NOT NULL,"
    " `product` varchar(100) NOT NULL,"
    " `version` varchar(100) NOT NULL,"
    " `success_count` int(64) NOT NULL,"
    " `fail_count` int(64) NOT NULL,"
    " `timestamp` int(11) NOT NULL"
    ")"
)

def _compute_sidefx_signature(timestamp, body):
    base_string = u"v0:{}:{}".format(timestamp, body.decode())
    return b"v0=" + hmac.new(SIGNING_SECRET.encode(), msg=base_string.encode(),
                             digestmod=hashlib.sha256).hexdigest().encode()

def _sidefx_signature_errors(request):
    """Validate that the response was signed by sesinetd. See
    https://sidefx.com/docs/houdini/licensing/webhooks/
    """
    expected_signature = request.headers().get("X-SideFX-Signature")
    if expected_signature is None:
        return "Missing header X-SideFX-Signature"
    expected_signature = expected_signature.encode()

    timestamp = request.headers().get("X-SideFX-Request-Timestamp")
    if timestamp is None:
        return "Missing header X-SideFX-Request-Timestamp"

    try:
        float(timestamp)
    except ValueError:
        return "X-SideFX-Request-Timestamp is not a number"

    # Watch out for replay attacks. Ignore anything that's more than five
    # minutes old.
    if abs(time.time() - float(timestamp)) > 300:
        return "The timestamp is too old"

    signature = _compute_sidefx_signature(timestamp, request.body())
    if hasattr(hmac, "compare_digest"):
        signature_matches = hmac.compare_digest(signature, expected_signature)
    else:
        signature_matches = signature == expected_signature
    if not signature_matches:
        return "Incorrect signature header"
    return None

def _create_database(cursor):
    """Create the mysql db used to store all events the webserver receives."""
    try:
        cursor.execute(f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'")
    except mysql.connector.Error as err:
        print(f"Failed creating database: {err}")
        sys.exit(1)

def _create_tables(cursor):
    """Create all tables for every event that the webserver may receive."""
    try:
        cursor.execute(f"USE {DB_NAME}")
    except mysql.connector.Error as err:
        print(f"Database {DB_NAME} does not exist.")
        if err.errno == errorcode.ER_BAD_DB_ERROR:
            _create_database(cursor)
            print(f"Database {DB_NAME} created successfully.")
            cnx.database = DB_NAME
        else:
            print(err)
            sys.exit(1)

    for table_name in TABLES:
        table_description = TABLES[table_name]
        try:
            print(f"Creating table {table_name}: ", end='')
            cursor.execute(table_description)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
                print("already exists")
            else:
                print(err.msg)
        else:
            print("OK")

def handle_checkin(cursor, info):
    """Handle all checkin events by saving them into the `checkin` sql
    table."""
    sql = "INSERT INTO checkin (license_id, seat_id, user_machine, product, \
         version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
         %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
         %(available)s, %(total_tokens)s, %(timestamp)s)"

    timestamp = info["event_time"]
    ev = info["event"]

    values = {}
    values["license_id"] = ev.get("license_id", "")
    values["count"] = ev.get("count", 0)
    values["product"] = ev.get("product", "")
    values["version"] = ev.get("version", "")
    values["total_tokens"] = ev.get("total_tokens", 0)
    values["timestamp"] = timestamp

    cursor.execute(sql, values)
    return True

def handle_checkout(cursor, info):
    """Handle all checkout events by saving them into the `checkout` sql
    table."""
    sql = "INSERT INTO checkout (license_id, seat_id, user_machine, product, \
         version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
         %(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
         %(available)s, %(total_tokens)s, %(timestamp)s)"

    timestamp = info["event_time"]
    ev = info["event"]

    values = {}
    values["license_id"] = ev.get("license_id", "")
    values["count"] = ev.get("count", 0)
    values["product"] = ev.get("product", "")
    values["version"] = ev.get("version", "")
    values["total_tokens"] = ev.get("total_tokens", 0)
    values["timestamp"] = timestamp

    cursor.execute(sql, values)
    return True

def handle_heartbeat(cursor, info):
    """Handle all heartbeat events by saving them into the `heartbeat` sql
    table."""
    sql = "INSERT INTO heartbeat (license_id, seat_id, user_machine, product, \
         success, timestamp) VALUES (%(license_id)s, %(seat_id)s,  \
        %(user_machine)s, %(product)s, %(success)s, %(timestamp)s)"

    timestamp = info["event_time"]
    ev = info["event"]

    values = {}
    values["license_id"] = ev.get("license_id", "")
    values["product"] = ev.get("product", "")
    values["version"] = ev.get("version", "")
    values["success_count"] = ev.get("success_count", 0)
    values["fail_count"] = ev.get("fail_count", 0)
    values["timestamp"] = timestamp

    cursor.execute(sql, values)
    return True

class StopToken():
    """Stop token to indicate to the queue we are done handling events."""
    def __init__(self):
        pass

def handle_rate_limit(ev):
    """Handle events that inform the server they have been rate limited."""
    minute_rate_limited = ev.get("minute_rate_limited", 0)
    print(f"Server has been rate limited: {minute_rate_limited}")

async def handle_webhook_events():
    """Handle the queue of webhook events that the webserver received."""
    handlers = dict()
    handlers["E100"] = handle_checkin
    handlers["E101"] = handle_checkout
    handlers["E102"] = handle_heartbeat

    cnx = mysql.connector.connect(user=USER, password=PASSWORD)
    cursor = cnx.cursor()
    #_create_database(cursor)
    _create_tables(cursor)

    try:
        while True:
            try:
                ev = EVENT_QUEUE.get()
                if isinstance(ev, StopToken):
                    break
                ev_type = ev.get("type", "")
                if ev_type == "event_callback":
                    event_id = ev["event_id"]
                    fn = handlers.get(event_id)
                    if fn is not None:
                        if fn(cursor, ev):
                            cnx.commit()
                elif ev_type == "rate_limited":
                    handle_rate_limit(ev)
            except Exception as err:
                print(err)
            finally:
                EVENT_QUEUE.task_done()
    finally:
        if cursor is not None:
            cursor.close()

def asyncio_handle_webhooks():
    """Kick off handling the webhook events."""
    loop = asyncio.DefaultEventLoopPolicy().new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(handle_webhook_events())

@hwebserver.urlHandler("/sidefx.webhook")
def webhook(request):
    """Handle all incoming webhook events. To ensure we handle these events
    quickly we place all events onto a queue that a background thread handles.
    If the server takes to long sesinetd will think the server is in a bad
    state and will stop sending events."""
    global EVENT_QUEUE

    if request.contentType() != "application/json":
        return Response(400, "Expected JSON body")

    if SIGNING_SECRET is not None:
        errors = _sidefx_signature_errors(request)
        if errors is not None:
            print(f"invalid signature: {errors}")
            return Response(errors, 400)

    data = json.loads(request.body())
    if data["type"] == "url_verification":
        resp_json = dict()
        resp_json["challenge"] = data["challenge"]
        return Response(json.dumps(resp_json), 200, "application/json")

    # Handle the event in the background so that we can respond quickly. If we
    # take to long then sesinetd will think this server is in a bad state.
    EVENT_QUEUE.put(data)

    return Response(b"", 200)

def main():
    global SIGNING_SECRET, EVENT_QUEUE, USER, PASSWORD
    SIGNING_SECRET = "8fjP3P1EYJiUwU9ONjRGw00UxgbHNm" 

    USER = sys.argv[1]
    PASSWORD = sys.argv[2]

    EVENT_QUEUE = queue.Queue()

    background_thread = threading.Thread(target=asyncio_handle_webhooks)
    background_thread.start()

    try:
        hwebserver.run(8080, debug=True, reload_source_changes=False)
    finally:
        # Add the stop token to the queue to inform the background worker
        # thread to stop handling events.
        EVENT_QUEUE.put(StopToken())
        if background_thread is not None:
            background_thread.join()

if __name__ == "__main__":
    main()

Webhooks

Examples

License

  • Checkin

    Event generated by license seat checkins.

  • Checkout

    Event generated by checking out a license seat.

  • Heartbeat

    Event generated by hserver heartbeating the license seat.