Skip to content

Commit

Permalink
address reviewer's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
showuon committed Dec 16, 2024
1 parent 086a862 commit 8c3d011
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 73 deletions.
126 changes: 82 additions & 44 deletions ha-example/README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# Flink cluster HA configuration
# Flink cluster High Availability configuration

In the deployment of Flink cluster, it consists of operators, job managers, and task managers.
By default, the deployment is not a high availability (HA) cluster, we need to configure it to achieve it.
A Flink cluster deployment consists of operators, job managers, and task managers.
However, the deployment is not configured to ensure high availability (HA) by default, additional configuration is needed to achieve it.

## Flink operator
## Flink Kubernetes Operator

The Flink operator manages and operates Flink Deployments. It supports high availability through leader election and standby operator instances.
To enable leader election we need to add the following two mandatory operator configuration parameters.
The Flink Kubernetes operator manages the Flink cluster Deployments.
It supports high availability through adding a standby operator instance and using leader election functionality to ensure that only one instance is _in charge_ of the Flink cluster Deployments at any one time.
To enable leader election, we need to add the following two mandatory parameters to the Kubernetes operator's configuration.

```
kubernetes.operator.leader-election.enabled: true
kubernetes.operator.leader-election.lease-name: flink-operator-lease
```
The lease name must be unique in the current lease namespace.
The lease name must be unique in the deployed namespace.

So, when installing flink operator using helm, we need to add these 2 configs + existing default configs via command like this:
When installing the Flink Kubernetes operator using `helm`, we need to add these 2 parameters into the existing default configuration string.
This can be done via the command line using the `--set` flag:
```
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
Expand All @@ -29,9 +31,11 @@ kubernetes.operator.leader-election.enabled:\ true
kubernetes.operator.leader-election.lease-name:\ flink-operator-lease" \
-n flink
```
Note: The `replicas` config is to set the number of operator replicas.
Note: The `replicas` configuration is to set to the total number of Flink Kubernetes operator replicas you want.
This will include the leader plus the number of standby instances you want.
One standby (two replicas in total) is usually sufficient.

After running this command, you should see there are 2 operator replicas lanuched.
After running this command, you should see that there are two instances of the Flink Kubernetes operator running.
```
kubectl get pod -n flink | grep flink-kubernetes-operator
NAME READY STATUS RESTARTS AGE
Expand All @@ -46,64 +50,98 @@ NAME HOLDER AGE
flink-operator-lease flink-kubernetes-operator-6cd86cc8-g298v 45h
```

When trying to delete the leader operator pod, we can see the holder will change to the other one (or the newly created one),
which makes the operator highly available.
You can test the high availability fail-over between the instances, by deleting the leader operator pod.
You should then see that the holder will change to the other instance (or the newly created instance if the creation happens quickly).

## job manager
## Flink Job Manager

The JobManager ensures consistency during recovery across TaskManagers. For the JobManager itself to recover consistently,
The Job Manager ensures consistency during recovery across Task Managers.
For the Job Manager itself to recover consistently,
an external service must store a minimal amount of recovery metadata (like “ID of last committed checkpoint”),
as well as help to elect and lock which JobManager is the leader (to avoid split-brain situations).

Configure job manager high availability with kubernetes mode like this in flink configuration:
```
high-availability.type: kubernetes
high-availability.storageDir: s3:///flink/recovery
as well as information needed to elect and lock which Job Manager is the leader (to avoid split-brain situations).

In order to configure Job Managers in your Flink Cluster for high availability you need to add the following settings to the configuration in your `FlinkDeplyment` CR like this:
```yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: recommendation-app
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
flinkVersion: v1_19
flinkConfiguration:
# job manager HA settings
high-availability.type: KUBERNETES
high-availability.storageDir: s3://test/ha
```
## task manager

Checkpointing is Flink’s primary fault-tolerance mechanism, wherein a snapshot of your job’s state persisted periodically to some durable location.
In the case of failure, Flink will restart from the most recent checkpoint and resume processing.
Although this is not completely related to HA of the flink cluster, it is also important to know it for the Flink fault tolerance.
By default, the checkpointing is not enabled in Flink cluster, you can enable it by setting the checkpointing interval:
## Task Manager
[Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/) is Flink’s primary fault-tolerance mechanism, wherein a snapshot of your job’s state is persisted periodically to some durable location.
In the case of failure, of a Task running your job's code, Flink will restart the Task from the most recent checkpoint and resume processing.
Although not strictly related to HA of the Flink cluster, it is important to enable check-pointing in production deployments to ensure fault tolerance.
By default, the checkpointing is not enabled in Flink cluster, you can enable it by setting the checkpointing interval in the `FlinkDeplyment` CR like this:
```yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: recommendation-app
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
flinkVersion: v1_19
flinkConfiguration:
# job manager HA settings
execution.checkpointing.interval: 1min
state.checkpoints.dir: s3://test/cp
```
execution.checkpointing.interval: 1min
state.checkpoints.dir: s3://test/cp
```
In this example, it will checkpoint the process status every 1 minute into the s3 path.
The settings above will checkpoint the Task state every 1 minute under the s3 path provided.

### Run HA in recommendation-app application
## Example: Making the `recommendation-app` fault tolerant and highly available

Here, we will use [recommendation-app](../recommendation-app) as an example to demonstrate the job manager HA.
Here, we will use the [recommendation-app](../recommendation-app) as an example to demonstrate the job manager HA.

1. Follow the [guide](minio-install/README.md) to deploy and create a bucket named `test` in minio
2. Replace the `MINIO_POD_ID` in `recommendation-app-HA/flink-deployment-ha.yaml` with this command output:
1. Installing Flink Kubernetes Operator with leader election enabled like this:
```
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set podSecurityContext=null \
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
--set replicas=2 \
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.metrics.reporter.slf4j.factory.class\:\ org.apache.flink.metrics.slf4j.Slf4jReporterFactory
kubernetes.operator.metrics.reporter.slf4j.interval\:\ 5\ MINUTE
kubernetes.operator.reconcile.interval:\ 15\ s
kubernetes.operator.observer.progress-check.interval:\ 5\ s
kubernetes.operator.leader-election.enabled:\ true
kubernetes.operator.leader-election.lease-name:\ flink-operator-lease" \
-n flink
```
2. Follow the [guide](minio-install/README.md) to deploy and create a local S3 compatible storage service using minio and add a bucket named `test`.
3. Replace the `MINIO_POD_ID` in `recommendation-app-HA/flink-deployment-ha.yaml` with this command output:
```
kubectl get pod minio -n flink --template={{.status.podIP}}
```
3. Deploy the flinkDeployment with HA configured
4. Deploy the `FlinkDeployment` CR with HA configured
```
kubectl apply -f recommendation-app-HA/flink-deployment-ha.yaml -n flink
```
4. There will be 2 recommendation-app (job manager) and 1 task manager pods deployed
5. There should be 2 recommendation-app job manager and 1 task manager pod deployed
```
kubectl get pod -l app=recommendation-app -n flink
NAME READY STATUS RESTARTS AGE
recommendation-app-76b6854f98-4qcz4 1/1 Running 0 3m59s
recommendation-app-76b6854f98-9zb24 1/1 Running 0 3m59s
recommendation-app-taskmanager-1-1 1/1 Running 0 2m5s
```
5. Browse the minio console, to make sure the metadata of the job manager is uploaded to s3://test/ha
6. Find out which job manager pod is the leader
6. Browse the minio console, to make sure the metadata of the job manager is uploaded to s3://test/ha
7. Find out which job manager pod is the leader
Check the pod logs, if it's the standby job manager, the log will stay at these lines, and waiting for becoming the leader.
```
kubectl logs -n flink recommendation-app-76b6854f98-4qcz4 --tail=2 -f
2024-12-11 08:31:22,729 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for flink/recommendation-app-cluster-config-map, watching id:3c7e23f2-fcaa-4f47-8623-fa1ebe9609ea
2024-12-11 08:31:22,729 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for flink/recommendation-app-cluster-config-map, watching id:ea191b50-a1ce-43c6-9bfc-a61f739fa8b3
```
7. Delete the leader pod of job manager, and monitor the logs of the standby pod
8. Delete the leader pod of job manager, and monitor the logs of the standby pod
Keep the step(5) command running, and delete the leader pod in another terminal
```
Expand All @@ -115,9 +153,9 @@ Here, we will use [recommendation-app](../recommendation-app) as an example to d
2024-12-11 08:50:16,524 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is granted leadership with session id 87a6a102-c77b-4ec2-b263-b20a60921c9e.
```
You should see the leadership is changed to the other pod.
8. Make sure the checkpointing file is successfully uploaded onto s3://test/cp via minio console 9Monitor the sink topic in kafka
9. Make sure the checkpointing file is successfully uploaded onto s3://test/cp via minio console 9Monitor the sink topic in kafka
9. Run the console consumer to get the result of sink topic:
10. Run the console consumer to get the result of sink topic:
```
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.recommended.products --from-beginning
Expand All @@ -129,12 +167,12 @@ Here, we will use [recommendation-app](../recommendation-app) as an example to d
user-75,128,"2024-12-11 09:00:25"
```
It will emit the result by time. Keep this terminal open, we'll check it later.
9. Delete the task manager pod
11. Delete the task manager pod
```
kubectl delete pod/recommendation-app-taskmanager-1-1 -n flink
pod "recommendation-app-taskmanager-1-1" deleted
```
11. Make sure the newly created task manager pod is loading the checkpoint
12. Make sure the newly created task manager pod is loading the checkpoint
```
kubectl logs recommendation-app-taskmanager-2-1 -n flink -f | grep test/cp
...
Expand All @@ -146,7 +184,7 @@ Here, we will use [recommendation-app](../recommendation-app) as an example to d
2024-12-11 09:02:55,572 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Finished restoring from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/ea11d00d-c79b-49d4-b1ff-a73ad01b7197, ea11d00d-c79b-49d4-b1ff-a73ad01b7197 [48167 bytes]}.
...
```
12. Check the sink topic consumer output, it should continue from minutes ago, not from the beginning:
13. Check the sink topic consumer output, it should continue from minutes ago, not from the beginning:
```
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.recommended.products --from-beginning
Expand Down
30 changes: 19 additions & 11 deletions ha-example/minio-install/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# minio installation

This is example yaml and documents are based on minio documentation [here](https://min.io/docs/minio/kubernetes/upstream/index.html).
This folder contains an example minio deployment yaml and the instructions are based on the [minio documentation](https://min.io/docs/minio/kubernetes/upstream/index.html).

1. Deploy the minio with default configurations
```
kubectl apply -f minio.yaml -n flink
```
2. Access the MinIO S3 Console
```
kubectl port-forward pod/minio 9000 9090 -n flink
kubectl port-forward deployment/minio 9000 9090 -n flink
```
3. Create a bucket via MinIO S3 Console

Expand All @@ -17,15 +17,23 @@ This is example yaml and documents are based on minio documentation [here](https

Click on the `Object Browser` to view the files in the buckets.

After minio is deployed and bucket is created, the flink configuration can be set like this:
```
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.endpoint: http://MINIO_POD_ID:9000
s3.path.style.access: "true"
...
high-availability.storageDir: s3://test/ha
state.checkpoints.dir: s3://test/cp
After minio is deployed and bucket is created, the flink configuration can be set like this in the `FlinkDeployment` CR:
```yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: recommendation-app
spec:
image: quay.io/streamshub/flink-sql-runner:v0.0.1
flinkVersion: v1_19
flinkConfiguration:
# minio setting
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.endpoint: http://MINIO_POD_ID:9000
s3.path.style.access: "true"
high-availability.storageDir: s3://test/ha
state.checkpoints.dir: s3://test/cp
```
Note:
1. Suppose there is a bucket named `test` in minio.
Expand Down
45 changes: 27 additions & 18 deletions ha-example/minio-install/minio.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
apiVersion: v1
kind: Pod
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: minio
name: minio
spec:
containers:
- name: minio
image: quay.io/minio/minio:latest
command:
- /bin/bash
- -c
args:
- minio server /data --console-address :9090
volumeMounts:
- mountPath: /data
name: localvolume # Corresponds to the `spec.volumes` Persistent Volume
volumes:
- name: localvolume
hostPath: # MinIO generally recommends using locally-attached volumes
path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node
type: DirectoryOrCreate # The path to the last directory must exist
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
containers:
- name: minio
image: quay.io/minio/minio:latest
command:
- /bin/bash
- -c
args:
- minio server /data --console-address :9090
volumeMounts:
- mountPath: /data
name: localvolume # Corresponds to the `spec.volumes` Persistent Volume
volumes:
- name: localvolume
hostPath: # MinIO generally recommends using locally-attached volumes
path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node
type: DirectoryOrCreate # The path to the last directory must exist

0 comments on commit 8c3d011

Please sign in to comment.