Add flink k8s mode

This commit is contained in:
Zheng, Qi 2024-05-29 10:38:56 +08:00 committed by volcano
parent 11fffa6039
commit 0b96a19526
11 changed files with 779 additions and 5 deletions

47
demos/flink/Dockerfile Normal file

@ -0,0 +1,47 @@
FROM ubuntu:20.04
LABEL maintainer="Qi Zheng <huaiqing.zq@antgroup.com>"
# Install SGX DCAP and Occlum runtime
ENV APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1
ARG PSW_VERSION=2.20.100.4
ARG DCAP_VERSION=1.17.100.4
ARG OCCLUM_VERSION=0.30.1
RUN apt update && DEBIAN_FRONTEND="noninteractive" apt install -y --no-install-recommends gnupg wget ca-certificates jq && \
echo 'deb [arch=amd64] https://download.01.org/intel-sgx/sgx_repo/ubuntu focal main' | tee /etc/apt/sources.list.d/intel-sgx.list && \
wget -qO - https://download.01.org/intel-sgx/sgx_repo/ubuntu/intel-sgx-deb.key | apt-key add - && \
echo 'deb [arch=amd64] https://occlum.io/occlum-package-repos/debian focal main' | tee /etc/apt/sources.list.d/occlum.list && \
wget -qO - https://occlum.io/occlum-package-repos/debian/public.key | apt-key add - && \
apt update && apt install -y --no-install-recommends \
libsgx-launch=$PSW_VERSION-focal1 \
libsgx-epid=$PSW_VERSION-focal1 \
libsgx-quote-ex=$PSW_VERSION-focal1 \
libsgx-urts=$PSW_VERSION-focal1 \
libsgx-enclave-common=$PSW_VERSION-focal1 \
libsgx-uae-service=$PSW_VERSION-focal1 \
libsgx-ae-pce=$PSW_VERSION-focal1 \
libsgx-ae-qe3=$DCAP_VERSION-focal1 \
libsgx-ae-id-enclave=$DCAP_VERSION-focal1 \
libsgx-ae-qve=$DCAP_VERSION-focal1 \
libsgx-dcap-ql=$DCAP_VERSION-focal1 \
libsgx-pce-logic=$DCAP_VERSION-focal1 \
libsgx-qe3-logic=$DCAP_VERSION-focal1 \
libsgx-dcap-default-qpl=$DCAP_VERSION-focal1 \
libsgx-dcap-quote-verify=$DCAP_VERSION-focal1 \
occlum-runtime=$OCCLUM_VERSION-1 \
gettext openjdk-11-jdk \
&& \
apt clean && \
rm -rf /var/lib/apt/lists/*
COPY docker-entrypoint.sh /
RUN mkdir -p /opt/flink
COPY flink-1.15.2 /opt/flink
ADD occlum_instance_k8s/occlum_instance_k8s.tar.gz /opt/flink
ENV FLINK_HOME=/opt/flink
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="/opt/occlum/build/bin:/usr/local/occlum/bin:/opt/flink/bin:$PATH"
WORKDIR /opt/flink
ENTRYPOINT ["/docker-entrypoint.sh"]

@ -1,5 +1,8 @@
# Run Flink on Occlum # Run Flink on Occlum
This is for how to run Flink job manager and task manager in Occlum.
For how to start Flink K8S cluster in Occlum, please refer to [kubernetes](./kubernetes/).
### Preinstall dependencies ### Preinstall dependencies
Related dependencies: openjdk-11 Related dependencies: openjdk-11
``` ```

@ -5,6 +5,7 @@ BLUE='\033[1;34m'
NC='\033[0m' NC='\033[0m'
RPC_BIND_PORT=8089 RPC_BIND_PORT=8089
OCCLUM_USER_SPACE_SIZE=8GB
build_instance() { build_instance() {
postfix=$1 postfix=$1
@ -12,7 +13,7 @@ build_instance() {
occlum new occlum_instance_$postfix occlum new occlum_instance_$postfix
cd occlum_instance_$postfix cd occlum_instance_$postfix
new_json="$(jq '.resource_limits.user_space_size = "1MB" | new_json="$(jq '.resource_limits.user_space_size = "1MB" |
.resource_limits.user_space_max_size = "8GB" | .resource_limits.user_space_max_size = "OCCLUM_USER_SPACE_SIZE" |
.resource_limits.kernel_space_heap_size="1MB" | .resource_limits.kernel_space_heap_size="1MB" |
.resource_limits.kernel_space_heap_max_size="256MB" | .resource_limits.kernel_space_heap_max_size="256MB" |
.resource_limits.max_num_of_threads = 256 | .resource_limits.max_num_of_threads = 256 |
@ -27,7 +28,32 @@ build_instance() {
# Copy JVM and class file into Occlum instance and build # Copy JVM and class file into Occlum instance and build
rm -rf image rm -rf image
copy_bom -f ../flink.yaml --root image --include-dir /opt/occlum/etc/template copy_bom -f ../flink.yaml --root image --include-dir /opt/occlum/etc/template
# Use hostfs for flink conf in k8s mode
if [ "$postfix" == "k8s" ]; then
# Increase user space size for k8s mode
OCCLUM_USER_SPACE_SIZE=16GB
rm -rf image/opt/flink*/conf/*
new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/opt/flink/conf", "type": "hostfs","source": "/opt/flink/conf-copy"}]')" && \
echo "${new_json}" > Occlum.json
# use host secrets
mkdir -p image/var/run/secrets
new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/var/run/secrets", "type": "hostfs","source": "/var/run/secrets-copy"}]')" && \
echo "${new_json}" > Occlum.json
# k8s pod template
mkdir -p image/opt/flink/pod-template
new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/opt/flink/pod-template", "type": "hostfs","source": "/opt/flink/pod-template-copy"}]')" && \
echo "${new_json}" > Occlum.json
fi
# Update user size
sed -i "s/OCCLUM_USER_SPACE_SIZE/$OCCLUM_USER_SPACE_SIZE/g" Occlum.json
occlum build occlum build
occlum package --debug
cd .. cd ..
} }
@ -35,7 +61,13 @@ update_flink_conf() {
echo "rest.port: $RPC_BIND_PORT" >> flink-1.15.2/conf/flink-conf.yaml echo "rest.port: $RPC_BIND_PORT" >> flink-1.15.2/conf/flink-conf.yaml
} }
update_flink_conf
build_instance jobmanager if [ "$1" == "k8s" ]; then
# flink job manager and taks manager use the same occlum instance echo "do occlum instance build for k8s mode"
cp -rf occlum_instance_jobmanager occlum_instance_taskmanager build_instance k8s
else
update_flink_conf
build_instance jobmanager
# flink job manager and taks manager use the same occlum instance
cp -rf occlum_instance_jobmanager occlum_instance_taskmanager
fi

154
demos/flink/docker-entrypoint.sh Executable file

@ -0,0 +1,154 @@
#!/usr/bin/env bash
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}
copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
# escape periods for usage in regular expressions
local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
# either override an existing entry, or append a new one
if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" >> "${CONF_FILE}"
fi
}
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through mode
# exec $(drop_privs_cmd) "${args[@]}"
# Do not run with flink user to avoid permission issue in Occlum hostfs mount
exec "${args[@]}"

@ -0,0 +1,103 @@
# Deploy Flink in K8S
There are several ways to deploy Flink on Kubernetes, such as [native kubernetes deployment](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/resource-providers/native_kubernetes/) and [Flink Kubernetes Operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/). This tutorial shows how to use the kubernetes operator deployment.
## Prerequisites
* A Kubernetes cluster with at least one node.
* The `kubectl` command line tool is installed and configured to connect to your Kubernetes cluster.
* The `helm` command line tool is also installed and configured to connect to your Kubernetes cluster.
### Install the Flink Kubernetes Operator
Just follow the [quick start](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/try-flink-kubernetes-operator/quick-start/) to install the Flink Kubernetes Operator.
## Build Flink K8S docker image
First, please make sure `docker` is installed successfully in your host. Then start the Occlum container (use version `latest-ubuntu20.04` for example) as below.
```
$ sudo docker run --rm -itd --network host \
-v $(which docker):/usr/bin/docker -v /var/run/docker.sock:/var/run/docker.sock \
occlum/occlum:latest-ubuntu20.04
```
All the following are running in the above container.
### Build
Just run the script [build.sh](./build.sh). It builds a docker image for Flink K8S.
```bash
Build Occlum Flink container images for k8s deployment.
usage: build.sh [OPTION]...
-r <container image registry> the container image registry
-g <tag> container image tag
-h <usage> usage help
```
For example, if you want to build the image named `demo/occlum-flink:0.1`, just run
```bash
$ ./build.sh -r demo -g 0.1
```
Notice, during the build process, a customized [flink-console.sh](./flink-console.sh) is used to replace the original one. Users could refer to the script for details.
Once the build is done, you can push the image for next steps -- [Deploy](#deploy).
## Deploy
Based on the original yaml files in the [github](https://github.com/apache/flink-kubernetes-operator/tree/release-1.8/examples), below customized example yaml files are provided.
* [basic.yaml](./basic.yaml)
* [basic-session-deployment-and-job.yaml](./basic-session-deployment-and-job.yaml)
* [basic-session-deployment-only. yaml](./basic-session-deployment-only.yaml)
* [basic-session-job-only.yaml](./basic-session-job-only.yaml)
They have the same meaning just like their original counterparts besides some SGX/Occlum related customization settings.
You can deploy each of them.
Just notice the **image** in the yaml file should be the one you built before.
### Examples
#### Basic Application Deployment example
This is a simple deployment defined by a minimal deployment file.
The configuration contains the following:
- Defines the job to run
- Assigns the resources available for the job
- Defines the parallelism used
To run the job submit the yaml file using kubectl:
```bash
kubectl apply -f basic.yaml
```
#### Basic Session Deployment example
This example shows how to create a basic Session Cluster and then how to submit specific jobs to this cluster if needed.
##### Without jobs
The Flink Deployment could be created without any jobs.
In this case the Flink jobs could be created later by submitting the jobs
separately.
To create a Flink Deployment with the specific resources without any jobs run the following command:
```bash
kubectl apply -f basic-session-deployment-only.yaml
```
##### Adding jobs
If the Flink Deployment is created by `basic-session-deployment-only.yaml` new job could be added
by the following command:
```bash
kubectl apply -f basic-session-job-only.yaml
```
##### Creating Deployment and Jobs together
Alternatively the Flink Deployment and the Flink Session Job configurations can be submitted together.
To try out this run the following command:
```bash
kubectl apply -f basic-session-deployment-and-job.yaml
```

@ -0,0 +1,83 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment-example
spec:
image: occlum_flink:0.1
flinkVersion: v1_15
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
resources:
requests:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
limits:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
# env:
# - name: OCCLUM_LOG_LEVEL
# value: "off"
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 2
upgradeMode: stateless
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example2
spec:
deploymentName: basic-session-deployment-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar
parallelism: 2
upgradeMode: stateless
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

@ -0,0 +1,59 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment-only-example
spec:
image: occlum_flink:0.1
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
resources:
requests:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
limits:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
# env:
# - name: OCCLUM_LOG_LEVEL
# value: "off"
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins

@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-only-example
spec:
deploymentName: basic-session-deployment-only-example
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless

@ -0,0 +1,79 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: occlum_flink:0.1
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
# podTemplate:
# spec:
# containers:
# # Do not change the main container name
# - name: flink-main-container
# args:
# - bash
# - -c
# - 'kubernetes-jobmanager.sh kubernetes-application '
taskManager:
resource:
memory: "2048m"
cpu: 2
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: OCCLUM_LOG_LEVEL
value: "off"
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
resources:
requests:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
limits:
sgx.intel.com/epc: 21474836480
sgx.intel.com/enclave: 1
sgx.intel.com/provision: 1
# env:
# - name: OCCLUM_LOG_LEVEL
# value: "off"
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins

60
demos/flink/kubernetes/build.sh Executable file

@ -0,0 +1,60 @@
#!/bin/bash
set -e
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
top_dir=$(dirname "${script_dir}")
registry="demo"
tag="latest"
function usage {
cat << EOM
Build Occlum Flink container images for k8s deployment.
usage: $(basename "$0") [OPTION]...
-r <container image registry> the container image registry
-g <tag> container image tag
-h <usage> usage help
EOM
exit 0
}
function process_args {
while getopts ":r:g:h" option; do
case "${option}" in
r) registry=${OPTARG};;
g) tag=${OPTARG};;
h) usage;;
esac
done
}
process_args "$@"
echo ""
echo "############################"
echo "Build Occlum Flink container image for k8s deployment"
echo " Container images registry: ${registry}"
echo " Container images tag: ${tag}"
echo ""
pushd ${top_dir}
echo "Install openjdk 11 first ..."
./preinstall_deps.sh
echo "Download Flink ..."
./download_flink.sh
cp ./kubernetes/flink-console.sh ./flink-1.15.2/bin/
echo "Build Occlum instance ..."
./build_occlum_instance.sh k8s
echo ""
echo "Build Occlum container image ..."
docker build \
-f Dockerfile \
-t ${registry}/occlum_flink:${tag} .
echo "Build is done"
popd

@ -0,0 +1,126 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"
SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
case $SERVICE in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(kubernetes-session)
CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
;;
(kubernetes-application)
CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
;;
(kubernetes-taskmanager)
CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;;
(*)
echo "Unknown service '${SERVICE}'. $USAGE."
exit 1
;;
esac
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid
mkdir -p "$FLINK_PID_DIR"
# The lock needs to be released after use because this script is started foreground
command -v flock >/dev/null 2>&1
flock_exist=$?
if [[ ${flock_exist} -eq 0 ]]; then
exec 200<"$FLINK_PID_DIR"
flock 200
fi
# Remove the pid file when all the processes are dead
if [ -f "$pid" ]; then
all_dead=0
while read each_pid; do
# Check whether the process is still running
kill -0 $each_pid > /dev/null 2>&1
[[ $? -eq 0 ]] && all_dead=1
done < "$pid"
[ ${all_dead} -eq 0 ] && rm $pid
fi
id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
echo "Starting $SERVICE as a console application on host $HOSTNAME."
# Add the current process id to pid file
echo $$ >> "$pid" 2>/dev/null
# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
[[ ${flock_exist} -eq 0 ]] && flock -u 200
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
echo "################"
set -x
cp -rf /var/run/secrets /var/run/secrets-copy
cp -rf conf conf-copy
if [ -d pod-template ]; then
cp -rf pod-template pod-template-copy
else
# create dir anyway to avoid hostfs mount error
mkdir -p pod-template-copy
fi
cd occlum_instance_k8s
# exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
exec occlum run /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Dos.name=Linux $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"