Code for Detecting Fraudulent Transactions in a Streaming App using Kafka in Python Tutorial


View on Github

settings.py

# URL for our broker used for connecting to the Kafka cluster
KAFKA_BROKER_URL   = "localhost:9092"
# name of the topic hosting the transactions to be processed and requiring processing
TRANSACTIONS_TOPIC = "queuing.transactions"
# these 2 variables will control the amount of transactions automatically generated
TRANSACTIONS_PER_SECOND = float("2.0")
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
# name of the topic hosting the legitimate transactions
LEGIT_TOPIC = "queuing.legit"
# name of the topic hosting the suspicious transactions
FRAUD_TOPIC = "queuing.fraud"

transactions.py

from random import choices, randint
from string import ascii_letters, digits

account_chars: str = digits + ascii_letters

def _random_account_id() -> str:
    """Return a random account number made of 12 characters"""
    return "".join(choices(account_chars,k=12))

def _random_amount() -> float:
    """Return a random amount between 1.00 and 1000.00"""
    return randint(100,1000000)/100

def create_random_transaction() -> dict:
    """Create a fake randomised transaction."""
    return {
        "source":_random_account_id()
       ,"target":_random_account_id()
       ,"amount":_random_amount()
       ,"currency":"EUR"
    }

producer.py

import os
import json
from time import sleep
from kafka import KafkaProducer
# import initialization parameters
from settings import *
from transactions import create_random_transaction


if __name__ == "__main__":
   producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL
                            #Encode all values as JSON
                           ,value_serializer = lambda value: json.dumps(value).encode()
                           ,)
   while True:
       transaction: dict = create_random_transaction()
       producer.send(TRANSACTIONS_TOPIC, value= transaction)
       print(transaction) #DEBUG
       sleep(SLEEP_TIME)

detector.py

import os
import json
from kafka import KafkaConsumer, KafkaProducer
from settings import *

def is_suspicious(transaction: dict) -> bool:
    """Simple condition to determine whether a transaction is suspicious."""
    return transaction["amount"] >= 900

if __name__ == "__main__":
   consumer = KafkaConsumer(
       TRANSACTIONS_TOPIC
      ,bootstrap_servers=KAFKA_BROKER_URL
      ,value_deserializer = lambda value: json.loads(value)
      ,
   )

   for message in consumer:
       transaction: dict = message.value
       topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
       print(topic,transaction) #DEBUG

appdetector.py

from flask import Flask, Response, stream_with_context, render_template, json, url_for

from kafka import KafkaConsumer
from settings import *

# create the flask object app
app = Flask(__name__)

def stream_template(template_name, **context):
    print('template name =',template_name)
    app.update_template_context(context)
    t = app.jinja_env.get_template(template_name)
    rv = t.stream(context)
    rv.enable_buffering(5)
    return rv

def is_suspicious(transaction: dict) -> bool:
    """Determine whether a transaction is suspicious."""
    return transaction["amount"] >= 900

# this router will render the template named index.html and will pass the following parameters to it:
# title and Kafka stream
@app.route('/')
def index():
    def g():
        consumer = KafkaConsumer(
            TRANSACTIONS_TOPIC
            , bootstrap_servers=KAFKA_BROKER_URL
            , value_deserializer=lambda value: json.loads(value)
            ,
        )
        for message in consumer:
            transaction: dict = message.value
            topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
            print(topic, transaction)  # DEBUG
            yield topic, transaction

    return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))

if __name__ == "__main__":
   app.run(host="localhost" , debug=True)

templates/index.html

<!doctype html>
<title> Send Javascript with template demo </title>
<html>

<head>
</head>

<body>
    <div class="container">
        <h1>{{title}}</h1>
    </div>
    <div id="data"></div>
    {% for topic, transaction in data: %}
    <script>
        var topic = "{{ topic }}";
        var transaction = "{{ transaction }}";
        if (topic.search("fraud") > 0) {
            topic = topic.fontcolor("red")
        } else {
            topic = topic.fontcolor("green")
        }
        document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;
    </script>
    {% endfor %}
</body>

</html>