MSK#

Create Private Connection for MSK#

Important

This feature does not support serverless MSK. Additionally, our support for access control methods for the provisioned MSK cluster is limited to “PlainText” and “TLS,” and the “IAM” method is currently not supported.

Prerequisites#

  • Obtain the broker endpoint of an existing MSK cluster, which is owned by the user and will be accessed in Flink jobs.

Step 1: Allow inbound traffic for MSK broker#

Add inbound rules to MSK-related security groups and allow network traffic from NLB to reach MSK brokers.

  • Protocol: TCP

  • Port range: brokers’ target port for the client to connect, 9092 for plaintext and 9094 for TLS encryption

  • Source: MSK’s VPC CIDR, where NLB network mapping’s IP address is assigned from

Using the console#

  1. Login to`AWS Management Console <https://console.aws.amazon.com/msk/home?region=us-west-1#/home>`_ and open the Amazon MSK console.

  2. Check target MSK’s VPC information and redirect to one of the MSK’s security groups, Clusters->target MSK cluster->Properties->Networking settings->Security groups applied.

  3. In the Inbound rules tab, click Edit inbound rules. Add a rule with TCP protocol, brokers’ target port and MSK’s VPC CIDR as source.

Using the AWS CLI#

Specify the following variables in the script:

  • MSK_NAME: name of MSK cluster

  • BROKER_PORT: port of broker

  • MSK_SG_ID: name of the chosen security group

MSK_NAME=demo-kafka
BROKER_PORT=9092


# query msk's security group
aws kafka list-clusters \
  --cluster-name-filter $MSK_NAME \
  --query ClusterInfoList[0].BrokerNodeGroupInfo.SecurityGroups[*]
# select one security group from query results
MSK_SG_ID=sg-XXXX


# get related vpc id
MSK_VPC_ID=$(aws ec2 describe-security-groups \
  --group-ids $MSK_SG_ID \
  --query SecurityGroups[0].VpcId --output text)
echo MSK_VPC_ID = $MSK_VPC_ID
# get related cidr info
MSK_CIDR=$(aws ec2 describe-vpcs \
  --vpc-ids $MSK_VPC_ID \
  --query Vpcs[0].CidrBlock --output text)
echo MSK_CIDR = $MSK_CIDR

# add inbound rules to chosen security group
aws ec2 authorize-security-group-ingress \
  --group-id $MSK_SG_ID \
  --protocol tcp \
  --port $BROKER_PORT \
  --cidr $MSK_CIDR

Step 2: Create target groups#

Create a target group for each broker used for NLB forwarding.

Using the console#

  1. Login to`AWS Management Console <https://console.aws.amazon.com/msk/home?region=us-west-1#/home>`_ and open the Amazon MSK console.

  2. Check and record every broker’s availability zone and IP address:

    1. Clusters -> target MSK cluster-> Properties -> Brokers -> Broker details

    2. Set Client subnets and Client VPC IP address as visible in Preferences

    3. Get Availability Zone & Availability Zone ID from subnet information, eg. <endpoint: b-1.XXX.amazonaws.com, az: us-west-1a, ip: 172.31.17.190>; <endpoint: b-2.XXX.amazonaws.com, az: us-west-1c, ip: 172.31.6.49>

  3. Open Amazon EC2 console -> Load Balancing -> Target Groups (https://console.aws.amazon.com/ec2/home?region=us-west-1#TargetGroups)

  4. Use Create target group and create IP addresses type target group with tcp protocol, broker target port, same vpc as MSK cluster for each broker. Add the related broker’s ip to the target list when registering targets using Include as pending below.

Using the AWS CLI#

  • Specify the following variables in the script:

  • TARGETGROUP_NAME_PREFIX: name prefix for target group

  • MSK_NAME: name of MSK cluster from Step 1

  • BROKER_PORT: port to access for vvc from Step 1

  • MSK_VPC_ID: MSK cluster’s VPC from Step 1

TARGETGROUP_NAME_PREFIX=demo-targetgroup-
MSK_NAME=${MSK_NAME:=demo-kafka}  # input from Step 1
BROKER_PORT=${BROKER_PORT:=9092}  # input from Step 1
MSK_VPC_ID=${MSK_VPC_ID:=vpc-XXX}  # exec result from Step 1


MSK_ARN=$(aws kafka list-clusters \
  --cluster-name-filter $MSK_NAME \
  --query ClusterInfoList[0].ClusterArn --output text)
echo MSK_ARN = $MSK_ARN
MSK_BROKER_NUM=$(aws kafka list-nodes \
  --cluster-arn $MSK_ARN \
  --query "length(NodeInfoList)")
echo MSK_BROKER_NUM = $MSK_BROKER_NUM

echo "Creating $MSK_BROKER_NUM target groups for msk"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
  BROKER_ID=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
  echo BROKER_ID = $BROKER_ID
  BROKER_IP=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.ClientVpcIpAddress --output text)
  echo BROKER_IP = $BROKER_IP

  TARGETGROUP_ARN=$(aws elbv2 create-target-group \
    --name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
    --protocol TCP \
    --port $BROKER_PORT \
    --target-type ip \
    --vpc-id $MSK_VPC_ID \
    --query TargetGroups[0].TargetGroupArn --output text)
  echo TARGETGROUP_ARN = $TARGETGROUP_ARN
  aws elbv2 register-targets \
    --target-group-arn $TARGETGROUP_ARN \
    --targets Id=$BROKER_IP
  echo "Created target group $TARGETGROUP_NAME_PREFIX$BROKER_ID"
  aws elbv2 describe-target-groups \
    --name $TARGETGROUP_NAME_PREFIX$BROKER_ID
done

Step 3: Create NLBs#

Create a NLB for each broker with the corresponding target group created in Step 2.

Using the console#

  1. Open Amazon EC2 console -> Load Balancing -> Load Balancers (https://console.aws.amazon.com/ec2/home?region=us-west-1#LoadBalancers)

  2. Create Network Load Balancer with internal scheme, ipv4 ip address type, same vpc as MSK cluster, mappings to the availability zone where the corresponding broker’s subnet is allocated (when NLB attribute cross-zone load balancing keeps off), TCP listener protocol, broker target port, Forward to the corresponding target group created in Step 2. We use the same subnet where the broker is allocated for demonstration in network mappings.

  3. Check target groups’ health status until targets are healthy.

Using the AWS CLI#

Specify the following variables in the script:

  • NLB_NAME_PREFIX: name prefix for NLB

  • BROKER_PORT: port to access for vvc from Step 1

  • TARGETGROUP_NAME_PREFIX: name prefix for target group from Step 2

  • MSK_ARN: ARN of MSK cluster from Step 2

  • MSK_BROKER_NUM: broker’s number from Step 2

NLB_NAME_PREFIX=demo-nlb-
BROKER_PORT=${BROKER_PORT:=9092}  # input from Step 1
TARGETGROUP_NAME_PREFIX=${TARGETGROUP_NAME_PREFIX:=demo-targetgroup-}  # input from Step 2
MSK_ARN=${MSK_ARN:=arn:aws:kafka:XXX}  # exec result from Step 2
MSK_BROKER_NUM=${MSK_BROKER_NUM:=2}  # exec result from Step 2


echo "Creating $MSK_BROKER_NUM NLBs for msk"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
  BROKER_ID=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
  echo BROKER_ID = $BROKER_ID
  BROKER_SUBNET=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.ClientSubnet --output text)
  echo BROKER_SUBNET = $BROKER_SUBNET
  TARGETGROUP_ARN=$(aws elbv2 describe-target-groups \
    --name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
    --query TargetGroups[0].TargetGroupArn --output text)
  echo TARGETGROUP_ARN = $TARGETGROUP_ARN

  NLB_ARN=$(aws elbv2 create-load-balancer \
    --name $NLB_NAME_PREFIX$BROKER_ID \
    --scheme internal \
    --type network \
    --subnets $BROKER_SUBNET \
    --query LoadBalancers[0].LoadBalancerArn --output text)
  echo NLB_ARN = $NLB_ARN
  aws elbv2 create-listener \
    --load-balancer-arn $NLB_ARN \
    --protocol TCP \
    --port $BROKER_PORT \
    --default-actions Type=forward,TargetGroupArn=$TARGETGROUP_ARN
  echo "Created NLB $NLB_NAME_PREFIX$BROKER_ID"
  aws elbv2 describe-load-balancers \
    --names $NLB_NAME_PREFIX$BROKER_ID
done

echo "Checking health for NLB target groups"
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
  BROKER_ID=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
  TARGETGROUP_ARN=$(aws elbv2 describe-target-groups \
    --name $TARGETGROUP_NAME_PREFIX$BROKER_ID \
    --query TargetGroups[0].TargetGroupArn --output text)
  aws elbv2 describe-target-health \
    --target-group-arn $TARGETGROUP_ARN
done

Step 4: Create endpoint services#

Create endpoint services with NLBs created in Step 3 which allow Ververica Cloud (VVC) principals to connect.

Using the console#

  1. Open Amazon EC2 console -> Endpoint services <https://console.aws.amazon.com/vpc/home?region=us-west-1#EndpointServices>

  2. Use Create endpoint service to create endpoint service with Network Load balancer type and set Require acceptance for endpoint to Off

  3. After service creation, go to the Allow principals tab, use Allow principals and add VVC_ROLE_ARN principals.

Note

The VVC_ROLE_ARN is static, in this format arn:aws:iam::<vvc-account-id>:role/pyxis.

  • account ID: 794031221915

Using the AWS CLI#

Specify the following variables in the script:

  • VVC_ROLE_ARN: ARN used by VVC to create VPC endpoint to access endpoint services

  • MSK_ARN: ARN of MSK cluster from Step 2

  • MSK_BROKER_NUM: broker’s number from Step 2

  • NLB_NAME_PREFIX: name prefix for NLB from Step 3

VVC_ROLE_ARN=arn:aws:iam::794031221915:role/pyxis
MSK_ARN=${MSK_ARN:=arn:aws:kafka:XXX}  # exec result from Step 2
MSK_BROKER_NUM=${MSK_BROKER_NUM:=2}  # exec result from Step 2
NLB_NAME_PREFIX=${NLB_NAME_PREFIX:=demo-nlb-}  # input from Step 3


echo "Creating endpoint service for MSK brokers' NLBs"
TBL=(DOMAIN-NAME,SERVICE-NAME)
for ((i=0; i<$MSK_BROKER_NUM; i++)); do
  BROKER_ENDPOINT=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.Endpoints[0] --output text)
  echo BROKER_ENDPOINT = $BROKER_ENDPOINT
  BROKER_ID=$(aws kafka list-nodes \
    --cluster-arn $MSK_ARN \
    --query NodeInfoList[$i].BrokerNodeInfo.BrokerId)
  echo BROKER_ID = $BROKER_ID
  NLB_ARN=$(aws elbv2 describe-load-balancers \
    --names $NLB_NAME_PREFIX$BROKER_ID \
    --query LoadBalancers[0].LoadBalancerArn --output text)
  echo NLB_ARN = $NLB_ARN

  SVC_ID=$(aws ec2 create-vpc-endpoint-service-configuration \
    --network-load-balancer-arns $NLB_ARN \
    --no-acceptance-required \
    --query ServiceConfiguration.ServiceId --output text)
  echo SVC_ID = $SVC_ID
  aws ec2 modify-vpc-endpoint-service-permissions \
    --service-id $SVC_ID \
    --add-allowed-principals [\"$VVC_ROLE_ARN\"]
  SVC_NAME=$(aws ec2 describe-vpc-endpoint-service-configurations \
    --service-id $SVC_ID \
    --query ServiceConfigurations[0].ServiceName --output text)
  echo SVC_NAME = $SVC_NAME
  TBL+=($BROKER_ENDPOINT,$SVC_NAME)
done

echo "Created endpoint services and their corresponding broker domain name"
printf "%s\n" ${TBL[@]} | column -t -s,

Step 5: Create private connection in VVC Console#

  1. Go to VVC Console and select: Security -> Private Connection -> Create Connection.

  2. Add the Service_Name(SERVICE-NAME) to Private_Endpoint(DOMAIN-NAME) mappings from Step 4.

Placeholder

Important

The Private_Endpoint value should not include a port address.