Skip to main content
Ctrl+K
Logo image

Section Navigation

Introduction

  • Ververica Cloud
  • Core Concepts
  • Account and Workspace

Getting Started

  • Quickstart
    • SQL Deployment
    • JAR Deployment
    • Python Deployment
    • Getting started with Flink ML
    • Getting started with dynamic Flink CEP

Developer Guide

  • SQL Editor
    • Create Draft
    • Usability Features
    • Debug SQL Draft
    • Deploy to Production
  • User-Defined Function (UDF)
  • Metadata Management
    • Catalogs
  • Built-in Connectors
    • Apache Kafka
    • Apache Upsert Kafka
    • AWS Kinesis
    • DataGen
    • Faker
    • Elasticsearch
    • MySQL & MySQL CDC
    • Postgres CDC
    • Redis
    • Apache Paimon
  • Custom Connectors
  • Built-in Formats
    • Avro
    • Confluent Avro
    • Canal-json
    • CSV
    • Debezium-avro-confluent
    • Debezium-json
    • JSON
    • Maxwell-json
    • JSON rule definitions in dynamic Flink CEP
    • ORC
    • Parquet
    • Raw
  • Custom Formats
  • Artifacts Management
  • Session Clusters

Operations Guide

  • Deployment Management
    • Create Deployment
      • JAR
      • Python
    • Start Deployment
    • Modify Deployment
    • View Deployment
    • Snapshots
    • Cancel Deployment
    • Delete Deployment
    • Deployment Defaults
  • Performance Tuning
    • FlinkSQL Performance Guide
  • Monitoring and Diagnostics
    • Metrics-VVC
    • Logs
      • Change logs level for a job
      • View Deployment Events
      • View startup logs and running logs of a job
  • Security Center
    • Private Connections
      • MSK
      • Kinesis
      • RDS
        • VPC Setup for Lambda Function
    • Secret Values

Additional Information

  • Credits
  • VVC docs
  • Quickstart
  • Getting started with dynamic Flink CEP

Getting started with dynamic Flink CEP#

Ververica Cloud enables you to execute complex event processing (CEP) with Flink, which supports on-the-fly rule modifications using DataStream operations. This article explains how to create a Flink CEP setup that can automatically load updated rules to handle data from upstream Kafka, all while utilizing a fully managed Flink environment.

Background Context:#

Advertisers typically allocate budgets when they display ads on e-commerce sites. Consider a scenario where an advertiser’s ad cost is determined by the number of ad clicks. If this advertiser falls victim to fraudulent online traffic, their budget will deplete rapidly, causing the ad to be taken down sooner than planned. This not only jeopardizes the advertiser’s interests but might also lead to future grievances and disagreements.

To counteract these types of attacks, it’s crucial to swiftly pinpoint malicious traffic and implement countermeasures, like barring deceitful users or alerting the advertisers, to safeguard user rights and interests. Moreover, unforeseen variables, such as endorsements from celebrities or viral trends, can trigger abrupt shifts in traffic. In these instances, the rules that discern malicious traffic need to be flexibly adjusted to prevent unintentional harm to genuine users.

This section elucidates the use of dynamic Flink CEP to address the challenges mentioned above. Here, user behavior logs are retained in an Amazon MSK for Kafka system. A Flink CEP system is tailored to process this Kafka data. To achieve this, the Flink CEP system checks the rule table in the Amazon RDS for MySQL database periodically, retrieves the most recent rules set by policy creators, and then employs these rules for event matching. Depending on the events that align with these rules, the Flink CEP system either sends alerts or logs pertinent details into other databases. An illustrative diagram depicts the comprehensive data flow for this case.

In our illustration, once the Flink CEP system is operational, Rule 1 is introduced: If a sequence of three events all have an action value of 0, and the following event also doesn’t have an action value of 1, it indicates that a product has been visited thrice in a row without a purchase. After events are filtered and processed according to Rule 1, the rule’s pattern is then adjusted to: Five sequential events with an action value of either 0 or 2, and the subsequent event again not having an action value of 1, to accommodate increasing traffic. A second rule, mirroring Rule 1’s original pattern, is added for functionality testing like multi-rule support. Additionally, other rules can also be incorporated.

Prerequisites#

You have create upstream and downstream storage instances.

  • Amazon MSK

  • Amazon RDS for MySQL

Step 1: Prepare test data#

Create an upstream Kafka topic#

Create an upstream Kafka topic named demo_topic to store simulated user behavior logs in your Amazon MSK.

Prepare an Amazon RDS for MySQL database#

Create a rule table named rds_demo to record the rules that you want to use in your Flink CEP deployment.

On the SQLConsole tab, enter the following statements and click Execute:

CREATE DATABASE cep_demo_db;
USE cep_demo_db;

CREATE TABLE rds_demo (
`id` VARCHAR(64),
`version` INT,
`pattern` VARCHAR(4096),
`function` VARCHAR(512)
);

Each entry in the rule table signifies a rule. This includes the ‘id’ and ‘version’ fields which denote the rule’s unique ID and its version. The ‘pattern’ field outlines the pattern object as defined by the Flink CEP API, while the ‘function’ field details how to handle a series of events that align with the specified pattern.

Step 2: Configure the Amazone RDS for MySQL with “0.0.0.0/0” whitelist#

Step 3: Develop a Flink CEP draft and start the deployment for the draft#

Note

All the code discussed in this article is available for download from the GitHub repository. In the subsequent section, we will guide you through creating a Flink CEP prototype and initiating its deployment for your reference.

  1. Configure the repositories that need to be used by the pom.xml file in a Maven project.

  2. Add flink-cep as a project dependency to the pom.xml file for your Flink CEP draft.

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-cep</artifactId>
    <version>{insert-version}</version>
    <scope>provided</scope>
</dependency>
  1. Create a Kafka source.

  2. Configure the CEP.dynamicPatterns() method.

To facilitate the dynamic modification of CEP rules and enable matching using various rules, Ververica Cloud introduces the CEP.dynamicPatterns() method. Here’s the syntax for this method:

public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
    DataStream<T> input,
    PatternProcessorDiscovererFactory<T> discovererFactory,
    TimeBehaviour timeBehaviour,
    TypeInformation<R> outTypeInfo)

The following table describes the parameters that are required when you call this method. You can configure and modify the parameters based on your business requirements.

Parameter

Description

DataStream<T> input

The input event stream.

PatternProcessorDiscovererFactory<T> discovererFactory

The factory object. The factory object constructs a PatternProcessorDiscoverer interface that is used to obtain the most recent rules. In this case, a PatternProcessor interface is constructed.

TimeBehaviour timeBehaviour

The time attribute that defines how the Flink CEP deployment processes events.

  • TimeBehaviour.ProcessingTime: specifies that events are processed based on the processing time.

  • TimeBehaviour.EventTime: specifies that events are processed based on the event time.

TypeInformation<R> outTypeInfo

The type information of the output stream.

For an in-depth understanding of Flink deployment’s standard concepts like DataStream, TimeBehaviour, and TypeInformation, refer to the Flink DataStream API Programming Guide, the article on Notions of Time: Event Time and Processing Time, and the Class TypeInformation<T> documentation.

This segment delves into the PatternProcessor interface. The PatternProcessor interface comprises a Pattern method, which outlines the criteria for event-pattern matching, and a PatternProcessFunction method, detailing the steps post-matching — for instance, dispatching alert notifications. Additionally, fields like id and version are present for uniquely pinpointing the crafted pattern processor. Essentially, a PatternProcessor interface formulates a rule and delineates the Flink CEP deployment’s reaction upon its activation. To delve deeper, consult FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).

The PatternProcessorDiscovererFactory interface plays a role in crafting a PatternProcessorDiscoverer interface, which fetches updates for the pattern processor. In the presented instance, a standard abstract class is deployed to scan external storage at regular intervals. The sample code provided below demonstrates initiating a timer for consistent external storage checks, aiming to retrieve pattern processor updates:

public abstract class PeriodicPatternProcessorDiscoverer<T>
        implements PatternProcessorDiscoverer<T> {

    ...
    @Override
    public void discoverPatternProcessorUpdates(
            PatternProcessorManager<T> patternProcessorManager) {
        // Periodically discovers the pattern processor updates.
        timer.schedule(
                new TimerTask() {
                    @Override
                    public void run() {
                        if (arePatternProcessorsUpdated()) {
                            List<PatternProcessor<T>> patternProcessors = null;
                            try {
                                patternProcessors = getLatestPatternProcessors();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                        }
                    }
                },
                0,
                intervalMillis);
    }

    ...
}

Ververica Cloud offers the JDBCPeriodicPatternProcessorDiscoverer interface, enabling retrieval of the latest rules from databases compatible with the Java Database Connectivity (JDBC) protocol. Below is a table detailing the parameters necessary when utilizing this interface:

Parameter

Description

jdbcUrl

The JDBC URL that you can use to connect to the database.

jdbcDriver

The name of the database driver class.

tableName

The name of the table in the database.

initialPatternProcessors

The pattern processor updates that are pulled from the database. If the rule table in the database is empty, the default pattern processor is used.

intervalMillis

The interval at which the database is polled.

The following sample code provides an example. In this example, the events matched by the Flink CEP deployment are displayed in the TaskManager logs of Flink.

// import ......
public class CepDemo {

    public static void main(String[] args) throws Exception {

        ......
        // DataStream Source
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");

        env.setParallelism(1);
        // keyBy userId and productionId
        // Notes, only events with the same key will be processed to see if there is a match
        KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                source.keyBy(
                        new KeySelector<Event, Tuple2<Integer, Integer>>() {

                            @Override
                            public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                                return Tuple2.of(value.getId(), value.getProductionId());
                            }
                        });

        SingleOutputStreamOperator<String> output =
            CEP.dynamicPatterns(
            keyedStream,
            new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS),
            TimeBehaviour.ProcessingTime,
            TypeInformation.of(new TypeHint<String>() {}));

        output.print();
        // Compile and submit the job
        env.execute("CEPDemo");
    }
}

Note

In the demo code, the CEP.dynamicPatterns() method is added to split input data streams into different partitions based on the userId and productionId parameters. This way, only events that have the same values of the userId and productionId parameters are matched based on the rules. Events that have different key values are not matched.

  1. Upload a JAR file and create a JAR deployment in the console of Ververica Platform. For more information, see how-to create a deployment doc.

You must download a test JAR file for subsequent operations. The following table describes the parameters that you need to configure when you create a deployment.

Note

In this example, no data is stored in the upstream Kafka source, and the rule table in the database is empty. Therefore, no output is returned after you run the deployment.

Parameter

Description

Deployment Type

Select JAR.

Deployment Name

Enter the name of the JAR deployment that you want to create.

Engine Version

Select an engine version

JAR URL

Upload your JAR file or the test JAR file.

Entry Point Class

Set the value to com.ververica.cep.demo.CepDemo.

Entry Point Main Arguments

If using your JAR file and it contains upstream and downstream storage configurations, this is not required. If using the test JAR, input parameters:

  • –kafkaBrokers YOUR_KAFKA_BROKERS

  • –inputTopic YOUR_KAFKA_TOPIC

  • –inputTopicGroup YOUR_KAFKA_TOPIC_GROUP

  • –jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD

  • –tableName YOUR_TABLE_NAME

  • –jdbcIntervalMs 3000

Parameters

  • kafkaBrokers: the addresses of Kafka brokers.

  • inputTopic: the name of the Kafka topic.

  • inputTopicGroup: the consumer group of Kafka.

  • jdbcUrl: the JDBC URL of the database.

Note

Ensure the JDBC URL username and password are standard, with the password containing only letters and digits. Modify the parameters as per your storage instance details. Avoid exceeding 1024 characters for parameter values. If necessary, use a dependency file for complex parameters.

  • tableName: the name of the destination table.

  • jdbcIntervalMs: the interval at which the database is polled.

Note

You must change the values of the preceding parameters based on the information about the upstream and downstream storage instances. A parameter value cannot be greater than 1024 characters in length. We recommend that you do not use complex parameters. A parameter whose value includes line breaks, spaces, or other special characters is considered a complex parameter. A parameter value can contain only letters and digits. If you want to pass complex parameters, use a dependency file.

  1. In the Parameters section of the Configuration tab on the Deployments page, enter the following code in the Other Configuration field:

kubernetes.application-mode.classpath.include-user-jar: 'true'
classloader.resolve-order: parent-first
  1. On the Deployments page in the console of fully managed Flink, find the desired deployment and click Start in the Actions column.

Step 4: Add a rule#

Once the Flink CEP deployment is initiated, implement version 1 of Rule 1: When three successive events have an action value of 0, and the subsequent event does not have an action value of 1, it signifies that there hasn’t been a purchase even after three continuous product visits.

  1. Add a rule that is dynamically updated in Amazon RDS for MySQL

Merge the JSON string that defines a rule with the field names such as id, version, and function, and then execute the INSERT INTO statement to insert data into the rule table in the ApsaraDB RDS for MySQL database.

INSERT INTO rds_demo (
    `id`,
    `version`,
    `pattern`,
    `function`
) values(
    '1',
    1,
    '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
    'com.ververica.cep.demo.dynamic.DemoPatternProcessFunction'
);

For clearer readability of the pattern field in a database, Realtime Compute for Apache Flink has outlined a series of rule descriptions in JSON format. For detailed insights, refer to Definitions of rules in the JSON format. The pattern field value in the mentioned SQL statement stands as a serialized pattern string of a rule depicted in JSON format. Essentially, this string is designed to capture a specific pattern: When three successive events carry an action value of 0, the following event also does not present an action value of 1.

Note

The EndCondition parameter in the following code defines that the action value of the next event is still not 1.

  • The following code shows the syntax of the Pattern method:

Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
        .where(new StartCondition("action == 0"))
        .timesOrMore(3)
        .followedBy("end")
        .where(new EndCondition());
  • The following sample code provides an example of the JSON string that defines a rule:

{
    "name": "end",
    "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": ["SINGLE"],
        "times": null,
        "untilCondition": null
    },
    "condition": null,
    "nodes": [
        {
            "name": "end",
            "quantifier": {
                "consumingStrategy": "SKIP_TILL_NEXT",
                "properties": ["SINGLE"],
                "times": null,
                "untilCondition": null
            },
            "condition": {
                "className": "com.ververica.cep.demo.condition.EndCondition",
                "type": "CLASS"
            },
            "type": "ATOMIC"
        },
        {
            "name": "start",
            "quantifier": {
                "consumingStrategy": "SKIP_TILL_NEXT",
                "properties": ["LOOPING"],
                "times": {
                    "from": 3,
                    "to": 3,
                    "windowTime": null
                },
                "untilCondition": null
            },
            "condition": {
                "expression": "action == 0",
                "type": "AVIATOR"
            },
            "type": "ATOMIC"
        }
    ],
    "edges": [
        {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
        }
    ],
    "window": null,
    "afterMatchStrategy": {
        "type": "SKIP_PAST_LAST_EVENT",
        "patternName": null
    },
    "type": "COMPOSITE",
    "version": 1
}
  1. Use a Kafka client to send messages to the demo_topic topic.

1,Ken,0,1,1662022777000
1,Ken,0,1,1662022778000
1,Ken,0,1,1662022779000
1,Ken,0,1,1662022780000

The following table describes the fields of messages in the demo_topic topic.

Field

Description

id

The ID of the user.

username

The name of the user.

action

The action of the user. Valid values:

  • 0: the view operation.

  • 1: the purchase operation.

  • 2: the share operation.

product_id

The ID of the product.

event_time

The event time when the action was performed.

  1. View the most recent rule that is displayed in the JobManager logs and the matching results displayed in the TaskManager logs.

  • On the Logs tab of the Job Manager tab, use the keyword JDBCPeriodicPatternProcessorDiscoverer to search for the most recent rule.

Placeholder
  • On the Log List tab of the Running Task Managers tab, find the log file whose name ends with .out and use the A match for Pattern of (id, version): (1, 1) keyword to search for the matching results.

Placeholder

Step 5: Update the matching rules and check whether the updated rules take effect#

Once events have been matched and processed according to Rule 1, modify the pattern of Rule 1 to: Five successive events with the action value of either 0 or 2, followed by an event that doesn’t have an action value of 1. This adjustment addresses heightened traffic. Add Rule 2, which has a pattern identical to Rule 1’s original, to test capabilities like the support for multiple rules.

  1. Update the matching rules in Amazon RDS for MySQL

Change action == 0 in the value of the StartCondition parameter to action == 0 || action == 2, and change the two values of the times parameter from 3 to 5. This is version 2 of Rule 1. The following SQL statement provides an example.

INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`)
values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.ververica.cep.demo.dynamic.DemoPatternProcessFunction');

Add a rule whose id value is 2 as Rule 2.

The SQL statement of Rule 2 is the same as that of version 1 of Rule 1. The StartCondition parameter is still set to action == 0, and the two values of the times parameter are 3 and 3.

INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`)
values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
  1. Send eight simple messages to trigger a match in the Amazon MQ for Kafka console.

The following eight messages provide an example:

1,Ken,0,1,1662022777000
1,Ken,0,1,1662022777000
1,Ken,0,1,1662022777000
1,Ken,2,1,1662022777000
1,Ken,0,1,1662022777000
1,Ken,0,1,1662022777000
1,Ken,0,1,1662022777000
1,Ken,2,1,1662022777000

On the Log List tab of the Running Task Managers tab, find the log file whose name ends with .out, and view the matching results.

  • If you want to search for the matching results that are generated based on Version 2 of Rule 1, use the A match for Pattern of (id, version): (1, 2) keyword.

Placeholder
  • If you want to search for the matching results that are generated based on Version 1 of Rule 2, use the A match for Pattern of (id, version): (2, 1) keyword.

Placeholder

The previously presented figures depict the following match outcomes:

  • According to Version 2 of Rule 1, the Flink CEP deployment identifies a series of events consisting of five successive actions with values of 0 or 2, followed by an action value different from 1. This confirms the successful implementation of the dynamically updated Rule 1.

  • As per Version 1 of Rule 2, the Flink CEP deployment spots two event sequences, each comprising three consecutive actions with a value of 0, succeeded by an action with a value not equal to 1. This confirms the effective addition of the dynamically introduced Rule 2.

On this page
  • Background Context:
  • Prerequisites
    • Step 1: Prepare test data
      • Create an upstream Kafka topic
      • Prepare an Amazon RDS for MySQL database
    • Step 2: Configure the Amazone RDS for MySQL with “0.0.0.0/0” whitelist
    • Step 3: Develop a Flink CEP draft and start the deployment for the draft
    • Step 4: Add a rule
    • Step 5: Update the matching rules and check whether the updated rules take effect
Logo image

Built with the PyData Sphinx Theme 0.13.3.