diff --git a/demos/flink/Dockerfile b/demos/flink/Dockerfile new file mode 100644 index 00000000..e31fdc80 --- /dev/null +++ b/demos/flink/Dockerfile @@ -0,0 +1,47 @@ +FROM ubuntu:20.04 +LABEL maintainer="Qi Zheng " + +# Install SGX DCAP and Occlum runtime +ENV APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 +ARG PSW_VERSION=2.20.100.4 +ARG DCAP_VERSION=1.17.100.4 +ARG OCCLUM_VERSION=0.30.1 +RUN apt update && DEBIAN_FRONTEND="noninteractive" apt install -y --no-install-recommends gnupg wget ca-certificates jq && \ + echo 'deb [arch=amd64] https://download.01.org/intel-sgx/sgx_repo/ubuntu focal main' | tee /etc/apt/sources.list.d/intel-sgx.list && \ + wget -qO - https://download.01.org/intel-sgx/sgx_repo/ubuntu/intel-sgx-deb.key | apt-key add - && \ + echo 'deb [arch=amd64] https://occlum.io/occlum-package-repos/debian focal main' | tee /etc/apt/sources.list.d/occlum.list && \ + wget -qO - https://occlum.io/occlum-package-repos/debian/public.key | apt-key add - && \ + apt update && apt install -y --no-install-recommends \ + libsgx-launch=$PSW_VERSION-focal1 \ + libsgx-epid=$PSW_VERSION-focal1 \ + libsgx-quote-ex=$PSW_VERSION-focal1 \ + libsgx-urts=$PSW_VERSION-focal1 \ + libsgx-enclave-common=$PSW_VERSION-focal1 \ + libsgx-uae-service=$PSW_VERSION-focal1 \ + libsgx-ae-pce=$PSW_VERSION-focal1 \ + libsgx-ae-qe3=$DCAP_VERSION-focal1 \ + libsgx-ae-id-enclave=$DCAP_VERSION-focal1 \ + libsgx-ae-qve=$DCAP_VERSION-focal1 \ + libsgx-dcap-ql=$DCAP_VERSION-focal1 \ + libsgx-pce-logic=$DCAP_VERSION-focal1 \ + libsgx-qe3-logic=$DCAP_VERSION-focal1 \ + libsgx-dcap-default-qpl=$DCAP_VERSION-focal1 \ + libsgx-dcap-quote-verify=$DCAP_VERSION-focal1 \ + occlum-runtime=$OCCLUM_VERSION-1 \ + gettext openjdk-11-jdk \ + && \ + apt clean && \ + rm -rf /var/lib/apt/lists/* + +COPY docker-entrypoint.sh / + +RUN mkdir -p /opt/flink +COPY flink-1.15.2 /opt/flink +ADD occlum_instance_k8s/occlum_instance_k8s.tar.gz /opt/flink + +ENV FLINK_HOME=/opt/flink +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +ENV PATH="/opt/occlum/build/bin:/usr/local/occlum/bin:/opt/flink/bin:$PATH" + +WORKDIR /opt/flink +ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/demos/flink/README.md b/demos/flink/README.md index c1f09c89..360f461a 100644 --- a/demos/flink/README.md +++ b/demos/flink/README.md @@ -1,5 +1,8 @@ # Run Flink on Occlum +This is for how to run Flink job manager and task manager in Occlum. +For how to start Flink K8S cluster in Occlum, please refer to [kubernetes](./kubernetes/). + ### Preinstall dependencies Related dependencies: openjdk-11 ``` diff --git a/demos/flink/build_occlum_instance.sh b/demos/flink/build_occlum_instance.sh index e17d918a..17852934 100755 --- a/demos/flink/build_occlum_instance.sh +++ b/demos/flink/build_occlum_instance.sh @@ -5,6 +5,7 @@ BLUE='\033[1;34m' NC='\033[0m' RPC_BIND_PORT=8089 +OCCLUM_USER_SPACE_SIZE=8GB build_instance() { postfix=$1 @@ -12,7 +13,7 @@ build_instance() { occlum new occlum_instance_$postfix cd occlum_instance_$postfix new_json="$(jq '.resource_limits.user_space_size = "1MB" | - .resource_limits.user_space_max_size = "8GB" | + .resource_limits.user_space_max_size = "OCCLUM_USER_SPACE_SIZE" | .resource_limits.kernel_space_heap_size="1MB" | .resource_limits.kernel_space_heap_max_size="256MB" | .resource_limits.max_num_of_threads = 256 | @@ -27,7 +28,32 @@ build_instance() { # Copy JVM and class file into Occlum instance and build rm -rf image copy_bom -f ../flink.yaml --root image --include-dir /opt/occlum/etc/template + + # Use hostfs for flink conf in k8s mode + if [ "$postfix" == "k8s" ]; then + # Increase user space size for k8s mode + OCCLUM_USER_SPACE_SIZE=16GB + + rm -rf image/opt/flink*/conf/* + new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/opt/flink/conf", "type": "hostfs","source": "/opt/flink/conf-copy"}]')" && \ + echo "${new_json}" > Occlum.json + + # use host secrets + mkdir -p image/var/run/secrets + new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/var/run/secrets", "type": "hostfs","source": "/var/run/secrets-copy"}]')" && \ + echo "${new_json}" > Occlum.json + + # k8s pod template + mkdir -p image/opt/flink/pod-template + new_json="$(cat Occlum.json | jq '.mount+=[{"target": "/opt/flink/pod-template", "type": "hostfs","source": "/opt/flink/pod-template-copy"}]')" && \ + echo "${new_json}" > Occlum.json + fi + + # Update user size + sed -i "s/OCCLUM_USER_SPACE_SIZE/$OCCLUM_USER_SPACE_SIZE/g" Occlum.json + occlum build + occlum package --debug cd .. } @@ -35,7 +61,13 @@ update_flink_conf() { echo "rest.port: $RPC_BIND_PORT" >> flink-1.15.2/conf/flink-conf.yaml } -update_flink_conf -build_instance jobmanager -# flink job manager and taks manager use the same occlum instance -cp -rf occlum_instance_jobmanager occlum_instance_taskmanager + +if [ "$1" == "k8s" ]; then + echo "do occlum instance build for k8s mode" + build_instance k8s +else + update_flink_conf + build_instance jobmanager + # flink job manager and taks manager use the same occlum instance + cp -rf occlum_instance_jobmanager occlum_instance_taskmanager +fi diff --git a/demos/flink/docker-entrypoint.sh b/demos/flink/docker-entrypoint.sh new file mode 100755 index 00000000..02961e3b --- /dev/null +++ b/demos/flink/docker-entrypoint.sh @@ -0,0 +1,154 @@ +#!/usr/bin/env bash + +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +COMMAND_STANDALONE="standalone-job" +COMMAND_HISTORY_SERVER="history-server" + +# If unspecified, the hostname of the container is taken as the JobManager address +JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} +CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" + +drop_privs_cmd() { + if [ $(id -u) != 0 ]; then + # Don't need to drop privs if EUID != 0 + return + elif [ -x /sbin/su-exec ]; then + # Alpine + echo su-exec flink + else + # Others + echo gosu flink + fi +} + +copy_plugins_if_required() { + if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then + return 0 + fi + + echo "Enabling required built-in plugins" + for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do + echo "Linking ${target_plugin} to plugin directory" + plugin_name=${target_plugin%.jar} + + mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" + if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then + echo "Plugin ${target_plugin} does not exist. Exiting." + exit 1 + else + ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" + echo "Successfully enabled ${target_plugin}" + fi + done +} + +set_config_option() { + local option=$1 + local value=$2 + + # escape periods for usage in regular expressions + local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g") + + # either override an existing entry, or append a new one + if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then + sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" + else + echo "${option}: ${value}" >> "${CONF_FILE}" + fi +} + +prepare_configuration() { + set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} + set_config_option blob.server.port 6124 + set_config_option query.server.port 6125 + + if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then + set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} + fi + + if [ -n "${FLINK_PROPERTIES}" ]; then + echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" + fi + envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" +} + +maybe_enable_jemalloc() { + if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then + JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" + JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" + if [ -f "$JEMALLOC_PATH" ]; then + export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH + elif [ -f "$JEMALLOC_FALLBACK" ]; then + export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK + else + if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then + MSG_PATH=$JEMALLOC_PATH + else + MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" + fi + echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." + fi + fi +} + +maybe_enable_jemalloc + +copy_plugins_if_required + +prepare_configuration + +args=("$@") +if [ "$1" = "help" ]; then + printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" + printf " Or $(basename "$0") help\n\n" + printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" + exit 0 +elif [ "$1" = "jobmanager" ]; then + args=("${args[@]:1}") + + echo "Starting Job Manager" + + exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" +elif [ "$1" = ${COMMAND_STANDALONE} ]; then + args=("${args[@]:1}") + + echo "Starting Job Manager" + + exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" +elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then + args=("${args[@]:1}") + + echo "Starting History Server" + + exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" +elif [ "$1" = "taskmanager" ]; then + args=("${args[@]:1}") + + echo "Starting Task Manager" + + exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" +fi + +args=("${args[@]}") + +# Running command in pass-through mode +# exec $(drop_privs_cmd) "${args[@]}" +# Do not run with flink user to avoid permission issue in Occlum hostfs mount +exec "${args[@]}" diff --git a/demos/flink/kubernetes/README.md b/demos/flink/kubernetes/README.md new file mode 100644 index 00000000..d144bf70 --- /dev/null +++ b/demos/flink/kubernetes/README.md @@ -0,0 +1,103 @@ +# Deploy Flink in K8S + +There are several ways to deploy Flink on Kubernetes, such as [native kubernetes deployment](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/resource-providers/native_kubernetes/) and [Flink Kubernetes Operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/). This tutorial shows how to use the kubernetes operator deployment. + +## Prerequisites + +* A Kubernetes cluster with at least one node. +* The `kubectl` command line tool is installed and configured to connect to your Kubernetes cluster. +* The `helm` command line tool is also installed and configured to connect to your Kubernetes cluster. + +### Install the Flink Kubernetes Operator + +Just follow the [quick start](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/try-flink-kubernetes-operator/quick-start/) to install the Flink Kubernetes Operator. + +## Build Flink K8S docker image + +First, please make sure `docker` is installed successfully in your host. Then start the Occlum container (use version `latest-ubuntu20.04` for example) as below. +``` +$ sudo docker run --rm -itd --network host \ + -v $(which docker):/usr/bin/docker -v /var/run/docker.sock:/var/run/docker.sock \ + occlum/occlum:latest-ubuntu20.04 +``` + +All the following are running in the above container. + +### Build + +Just run the script [build.sh](./build.sh). It builds a docker image for Flink K8S. +```bash +Build Occlum Flink container images for k8s deployment. +usage: build.sh [OPTION]... + -r the container image registry + -g container image tag + -h usage help +``` +For example, if you want to build the image named `demo/occlum-flink:0.1`, just run +```bash +$ ./build.sh -r demo -g 0.1 +``` + +Notice, during the build process, a customized [flink-console.sh](./flink-console.sh) is used to replace the original one. Users could refer to the script for details. + +Once the build is done, you can push the image for next steps -- [Deploy](#deploy). + +## Deploy + +Based on the original yaml files in the [github](https://github.com/apache/flink-kubernetes-operator/tree/release-1.8/examples), below customized example yaml files are provided. + +* [basic.yaml](./basic.yaml) +* [basic-session-deployment-and-job.yaml](./basic-session-deployment-and-job.yaml) +* [basic-session-deployment-only. yaml](./basic-session-deployment-only.yaml) +* [basic-session-job-only.yaml](./basic-session-job-only.yaml) + +They have the same meaning just like their original counterparts besides some SGX/Occlum related customization settings. +You can deploy each of them. +Just notice the **image** in the yaml file should be the one you built before. + +### Examples + +#### Basic Application Deployment example + +This is a simple deployment defined by a minimal deployment file. +The configuration contains the following: +- Defines the job to run +- Assigns the resources available for the job +- Defines the parallelism used + +To run the job submit the yaml file using kubectl: +```bash +kubectl apply -f basic.yaml +``` + +#### Basic Session Deployment example + +This example shows how to create a basic Session Cluster and then how to submit specific jobs to this cluster if needed. + +##### Without jobs + +The Flink Deployment could be created without any jobs. +In this case the Flink jobs could be created later by submitting the jobs +separately. + +To create a Flink Deployment with the specific resources without any jobs run the following command: +```bash +kubectl apply -f basic-session-deployment-only.yaml +``` + +##### Adding jobs + +If the Flink Deployment is created by `basic-session-deployment-only.yaml` new job could be added +by the following command: +```bash +kubectl apply -f basic-session-job-only.yaml +``` + +##### Creating Deployment and Jobs together + +Alternatively the Flink Deployment and the Flink Session Job configurations can be submitted together. + +To try out this run the following command: +```bash +kubectl apply -f basic-session-deployment-and-job.yaml +``` diff --git a/demos/flink/kubernetes/basic-session-deployment-and-job.yaml b/demos/flink/kubernetes/basic-session-deployment-and-job.yaml new file mode 100644 index 00000000..0f6add87 --- /dev/null +++ b/demos/flink/kubernetes/basic-session-deployment-and-job.yaml @@ -0,0 +1,83 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-session-deployment-example +spec: + image: occlum_flink:0.1 + flinkVersion: v1_15 + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + serviceAccount: flink + podTemplate: + spec: + containers: + - name: flink-main-container + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + resources: + requests: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + limits: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + # env: + # - name: OCCLUM_LOG_LEVEL + # value: "off" + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + + +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: basic-session-job-example +spec: + deploymentName: basic-session-deployment-example + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar + parallelism: 2 + upgradeMode: stateless + +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: basic-session-job-example2 +spec: + deploymentName: basic-session-deployment-example + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1.jar + parallelism: 2 + upgradeMode: stateless + entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample diff --git a/demos/flink/kubernetes/basic-session-deployment-only.yaml b/demos/flink/kubernetes/basic-session-deployment-only.yaml new file mode 100644 index 00000000..8dd0078c --- /dev/null +++ b/demos/flink/kubernetes/basic-session-deployment-only.yaml @@ -0,0 +1,59 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-session-deployment-only-example +spec: + image: occlum_flink:0.1 + flinkVersion: v1_15 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + podTemplate: + spec: + containers: + - name: flink-main-container + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + resources: + requests: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + limits: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + # env: + # - name: OCCLUM_LOG_LEVEL + # value: "off" + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins diff --git a/demos/flink/kubernetes/basic-session-job-only.yaml b/demos/flink/kubernetes/basic-session-job-only.yaml new file mode 100644 index 00000000..d43e33ed --- /dev/null +++ b/demos/flink/kubernetes/basic-session-job-only.yaml @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: basic-session-job-only-example +spec: + deploymentName: basic-session-deployment-only-example + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar + parallelism: 4 + upgradeMode: stateless + diff --git a/demos/flink/kubernetes/basic.yaml b/demos/flink/kubernetes/basic.yaml new file mode 100644 index 00000000..0745f559 --- /dev/null +++ b/demos/flink/kubernetes/basic.yaml @@ -0,0 +1,79 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-example +spec: + image: occlum_flink:0.1 + flinkVersion: v1_15 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + # podTemplate: + # spec: + # containers: + # # Do not change the main container name + # - name: flink-main-container + # args: + # - bash + # - -c + # - 'kubernetes-jobmanager.sh kubernetes-application ' + taskManager: + resource: + memory: "2048m" + cpu: 2 + podTemplate: + spec: + containers: + - name: flink-main-container + env: + - name: OCCLUM_LOG_LEVEL + value: "off" + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + upgradeMode: stateless + podTemplate: + spec: + containers: + - name: flink-main-container + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + resources: + requests: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + limits: + sgx.intel.com/epc: 21474836480 + sgx.intel.com/enclave: 1 + sgx.intel.com/provision: 1 + # env: + # - name: OCCLUM_LOG_LEVEL + # value: "off" + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins \ No newline at end of file diff --git a/demos/flink/kubernetes/build.sh b/demos/flink/kubernetes/build.sh new file mode 100755 index 00000000..91102608 --- /dev/null +++ b/demos/flink/kubernetes/build.sh @@ -0,0 +1,60 @@ +#!/bin/bash +set -e + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +top_dir=$(dirname "${script_dir}") + +registry="demo" +tag="latest" + +function usage { + cat << EOM +Build Occlum Flink container images for k8s deployment. +usage: $(basename "$0") [OPTION]... + -r the container image registry + -g container image tag + -h usage help +EOM + exit 0 +} + +function process_args { + while getopts ":r:g:h" option; do + case "${option}" in + r) registry=${OPTARG};; + g) tag=${OPTARG};; + h) usage;; + esac + done +} + +process_args "$@" + +echo "" +echo "############################" +echo "Build Occlum Flink container image for k8s deployment" +echo " Container images registry: ${registry}" +echo " Container images tag: ${tag}" +echo "" + +pushd ${top_dir} +echo "Install openjdk 11 first ..." +./preinstall_deps.sh + +echo "Download Flink ..." +./download_flink.sh +cp ./kubernetes/flink-console.sh ./flink-1.15.2/bin/ + +echo "Build Occlum instance ..." +./build_occlum_instance.sh k8s + +echo "" +echo "Build Occlum container image ..." + +docker build \ + -f Dockerfile \ + -t ${registry}/occlum_flink:${tag} . + +echo "Build is done" + +popd diff --git a/demos/flink/kubernetes/flink-console.sh b/demos/flink/kubernetes/flink-console.sh new file mode 100755 index 00000000..07e76115 --- /dev/null +++ b/demos/flink/kubernetes/flink-console.sh @@ -0,0 +1,126 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Start a Flink service as a console application. Must be stopped with Ctrl-C +# or with SIGTERM by kill or the controlling process. +USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]" + +SERVICE=$1 +ARGS=("${@:2}") # get remaining arguments as array + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +case $SERVICE in + (taskexecutor) + CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner + ;; + + (historyserver) + CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer + ;; + + (zookeeper) + CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer + ;; + + (standalonesession) + CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint + ;; + + (standalonejob) + CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint + ;; + + (kubernetes-session) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint + ;; + + (kubernetes-application) + CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint + ;; + + (kubernetes-taskmanager) + CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner + ;; + + (*) + echo "Unknown service '${SERVICE}'. $USAGE." + exit 1 + ;; +esac + +FLINK_TM_CLASSPATH=`constructFlinkClassPath` + +if [ "$FLINK_IDENT_STRING" = "" ]; then + FLINK_IDENT_STRING="$USER" +fi + +pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid +mkdir -p "$FLINK_PID_DIR" +# The lock needs to be released after use because this script is started foreground +command -v flock >/dev/null 2>&1 +flock_exist=$? +if [[ ${flock_exist} -eq 0 ]]; then + exec 200<"$FLINK_PID_DIR" + flock 200 +fi +# Remove the pid file when all the processes are dead +if [ -f "$pid" ]; then + all_dead=0 + while read each_pid; do + # Check whether the process is still running + kill -0 $each_pid > /dev/null 2>&1 + [[ $? -eq 0 ]] && all_dead=1 + done < "$pid" + [ ${all_dead} -eq 0 ] && rm $pid +fi +id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0") + +FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}" +log="${FLINK_LOG_PREFIX}.log" + +log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") + +echo "Starting $SERVICE as a console application on host $HOSTNAME." + +# Add the current process id to pid file +echo $$ >> "$pid" 2>/dev/null + +# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file +[[ ${flock_exist} -eq 0 ]] && flock -u 200 + +# Evaluate user options for local variable expansion +FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) + +echo "################" +set -x +cp -rf /var/run/secrets /var/run/secrets-copy +cp -rf conf conf-copy +if [ -d pod-template ]; then + cp -rf pod-template pod-template-copy +else + # create dir anyway to avoid hostfs mount error + mkdir -p pod-template-copy +fi +cd occlum_instance_k8s +# exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" +exec occlum run /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Dos.name=Linux $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"