普通用户使用ecFlow提交dsub作业
前提条件
普通用户有ecFlow使用权限并已加载MPI环境变量,Donau Sheduler已安装并能配置该用户。以下操作均由普通用户登录操作节点后执行。
新建目录脚本文件
- 进入ecFlow实际安装目录,并创建新目录:
cd /path/to/ecFlow-5.5.2/bin
mkdir -p course/test/f3/
- 执行以下命令,启动ecflow_server:
sh ecflow_start.sh
- 回显中展示服务状态为Running则说明ecflow_server启动成功。查看命令回显信息中的Port字段,对应ecflow_server使用的端口,示例中为7504:
... Version Ecflow version(5.5.2) boost(1.72.0) compiler(gcc 10.3.1) protocol(JSON cereal 1.3.0) openssl(enabled) Compiled on Nov 26 2025 20:56:06 Status RUNNING Host node29 Port 7504 Up since 2025-Nov-28 08:14:33 Job sub' interval 60s ECF_HOME /home/donau_cli/ecflow_server ECF_LOG /home/donau_cli/ecflow_server/node29.7504.ecf.log ECF_CHECK /home/donau_cli/ecflow_server/node29.7504.check ECF_SSL disabled Check pt interval 120s Check pt mode CHECK_ON_TIME Check pt save time alarm 20s Number of Suites 0 Request's per 1,5,15,30,60 min Restart server 1 Ping 1 Get full definition 15 Sync 7 Sync full 18 Task init 4 Task abort 4 Load definition 12 Begin 8 Node delete 10 stats cmd 1
- 进入新创建的目录,创建t66.ecf文件并写入下框中的内容(提交作业的内容可以自行修改,以下以运行test.sh脚本作为示范):cd course/test/f3/
%include "/path/to/ecFlow-5.5.2/bin/course/head.h" dsub -N 20 --mpi hmpi /path/to/ecFlow-5.5.2/bin/course/test/f3/test.sh %include "/path/to/ecFlow-5.5.2/bin/course/tail.h"
- 创建test.sh文件,写入以下内容:
#!/bin/bash sleep 10 echo "ecflow test OK"
- 返回上上层目录,创建hpc_test1.py文件,写入下框中的内容:cd ../..
#!/usr/bin/env import os from pathlib import Path from ecflow import Defs, Suite, Task, Family, Edit, Trigger, Event, Complete, Meter, Time, Day, Date def create_f3(): return Family( "f3", Edit(SLEEP=20), Task("t66") ) home = os.path.abspath(os.path.join("/path/to/ecFlow-5.5.2/bin/", "course")) ld_lib = "/path/to/ecFlow-5.5.2/lib/" defs = Defs( Suite('test', Edit(ECF_INCLUDE=home, ECF_HOME=home, LD_LIBRARY_PATH=ld_lib), create_f3())) defs.save_as_defs(str(Path(os.path.join("/path/to/ecFlow-5.5.2/bin/", "course"), "test.def"))) - 创建client.py文件,写入以下内容(端口号替换为实际对应的):
from pathlib import Path import ecflow home = "/path/to/ecFlow-5.5.2/bin/course" print("Load the in memory definition(defs) into the server") ci = ecflow.Client('localhost', 7504) ci.sync_local() defs = ci.get_defs() # 如果服务器里已经存在 test suite,则删除 if defs and defs.find_suite("test"): print("Suite 'test' already exist, deleting...") ci.delete("/test") ci.sync_local() else: print("Suite 'test' not found, safe to load") # 加载新的 ci.load(str(Path(home, "test.def"))) ci.begin_suite("test") - 创建get_nodes.py文件,写入以下内容(端口号替换为实际对应的):
import ecflow try: # Create the client ci = ecflow.Client("localhost", "7504") # Get the node tree suite definitionas stored in the server # The definition is retrieved and stored on the variable 'ci' ci.sync_local() # access the definition retrievedfrom the server defs = ci.get_defs() if defs is None: print("The server has no definition") exit(1) # get the tasks, *alternatively*could use defs.get_all_nodes() # to include suites,families and tasks. task_vec = defs.get_all_nodes() # iterate over tasks and printpath and state for task in task_vec: print(task.get_abs_node_path() + " " + str(task.get_state())) except RuntimeError as e: print("Failed: ", str(e)) - 创建头文件head.h:
#!/bin/bash set -e # stop the shell on first error set -u # fail when using an undefined variable set -x # echo script lines as they are executed set -o pipefail # fail if last(rightmost) command exits with a non-zero status # Defines the variables that are needed for any communication with ECF export ECF_PORT=%ECF_PORT% # The server port number export ECF_HOST=%ECF_HOST% # The host name where the server is running export ECF_NAME=%ECF_NAME% # The name of this current task export ECF_PASS=%ECF_PASS% # A unique password export ECF_TRYNO=%ECF_TRYNO% # Current try number of the task export ECF_RID=$$ # record the process id. Also used for zombie detection # Define the path where to find ecflow_client # make sure client and server use the *same* version. # Important when there are multiple versions of ecFlow export PATH=/path/to/ecFlow-5.5.2/bin:$PATH export PATH=/opt/batch/cli/bin:$PATH export CCS_CLI_HOME=/opt/batch/cli # Tell ecFlow we have started ecflow_client --init=$$ # Define a error handler ERROR() { set +e # Clear -e flag, so we don't fail wait # wait for background process to stop ecflow_client --abort=trap # Notify ecFlow that something went wrong, using 'trap' as the reason trap 0 # Remove the trap exit 0 # End the script } # Trap any calls to exit and errors caught by the -e flag trap ERROR 0 # Trap any signal that may cause the script to fail trap '{echo "Killed by a signal"; ERROR ;}' 1 2 3 4 5 6 7 8 9 10 12 13 15 - 创建尾文件tail.h:
wait # wait for background process to stop ecflow_client --complete # Notify ecFlow of a normal end trap 0 # Remove all traps exit 0 # End the shell
切换用户提交运行作业,观察作业运行
查看运行结果
- Ecflow结果状态查询。执行命令:
回显如下:
/test complete /test/f3 complete /test/f3/t66 complete
- 执行以下命令找到对应的jobID,查看作业运行结果:
回显如下:
... ID NAME STATE USER ACCOUNT QUEUE START_TIME END_TIME EXEC_NODES 17 default SUCCEEDED superadmin default default 2025/12/04 20:50:39 2025/12/04 20:50:39 node-238 ...