Code for Webhooks in Python with Flask Tutorial


View on Github

config.py

#Application configuration File
################################

#Secret key that will be used by Flask for securely signing the session cookie
# and can be used for other security related needs
SECRET_KEY = 'SECRET_KEY'

#######################################
#Minimum Number Of Tasks To Generate
MIN_NBR_TASKS = 1

#Maximum Number Of Tasks To Generate
MAX_NBR_TASKS = 100

#Time to wait when producing tasks
WAIT_TIME = 1

#Webhook endpoint Mapping to the listener
WEBHOOK_RECEIVER_URL = 'http://localhost:5001/consumetasks'
#######################################

#Map to the REDIS Server Port
BROKER_URL = 'redis://localhost:6379'
#######################################

tasks_producer.py

import random
from faker.providers import BaseProvider
from faker import Faker
import config
import time
import requests
import json
import uuid

# Define a TaskProvider
class TaskProvider(BaseProvider):
    def task_priority(self):
        severity_levels = [
            'Low', 'Moderate', 'Major', 'Critical'
        ]
        return severity_levels[random.randint(0, len(severity_levels)-1)]


# Create a Faker instance and seeding to have the same results every time we execute the script
# Return data in English
fakeTasks = Faker('en_US')
# Seed the Faker instance to have the same results every time we run the program
fakeTasks.seed_instance(0)
# Assign the TaskProvider to the Faker instance
fakeTasks.add_provider(TaskProvider)

# Generate A Fake Task
def produce_task(batchid, taskid):
    # Message composition
    message = {
        'batchid': batchid, 'id': taskid, 'owner': fakeTasks.unique.name(), 'priority': fakeTasks.task_priority()
        # ,'raised_date':fakeTasks.date_time_this_year()
        # ,'description':fakeTasks.text()
    }
    return message


def send_webhook(msg):
    """
    Send a webhook to a specified URL
    :param msg: task details
    :return:
    """
    try:
        # Post a webhook message
        # default is a function applied to objects that are not serializable = it converts them to str
        resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps(
            msg, sort_keys=True, default=str), headers={'Content-Type': 'application/json'}, timeout=1.0)
        # Returns an HTTPError if an error has occurred during the process (used for debugging).
        resp.raise_for_status()
    except requests.exceptions.HTTPError as err:
        #print("An HTTP Error occurred",repr(err))
        pass
    except requests.exceptions.ConnectionError as err:
        #print("An Error Connecting to the API occurred", repr(err))
        pass
    except requests.exceptions.Timeout as err:
        #print("A Timeout Error occurred", repr(err))
        pass
    except requests.exceptions.RequestException as err:
        #print("An Unknown Error occurred", repr(err))
        pass
    except:
        pass
    else:
        return resp.status_code

# Generate A Bunch Of Fake Tasks
def produce_bunch_tasks():
    """
    Generate a Bunch of Fake Tasks
    """
    n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS)
    batchid = str(uuid.uuid4())
    for i in range(n):
        msg = produce_task(batchid, i)
        resp = send_webhook(msg)
        time.sleep(config.WAIT_TIME)
        print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg)
        yield resp, n, msg


if __name__ == "__main__":
    for resp, total, msg in produce_bunch_tasks():
        pass

init_producer.py

from flask import Flask

#Create a Flask instance
app = Flask(__name__)

#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")

app_producer.py

#Flask imports
from flask import Response, render_template
from init_producer import app
import tasks_producer

def stream_template(template_name, **context):
    app.update_template_context(context)
    t = app.jinja_env.get_template(template_name)
    rv = t.stream(context)
    rv.enable_buffering(5)
    return rv

@app.route("/", methods=['GET'])
def index():
    return render_template('producer.html')

@app.route('/producetasks', methods=['POST'])
def producetasks():
    print("producetasks")
    return Response(stream_template('producer.html', data= tasks_producer.produce_bunch_tasks() ))

if __name__ == "__main__":
   app.run(host="localhost",port=5000, debug=True)

init_consumer.py

from flask import Flask

#Create a Flask instance
app = Flask(__name__)

#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")

#Setup the Flask SocketIO integration while mapping the Redis Server.
from flask_socketio import SocketIO
socketio = SocketIO(app,logger=True,engineio_logger=True,message_queue=app.config['BROKER_URL'])

app_consumer.py

#Flask imports
from flask import render_template, request,session
from flask_socketio import join_room
from init_consumer import app, socketio
import json
import uuid

#Render the assigned template file
@app.route("/", methods=['GET'])
def index():
    return render_template('consumer.html')

# Sending Message through the websocket
def send_message(event, namespace, room, message):
    # print("Message = ", message)
    socketio.emit(event, message, namespace=namespace, room=room)

# Registers a function to be run before the first request to this instance of the application
# Create a unique session ID and store it within the application configuration file
@app.before_first_request
def initialize_params():
    if not hasattr(app.config,'uid'):
        sid = str(uuid.uuid4())
        app.config['uid'] = sid
        print("initialize_params - Session ID stored =", sid)

# Receive the webhooks and emit websocket events
@app.route('/consumetasks', methods=['POST'])
def consumetasks():
    if request.method == 'POST':
        data = request.json
        if data:
           print("Received Data = ", data)
           roomid =  app.config['uid']
           var = json.dumps(data)
           send_message(event='msg', namespace='/collectHooks', room=roomid, message=var)
    return 'OK'

#Execute on connecting
@socketio.on('connect', namespace='/collectHooks')
def socket_connect():
    # Display message upon connecting to the namespace
    print('Client Connected To NameSpace /collectHooks - ', request.sid)

#Execute on disconnecting
@socketio.on('disconnect', namespace='/collectHooks')
def socket_connect():
    # Display message upon disconnecting from the namespace
    print('Client disconnected From NameSpace /collectHooks - ', request.sid)

#Execute upon joining a specific room
@socketio.on('join_room', namespace='/collectHooks')
def on_room():
    if app.config['uid']:
        room = str(app.config['uid'])
        # Display message upon joining a room specific to the session previously stored.
        print(f"Socket joining room {room}")
        join_room(room)

#Execute upon encountering any error related to the websocket
@socketio.on_error_default
def error_handler(e):
    # Display message on error.
    print(f"socket error: {e}, {str(request.event)}")

#Run using port 5001
if __name__ == "__main__":
    socketio.run(app,host='localhost', port=5001,debug=True)

templates/producer.html

<!doctype html>
<html>
  <head>
    <title>Tasks Producer</title>
    <style>
      .content {
        width: 100%;
      }
      .container{
        max-width: none;
      }
    </style>
    <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
  </head>

<body class="container">
    <div class="content">
      <form method='post' id="produceTasksForm" action = "/producetasks">
        <button style="height:20%x;width:100%" type="submit" id="produceTasks">Produce Tasks</button>
      </form>
    </div>

    <div class="content">
        <div id="Messages" class="content" style="height:400px;width:100%; border:2px solid gray; overflow-y:scroll;"></div>
        {% for rsp,total, msg in data: %}
         <script>
            var rsp   = "{{ rsp }}";
            var total = "{{ total }}";
            var msg   = "{{ msg }}";
            var lineidx = "{{ loop.index }}";
            //If the webhook request succeeds color it in blue else in red.
            if (rsp == '200') {
                rsp = rsp.fontcolor("blue");
            }
            else {
                rsp = rsp.fontcolor("red");
            }
            //Add the details of the generated task to the Messages section.
            document.getElementById('Messages').innerHTML += "<br>" + lineidx  + " out of " + total + " -- "+ rsp + " -- " + msg;
        </script>
        {% endfor %}
    </div>
</body>
</html>

templates/consumer.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Tasks Consumer</title>

    <link rel="stylesheet" href="{{url_for('static',filename='css/bootstrap.min.css')}}">
    <link rel="stylesheet" href="{{url_for('static',filename='css/Chart.min.css')}}">
</head>
<body>
    <div class="content">
        <div id="Messages" class="content" style="height:200px;width:100%; border:1px solid gray; overflow-y:scroll;"></div>
    </div>

    <div class="container">
        <div class="row">
            <div class="col-12">
                <div class="card">
                    <div class="card-body">
                        <canvas id="canvas"></canvas>
                    </div>
                </div>
            </div>
        </div>
    </div>

    <!-- import the jquery library -->
    <script src="{{ url_for('static',filename='js/jquery.min.js') }}"></script>
    <!-- import the socket.io library -->
    <script src="{{ url_for('static',filename='js/socket.io.js') }}"></script>

    <!-- import the bootstrap library -->
    <script src="{{ url_for('static',filename='js/bootstrap.min.js') }}"></script>
    <!-- import the Chart library -->
    <script src="{{ url_for('static',filename='js/Chart.min.js') }}"></script>

<script>
      $(document).ready(function(){
        const config = {
            //Type of the chart - Bar Chart
            type: 'bar',
            //Data for our chart
            data: {
                labels: ['Low','Moderate','Major','Critical'],
                datasets: [{
                    label: "Count Of Tasks",
                    //Setting a color for each bar
                    backgroundColor: ['green','blue','yellow','red'],
                    borderColor: 'rgb(255, 99, 132)',
                    data: [0,0,0,0],
                    fill: false,
                }],
            },
            //Configuration options
            options: {
                responsive: true,
                title: {
                    display: true,
                    text: 'Tasks Priority Matrix'
                },
                tooltips: {
                    mode: 'index',
                    intersect: false,
                },
                hover: {
                    mode: 'nearest',
                    intersect: true
                },
                scales: {
                    xAxes: [{
                        display: true,
                        scaleLabel: {
                            display: true,
                            labelString: 'Priority'
                        }
                    }],
                    yAxes: [{
                        display: true
                     ,ticks: {
                            beginAtZero: true
                        }
                       ,scaleLabel: {
                            display: true,
                            labelString: 'Total'
                        }
                    }]
                }
            }
        };
        const context = document.getElementById('canvas').getContext('2d');
        //Creating the bar chart
        const lineChart = new Chart(context, config);


        //Reserved for websocket manipulation
        var namespace='/collectHooks';
        var url = 'http://' + document.domain + ':' + location.port + namespace;
        var socket = io.connect(url);

        //When connecting to the socket join the room
        socket.on('connect', function() {
                              socket.emit('join_room');
                            });

        //When receiving a message
        socket.on('msg' , function(data) {
                            var msg = JSON.parse(data);
                            var newLine = $('<li>'+ 'Batch ID. = ' + msg.batchid + ' -- Task ID. = ' + msg.id + ' -- Owner = ' + msg.owner + ' -- Priority = ' + msg.priority +'</li>');
                            newLine.css("color","blue");
                            $("#Messages").append(newLine);

                            //Retrieve the index of the priority of the received message
                            var lindex = config.data.labels.indexOf(msg.priority);

                            //Increment the value of the priority of the received message
                            config.data.datasets[0].data[lindex] += 1;

                            //Update the chart
                            lineChart.update();
                          });
      });
</script>
</body>
</html>