Customer 360: Fraud Detection in Fintech With PySpark and ML
A big data ML solution using PySpark to link bank accounts and transactions for a unified customer view (Customer 360), while detecting fraud implicitly.
Join the DZone community and get the full member experience.
Join For FreeEvery bank uses Customer 360 to maintain its customer records in a unified way, and it can also be used for fraud detection.
What Is Customer 360?
Customer 360 is like creating a complete picture of a customer by pulling together all the data you have about them — think of it as a "comprehensive profile." Imagine a bank with data from accounts, transactions, and customer service calls. Instead of having different/diverse data of the same customer, Customer 360 links them to say, “ This data in various ways belongs to customer John Doe.” It helps businesses understand customers better, personalize services, and figure out customer data patterns.
What Is Fraud Detection?
Fraud detection is about finding suspicious activity... like someone acts like John Doe to steal money. It uses data (e.g., transactions, account details) to alert to anything unusual, like mismatched names or addresses that don’t quite add up. Machine learning can help by learning what normal looks like and pointing out what’s wrong.
How Does the Code Fit In?
The code uses PySpark (a tool for handling big data) and Machine learning algorithms to:
- Link records (Customer 360): It matches account data (e.g., savings accounts) with transaction data to figure out which belong to the same person.
- Figuring out mismatches (Fraud Detection): It calculates how similar names, addresses, and birth dates are. If they’re close but not exact, it might be either a signal of fraud or just a typo.
The code cleans unevenly formatted data, compares it, and uses a machine learning model (Random Forest) to decide if two records match. Let’s see how. Below is the code block, which does, followed by an explanation and results.
Below is scalable solution to cloud native as well as, can be tested with Linux/Win/Idea by changing the master... Pyspark/Python installation and jellyfish
pip install jellyfish
import os
import sys
import jellyfish
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, udf
from pyspark.sql.types import StringType, FloatType
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.appName("BankRecordLinkageComplex") \
.master("local[*]") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.network.timeout", "5000s") \
.config("spark.executor.memory", "2g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
# Sample data for accounts and transactions
accounts = [
("A001", "John", "Doe", "1980-05-15", "123 Main St", "555-123-4567", "Savings"),
("A002", "Jon", "Doe", "15-May-80", "123 Main Street", "555-123-4568", "Credit Card"),
("A003", "Jane", "Smith", "1990-03-22", "456 Oak Ave", "555-987-6543", "Loan"),
("A004", "Michael", "Jones", "1975-12-01", "789 Pine Rd Apt 5", "555-456-7890", "Savings")
]
transactions = [
("T001", "John", "Doe", "1980-05-15", "123 Main St", 50.25, "2025-04-01"),
("T002", "J Doe", "", "1980-05-15", "123 Main Street", 200.75, "2025-04-02"),
("T003", "Jane", "Smyth", "22-Mar-90", "456 Oak Avenue", 100.00, "2025-04-03"),
("T004", "Mike", "Jones", "1975-12-01", "789 Pine Road", 75.50, "2025-04-04")
]
df_accounts = spark.createDataFrame(accounts, ["acct_id", "name", "surname", "dob", "address", "phone", "account_type"])
df_transactions = spark.createDataFrame(transactions, ["tx_id", "name", "surname", "dob", "address", "amount", "tx_date"])
df_accounts.show()
df_transactions.show()
def standardize_date(dob):
from datetime import datetime
try:
if "-" in dob and len(dob.split("-")[0]) == 4: # YYYY-MM-DD
return datetime.strptime(dob, "%Y-%m-%d").strftime("%Y-%m-%d")
else: # DD-MMM-YY
return datetime.strptime(dob, "%d-%b-%y").strftime("%Y-%m-%d")
except ValueError:
return dob
date_udf = udf(standardize_date, StringType())
df_accounts = df_accounts.withColumn("name", lower(col("name"))) \
.withColumn("surname", lower(col("surname"))) \
.withColumn("dob", date_udf(col("dob")))
df_transactions = df_transactions.withColumn("name", lower(col("name"))) \
.withColumn("surname", lower(col("surname"))) \
.withColumn("dob", date_udf(col("dob")))
# print('standardised data')
# df_accounts.show()
# df_transactions.show()
def jaro_winkler_sim(str1, str2):
if not str1 or not str2:
return 0.0
return jellyfish.jaro_winkler_similarity(str1, str2)
def levenshtein_sim(str1, str2):
if not str1 or not str2:
return 0.0
distance = jellyfish.levenshtein_distance(str1, str2)
max_len = max(len(str1), len(str2))
return 1.0 - (distance / max_len) if max_len > 0 else 1.0
jaro_udf = udf(jaro_winkler_sim, FloatType())
lev_udf = udf(levenshtein_sim, FloatType())
df_accounts = df_accounts.withColumn("year", col("dob").substr(1, 4)) \
.withColumn("surname_initial", col("surname").substr(1, 1))
df_transactions = df_transactions.withColumn("year", col("dob").substr(1, 4)) \
.withColumn("surname_initial", col("surname").substr(1, 1))
df_accounts.cache()
df_transactions.cache()
pairs = df_accounts.alias("acc").join(df_transactions.alias("tx"),
(col("acc.year") == col("tx.year")) &
((col("acc.surname_initial") == col("tx.surname_initial")) |
(col("tx.surname") == "")),
"inner")
pairs = pairs.withColumn("name_sim", jaro_udf(col("acc.name"), col("tx.name"))) \
.withColumn("surname_sim", jaro_udf(col("acc.surname"), col("tx.surname"))) \
.withColumn("address_sim", lev_udf(col("acc.address"), col("tx.address")))
assembler = VectorAssembler(inputCols=["name_sim", "surname_sim", "address_sim"], outputCol="features")
pairs_with_features = assembler.transform(pairs)
training_data = spark.createDataFrame([
(0.944, 1.0, 0.923, 1),
(0.833, 0.0, 0.923, 0),
(0.933, 0.911, 0.857, 1),
(0.5, 0.5, 0.5, 0),
(0.78, 0.0, 1.0, 1),
(0.925, 1.0, 0.764, 1),
(0.6, 0.8, 0.9, 0),
(0.95, 0.95, 0.95, 1),
], ["name_sim", "surname_sim", "address_sim", "label"])
training_data = assembler.transform(training_data)
# print('training data')
# training_data.show()
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10, maxBins=4)
model = rf.fit(training_data)
predictions = model.transform(pairs_with_features)
predictions.select("acct_id", "tx_id", "name_sim", "surname_sim", "address_sim", "prediction").show()
spark.stop()
Step-by-Step Explanation of Code
1. Take Sample Data
Here’s what the data looks like:
Accounts Table
acct_id | name | surname | dob | address | phone | account_type |
---|---|---|---|---|---|---|
A001 | John | Doe | 1980-05-15 | 123 Main St | 555-123-4567 | Savings |
A002 | Jon | Doe | 15-May-80 | 123 Main Street | 555-123-4568 | Credit Card |
A003 | Jane | Smith | 1990-03-22 | 456 Oak Ave | 555-987-6543 | Loan |
A004 | Michael | Jones | 1975-12-01 | 789 Pine Rd Apt 5 | 555-456-7890 | Savings |
Transactions Table
tx_id | name | surname | dob | address | amount | tx_date |
---|---|---|---|---|---|---|
T001 | John | Doe | 1980-05-15 | 123 Main St | 50.25 | 2025-04-01 |
T002 | J Doe | 1980-05-15 | 123 Main Street | 200.75 | 2025-04-02 | |
T003 | Jane | Smyth | 22-Mar-90 | 456 Oak Avenue | 100.00 | 2025-04-03 |
T004 | Mike | Jones | 1975-12-01 | 789 Pine Road | 75.50 | 2025-04-04 |
This mimics real bank data with slight variations (for example.. "Jon" vs. "John") to test matching.
2. Cleaning the Data
- What it does: Makes names lowercase (e.g., "John" → "john") and standardizes dates (e.g., "15-May-80" → "1980-05-15").
- Why it is needed: Ensures "John" and "john" aren’t treated as different people.
3. Comparing Records
- Similarity Scores: Uses two ways:
- Jaro-Winkler: Measures how close two strings are (for example, "john" vs. "jon" = 0.944).
- Levenshtein: Calculates similarity based on edits needed (for example., "smith" vs. "smyth" = 0.8) widely used for address string comparison.
- Blocking: Pairs records by birth year and first letter of surname to avoid comparing everyone to everyone.
- Result: A table of possible matches with scores.
4. Machine Learning
- Features: Combines similarity scores into a "features" vector.
- Training Data: A small set of examples tells the model what a match looks like:
name_sim surname_sim address_sim label (1 = match, 0 = no match) 0.944 1.0 0.923 1 0.833 0.0 0.923 0 0.933 0.911 0.857 1 0.5 0.5 0.5 0
0.780.0 1.0 1 0.925 1.0 0.764 1 0.6 0.8 0.9 0 0.95 0.95 0.95 1 - Random Forest: A model learns from this to predict matches.
5. Predictions Output
The following table shows the results of the PySpark record linkage process, linking account records to transactions. The Random Forest model predicts matches (1.0) or non-matches (0.0) based on similarity scores for names, surnames, and addresses.
Example Output
acct_id | tx_id | name_sim | surname_sim | address_sim | prediction |
---|---|---|---|---|---|
A002 | T001 | 0.93333334 | 1.0 | 0.73333335 | 1.0 |
A001 | T001 | 1.0 | 1.0 | 1.0 | 1.0 |
A002 | T002 | 0.51111114 | 0.0 | 1.0 | 0.0 |
A001 | T002 | 0.48333332 | 0.0 | 0.73333335 | 0.0 |
A003 | T003 | 1.0 | 0.8933333 | 0.78571427 | 1.0 |
A004 | T004 | 0.7809524 | 1.0 | 0.5882353 | 0.0 |
Note: This is just an example for methodology/approach. If needed, we can tweak and add more stringent blocks or clean up based on our needs.
Connecting to Customer 360 and Fraud Detection
- Customer 360: Linking "A001" to "T001" builds a full view of John Doe’s banking activity.
- Fraud Detection: If "T002" (J Doe) has a low surname similarity but matches otherwise, it might be either a fraud attempt or just a data entry error. The model helps decide.
This process scales to millions of records, making it practical for real banks! Happy learning. :-)
Opinions expressed by DZone contributors are their own.
Comments