Skip to main content
Ctrl+K
Logo image

Section Navigation

Introduction

  • Ververica Cloud
  • Core Concepts
  • Account and Workspace

Getting Started

  • Quickstart
    • SQL Deployment
    • JAR Deployment
    • Python Deployment
    • Getting started with Flink ML
    • Getting started with dynamic Flink CEP

Developer Guide

  • SQL Editor
    • Create Draft
    • Usability Features
    • Debug SQL Draft
    • Deploy to Production
  • User-Defined Function (UDF)
  • Metadata Management
    • Catalogs
  • Built-in Connectors
    • Apache Kafka
    • Apache Upsert Kafka
    • AWS Kinesis
    • DataGen
    • Faker
    • Elasticsearch
    • MySQL & MySQL CDC
    • Postgres CDC
    • Redis
    • Apache Paimon
  • Custom Connectors
  • Built-in Formats
    • Avro
    • Confluent Avro
    • Canal-json
    • CSV
    • Debezium-avro-confluent
    • Debezium-json
    • JSON
    • Maxwell-json
    • JSON rule definitions in dynamic Flink CEP
    • ORC
    • Parquet
    • Raw
  • Custom Formats
  • Artifacts Management
  • Session Clusters

Operations Guide

  • Deployment Management
    • Create Deployment
      • JAR
      • Python
    • Start Deployment
    • Modify Deployment
    • View Deployment
    • Snapshots
    • Cancel Deployment
    • Delete Deployment
    • Deployment Defaults
  • Performance Tuning
    • FlinkSQL Performance Guide
  • Monitoring and Diagnostics
    • Metrics-VVC
    • Logs
      • Change logs level for a job
      • View Deployment Events
      • View startup logs and running logs of a job
  • Security Center
    • Private Connections
      • MSK
      • Kinesis
      • RDS
        • VPC Setup for Lambda Function
    • Secret Values

Additional Information

  • Credits
  • VVC docs
  • Built-in Formats
  • JSON rule definitions in dynamic Flink CEP

JSON rule definitions in dynamic Flink CEP#

This article explains the principles of rule definitions in dynamic Flink Complex Event Processing (CEP), structured in the JSON format.

Intended audience#

  • Platform developers specializing in risk control: Those developers who possess an understanding of dynamic Flink CEP can leverage the format provided in this article to assess whether additional encapsulation is necessary according to the platform’s needs.

  • Risk control strategy personnel: Individuals who have knowledge specific to risk control strategies but lack Java development experience can utilize this format to create new CEP-based rules and implement them in live risk control deployments.

CEP rules defined in JSON format#

An event sequence pattern can be visualized as a graph. In this context, a graph’s node symbolizes a pattern for specific events, while an edge connecting node denotes an event selection strategy that determines the transition from one pattern to another in order to match events. Each graph can also be viewed as a subnode of a larger graph, thus enabling pattern nesting. Ververica Cloud has established a series of JSON specifications to articulate CEP rules and simplify the process of storing and adjusting these rules. The tables below outline the meanings of each field in these specifications.

  • Node

    A node indicates a complete pattern. The following table describes the fields in a pattern.

Field

Description

Data type

Required

Remarks

name

The name of the pattern.

STRING

Yes

A unique string. Note, the name of each node must be unique.

type

The type of the node.

ENUM(STRING)

Yes

For a node that includes a child pattern, the value of this field is COMPOSITE. For a node that does not include a child pattern, the value of this field is ATOMIC.

quantifier

A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once.

Dictionary

Yes

For more information, see “Quantifier” in this article.

condition

The condition.

Dictionary

No

For more information, see “Condition” in this article.

  • Quantifier

    A quantifier is used to describe how to match events with the pattern. For example, the quantifier property of the pattern "A*" is LOOPING and the consumingStrategy field of the pattern is set to SKIP_TILL_ANY.

Field

Description

Data type

Required

Remarks

consumingStrategy

The event selection strategy.

ENUM(STRING)

Yes

Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY. For more information about the values and their meanings, see “Contiguity” in this article.

times

The number of times the pattern needs to be matched.

Dictionary

No

Example:

The values of from and to must be of the INTEGER data type. The unit of windowTime can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. Note: windowTime can be set to null. This property is displayed in the following format: “windowTime”: null.

“times”: {

“from”: 3, “to”: 3, “windowTime”: { “unit”: “MINUTES”, “size”: 12 } },

properties

The properties of the quantifier.

Array of enumeration strings

Yes

For more information about the values and their meanings, see “Quantifier” in this article.

untilCondition

The condition that is used to stop matching the pattern. Note This parameter can be configured only after the pattern that uses the LOOPING quantifier.

Dictionary

No

For more information about the values and their meanings, see “Condition” in this article.

  • Condition

    Conditions are used to identify events that meet specific requirements. For example, if you want to identify users whose browsing duration is longer than 5 minutes, you can specify a condition that the browsing duration is longer than 5 minutes.

Field

Description

Data type

Required

Remarks

type

The type of the condition.

ENUM(STRING)

Yes

Valid values: CLASS: a custom condition. AVIATOR: a condition based on an aviator expression. GROOVY: a condition based on a Groovy expression.

…

Other custom fields that can be serialized.

No

…

The following conditions are supported:

  • Condition of the Class type

Field

Description

Data type

Required

Remarks

type

The type of the condition.

ENUM(STRING)

Yes

Set the value to Class.

className

The name of the class.

STRING

Yes

The full name of the class, such as com.ververica.cep.demo.StartCondition.

  • Condition that includes a custom parameter

    When you use a condition of the Class type, you can specify only the className parameter. Parameters cannot be dynamically passed. To improve the expression capabilities of conditions, dynamic Flink CEP supports the CustomArgsCondition condition, which includes a custom parameter. This way, you can configure required parameters in the CustomArgsCondition condition in the form of an array of JSON strings and dynamically construct CustomArgsCondition instances. In this case, you can dynamically update the parameters of the condition without the need to modify the Java code.

Field

Description

Data type

Required

Remarks

type

The type of the condition.

ENUM(STRING)

Yes

Set the value to Class.

className

The name of the class.

STRING

Yes

The full name of the class, such as com.ververica.cep.demo.CustomMiddleCondition.

args

The custom parameter.

Array of strings

Yes

The value is an array of strings.

  • Condition based on an aviator expression

    Aviator is an expression evaluation engine that dynamically compiles expressions into bytecode. For more information, see aviatorscript. You can use a condition based on an aviator expression in a deployment to dynamically modify the threshold of the condition. This way, you do not need to modify the Java code to recompile and run the code.

Field

Description

Data type

Required

Remarks

type

The name of the class.

STRING

Yes

Set the value to AVIATOR.

expression

An expression string.

STRING

Yes

This field specifies an expression string in a form similar to price > 10. The price variable is a field defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20, the price > 20 expression is dynamically loaded in a dynamic Flink CEP deployment to construct a new aviator condition to process subsequent events.

  • Condition based on a Groovy expression

    Groovy is a dynamic language based on the Java virtual machine (JVM) platform. For more information about the Groovy syntax, see Syntax. Dynamic Flink CEP allows you to use Groovy expressions to define conditions. This way, the thresholds of conditions can be dynamically modified.

Field

Description

Data type

Required

Remarks

type

The name of the class.

STRING

Yes

Set the value to GROOVY.

expression

An expression string.

STRING

Yes

This field specifies an expression string in a form similar to price > 5.0 && name.contains(“mid”). The variables, such as price and name, are fields defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20 && name.contains(“end”), the new Groovy string is dynamically loaded in a dynamic Flink CEP deployment to construct a new Groovy condition to process subsequent events.

  • Edge

Field

Description

Data type

Required

Remarks

source

The name of the source pattern.

STRING

Yes

N/A.

target

The name of the destination pattern.

STRING

Yes

N/A.

type

The event selection strategy.

Dictionary

Yes

Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT. For more information about the values and their meanings, see “Contiguity” in this article.

  • GraphNode extends Node

    A GraphNode indicates a complete pattern sequence. Each node of a GraphNode indicates an independent pattern. Each edge of a GraphNode indicates how to transform a pattern to another pattern to match events.

    A GraphNode is considered a subclass of Node and can be used as a Node in a larger GraphNode. This way, GroupPattern that indicates pattern nesting is supported. Compared with Node, GraphNode provides the following additional types of fields:

    • The nodes and edges fields that describe the structure of a graph.

    • The window field that describes the time window policy in a graph and the afterMatchSkipStrategy field that describes the skipping strategy used after event matching.

    The following table describes the fields of GraphNode.

Field

Description

Data type

Required

Remarks

name

The name of the composite pattern.

STRING

Yes

A unique string. Note: The name of each graph must be unique.

type

The type of the node.

ENUM(STRING)

Yes

Set the value to COMPOSITE.

version

The version of the JSON format that is used by the graph.

INTEGER

Yes

Default value: 1.

nodes

The child patterns that are nested in the pattern.

Array of nodes

Yes

The value of this field is an array. The array cannot be empty.

edges

The connection relationships between the nested child patterns.

Array of edges

Yes

The value of this field is an array. The array can be empty.

window

If the type of the window is FIRST_AND_LAST, this field indicates the maximum time interval between the start and end of a complete match of the composite pattern. If the type of the window is PREVIOUS_AND_CURRENT, this field indicates the maximum time interval between the matches of two adjacent child patterns.

Dictionary

No

The unit can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The data type can be LONG or INTEGER.
“window”: {

“type”: “FIRST_AND_LAST”, “time”: { “unit”: “DAYS”, “size”: 1 }

}

The unit can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The data type can be LONG or INTEGER.

afterMatchSkipStrategy

The skipping strategy after all events in the graph are matched.

Dictionary

Yes

For more information, see “AfterMatchSkipStrategy” in this article.

quantifier

A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once.

Dictionary

Yes

For more information, see “Quantifier” in this article.

  • AfterMatchSkipStrategy

Field

Description

Data type

Required

Remarks

type

The type of the strategy.

ENUM(STRING)

Yes

Valid values: NO_SKIP: returns each successful match in the output. This is the default value. SKIP_TO_NEXT: discards all partial matches that start with the same event. SKIP_PAST_LAST_EVENT: discards all partial matches that start between the beginning and end of the match. SKIP_TO_FIRST: discards all partial matches that start between the beginning of the match and the first occurrence of the event named PatternName. SKIP_TO_LAST: discards all partial matches that start between the beginning of the match and the last occurrence of the event named PatternName. For more information, see After Match Skip Strategy.

patternName

The name of the pattern that uses the strategy.

STRING

No

A unique string.

  • Contiguity

Valid value

Description

STRICT

Strict contiguity. Unmatched events cannot appear between matched events.

SKIP_TILL_NEXT

Relaxed contiguity. Unmatched events can appear between matched events. The unmatched events are ignored.

SKIP_TILL_ANY

Non-deterministic relaxed contiguity. This value indicates a further relaxed contiguity. In this contiguity mode, additional matches for specific matched events can be ignored.

NOT_NEXT

The subsequent event that occurs after an event cannot be a specified event.

NOT_FOLLOW

A specified event cannot appear subsequently.

For more information, see FlinkCEP - Complex event processing for Flink.

  • Properties of a quantifier

Valid value

Description

SINGLE

The pattern occurs only once.

LOOPING

The pattern is a looping pattern and may occur multiple times. This quantifier is similar to the asterisk (*) and plus sign (+) in regular expressions.

TIMES

The pattern can occur for a specified number of times.

GREEDY

The greedy matching strategy is used to match the pattern to obtain the maximum number of matches.

OPTIONAL

The pattern is optional.

Example 1: Use a common pattern#

This example describes how to use dynamic Flink CEP to adjust marketing strategies for the customers that meet the following conditions in a 10-minute time window during a real-time e-commerce promotional event:

  • Obtained coupons for a venue.

  • Added items to their shopping carts more than three times.

  • Did not complete the payments.

In the following sample code, the condition for obtaining coupons for a venue is defined as StartCondition, the condition for adding items to the shopping cart is defined as MiddleCondition, and the condition related to payment completion is defined as EndCondition. The following pattern is abstracted: In a 10-minute time window, an event that meets StartCondition occurs once, an event that meets MiddleCondition occurs three or more times, and no event that meets EndCondition occurs. The event that meets StartCondition is optional. The following sample code shows the Java code that describes the pattern in this example.

Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));

The following sample code shows the JSON-formatted code that describes the pattern in this example.

{
"name": "end",
"quantifier": {
  "consumingStrategy": "SKIP_TILL_NEXT",
  "properties": [
    "SINGLE"
  ],
  "times": null,
  "untilCondition": null
},
"condition": null,
"nodes": [
  {
    "name": "end",
    "quantifier": {
      "consumingStrategy": "SKIP_TILL_NEXT",
      "properties": [
        "SINGLE"
      ],
      "times": null,
      "untilCondition": null
    },
    "condition": {
      "className": "com.ververica.cep.demo.condition.EndCondition",
      "type": "CLASS"
    },
    "type": "ATOMIC"
  },
  {
    "name": "middle",
    "quantifier": {
      "consumingStrategy": "SKIP_TILL_NEXT",
      "properties": [
        "LOOPING"
      ],
      "times": {
        "from": 3,
        "to": 3,
        "windowTime": null
      },
      "untilCondition": null
    },
    "condition": {
      "className": "com.ververica.cep.demo.condition.MiddleCondition",
      "type": "CLASS"
    },
    "type": "ATOMIC"
  },
  {
    "name": "start",
    "quantifier": {
      "consumingStrategy": "SKIP_TILL_NEXT",
      "properties": [
        "SINGLE",
        "OPTIONAL"
      ],
      "times": null,
      "untilCondition": null
    },
    "condition": {
      "className": "com.ververica.cep.demo.condition.StartCondition",
      "type": "CLASS"
    },
    "type": "ATOMIC"
  }
],
"edges": [
  {
    "source": "middle",
    "target": "end",
    "type": "NOT_FOLLOW"
  },
  {
    "source": "start",
    "target": "middle",
    "type": "SKIP_TILL_NEXT"
  }
],
"window": {
  "type": "FIRST_AND_LAST",
  "time": {
    "unit": "MINUTES",
    "size": 10
  }
},
"afterMatchStrategy": {
  "type": "NO_SKIP",
  "patternName": null
},
"type": "COMPOSITE",
"version": 1

}

Example 2: Use a condition that includes a custom parameter in a pattern#

This example describes how to specify different marketing strategies for customers of different classes during a real-time e-commerce promotional event. For example, you can specify a marketing strategy that sends marketing-related text messages to customers of Class A, a marketing strategy that sends coupons to customers of Class B, and a marketing strategy that does not take marketing actions for other customers. You can define a condition of the common class in your deployment to meet the preceding requirements. If you want to adjust marketing strategies when a condition of the common class is used in your deployment, you must rewrite the deployment code and recompile and run the deployment. For example, you can modify a marketing strategy that sends coupons to customers of Class C. To simplify the operation, you can use the condition that includes a custom parameter. After you define how to adjust the strategies based on the passed parameter in the code, you need only change the value of the passed parameter in the database. The value of the passed parameter is the value of the args parameter of the condition that includes a custom parameter. For example, you can change [“A”, “B”] to [“A”, “B”, “C”] to perform dynamic updates of marketing strategies.

The following sample code shows the condition that is initially defined in the pattern.

{
  "condition": {
    "args": [
      "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
  }
}

You can change the preceding condition to the condition that includes a custom parameter. The following sample code shows the condition that includes a custom parameter.

{
  "condition": {
    "args": [
      "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
  }
}

For more information about how to use the condition that includes a custom parameter in specific business scenarios, see Demo.

On this page
  • Intended audience
  • CEP rules defined in JSON format
  • Example 1: Use a common pattern
  • Example 2: Use a condition that includes a custom parameter in a pattern
Logo image

Built with the PyData Sphinx Theme 0.13.3.