[demos] Update flink demo to version 1.15.2

Also make both flink job manager and task manager run in Occlum
This commit is contained in:
Qi Zheng 2024-04-28 15:52:23 +08:00 committed by volcano
parent f36d6d465c
commit efafd12d7c
8 changed files with 187 additions and 105 deletions

@ -766,16 +766,24 @@ jobs:
- name: Preinstall dependencies and download flink - name: Preinstall dependencies and download flink
run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && ./preinstall_deps.sh && ./download_flink.sh" run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && ./preinstall_deps.sh && ./download_flink.sh"
- name: Run jobmanager on host - name: Build Occlum instance
run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && SGX_MODE=SIM ./run_flink_jobmanager_on_host.sh" run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && SGX_MODE=SIM ./build_occlum_instance.sh"
- name: Run flink taskmanager - name: Run flink job manager on Occlum
run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && SGX_MODE=SIM ./run_flink_on_occlum_glibc.sh tm > flink.log 2>&1 &" run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && ./run_flink_on_occlum.sh jm"
- name: Run flink task manager on Occlum
run: |
sleep ${{ env.nap_time }};
docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && ./run_flink_on_occlum.sh tm > flink.log 2>&1 &"
- name: Run flink task - name: Run flink task
run: | run: |
sleep ${{ env.nap_time }}; sleep ${{ env.nap_time }};
docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && SGX_MODE=SIM ./run_flink_on_occlum_glibc.sh task" docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink && ./run_flink_on_occlum.sh task"
- name: Check flink job manager's log
if: ${{ always() }}
run: docker exec ${{ github.job }} bash -c "cd /root/occlum/demos/flink; cat occlum_instance_jobmanager/flink--standalonesession-0.log"
- name: Check flink task manager's log - name: Check flink task manager's log
if: ${{ always() }} if: ${{ always() }}

@ -6,21 +6,38 @@ Related dependencies: openjdk-11
./preinstall_deps.sh ./preinstall_deps.sh
``` ```
### Run the flink jobmanager ### Download flink
``` ```
./run_flink_jobmanager_on_host.sh ./download_flink.sh
``` ```
### Run the taskManager ### Build Occlum instance
``` ```
./run_flink_on_occlum_glibc.sh tm ./build_occlum_instance.sh
``` ```
### Run flink jobs example ### Run flink job manager on Occlum
``` ```
./run_flink_on_occlum_glibc.sh task ./run_flink_on_occlum.sh jm
```
Wait a while for job manager started successfully. You can check the log `occlum_instance_jobmanager/flink--standalonesession-0.log` for detail status.
### Run flink task manager on Occlum
Once the job manager is up, you can run the task manager.
```
./run_flink_on_occlum.sh tm
```
Wait a while for task manager started successfully. You can check the log `occlum_instance_taskmanager/flink--taskmanager-0.log` for detail status.
### Submit a flink job to occlum
You can submit an example flink job by using the following command:
```
./run_flink_on_occlum.sh task
``` ```
**Note:** **Note:**
1. If running the jobmanager in docker, please export the port 8081 and 6123 If running the jobmanager in docker, please export the port 8081 and 6123.
2. Step 2 may report warning for not finding shared objects. It doesn't matter. To avoid these warnings, you can **REPLACE the FIRST LINE** of config file `/opt/occlum/etc/template/occlum_elf_loader.config` with `/opt/occlum/glibc/lib/ld-linux-x86-64.so.2 /usr/lib/x86_64-linux-gnu:/lib/x86_64-linux-gnu:/usr/lib/jvm/java-11-openjdk-amd64/lib/server`.

@ -0,0 +1,41 @@
#!/bin/bash
set -e
BLUE='\033[1;34m'
NC='\033[0m'
RPC_BIND_PORT=8089
build_instance() {
postfix=$1
rm -rf occlum_instance*
occlum new occlum_instance_$postfix
cd occlum_instance_$postfix
new_json="$(jq '.resource_limits.user_space_size = "1MB" |
.resource_limits.user_space_max_size = "8GB" |
.resource_limits.kernel_space_heap_size="1MB" |
.resource_limits.kernel_space_heap_max_size="256MB" |
.resource_limits.max_num_of_threads = 256 |
.entry_points = [ "/usr/lib/jvm/java-11-openjdk-amd64/bin" ] |
.env.default = [ "LD_LIBRARY_PATH=/usr/lib/jvm/java-11-openjdk-amd64/lib/server:/usr/lib/jvm/java-11-openjdk-amd64/lib" ] |
.env.default = [ "FLINK_HOME=/opt/flink" ] |
.env.default = [ "JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64" ] |
.env.default = [ "HOME=/root" ] |
.env.untrusted += [ "TZ", "FLINK_CONF_DIR" ]' Occlum.json)" && \
echo "${new_json}" > Occlum.json
# Copy JVM and class file into Occlum instance and build
rm -rf image
copy_bom -f ../flink.yaml --root image --include-dir /opt/occlum/etc/template
occlum build
cd ..
}
update_flink_conf() {
echo "rest.port: $RPC_BIND_PORT" >> flink-1.15.2/conf/flink-conf.yaml
}
update_flink_conf
build_instance jobmanager
# flink job manager and taks manager use the same occlum instance
cp -rf occlum_instance_jobmanager occlum_instance_taskmanager

@ -1,8 +1,8 @@
#!/bin/bash #!/bin/bash
set -e set -e
rm -rf flink-1.10.1* rm -rf flink-1.15.2*
wget https://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.11.tgz wget https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
tar -xvzf flink-1.10.1-bin-scala_2.11.tgz tar -xvzf flink-1.15.2-bin-scala_2.12.tgz
echo "Download Flink Success" echo "Download Flink Success"

@ -3,16 +3,26 @@ includes:
- java-11-openjdk-amd64.yaml - java-11-openjdk-amd64.yaml
targets: targets:
# copy flink directory # copy flink directory
- target: /bin - target: /opt/flink
copy: copy:
- from: ../flink-1.10.1 - dirs:
# copy localtime - ../flink-1.15.2/
# add timezone
- target: /opt/occlum/glibc/share/
copy:
- dirs:
- /usr/share/zoneinfo
# etc files
- target: /etc - target: /etc
copy: copy:
- dirs:
- /etc/ssl
- files: - files:
- /etc/localtime - /etc/nsswitch.conf
# copy libnss_files # copy libnss_files
- target: /opt/occlum/glibc/lib - target: /opt/occlum/glibc/lib
copy: copy:
- files: - files:
- /opt/occlum/glibc/lib/libnss_files.so.2 - /opt/occlum/glibc/lib/libnss_files.so.2
- /opt/occlum/glibc/lib/libnss_dns.so.2
- /opt/occlum/glibc/lib/libresolv.so.2

@ -1,3 +0,0 @@
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
./flink-1.10.1/bin/jobmanager.sh start
echo -e "${BLUE}Flink jobmanager${NC}"

@ -0,0 +1,86 @@
#!/bin/bash
set -e
BLUE='\033[1;34m'
NC='\033[0m'
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
FLINK_BIND_PORT=8089
run_jobmanager() {
logfile="flink--standalonesession-0.log"
echo -e "${BLUE}occlum run JVM jobmanager${NC}"
echo -e "${BLUE}logfile=$logfile${NC}"
cd occlum_instance_jobmanager
occlum run /usr/lib/jvm/java-11-openjdk-amd64/bin/java \
-Dos.name=Linux -XX:ActiveProcessorCount=4 -Xmx800m -Xms800m \
-XX:MaxMetaspaceSize=256m -Dlog.file=/host/$logfile \
-Dlog4j.configuration=file:/opt/flink/conf/log4j.properties \
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties \
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml \
-classpath /opt/flink/lib/* org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint \
-D jobmanager.memory.off-heap.size=128mb \
-D jobmanager.memory.jvm-overhead.min=192mb \
-D jobmanager.memory.jvm-metaspace.size=256mb \
-D jobmanager.memory.jvm-overhead.max=192mb \
-D rest.bind-port=$FLINK_BIND_PORT \
-D rest.bind-address=0.0.0.0 \
--configDir /opt/flink/conf \
--executionMode cluster \
&
cd ..
}
run_taskmanager() {
logfile="flink--taskmanager-0.log"
echo -e "${BLUE}occlum run JVM taskmanager${NC}"
echo -e "${BLUE}logfile=$logfile${NC}"
cd occlum_instance_taskmanager
occlum run /usr/lib/jvm/java-11-openjdk-amd64/bin/java \
-Dos.name=Linux -XX:ActiveProcessorCount=2 -XX:+UseG1GC \
-Xmx600m -Xms600m -XX:MaxMetaspaceSize=256m \
-Dlog.file=/host/$logfile \
-Dlog4j.configuration=file:/opt/flink/conf/log4j.properties \
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties \
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml \
-classpath /opt/flink/lib/* org.apache.flink.runtime.taskexecutor.TaskManagerRunner \
--configDir /opt/flink/conf -D taskmanager.memory.network.min=128mb \
-D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b \
-D taskmanager.memory.jvm-metaspace.size=256mb -D external-resources=none \
-D taskmanager.memory.jvm-overhead.min=192mb \
-D taskmanager.memory.framework.off-heap.size=128mb \
-D taskmanager.memory.network.max=128mb \
-D taskmanager.memory.framework.heap.size=128mb \
-D taskmanager.memory.managed.size=256mb \
-D taskmanager.memory.task.heap.size=383mb \
-D taskmanager.numberOfTaskSlots=1 \
-D taskmanager.memory.jvm-overhead.max=192mb \
-D rest.bind-port=$FLINK_BIND_PORT \
-D rest.bind-address=0.0.0.0 \
&
cd ..
}
run_task() {
cd flink-1.15.2
./bin/flink run ./examples/streaming/WordCount.jar
cd ..
}
arg=$1
case "$arg" in
jm)
run_jobmanager
cd ../
;;
tm)
run_taskmanager
cd ../
;;
task)
run_task
cd ../
;;
esac

@ -1,77 +0,0 @@
#!/bin/bash
set -e
BLUE='\033[1;34m'
NC='\033[0m'
occlum_glibc=/opt/occlum/glibc/lib/
init_instance() {
# Init Occlum instance
postfix=$1
FLINK_LOG_PREFIX="/host/flink--$postfix-${id}"
log="${FLINK_LOG_PREFIX}.log"
out="./flink--$postfix-${id}.out"
rm -rf occlum_instance_$postfix && mkdir occlum_instance_$postfix
cd occlum_instance_$postfix
occlum init
new_json="$(jq '.resource_limits.user_space_size = "1MB" |
.resource_limits.user_space_max_size = "5500MB" |
.resource_limits.kernel_space_heap_size="1MB" |
.resource_limits.kernel_space_heap_max_size="64MB" |
.resource_limits.max_num_of_threads = 64 |
.process.default_heap_size = "128MB" |
.entry_points = [ "/usr/lib/jvm/java-11-openjdk-amd64/bin" ] |
.env.default = [ "LD_LIBRARY_PATH=/usr/lib/jvm/java-11-openjdk-amd64/lib/server:/usr/lib/jvm/java-11-openjdk-amd64/lib:/usr/lib/jvm/java-11-openjdk-amd64/../lib:/lib" ]' Occlum.json)" && \
echo "${new_json}" > Occlum.json
}
build_flink() {
# Copy JVM and class file into Occlum instance and build
rm -rf image
copy_bom -f ../flink.yaml --root image --include-dir /opt/occlum/etc/template
occlum build
}
run_taskmanager() {
init_instance taskmanager
build_flink
echo -e "${BLUE}occlum run JVM taskmanager${NC}"
echo -e "${BLUE}logfile=$log${NC}"
occlum run /usr/lib/jvm/java-11-openjdk-amd64/bin/java \
-Xmx800m -XX:-UseCompressedOops -XX:MaxMetaspaceSize=256m \
-XX:ActiveProcessorCount=2 \
-Dlog.file=$log \
-Dos.name=Linux \
-Dlog4j.configuration=file:/bin/conf/log4j.properties \
-Dlogback.configurationFile=file:/bin/conf/logback.xml \
-classpath /bin/lib/flink-table-blink_2.11-1.10.1.jar:/bin/lib/flink-table_2.11-1.10.1.jar:/bin/lib/log4j-1.2.17.jar:/bin/lib/slf4j-log4j12-1.7.15.jar:/bin/lib/flink-dist_2.11-1.10.1.jar org.apache.flink.runtime.taskexecutor.TaskManagerRunner \
--configDir /bin/conf \
-D taskmanager.memory.network.max=64mb \
-D taskmanager.memory.network.min=64mb \
-D taskmanager.memory.managed.size=128mb \
-D taskmanager.cpu.cores=1.0 \
-D taskmanager.memory.task.heap.size=256mb \
&
}
run_task() {
export FLINK_CONF_DIR=$PWD/flink-1.10.1/conf && \
./flink-1.10.1/bin/flink run ./flink-1.10.1/examples/streaming/WordCount.jar
}
id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
arg=$1
case "$arg" in
tm)
run_taskmanager
cd ../
;;
task)
run_task
cd ../
;;
esac