MSK#
Create Private Connection for MSK#
Important
This feature does not support serverless MSK. Additionally, our support for access control methods for the provisioned MSK cluster is limited to “PlainText” and “TLS,” and the “IAM” method is currently not supported.
Prerequisites#
Obtain the broker endpoint of an existing MSK cluster, which is owned by the user and will be accessed in Flink jobs.
Step 1: Allow inbound traffic for MSK broker#
Add inbound rules to MSK-related security groups and allow network traffic from NLB to reach MSK brokers.
Protocol: TCP
Port range: brokers’ target port for the client to connect, 9092 for plaintext and 9094 for TLS encryption
Source: MSK’s VPC CIDR, where NLB network mapping’s IP address is assigned from
Using the console#
Login to`AWS Management Console <https://console.aws.amazon.com/msk/home?region=us-west-1#/home>`_ and open the Amazon MSK console.
Check target MSK’s VPC information and redirect to one of the MSK’s security groups, Clusters->target MSK cluster->Properties->Networking settings->Security groups applied.
In the Inbound rules tab, click Edit inbound rules. Add a rule with TCP protocol, brokers’ target port and MSK’s VPC CIDR as source.
Using the AWS CLI#
Specify the following variables in the script:
MSK_NAME: name of MSK cluster
BROKER_PORT: port of broker
MSK_SG_ID: name of the chosen security group
MSK_NAME=demo-kafka
BROKER_PORT=9092
# query msk's security group
aws kafka list-clusters \
--cluster-name-filter $MSK_NAME \
--query ClusterInfoList[0].BrokerNodeGroupInfo.SecurityGroups[*]
# select one security group from query results
MSK_SG_ID=sg-XXXX
# get related vpc id
MSK_VPC_ID=$(aws ec2 describe-security-groups \
--group-ids $MSK_SG_ID \
--query SecurityGroups[0].VpcId --output text)
echo MSK_VPC_ID = $MSK_VPC_ID
# get related cidr info
MSK_CIDR=$(aws ec2 describe-vpcs \
--vpc-ids $MSK_VPC_ID \
--query Vpcs[0].CidrBlock --output text)
echo MSK_CIDR = $MSK_CIDR
# add inbound rules to chosen security group
aws ec2 authorize-security-group-ingress \
--group-id $MSK_SG_ID \
--protocol tcp \
--port $BROKER_PORT \
--cidr $MSK_CIDR
Step 2: Create target groups#
Create a target group for each broker used for NLB forwarding.
Using the console#
Login to`AWS Management Console <https://console.aws.amazon.com/msk/home?region=us-west-1#/home>`_ and open the Amazon MSK console.
Check and record every broker’s availability zone and IP address:
Clusters -> target MSK cluster-> Properties -> Brokers -> Broker details
Set Client subnets and Client VPC IP address as visible in Preferences
Get Availability Zone & Availability Zone ID from subnet information, eg.
<endpoint: b-1.XXX.amazonaws.com, az: us-west-1a, ip: 172.31.17.190>; <endpoint: b-2.XXX.amazonaws.com, az: us-west-1c, ip: 172.31.6.49>
Open Amazon EC2 console -> Load Balancing -> Target Groups (https://console.aws.amazon.com/ec2/home?region=us-west-1#TargetGroups)
Use Create target group and create IP addresses type target group with tcp protocol, broker target port, same vpc as MSK cluster for each broker. Add the related broker’s ip to the target list when registering targets using Include as pending below.
Using the AWS CLI#
Specify the following variables in the script:
TARGETGROUP_NAME_PREFIX: name prefix for target group
MSK_NAME: name of MSK cluster from Step 1
BROKER_PORT: port to access for vvc from Step 1
MSK_VPC_ID: MSK cluster’s VPC from Step 1
TARGETGROUP_NAME_PREFIX=demo-targetgroup-
MSK_NAME=${MSK_NAME:=demo-kafka} # input from Step 1
BROKER_PORT=${BROKER_PORT:=9092} # input from Step 1
MSK_VPC_ID=${MSK_VPC_ID:=vpc-XXX} # exec result from Step 1
MSK_ARN=$(aws kafka list-clusters \
--cluster-name-filter $MSK_NAME \
--query ClusterInfoList[0].ClusterArn --output text)
echo MSK_ARN = $MSK_ARN
MSK_BROKER_NUM=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query "length(NodeInfoList)")
echo MSK_BROKER_NUM = $MSK_BROKER_NUM
echo "Creating $MSK_BROKER_NUM target groups for msk"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
BROKER_ID=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
echo BROKER_ID = $BROKER_ID
BROKER_IP=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.ClientVpcIpAddress --output text)
echo BROKER_IP = $BROKER_IP
TARGETGROUP_ARN=$(aws elbv2 create-target-group \
--name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
--protocol TCP \
--port $BROKER_PORT \
--target-type ip \
--vpc-id $MSK_VPC_ID \
--query TargetGroups[0].TargetGroupArn --output text)
echo TARGETGROUP_ARN = $TARGETGROUP_ARN
aws elbv2 register-targets \
--target-group-arn $TARGETGROUP_ARN \
--targets Id=$BROKER_IP
echo "Created target group $TARGETGROUP_NAME_PREFIX$BROKER_ID"
aws elbv2 describe-target-groups \
--name $TARGETGROUP_NAME_PREFIX$BROKER_ID
done
Step 3: Create NLBs#
Create a NLB for each broker with the corresponding target group created in Step 2.
Using the console#
Open Amazon EC2 console -> Load Balancing -> Load Balancers (https://console.aws.amazon.com/ec2/home?region=us-west-1#LoadBalancers)
Create Network Load Balancer with internal scheme, ipv4 ip address type, same vpc as MSK cluster, mappings to the availability zone where the corresponding broker’s subnet is allocated (when NLB attribute cross-zone load balancing keeps off), TCP listener protocol, broker target port, Forward to the corresponding target group created in Step 2. We use the same subnet where the broker is allocated for demonstration in network mappings.
Check target groups’ health status until targets are healthy.
Using the AWS CLI#
Specify the following variables in the script:
NLB_NAME_PREFIX: name prefix for NLB
BROKER_PORT: port to access for vvc from Step 1
TARGETGROUP_NAME_PREFIX: name prefix for target group from Step 2
MSK_ARN: ARN of MSK cluster from Step 2
MSK_BROKER_NUM: broker’s number from Step 2
NLB_NAME_PREFIX=demo-nlb-
BROKER_PORT=${BROKER_PORT:=9092} # input from Step 1
TARGETGROUP_NAME_PREFIX=${TARGETGROUP_NAME_PREFIX:=demo-targetgroup-} # input from Step 2
MSK_ARN=${MSK_ARN:=arn:aws:kafka:XXX} # exec result from Step 2
MSK_BROKER_NUM=${MSK_BROKER_NUM:=2} # exec result from Step 2
echo "Creating $MSK_BROKER_NUM NLBs for msk"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
BROKER_ID=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
echo BROKER_ID = $BROKER_ID
BROKER_SUBNET=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.ClientSubnet --output text)
echo BROKER_SUBNET = $BROKER_SUBNET
TARGETGROUP_ARN=$(aws elbv2 describe-target-groups \
--name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
--query TargetGroups[0].TargetGroupArn --output text)
echo TARGETGROUP_ARN = $TARGETGROUP_ARN
NLB_ARN=$(aws elbv2 create-load-balancer \
--name $NLB_NAME_PREFIX$BROKER_ID \
--scheme internal \
--type network \
--subnets $BROKER_SUBNET \
--query LoadBalancers[0].LoadBalancerArn --output text)
echo NLB_ARN = $NLB_ARN
aws elbv2 create-listener \
--load-balancer-arn $NLB_ARN \
--protocol TCP \
--port $BROKER_PORT \
--default-actions Type=forward,TargetGroupArn=$TARGETGROUP_ARN
echo "Created NLB $NLB_NAME_PREFIX$BROKER_ID"
aws elbv2 describe-load-balancers \
--names $NLB_NAME_PREFIX$BROKER_ID
done
echo "Checking health for NLB target groups"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
BROKER_ID=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
TARGETGROUP_ARN=$(aws elbv2 describe-target-groups \
--name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
--query TargetGroups[0].TargetGroupArn --output text)
aws elbv2 describe-target-health \
--target-group-arn $TARGETGROUP_ARN
done
Step 4: Create endpoint services#
Create endpoint services with NLBs created in Step 3 which allow Ververica Cloud (VVC) principals to connect.
Using the console#
Open Amazon EC2 console -> Endpoint services <https://console.aws.amazon.com/vpc/home?region=us-west-1#EndpointServices>
Use Create endpoint service to create endpoint service with Network Load balancer type and set Require acceptance for endpoint to Off
After service creation, go to the Allow principals tab, use Allow principals and add VVC_ROLE_ARN principals.
Note
The VVC_ROLE_ARN is static, in this format arn:aws:iam::<vvc-account-id>:role/pyxis
.
account ID: 794031221915
Using the AWS CLI#
Specify the following variables in the script:
VVC_ROLE_ARN: ARN used by VVC to create VPC endpoint to access endpoint services
MSK_ARN: ARN of MSK cluster from Step 2
MSK_BROKER_NUM: broker’s number from Step 2
NLB_NAME_PREFIX: name prefix for NLB from Step 3
VVC_ROLE_ARN=arn:aws:iam::794031221915:role/pyxis
MSK_ARN=${MSK_ARN:=arn:aws:kafka:XXX} # exec result from Step 2
MSK_BROKER_NUM=${MSK_BROKER_NUM:=2} # exec result from Step 2
NLB_NAME_PREFIX=${NLB_NAME_PREFIX:=demo-nlb-} # input from Step 3
echo "Creating endpoint service for MSK brokers' NLBs"
TBL=(DOMAIN-NAME,SERVICE-NAME)
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
BROKER_ENDPOINT=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.Endpoints[0] --output text)
echo BROKER_ENDPOINT = $BROKER_ENDPOINT
BROKER_ID=$(aws kafka list-nodes \
--cluster-arn $MSK_ARN \
--query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
echo BROKER_ID = $BROKER_ID
NLB_ARN=$(aws elbv2 describe-load-balancers \
--names $NLB_NAME_PREFIX$BROKER_ID \
--query LoadBalancers[0].LoadBalancerArn --output text)
echo NLB_ARN = $NLB_ARN
SVC_ID=$(aws ec2 create-vpc-endpoint-service-configuration \
--network-load-balancer-arns $NLB_ARN \
--no-acceptance-required \
--query ServiceConfiguration.ServiceId --output text)
echo SVC_ID = $SVC_ID
aws ec2 modify-vpc-endpoint-service-permissions \
--service-id $SVC_ID \
--add-allowed-principals [\"$VVC_ROLE_ARN\"]
SVC_NAME=$(aws ec2 describe-vpc-endpoint-service-configurations \
--service-id $SVC_ID \
--query ServiceConfigurations[0].ServiceName --output text)
echo SVC_NAME = $SVC_NAME
TBL+=($BROKER_ENDPOINT,$SVC_NAME)
done
echo "Created endpoint services and their corresponding broker domain name"
printf "%s\n" ${TBL[@]} | column -t -s,
Step 5: Create private connection in VVC Console#
Go to VVC Console and select: Security -> Private Connection -> Create Connection.
Add the Service_Name(SERVICE-NAME) to Private_Endpoint(DOMAIN-NAME) mappings from Step 4.

Important
The Private_Endpoint value should not include a port address.