算子下推开发
- 使用注解@HafTarget标记需要下推的函数。
如下所示,使用注解@HafTarget表示该函数为需要下推到卸载节点运算的函数。
package com.haf.test.testapp; import com.huawei.haf.annotations.HafTarget; public class TestAppMain { private String ip; private int val; public TestAppMain(String ip) { val = 0; this.ip = ip; } @HafTarget(target = "127.0.0.1") public void jarOffloadVoidTest() { System.out.println("void test"); } @HafTarget(target = "ip", to = {"a", "b"}, app = "add", exception = Exception.class) public int jarOffloadAddTest(int a, int b) { System.out.println("call add func"); return a + b; } @HafTarget(target = "ip", to = {"a", "b"}, from = "val", timeout = "300", runInHostIfFailed = false, exception = Exception.class, app = "add") public int jarOffload(int a, int b) throws Exception { System.out.println(""); val++; return a + b; } public static void main(String[] args) { String addIp; int numA = 0; int numB = 0; try { addIp = args[0]; numA = Integer.valueOf(args[1]).intValue(); numB = Integer.valueOf(args[2]).intValue(); } catch (Exception e) { System.out.println(e.fillInStackTrace()); return; } TestAppMain testAppMain = new TestAppMain(addIp); System.out.println(testAppMain.jarOffloadAddTest(numA, numB)); testAppMain.jarOffloadVoidTest(); try { testAppMain.jarOffload(numA, numB); } catch (Exception e) { System.out.println(e.fillInStackTrace()); return; } } }
表1 @HafTarget注解支持字段 字段
说明
是否必填
target
表示卸载节点IP,函数将被下推到指定IP的节点上运行。
target支持一个IP地址或一个类型为String的成员变量名,用户需保证该成员变量的值是一个合法IP地址。
×
to
表示下推函数的参数名。
×
from
表示函数运行结束之后,需要返回的变量名。
×
timeout
表示函数下推运行的超时时间,默认超时时间为1800s。timeout支持一个数字字符串或一个类型为int的成员变量名,用户需保证该成员变量的值是一个合法int整数。
×
runInHostIfFailed
表示如果下推失败,是否在主机节点运行该函数,默认为true,即如果函数下推失败,默认在主机节点运行函数。如果runInHostIfFailed的值设置为false,表示下推失败不在主机节点运行该函数,因此会抛出异常,需要使用exception指定抛出异常的类型。
×
exception
表示如果函数下推失败,抛出的异常类型,当runInHostIfFailed设为false时,表示需要抛出异常。
√
app
表示当前应用的名字。对于同一个JAR包项目,如果有多个函数需要使用@HafTarget注解标记下推,仅需有一个注解指定应用名。
×
- 配置pom文件,项目打包成一个整体的JAR包。如下所示,使用maven-assembly-plugin插件,将项目所有依赖打包在一起。
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>com.haf.test.testapp.TestAppMain</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- 开发序列化器。
函数下推的过程中,是使用FST序列化当前对象this和to指定的参数,如果存在FST无法直接序列化的对象类型,需自行根据FST写序列化器,并使用注解@HafSerializer指定序列化器。
如下所示,假设类Page是FST不支持的序列化对象。
public class Page { private String name; private int age; public Page(String name, int age) { this.name = name; this.age = age; } public int getAge() { return age; } public String getName() { return name; } public void setAge(int age) { this.age = age; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Page{" + "name='" + name + '\'' + ", age=" + age + '}'; } }
针对类Page创建FST的序列化器仅需继承FST的类FSTBasicObjectSerializer,并重写writeObject和instantiate方法。
public class PageSerializer extends FSTBasicObjectSerializer { @Override public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo, FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException { if (toWrite instanceof Page) { Page page = (Page) toWrite; out.writeInt(page.getAge()); out.writeStringUTF(page.getName()); } } @Override public Object instantiate(Class objectClass, FSTObjectInput in, FSTClazzInfo serializationInfo, FSTClazzInfo.FSTFieldInfo referencee, int streamPosition) throws Exception { int age = in.readInt(); String name = in.readStringUTF(); return new Page(name, age); } }
对于写好的序列化器,使用注解@HafSerializer使用该序列化器。如下所示:
@HafSerializer(clazz = Page.class, serializer = PageSerializer.class) @HafTarget(target = "127.0.0.1") public Page getPage() { Page page = new Page("test", 0); return page; }
注解@HafSerializer包含两个字段:
- clazz表示需要使用序列化器的类。
- serializer表示使用的自定义序列化器。
- 编译项目。