Real Time Fraud Detection Using Apache Flink — Part 2

Every year, businesses lose billions of dollars due to financial frauds. Modern financial transactions are too fast — customers can complete transactions in milliseconds, but so can fraudsters. This rapid pace leaves a very short trace of suspicious patterns, making traditional batch processing systems for feature engineering too slow to detect and stop fraud.
Effective fraud detection today requires continuously analyzing vast streams of data, identifying suspicious patterns across multiple time windows, and serving these real-time insights to the inference engine. Apache Flink addresses these problems with its powerful stream processing capabilities. By processing data in motion rather than in batches, Flink can aggregate key fraud signals in true real-time, ensuring detection models receive up to the moment insights to catch fraud as it happens, making it a game-changer in the fight against financial fraud.
State Management: Flink maintains a state that stores the transaction history for each account, allowing it to track patterns across multiple events and detect frauds.
Fault Tolerance: Flink uses checkpoints to create savepoints of the system’s state at regular intervals. In the event of a system failure, Flink can revert to the last consistent checkpoint and continue processing, ensuring no transactions are missed.
Scalability and Low-Latency: Flink distributes workloads across multiple nodes in a cluster, enabling efficient parallel execution. By leveraging parallelism and dynamically assigning tasks across multiple processing nodes, Flink ensures low-latency processing even as data streams grow in size.
Event-Time Processing: Flink supports event-time processing with watermarking, allowing data to be processed based on when events actually occurred rather than their arrival time. This ensures accurate real-time analytics by effectively handling out-of-order data while minimizing delays
Exactly-Once Processing: Flink provides exactly-once guarantees, ensuring that no data is duplicated or lost, even in the event of failures.
We are going to implement a fraud detection system that identifies fraud based on five key rules:
First, we need to set up Apache Kafka and Apache Flink. You can follow the official documentation for Kafka and Flink to get started.
Let’s first setup Maven dependencies. Ensure you have a Maven project set up with the necessary dependencies in your pom.xml
<dependencies>
<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.20.0</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
Adding these dependencies enables JSON processing and allows the application to consume and produce messages to and from Kafka topics using Apache Flink. You can find the current version of Flink on Maven Repository.
We can use the following commands to create the new topics called transactions and fraud-alerts:
# Create transaction topic
kafka-topics.sh --create \
--topic transactions \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# Create alerts topic
kafka-topics.sh --create \
--topic fraud-alerts \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
Let’s examine the high-level architecture and data flow before getting into the code
Our system processes transaction events from a transactions Kafka topic. Each event is a JSON message with the following schema:
{
"userId": "user123",
"amount": 500.00,
"timestamp": "2024-01-29T10:00:00",
"latitude": 40.7128,
"longitude": -74.0060,
"ipAddress": "192.168.1.1"
}
First, let’s look at the primary Job configuration that creates our Flink pipeline:
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Configure Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("transactions")
.setGroupId("fraud-detection-flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// Configure Kafka sink
KafkaSink<String> sink = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("fraud-alerts")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
This configures Kafka integration with Flink jobs:
Kafka’s scalable, real-time messaging system allows for continuous ingestion and distribution of transaction data, facilitating the quick detection of fraudulent activities as they happen.
Firstly, what is state management and why do we need it?
Comparing the current transaction with a user’s previous transactions is necessary in order to detect fraud. For example we check if the user has made a transaction 2000 km away just 10 minutes ago or if this is their 11th transaction within the last 30 minutes, or if they have used 5 different IP addresses within 30 minutes. Without state management we would have to manually keep transaction history, query a database for every transaction and deal with data consistency and failures on our own.
public class AdvancedFraudDetector extends KeyedProcessFunction<String, String, String> {
private transient MapState<Long, Transaction> transactionState;
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Transaction> descriptor =
new MapStateDescriptor<>(
"transaction-state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Transaction>() {})
);
transactionState = getRuntimeContext().getMapState(descriptor);
}
KeyedProcessFunction
: Processes transaction by userId to make sure that each user data is treated separately.
MapState
: Each user transaction history is stored in MapState, with transaction timestamp as key and transaction details as value.
Advantages:
Our system implements five fraud detection rules:
This rule tracks transaction frequency within a one-hour window. If a user exceeds 10 transactions per hour, it triggers an alert. The state management here helps us maintain a rolling window of transactions.
private FraudCheckResult checkTransactionFrequency(long currentTimestamp) throws Exception {
int transactionsLastHour = 0;
long oneHourAgo = currentTimestamp - (60 * 60 * 1000);
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
if (iterator.next().getKey() >= oneHourAgo) {
transactionsLastHour++;
}
}
if (transactionsLastHour > MAX_TRANSACTIONS_PER_HOUR) {
return new FraudCheckResult(true,
String.format("High frequency: %d transactions in last hour",
transactionsLastHour));
}
return new FraudCheckResult(false, "");
}
The velocity check monitors the rate of spending. If transactions exceed $2,000 in a minute, it indicates potential fraud. This helps catch rapid succession of high-value transactions.
private FraudCheckResult checkTransactionVelocity(Transaction currentTx, long currentTimestamp)
throws Exception {
double oneMinuteTotal = 0.0;
long oneMinuteAgo = currentTimestamp - (60 * 1000);
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Transaction> entry = iterator.next();
if (entry.getKey() >= oneMinuteAgo) {
oneMinuteTotal += entry.getValue().getAmount();
}
}
if (oneMinuteTotal > VELOCITY_THRESHOLD) {
return new FraudCheckResult(true,
String.format("High velocity: $%.2f in one minute", oneMinuteTotal));
}
return new FraudCheckResult(false, "");
}
This rule detects account access from multiple IP addresses, which could detect account takeovers where fraudsters log in from different locations. It uses a 30-minute sliding window to track unique IP addresses.
private FraudCheckResult checkDistinctIPs(Transaction currentTx, long currentTimestamp)
throws Exception {
Set<String> distinctIPs = new HashSet<>();
long thirtyMinutesAgo = currentTimestamp - IP_CHECK_WINDOW;
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Transaction> entry = iterator.next();
if (entry.getKey() >= thirtyMinutesAgo) {
distinctIPs.add(entry.getValue().getIpAddress());
}
}
distinctIPs.add(currentTx.getIpAddress());
if (distinctIPs.size() > MAX_DISTINCT_IPS) {
return new FraudCheckResult(true,
String.format("Multiple IPs: %d distinct IPs in 30 minutes",
distinctIPs.size()));
}
return new FraudCheckResult(false, "");
}
This pattern detector looks for a common fraud pattern: small test transactions followed by large amounts. It helps in detecting card testing where fraudsters check if stolen card details are valid or not.
private FraudCheckResult checkTransactionPattern(Transaction currentTx, long currentTimestamp)
throws Exception {
if (currentTx.getAmount() > LARGE_TXN_THRESHOLD) {
long fiveMinutesAgo = currentTimestamp - PATTERN_TIME_WINDOW;
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Transaction> entry = iterator.next();
if (entry.getKey() >= fiveMinutesAgo) {
Transaction tx = entry.getValue();
if (tx.getAmount() < SMALL_TXN_THRESHOLD) {
return new FraudCheckResult(true,
String.format("Suspicious pattern: Small txn $%.2f followed by large txn $%.2f within 5 minutes",
tx.getAmount(), currentTx.getAmount()));
}
}
}
}
return new FraudCheckResult(false, "");
}
This rule detects physically impossible travel patterns, like transactions occurring too far apart in a short time window. It uses the Haversine formula to calculate distances between transaction locations.
private FraudCheckResult checkGeographicAnomalies(Transaction currentTx, long currentTimestamp)
throws Exception {
Transaction previousTx = null;
long previousTxTimestamp = 0;
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Transaction> entry = iterator.next();
if (entry.getKey() < currentTimestamp && entry.getKey() > previousTxTimestamp) {
previousTx = entry.getValue();
previousTxTimestamp = entry.getKey();
}
}
if (previousTx != null) {
double distance = calculateDistance(
previousTx.getLatitude(), previousTx.getLongitude(),
currentTx.getLatitude(), currentTx.getLongitude()
);
double timeDiff = (currentTimestamp - previousTxTimestamp) / (60.0 * 60 * 1000);
double speedKmH = distance / timeDiff;
if (distance > LOCATION_CHANGE_THRESHOLD_KM && timeDiff < 2.0) {
return new FraudCheckResult(true,
String.format("Impossible travel: %.0f km in %.1f hours (%.0f km/h)",
distance, timeDiff, speedKmH));
}
}
return new FraudCheckResult(false, "");
}
These rules together improve fraud detection by identifying suspicious activities in real time and allow for quick action to stop frauds and safeguard clients.
Generating alerts in real time for suspicious transactions equips teams to act promptly and improve their defensibility. Alerts can also act as an additional source of data to do deep dives and design new signals or identify gaps in the existing solution.
Let’s see an example of how we can enable alerting in this Flink job
if (fraudFlags > 0) {
Alert alert = new Alert();
alert.setUserId(currentTx.getUserId());
alert.setTransaction(currentTx);
alert.setFraudulent(true);
alert.setFraudScore(calculateFraudScore(fraudFlags));
alert.setReason(fraudReason.toString());
collector.collect(objectMapper.writeValueAsString(alert));
}
The alert includes:
(pyflink_demo) (base) shriramr@shrirams-MBP bin % ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fraud-alerts
{"userId":"user123","fraudulent":true,"transaction":{"userId":"user123","amount":50.0,"timestamp":"2025-01-29T19:00:57","latitude":40.7128,"longitude":-74.006,"ipAddress":"192.168.1.5"},"reason":"Multiple IPs: 5 distinct IPs in 30 minutes","fraudScore":20}
{"userId":"user456","fraudulent":true,"transaction":{"userId":"user456","amount":100.0,"timestamp":"2025-01-29T19:30:57","latitude":40.7128,"longitude":-74.006,"ipAddress":"192.168.1.1"},"reason":"High frequency: 11 transactions in last hour","fraudScore":20}
{"userId":"user789","fraudulent":true,"transaction":{"userId":"user789","amount":1000.0,"timestamp":"2025-01-29T18:41:27","latitude":40.7128,"longitude":-74.006,"ipAddress":"192.168.1.1"},"reason":"High velocity: $2500.00 in one minute","fraudScore":20}
{"userId":"user101","fraudulent":true,"transaction":{"userId":"user101","amount":999.99,"timestamp":"2025-01-29T18:42:57","latitude":40.7128,"longitude":-74.006,"ipAddress":"192.168.1.1"},"reason":"Suspicious pattern: Small txn $9.99 followed by large txn $999.99 within 5 minutes","fraudScore":20}
{"userId":"user202","fraudulent":true,"transaction":{"userId":"user202","amount":200.0,"timestamp":"2025-01-29T19:10:57","latitude":34.0522,"longitude":-118.2437,"ipAddress":"192.168.1.1"},"reason":"Impossible travel: 3936 km in 0.5 hours (7871 km/h)","fraudScore":20}
As our Flink fraud detection job continues to process transactions, the TaskManager’s memory, particularly the managed memory, starts filling up with state data. Since we are using the default heap state backend instead of an external one (such as RocksDB), all state information is stored in managed memory.
As user’s transaction histories grow, memory usage increases, putting pressure on both managed memory and task heap, which can significantly impact performance. Internal memory management is important for optimizing Flink jobs, and therefore we would like to ensure that outdated data is removed from the memory in a periodic manner.
This can be easily achieved using Flink’s timer service, which periodically cleans up outdated state data.
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<String> collector)
throws Exception {
Iterator<Map.Entry<Long, Transaction>> iterator = transactionState.iterator();
while (iterator.hasNext()) {
if (iterator.next().getKey() < timestamp - IP_CHECK_WINDOW) {
iterator.remove();
}
}
}
This timer ensures that outdated transactions are removed from state after they’re no longer needed for fraud detection.
Memory management or tuning is essential to writing performant production-grade Flink applications. Detecting suspicious transactions within a certain latency threshold is a common requirement in most real-time systems and therefore, allocating memory for the job is always a core consideration.
Let’s take a quick look at Flink’s memory model and its components -
The total process memory of Flink’s JVM processes includes both the memory used by the Flink application (total Flink memory) and the memory required by the JVM to run the process. Flink’s total memory consumption consists of JVM heap memory and off-heap memory, which can be either direct or native memory.
JobManager Memory is in charge of checkpoint coordination, Job management and metadata storage. It includes off-heap memory which is used for network buffers and RPC communication between Flink components and JVM memory which stores internal data structures and job graphs.
TaskManager Memory is allocated to manage state and execute tasks. It consists of managed memory (managed by Flink) for state backends such as RocksDB, in our use case it stores the MapState of transactions when using heap state backend, task heap memory for transformations, network memory for shuffling data and framework heap memory for storing internal data structures.
What we saw above was a simple Fraud detection system and isn’t suitable for production use. While, we’ll cover that in a future blog, here are some quick considerations while moving to production in a Kubernetes cluster.
On a Kubernetes cluster, we can install the official Flink Kubernetes operator using the helm install command. Alternatively, this operator can be installed via ArgoCD, so that you can adopt a GitOps approach. The Helm chart provides various configurable parameters to customize the operator deployment according to your environment needs which can be found here.
Custom Flink images are created by extending the official Flink base image. These images package your application JAR files and any extra dependencies needed for your Flink job.
The FlinkDeployment resource manages Flink applications defined in YAML format. The specification contains all the information the operator needs to deploy and manage Flink deployments, including container images, JobManager and TaskManager configurations such as pod resources (CPU/memory), desired state settings etc.
By implementing five key detection rules — IP monitoring, pattern analysis, geographic analysis, velocity checks, and transaction frequency, this application demonstrates a preliminary approach to detecting suspicious transactions.
Flink’s core features, including stateful processing, event-time handling, and exactly-once processing, ensure accuracy and reliability in detecting suspicious activity. Future enhancements could include incorporating more advanced techniques like Statistical analysis, historical data analysis, graph-based algorithms and anomaly detection models to further improve detection accuracy.
In a follow-up blog post, we will explore Flink’s Pattern API to detect even more complex fraud patterns.
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Block quote
Ordered list
Unordered list
Bold text
Emphasis
Superscript
Subscript
Stay in the know and learn about the latest trends in fraud, credit, and compliance risk.