JSON rule definitions in dynamic Flink CEP#
This article explains the principles of rule definitions in dynamic Flink Complex Event Processing (CEP), structured in the JSON format.
Intended audience#
Platform developers specializing in risk control: Those developers who possess an understanding of dynamic Flink CEP can leverage the format provided in this article to assess whether additional encapsulation is necessary according to the platform’s needs.
Risk control strategy personnel: Individuals who have knowledge specific to risk control strategies but lack Java development experience can utilize this format to create new CEP-based rules and implement them in live risk control deployments.
CEP rules defined in JSON format#
An event sequence pattern can be visualized as a graph. In this context, a graph’s node symbolizes a pattern for specific events, while an edge connecting node denotes an event selection strategy that determines the transition from one pattern to another in order to match events. Each graph can also be viewed as a subnode of a larger graph, thus enabling pattern nesting. Ververica Cloud has established a series of JSON specifications to articulate CEP rules and simplify the process of storing and adjusting these rules. The tables below outline the meanings of each field in these specifications.
- Node
A node indicates a complete pattern. The following table describes the fields in a pattern.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
name |
The name of the pattern. |
STRING |
Yes |
A unique string. Note, the name of each node must be unique. |
type |
The type of the node. |
ENUM(STRING) |
Yes |
For a node that includes a child pattern, the value of this field is COMPOSITE. For a node that does not include a child pattern, the value of this field is ATOMIC. |
quantifier |
A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. |
Dictionary |
Yes |
For more information, see “Quantifier” in this article. |
condition |
The condition. |
Dictionary |
No |
For more information, see “Condition” in this article. |
- Quantifier
A quantifier is used to describe how to match events with the pattern. For example, the quantifier property of the pattern
"A*"
is LOOPING and the consumingStrategy field of the pattern is set to SKIP_TILL_ANY.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
consumingStrategy |
The event selection strategy. |
ENUM(STRING) |
Yes |
Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY. For more information about the values and their meanings, see “Contiguity” in this article. |
times |
The number of times the pattern needs to be matched. |
Dictionary |
No |
|
properties |
The properties of the quantifier. |
Array of enumeration strings |
Yes |
For more information about the values and their meanings, see “Quantifier” in this article. |
untilCondition |
The condition that is used to stop matching the pattern. Note This parameter can be configured only after the pattern that uses the LOOPING quantifier. |
Dictionary |
No |
For more information about the values and their meanings, see “Condition” in this article. |
- Condition
Conditions are used to identify events that meet specific requirements. For example, if you want to identify users whose browsing duration is longer than 5 minutes, you can specify a condition that the browsing duration is longer than 5 minutes.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The type of the condition. |
ENUM(STRING) |
Yes |
Valid values: CLASS: a custom condition. AVIATOR: a condition based on an aviator expression. GROOVY: a condition based on a Groovy expression. |
… |
Other custom fields that can be serialized. |
No |
… |
The following conditions are supported:
Condition of the Class type
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The type of the condition. |
ENUM(STRING) |
Yes |
Set the value to Class. |
className |
The name of the class. |
STRING |
Yes |
The full name of the class, such as com.ververica.cep.demo.StartCondition. |
- Condition that includes a custom parameter
When you use a condition of the Class type, you can specify only the className parameter. Parameters cannot be dynamically passed. To improve the expression capabilities of conditions, dynamic Flink CEP supports the CustomArgsCondition condition, which includes a custom parameter. This way, you can configure required parameters in the CustomArgsCondition condition in the form of an array of JSON strings and dynamically construct CustomArgsCondition instances. In this case, you can dynamically update the parameters of the condition without the need to modify the Java code.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The type of the condition. |
ENUM(STRING) |
Yes |
Set the value to Class. |
className |
The name of the class. |
STRING |
Yes |
The full name of the class, such as com.ververica.cep.demo.CustomMiddleCondition. |
args |
The custom parameter. |
Array of strings |
Yes |
The value is an array of strings. |
- Condition based on an aviator expression
Aviator is an expression evaluation engine that dynamically compiles expressions into bytecode. For more information, see aviatorscript. You can use a condition based on an aviator expression in a deployment to dynamically modify the threshold of the condition. This way, you do not need to modify the Java code to recompile and run the code.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The name of the class. |
STRING |
Yes |
Set the value to AVIATOR. |
expression |
An expression string. |
STRING |
Yes |
This field specifies an expression string in a form similar to price > 10. The price variable is a field defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20, the price > 20 expression is dynamically loaded in a dynamic Flink CEP deployment to construct a new aviator condition to process subsequent events. |
- Condition based on a Groovy expression
Groovy is a dynamic language based on the Java virtual machine (JVM) platform. For more information about the Groovy syntax, see Syntax. Dynamic Flink CEP allows you to use Groovy expressions to define conditions. This way, the thresholds of conditions can be dynamically modified.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The name of the class. |
STRING |
Yes |
Set the value to GROOVY. |
expression |
An expression string. |
STRING |
Yes |
This field specifies an expression string in a form similar to price > 5.0 && name.contains(“mid”). The variables, such as price and name, are fields defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20 && name.contains(“end”), the new Groovy string is dynamically loaded in a dynamic Flink CEP deployment to construct a new Groovy condition to process subsequent events. |
Edge
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
source |
The name of the source pattern. |
STRING |
Yes |
N/A. |
target |
The name of the destination pattern. |
STRING |
Yes |
N/A. |
type |
The event selection strategy. |
Dictionary |
Yes |
Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT. For more information about the values and their meanings, see “Contiguity” in this article. |
- GraphNode extends Node
A GraphNode indicates a complete pattern sequence. Each node of a GraphNode indicates an independent pattern. Each edge of a GraphNode indicates how to transform a pattern to another pattern to match events.
A GraphNode is considered a subclass of Node and can be used as a Node in a larger GraphNode. This way, GroupPattern that indicates pattern nesting is supported. Compared with Node, GraphNode provides the following additional types of fields:
The nodes and edges fields that describe the structure of a graph.
The window field that describes the time window policy in a graph and the afterMatchSkipStrategy field that describes the skipping strategy used after event matching.
The following table describes the fields of GraphNode.
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
name |
The name of the composite pattern. |
STRING |
Yes |
A unique string. Note: The name of each graph must be unique. |
type |
The type of the node. |
ENUM(STRING) |
Yes |
Set the value to COMPOSITE. |
version |
The version of the JSON format that is used by the graph. |
INTEGER |
Yes |
Default value: 1. |
nodes |
The child patterns that are nested in the pattern. |
Array of nodes |
Yes |
The value of this field is an array. The array cannot be empty. |
edges |
The connection relationships between the nested child patterns. |
Array of edges |
Yes |
The value of this field is an array. The array can be empty. |
window |
If the type of the window is FIRST_AND_LAST, this field indicates the maximum time interval between the start and end of a complete match of the composite pattern. If the type of the window is PREVIOUS_AND_CURRENT, this field indicates the maximum time interval between the matches of two adjacent child patterns. |
Dictionary |
No |
|
afterMatchSkipStrategy |
The skipping strategy after all events in the graph are matched. |
Dictionary |
Yes |
For more information, see “AfterMatchSkipStrategy” in this article. |
quantifier |
A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. |
Dictionary |
Yes |
For more information, see “Quantifier” in this article. |
AfterMatchSkipStrategy
Field |
Description |
Data type |
Required |
Remarks |
---|---|---|---|---|
type |
The type of the strategy. |
ENUM(STRING) |
Yes |
Valid values: NO_SKIP: returns each successful match in the output. This is the default value. SKIP_TO_NEXT: discards all partial matches that start with the same event. SKIP_PAST_LAST_EVENT: discards all partial matches that start between the beginning and end of the match. SKIP_TO_FIRST: discards all partial matches that start between the beginning of the match and the first occurrence of the event named PatternName. SKIP_TO_LAST: discards all partial matches that start between the beginning of the match and the last occurrence of the event named PatternName. For more information, see After Match Skip Strategy. |
patternName |
The name of the pattern that uses the strategy. |
STRING |
No |
A unique string. |
Contiguity
Valid value |
Description |
---|---|
STRICT |
Strict contiguity. Unmatched events cannot appear between matched events. |
SKIP_TILL_NEXT |
Relaxed contiguity. Unmatched events can appear between matched events. The unmatched events are ignored. |
SKIP_TILL_ANY |
Non-deterministic relaxed contiguity. This value indicates a further relaxed contiguity. In this contiguity mode, additional matches for specific matched events can be ignored. |
NOT_NEXT |
The subsequent event that occurs after an event cannot be a specified event. |
NOT_FOLLOW |
A specified event cannot appear subsequently. |
For more information, see FlinkCEP - Complex event processing for Flink.
Properties of a quantifier
Valid value |
Description |
---|---|
SINGLE |
The pattern occurs only once. |
LOOPING |
The pattern is a looping pattern and may occur multiple times. This quantifier is similar to the asterisk (*) and plus sign (+) in regular expressions. |
TIMES |
The pattern can occur for a specified number of times. |
GREEDY |
The greedy matching strategy is used to match the pattern to obtain the maximum number of matches. |
OPTIONAL |
The pattern is optional. |
Example 1: Use a common pattern#
This example describes how to use dynamic Flink CEP to adjust marketing strategies for the customers that meet the following conditions in a 10-minute time window during a real-time e-commerce promotional event:
Obtained coupons for a venue.
Added items to their shopping carts more than three times.
Did not complete the payments.
In the following sample code, the condition for obtaining coupons for a venue is defined as StartCondition, the condition for adding items to the shopping cart is defined as MiddleCondition, and the condition related to payment completion is defined as EndCondition. The following pattern is abstracted: In a 10-minute time window, an event that meets StartCondition occurs once, an event that meets MiddleCondition occurs three or more times, and no event that meets EndCondition occurs. The event that meets StartCondition is optional. The following sample code shows the Java code that describes the pattern in this example.
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
The following sample code shows the JSON-formatted code that describes the pattern in this example.
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
Example 2: Use a condition that includes a custom parameter in a pattern#
This example describes how to specify different marketing strategies for customers of different classes during a real-time e-commerce promotional event. For example, you can specify a marketing strategy that sends marketing-related text messages to customers of Class A, a marketing strategy that sends coupons to customers of Class B, and a marketing strategy that does not take marketing actions for other customers. You can define a condition of the common class in your deployment to meet the preceding requirements. If you want to adjust marketing strategies when a condition of the common class is used in your deployment, you must rewrite the deployment code and recompile and run the deployment. For example, you can modify a marketing strategy that sends coupons to customers of Class C. To simplify the operation, you can use the condition that includes a custom parameter. After you define how to adjust the strategies based on the passed parameter in the code, you need only change the value of the passed parameter in the database. The value of the passed parameter is the value of the args parameter of the condition that includes a custom parameter. For example, you can change [“A”, “B”] to [“A”, “B”, “C”] to perform dynamic updates of marketing strategies.
The following sample code shows the condition that is initially defined in the pattern.
{
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
}
You can change the preceding condition to the condition that includes a custom parameter. The following sample code shows the condition that includes a custom parameter.
{
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
}
For more information about how to use the condition that includes a custom parameter in specific business scenarios, see Demo.