DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Mastering Advanced Aggregations in Spark SQL
  • Comparing Pandas, Polars, and PySpark: A Benchmark Analysis
  • Frequently Faced Challenges in Implementing Spark Code in Data Engineering Pipelines
  • Leveraging Data Locality to Optimize Spark Applications

Trending

  • Optimizing Integration Workflows With Spark Structured Streaming and Cloud Services
  • Power BI Embedded Analytics — Part 2: Power BI Embedded Overview
  • Mastering Advanced Aggregations in Spark SQL
  • How the Go Runtime Preempts Goroutines for Efficient Concurrency
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Customer 360: Fraud Detection in Fintech With PySpark and ML

Customer 360: Fraud Detection in Fintech With PySpark and ML

A big data ML solution using PySpark to link bank accounts and transactions for a unified customer view (Customer 360), while detecting fraud implicitly.

By 
Ram Ghadiyaram user avatar
Ram Ghadiyaram
·
May. 15, 25 · Analysis
Likes (3)
Comment
Save
Tweet
Share
2.4K Views

Join the DZone community and get the full member experience.

Join For Free

Every bank uses Customer 360 to maintain its customer records in a unified way, and it can also be used for fraud detection.

What Is Customer 360?

Customer 360 is like creating a complete picture of a customer by pulling together all the data you have about them — think of it as a "comprehensive profile." Imagine a bank with data from accounts, transactions, and customer service calls. Instead of having  different/diverse data of the same customer, Customer 360 links them to say, “ This data in various ways belongs to customer John Doe.” It helps businesses understand customers better, personalize services, and figure out customer data patterns.

What Is Fraud Detection?

Fraud detection is about finding suspicious activity... like someone acts like  John Doe to steal money. It uses data (e.g., transactions, account details) to alert to anything unusual, like mismatched names or addresses that don’t quite add up. Machine learning can help by learning what normal looks like and pointing out what’s wrong.

How Does the Code Fit In?

The code uses PySpark (a tool for handling big data) and Machine learning algorithms to:

  1. Link records (Customer 360): It matches account data (e.g., savings accounts) with transaction data to figure out which belong to the same person.
  2. Figuring out mismatches (Fraud Detection): It calculates how similar names, addresses, and birth dates are. If they’re close but not exact, it might be either a signal of fraud or just a typo.

The code cleans unevenly formatted data, compares it, and uses a machine learning model (Random Forest) to decide if two records match. Let’s see how. Below is the code block, which does, followed by an explanation and results. 

Below is scalable solution to cloud native as well as, can be tested with Linux/Win/Idea by changing the master... Pyspark/Python installation and jellyfish pip install jellyfish 

High-level flow

High-level flow


Python
 
import os
import sys

import jellyfish
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, udf
from pyspark.sql.types import StringType, FloatType

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BankRecordLinkageComplex") \
    .master("local[*]") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.network.timeout", "5000s") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Sample data for accounts and transactions
accounts = [
    ("A001", "John", "Doe", "1980-05-15", "123 Main St", "555-123-4567", "Savings"),
    ("A002", "Jon", "Doe", "15-May-80", "123 Main Street", "555-123-4568", "Credit Card"),
    ("A003", "Jane", "Smith", "1990-03-22", "456 Oak Ave", "555-987-6543", "Loan"),
    ("A004", "Michael", "Jones", "1975-12-01", "789 Pine Rd Apt 5", "555-456-7890", "Savings")
]
transactions = [
    ("T001", "John", "Doe", "1980-05-15", "123 Main St", 50.25, "2025-04-01"),
    ("T002", "J Doe", "", "1980-05-15", "123 Main Street", 200.75, "2025-04-02"),
    ("T003", "Jane", "Smyth", "22-Mar-90", "456 Oak Avenue", 100.00, "2025-04-03"),
    ("T004", "Mike", "Jones", "1975-12-01", "789 Pine Road", 75.50, "2025-04-04")
]

df_accounts = spark.createDataFrame(accounts, ["acct_id", "name", "surname", "dob", "address", "phone", "account_type"])
df_transactions = spark.createDataFrame(transactions, ["tx_id", "name", "surname", "dob", "address", "amount", "tx_date"])

df_accounts.show()
df_transactions.show()

def standardize_date(dob):
    from datetime import datetime
    try:
        if "-" in dob and len(dob.split("-")[0]) == 4:  # YYYY-MM-DD
            return datetime.strptime(dob, "%Y-%m-%d").strftime("%Y-%m-%d")
        else:  # DD-MMM-YY
            return datetime.strptime(dob, "%d-%b-%y").strftime("%Y-%m-%d")
    except ValueError:
        return dob

date_udf = udf(standardize_date, StringType())
df_accounts = df_accounts.withColumn("name", lower(col("name"))) \
    .withColumn("surname", lower(col("surname"))) \
    .withColumn("dob", date_udf(col("dob")))
df_transactions = df_transactions.withColumn("name", lower(col("name"))) \
    .withColumn("surname", lower(col("surname"))) \
    .withColumn("dob", date_udf(col("dob")))
# print('standardised data')
# df_accounts.show()
# df_transactions.show()

def jaro_winkler_sim(str1, str2):
    if not str1 or not str2:
        return 0.0
    return jellyfish.jaro_winkler_similarity(str1, str2)

def levenshtein_sim(str1, str2):
    if not str1 or not str2:
        return 0.0
    distance = jellyfish.levenshtein_distance(str1, str2)
    max_len = max(len(str1), len(str2))
    return 1.0 - (distance / max_len) if max_len > 0 else 1.0

jaro_udf = udf(jaro_winkler_sim, FloatType())
lev_udf = udf(levenshtein_sim, FloatType())

df_accounts = df_accounts.withColumn("year", col("dob").substr(1, 4)) \
    .withColumn("surname_initial", col("surname").substr(1, 1))
df_transactions = df_transactions.withColumn("year", col("dob").substr(1, 4)) \
    .withColumn("surname_initial", col("surname").substr(1, 1))
df_accounts.cache()
df_transactions.cache()
pairs = df_accounts.alias("acc").join(df_transactions.alias("tx"),
                                      (col("acc.year") == col("tx.year")) &
                                      ((col("acc.surname_initial") == col("tx.surname_initial")) |
                                       (col("tx.surname") == "")),
                                      "inner")
pairs = pairs.withColumn("name_sim", jaro_udf(col("acc.name"), col("tx.name"))) \
    .withColumn("surname_sim", jaro_udf(col("acc.surname"), col("tx.surname"))) \
    .withColumn("address_sim", lev_udf(col("acc.address"), col("tx.address")))

assembler = VectorAssembler(inputCols=["name_sim", "surname_sim", "address_sim"], outputCol="features")
pairs_with_features = assembler.transform(pairs)

training_data = spark.createDataFrame([
    (0.944, 1.0, 0.923, 1),
    (0.833, 0.0, 0.923, 0),
    (0.933, 0.911, 0.857, 1),
    (0.5, 0.5, 0.5, 0),
    (0.78, 0.0, 1.0, 1),
    (0.925, 1.0, 0.764, 1),
    (0.6, 0.8, 0.9, 0),
    (0.95, 0.95, 0.95, 1),
], ["name_sim", "surname_sim", "address_sim", "label"])
training_data = assembler.transform(training_data)
# print('training data')
# training_data.show()
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10, maxBins=4)
model = rf.fit(training_data)

predictions = model.transform(pairs_with_features)
predictions.select("acct_id", "tx_id", "name_sim", "surname_sim", "address_sim", "prediction").show()

spark.stop()


Step-by-Step Explanation of Code

1. Take Sample Data

Here’s what the data looks like:

Accounts Table

acct_id name surname dob address phone account_type
A001 John Doe 1980-05-15 123 Main St 555-123-4567 Savings
A002 Jon Doe 15-May-80 123 Main Street 555-123-4568 Credit Card
A003 Jane Smith 1990-03-22 456 Oak Ave 555-987-6543 Loan
A004 Michael Jones 1975-12-01 789 Pine Rd Apt 5 555-456-7890 Savings


Transactions Table

tx_id name surname dob address amount tx_date
T001 John Doe 1980-05-15 123 Main St 50.25 2025-04-01
T002 J Doe
1980-05-15 123 Main Street 200.75 2025-04-02
T003 Jane Smyth 22-Mar-90 456 Oak Avenue 100.00 2025-04-03
T004 Mike Jones 1975-12-01 789 Pine Road 75.50 2025-04-04

This mimics real bank data with slight variations (for example.. "Jon" vs. "John") to test matching.

2. Cleaning the Data

  • What it does: Makes names lowercase (e.g., "John" → "john") and standardizes dates (e.g., "15-May-80" → "1980-05-15").
  • Why it is needed: Ensures "John" and "john" aren’t treated as different people.

3. Comparing Records

  • Similarity Scores: Uses two ways: 
    • Jaro-Winkler: Measures how close two strings are (for example, "john" vs. "jon" = 0.944).
    • Levenshtein: Calculates similarity based on edits needed (for example., "smith" vs. "smyth" = 0.8) widely used for address string comparison.
  • Blocking: Pairs records by birth year and first letter of surname to avoid comparing everyone to everyone.
  • Result: A table of possible matches with scores.

4. Machine Learning

  • Features: Combines similarity scores into a "features" vector.
  • Training Data: A small set of examples tells the model what a match looks like:
    name_sim surname_sim address_sim label (1 = match, 0 = no match)
    0.944 1.0 0.923 1
    0.833 0.0 0.923 0
    0.933 0.911 0.857 1
     0.5  0.5  0.5 0

    0.78
    0.0 1.0 1
    0.925 1.0 0.764 1
    0.6 0.8 0.9 0
    0.95 0.95 0.95 1
  • Random Forest: A model learns from this to predict matches.

5. Predictions Output

The following table shows the results of the PySpark record linkage process, linking account records to transactions. The Random Forest model predicts matches (1.0) or non-matches (0.0) based on similarity scores for names, surnames, and addresses. 

Example Output

acct_id tx_id name_sim surname_sim address_sim prediction
A002 T001 0.93333334 1.0 0.73333335 1.0
A001 T001 1.0 1.0 1.0 1.0
A002 T002 0.51111114 0.0 1.0 0.0
A001 T002 0.48333332 0.0 0.73333335 0.0
A003 T003 1.0 0.8933333 0.78571427 1.0
A004 T004 0.7809524 1.0 0.5882353 0.0


Note: This is just an example for methodology/approach. If needed, we can tweak and add more stringent blocks or clean up based on our needs.

Connecting to Customer 360 and Fraud Detection

  • Customer 360: Linking "A001" to "T001" builds a full view of John Doe’s banking activity.
  • Fraud Detection: If "T002" (J Doe) has a low surname similarity but matches otherwise, it might be either a fraud attempt or just a data entry error. The model helps decide.

This process scales to millions of records, making it practical for real banks! Happy learning. :-)

Big data Random forest pyspark

Opinions expressed by DZone contributors are their own.

Related

  • Mastering Advanced Aggregations in Spark SQL
  • Comparing Pandas, Polars, and PySpark: A Benchmark Analysis
  • Frequently Faced Challenges in Implementing Spark Code in Data Engineering Pipelines
  • Leveraging Data Locality to Optimize Spark Applications

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

OSZAR »