Go Back
Fraud Detection

Real Time Fraud Detection Using Apache Flink — Part 2

Real Time Fraud Detection Using Apache Flink — Part 2
Kumar Sanjog
CEO & Co-Founder
10 mins read

Introduction

In a previous blog post we showcased how to get started with building a fraud detection application by creating fraud detection rules using the Java API.

Real Time Fraud Detection Using Apache Flink — Part 1

A simple Flink application to detect suspicious transactions

In this blog, we’ll explore the pattern API to see how we can create fraud rules based on patterns. Although pattern API can be implemented in multiple ways, we’ll use SQL as it provides a way to understand the nuances of pattern API with little to no programming knowledge.

Why the Pattern API?

In fraud detection, you often need to identify a series of suspicious events over time — such as a sequence of small transactions followed by an unusually large transaction. Apache Flink’s Pattern API makes it easy to detect these complex event patterns in real time. Using pattern matching, you can define rules that scan through continuous data streams to catch subtle behavioral cues indicative of fraud.

Individual patterns

An individual pattern is a basic sequence of events specified by certain conditions. Individual patterns serve as the building blocks for more complex pattern recognition scenarios. An individual pattern can either be a singleton or looping pattern. Singleton patterns accept one event whereas looping patterns accept one or more events.

For example, consider the individual pattern A B+C, a simple way to represent an Event A followed by one or more of Event B.

Flink Event Pattern — Example 1

Here’s another example C D? E — it’s similar to the example above with the difference that the ? is used to denote an optional event.

Flink Event Pattern — Example 2

Simple patterns can be combined to create complex ones. For e.g. consider the pattern ((A B+) C D? E), which combines the 2 individual patterns we saw above.

A combined pattern (strict contiguity)

Flink supports 3 different forms of contiguity between events when joining different patterns.

  1. Strict: combined patterns immediately follow each other. The example above is a strict contiguity.
  2. Relaxed: Non-matching events in between the patterns are ignored
  3. Non determinstic: relaxed contiguity, allowing additional matches that ignore some matching events

Groups of Patterns

Groups of patterns take combination a step further by structuring multiple patterns under higher-level logic. For example we can alter the pattern in the previous section into 2 groups of patterns as follows

((A B+) | (C D? E))

This pattern can be explained as either event A followed by one or more event B’s or event C followed by an optional event D and terminating with event D. A match is generated if a sequence of events matches either of the patterns.

Match skip strategies

A match skip strategy tells Flink how to proceed once a match has been detected. Common options include

  • NO_SKIP: Every possible match will be emitted.
  • SKIP_TO_NEXT: Discards every partial match that started with the same event, emits that last match.
  • SKIP_PAST_LAST_EVENT: Discards every partial match that started after the match started but before it ended.
  • SKIP_TO_FIRST: Discards every partial match that started after the match started but before the first event of PatternName occurred.
  • SKIP_TO_LAST: Discards every partial match that started after the match started but before the last event of PatternName occurred.

These strategies prevent overlaps or missed matches, ensuring precise and efficient pattern detection.

How the Pattern API works on real-time events

The Pattern API lets you declare a sequence of events (or conditions) that must occur in a specific order. In the diagram below we define a pattern as A+ B, it means that one or more occurrences of event A (e.g., small transactions) are immediately followed by event B (e.g., a large transaction). This pattern can help flag a fraudulent behaviour pattern where an attacker tests the waters with small amounts before attempting a large-scale fraud.

Applying fraud patterns on events

Pattern API can be implemented with multiple APIs like Java, Scala, SQL etc. For the purpose of this blog we are going to be using Flink’s SQL API.By using Flink SQL for pattern matching, you can define fraud detection logic in a declarative, concise, and readable manner. The enhanced readability makes it easier for engineers and analysts alike to understand and iterate if required.

A Fraud Detection System using the Pattern API

System Overview

Let’s take a quick look at the high-level architecture and flow of data before we dig deep.

System Overview

Fraud Detection Rules

Since this is a demonstration of the pattern API in Flink we create a simple pattern based rule

1. Custom transaction pattern: In this case we spot suspicious transactional sequences as they happen by small-amount transactions immediately followed by a sudden large transfer,between the same sender receiver pair.

2. Rapid-Fire Transactions to Many Recipients pattern: Detect if a user sends transactions to more than 3 distinct recipients within 2 minutes, possibly indicating suspicious “money mule” behaviour. Identify even if the transactions are small value transactions as this might be indicative of behavior to avoid detection.

3. High incoming Low retention pattern: The account received a high credit amount (above threshold) but 90%+ was transferred out within 24h.

Setting Up

In the first blog we setup a fraud detection application which does the following in order

  1. Reads data from a Kafka topic
  2. Runs fraud rules on the stream of transaction data
  3. Generates alert by publishing a messages in a different topic

For the pattern API we would be reusing most of the setup. We assume that there is already a datagen code running which is generating transaction data and publishing it to the transactions topic in kafka.

First lets start by opening the sql client. The SQL client can be opened by running sql-client.sh command in the job manager container.

SQL Client in the Flink Job Manager container

Inputs

Similar to the previous blog we assume that there is a kafka topic called transactions which has a continuous stream of transaction data flowing into it. Each message in the topic is a JSON string which contains transaction related fields. An example transaction would be as follows

{
 "transaction_id": "2b792051-509c-4472-95af-598a94f612fa",
 "user_id": 1003,
 "recipient_id": 9004,
 "amount": 1005,
 "transaction_type": "credit",
 "ts": "2025-02-03 17:01:12"
}

We can register the transactions input table by running the following command

create table transactions (transaction_id STRING,
user_id BIGINT,
recipient_id BIGINT,
amount FLOAT,
transaction_type STRING,
ts TIMESTAMP(3),
message_ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
proctime AS PROCTIME(),
WATERMARK FOR message_ts AS message_ts - INTERVAL '5' SECOND) WITH
(
 'connector' = 'kafka',
 'topic' = 'transactions',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'kafka:9094',
 'format' = 'json'
);

At this stage there are some points to consider

1. Make sure you have the kafka connector jar corresponding to your flink version in the lib folder.

2. Make sure that the kafka broker address is according to your kafka setup. In this case the kafka broker is reachable on Kafka:9092 host port combination. This may vary according to your setup

3. The transaction data has to match table definition.

You can run a simple SELECT query to verify the table creation

Sample transaction data

Fraud Detection Logic

Rule 1 — Custom Transaction Pattern

We’ll define our first pattern based query to detect fraud as follows

SELECT
  firstSmallTxnTime,
  lastSmallTxnTime,
  largeTxnTime,
  largeTxnAmount
FROM
  transactions MATCH_RECOGNIZE (
 PARTITION BY user_id, recipient_id
 ORDER BY
   message_ts MEASURES A.ts AS firstSmallTxnTime,
   LAST(A.ts) AS lastSmallTxnTime,
   B.ts AS largeTxnTime,
   B.amount AS largeTxnAmount ONE ROW PER MATCH
 AFTER
   MATCH SKIP PAST LAST ROW PATTERN (A+ B) 
        DEFINE 
          A AS A.amount < 1,
      B AS B.amount > 1000
  );

Note that the field by which to order has to be a time field and has to have a watermark. The watermark helps to handle messages which arrive at a delay. In this case the watermark is set to 5 seconds when we created the transactions table so utmost we accept messages with up to 5 seconds of delay.

The image below illustrates how to interpret the pattern in the code above

Understanding a custom transaction pattern

Running the above query results in a flink job which keeps outputting 1 record per pattern match. If you were able to successfully run you should be able to see an output similar to this

Flagged event(s) based on the defined rule

Rule 2 — Rapid Fire Transactions to Many Recipients

Here we specify a 5 occurrences in 5 mins as the threshold for rapid fire transactions

SELECT
 user_id,
 smallTxnCount,
 firstSmallTxnTime,
 lastSmallTxnTime
FROM transactions
MATCH_RECOGNIZE (
 PARTITION BY user_id
 ORDER BY message_ts
 MEASURES
     COUNT(A.recipient_id) AS smallTxnCount,
     
     FIRST(A.ts) AS firstSmallTxnTime,
     LAST(A.ts) AS lastSmallTxnTime
     
 ONE ROW PER MATCH
 AFTER MATCH SKIP PAST LAST ROW
 PATTERN (A{5,}?) -- At least 5 occurrences of A
 WITHIN INTERVAL '5' MINUTE
 DEFINE
     A AS A.amount < 10
);

Flagged event(s) based on the defined rule

Rule 3 — High Incoming, Low Retention Pattern

The pattern here ends with B+? instead of just B+. Currently greedy quantifiers at the end are not allowed so by providing ‘?’ we can let the engine know when to end the pattern match

SELECT
 depositSum,
 depositCount,
 withdrawalSum,
 withdrawalCount,
 depositStartTime,
 depositEndTime,
 withdrawalStartTime,
 withdrawalEndTime
FROM transactions
MATCH_RECOGNIZE (
 PARTITION BY user_id
 ORDER BY message_ts
 MEASURES
     SUM(A.amount) AS depositSum,
     COUNT(A.amount) AS depositCount,
     FIRST(A.ts) AS depositStartTime,
     LAST(A.ts) AS depositEndTime,
    
     SUM(B.amount) AS withdrawalSum,
     COUNT(B.amount) AS withdrawalCount,
     FIRST(B.ts) AS withdrawalStartTime,
     LAST(B.ts) AS withdrawalEndTime
 ONE ROW PER MATCH
 AFTER MATCH SKIP PAST LAST ROW
 PATTERN (A+ B+?) WITHIN INTERVAL '1' HOUR
 DEFINE
     A AS A.transaction_type = 'credit',
     B AS B.transaction_type = 'debit'
          AND SUM(A.amount) > 500
          AND SUM(B.amount) > 0.8 * SUM(A.amount)
);

Flagged event(s) based on the defined rule

Outputs

The next step is to save our outputs to a new topic called fraudulent_transactions. We can do this by first creating a table in Flink SQL client corresponding to how the fraudulent transactions table should look like.

CREATE TABLE fraudulent_transactions (
 transaction_id STRING,
 ts TIMESTAMP(3),
 fraud_type STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'fraudulent_transactions',
 'properties.bootstrap.servers' = 'kafka:9094',
 'properties.group.id' = 'flink-fraudulent-consumer-group',
 'properties.auto.offset.reset' = 'earliest',
 'format' = 'json'
);

We can now insert the detected fraudulent transactions into the output table by simply running an insert query and changing our fields to match the table format. The updated insert query becomes

INSERT INTO fraudulent_transactions
SELECT
  transaction_id,
  ts,
  'custom_pattern' as fraud_type
FROM
  transactions MATCH_RECOGNIZE (
 PARTITION BY user_id, recipient_id
 ORDER BY
   message_ts MEASURES
   B.transaction_id AS transaction_id,
   B.ts AS ts
 ONE ROW PER MATCH
 AFTER
   MATCH SKIP PAST LAST ROW PATTERN (A + B) DEFINE A AS A.amount < 1,
   B AS B.amount > 1000
  );

Fraud Transactions Topic

This query creates a continuously running job which runs in the backend. We’ll create the rules corresponding to the other patterns well.

INSERT INTO fraudulent_transactions
SELECT
  transaction_id,
  ts,
  'high_incoming_low_retention_pattern' as fraud_type
FROM
  transactions MATCH_RECOGNIZE (
 PARTITION BY user_id
 ORDER BY message_ts
 MEASURES
     LAST(B.transaction_id) AS transaction_id,
     LAST(B.ts) AS ts,
 ONE ROW PER MATCH
 AFTER MATCH SKIP PAST LAST ROW
 PATTERN (A+ B+?) WITHIN INTERVAL '1' HOUR
 DEFINE
     A AS A.transaction_type = 'credit',
     B AS B.transaction_type = 'debit'
          AND SUM(A.amount) > 500
          AND SUM(B.amount) > 0.8 * SUM(A.amount)
);

INSERT INTO fraudulent_transactions
SELECT
  transaction_id,
  ts,
  'rapid_small_transactions_pattern' as fraud_type
FROM transactions
MATCH_RECOGNIZE (
 PARTITION BY user_id
 ORDER BY message_ts
 MEASURES     
     LAST(A.transaction_id) AS transaction_id,
     LAST(A.ts) AS ts
     
 ONE ROW PER MATCH
 AFTER MATCH SKIP PAST LAST ROW
 PATTERN (A{5,}?) -- At least 5 occurrences of A
 WITHIN INTERVAL '5' MINUTE
 DEFINE
     -- 'A' represents small transactions
     A AS A.amount < 10
);

We can verify the results of the job by running a simple select on the fraudulent_transactions table.

Ingesting flagged events from Rule 1 to `fraudulent_transactions`
Ingesting flagged events from Rule 2to `fraudulent_transactions`
Ingesting flagged events from Rule 3 to `fraudulent_transactions`
Flagged transaction events based on all 3 rules

Thus, we have successfully created a pattern based potential fraud detection rule which outputs to a kafka topic.

Possible Improvements

Build More Complex Patterns Using ML and Historical Data

Incorporate machine learning algorithms and historical data. This allows for refined thresholds, predictive indicators, and the discovery of sophisticated fraud patterns that might go unnoticed by simpler rules. Future enhancements could include incorporating more advanced techniques like Statistical analysis, historical data analysis, graph-based algorithms and anomaly detection models to further improve detection accuracy.

Incorporate External Sources During Pattern Recognition

Finally, we should look at integrating additional data streams or external services into our detection pipeline. This might include IP reputation data for other sources, user profiling information, or features from external online stores. Combining multiple data sources will help create a more robust and accurate fraud detection system.

Explore JAVA CEP APIs

While Flink SQL’s MATCH_RECOGNIZE clause is powerful for many scenarios, it can be challenging to implement highly specialized logic — such as dynamic thresholds, complex calculations, or external data lookups — using a purely declarative approach. The Java CEP API offers finer-grained control over state, timers, and event sequencing, making it well-suited for nuanced or highly customised fraud detection workflows.

Conclusion

Apache Flink provides a powerful, flexible foundation for real-time fraud detection. By combining robust stream processing with pattern recognition teams can quickly flag suspicious activity in a continuous flow of transactions. Whether you’re capturing simple sequences of small and large transactions or building sophisticated, ML-driven rules across multiple data streams, Flink’s event-time processing and stateful analytics enable a scalable, low-latency solution. As fraudulent behaviors continue to evolve, leveraging Flink’s pattern recognition capabilities allows you to stay ahead of threats and maintain trust in your platform.

Heading 1

Heading 2

Heading 3

Heading 4

Heading 5
Heading 6

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

Block quote

Ordered list

  1. Item 1
  2. Item 2
  3. Item 3

Unordered list

  • Item A
  • Item B
  • Item C

Text link

Bold text

Emphasis

Superscript

Subscript

Kumar Sanjog
Article written by

Kumar Sanjog

Kumar Sanjog is the Co-Founder of CansoAI, a platform revolutionizing machine learning systems with faster workflows and real-time decisioning. With years of experience in AI and automation, Kumar is passionate about empowering businesses to unlock the full potential of data-driven insights.

Subscribe to email updates

Stay in the know and learn about the latest trends in fraud, credit, and compliance risk.

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Learn more about Canso

Want to know more about us?

Go to Home

Learn more about this topic

We’d love to hear from you

Contact Us

Explore similar blogs

Real Time Fraud Detection Using Apache Flink — Part 1

Real Time Fraud Detection Using Apache Flink — Part 1

Real Time Fraud Detection Using Apache Flink — Part 1
Kumar Sanjog
CEO & Co-Founder
11 mins read
Driving Faster Time to Value in Fraud Detection — A CRO Guide

Driving Faster Time to Value in Fraud Detection — A CRO Guide

Driving Faster Time to Value in Fraud Detection — A CRO Guide
Kumar Sanjog
CEO & Co-Founder
13 mins read
How to Build a Real-time Fraud Prevention System

How to Build a Real-time Fraud Prevention System

How to Build a Real-time Fraud Prevention System
Kumar Sanjog
CEO & Co-Founder
10 mins read