使用 Java 创建输入节点

开始之前

创建新的 Java 项目

您可在 工作台 中创建 Java 节点。 要完成这点,您必须按照以下所述新建一个 Java 项目:
  1. 切换到 Java 透视图。
  2. 单击文件 > 新建 > 项目。 从左边的菜单中选择 Java,然后从右边的菜单中选择 Java 项目
  3. 为该项目命名。

    显示“Java 设置”面板。

  4. 选择“库”选项卡,然后单击添加外部 JAR
  5. 选择 install_dir\classes\jplugin2.jar。
  6. 按照其他选项卡上的提示定义任何其他构建设置。
  7. 单击完成
然后,您可以在此项目中开发用于您的 Java 节点的源代码。

声明输入节点类

任何实现 MbInputNodeInterface 且包含在代理的 LIL 路径中的类都在代理中注册为输入节点。当您实现 MbInputNodeInterface 时,您还需要实现这个类的 run 方法。run 方法表示消息流的开始,包含表达消息的数据并将消息在流中向下传播。当线程根据您指定的线程技术模型变为可用时,代理调用 run 方法。

例如,要声明输入节点类:

package com.ibm.jplugins;

import com.ibm.broker.plugin.*;

public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
...
您可以在工作台中做到这点,如下所示:
  1. 单击文件 > 新建> 类
  2. 将包与类名字段设置为相应的值。
  3. 删除“超类”文本字段中的文本然后单击浏览按钮
  4. 选择MbInputNode
  5. 单击“接口”文本字段旁的添加按钮,然后选择MbInputNodeInterface
  6. 单击完成

定义节点构造函数

当实例化节点时,将调用用户的节点类的构造函数。这是您创建节点的终端并初始化属性的任何缺省值的地方。

输入节点有许多与它关联的输出终端,但通常没有任何输入终端。当实例化节点时,使用 createOutputTerminal 方法将输出终端添加到节点。例如,要创建带有三个输出终端的节点:

public BasicInputNode() throws MbException
{
	createOutputTerminal ("out");
	createOutputTerminal ("failure");
	createOutputTerminal ("catch");
   setAttribute ("firstParserClassName","myParser");
attributeVariable = "none";
}

将外部数据接收到缓冲区

输入节点可以使用与任何其他 Java 程序相同的方法从任何类型的外部源接收数据(例如,文件系统、队列或数据库),只要从节点的输出是正确的格式即可。

提供一个输入缓冲区(或位流)以包含输入数据,并将它与消息对象关联起来。使用 MbInputNode 类的 createMessage 方法可以从字节数组中创建消息,然后从此消息生成有效的消息集合。要了解这些方法的详细信息,请参阅 JavaAPI。例如,要从文件读取输入:

  1. 创建输入流以从文件读取:
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. 创建输入文件的字节数组大小:
    byte[] buffer = new byte[inputStream.available()];
  3. 从文件读取到字节数组中:
    inputStream.read(buffer);
  4. 关闭输入流:
    inputStream.close();
  5. 创建放入队列的消息:
    MbMessage msg = createMessage(buffer);
  6. 创建新的消息集合以保存此消息:
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
      MbMessageAssembly newAssembly =
         new MbMessageAssembly(assembly, msg);

传播消息

当您创建了消息集合,您可以将它传播到节点的终端中的一个。

例如,要将消息集合传播到“out”终端:
    MbOutputTerminal out = getOutputTerminal("out");out.propagate(newAssembly);
要删除消息:
msg.clearMessage();

要清空分配给该消息树的内存,调用最后一个 try/catch 程序段中的 clearMessage() 函数。

控制线程技术和事务性

代理基础结构处理事务问题(例如,在消息处理完成时,控制任何 WebSphere MQ 或工作的数据库单元的提交)。但是,从用户定义的节点修改的资源不需要处在代理的事务控制之下。

从每个消息流维护的线程的池分配每个消息流线程,且在 run 方法中启动执行。

用户定义的节点使用返回值表明事务是否成功,控制事务是提交还是回滚,并控制线程何时返回到池。代理基础捕获任何未处理的意外情况,并且事务发生回滚。

使用以下适当的返回值确定事务和线程的行为:

MbInputNode.SUCCESS_CONTINUE
事务已提交且代理使用相同的线程再次调用 run 方法。
MbInputNode.SUCCESS_RETURN
事务已提交且线程返回线程池,假设它不是此消息流的唯一线程。
MbInputNode.FAILURE_CONTINUE
事务已回滚且代理使用相同的线程再次调用 run 方法。
MbInputNode.FAILURE_RETURN
事务已回滚且线程返回线程池,假设它不是此消息流的唯一线程。
MbInputNode.TIMEOUT
当等待输入数据到达时,run 方法不能无限期地阻塞。当流被用户代码阻塞时,您就无法关闭或重新配置代理。run 方法必须通过从 run 方法返回才能定期地产生对代理的控制。 如果在特定的周期(例如,5 秒)后还没有接收到输入数据,方法应该返回 TIMEOUT 返回码。假设不需要重新配置或关闭代理,会立即再次调用输入节点的 run 方法。
要创建多线程消息流,您在创建消息后,但在消息传播到输出终端之前调用 dispatchThread 方法。这确保当其他线程处理消息时,只有一个线程正在等待数据。从线程池得到新的线程,最多为消息流的 additionalInstances 属性指定的最大限制。例如:
public int run( MbMessageAssembly assembly ) throws MbException
{
    byte[] data = getDataWithTimeout();  // user supplied method
                                                                              // returns null if timeout
    if( data == null )
        return TIMEOUT;

    MbMessage msg = createMessage( data );
    msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
    MbMessageAssembly newAssembly =
              new MbMessageAssembly( assembly, msg );

    dispatchThread();

    getOutputTerminal( "out" ).propagate( newAssembly );

    return SUCCESS_RETURN;
}

声明节点名称

您需要声明节点的名称,因为它将由工作台标识。所有节点名称必须以“Node”结尾。您使用下列方法声明此名称:

public static String getNodeName()
{
      return "BasicInputNode";
}
如果没有声明此方法,Java API 框架使用下列规则创建缺省节点名称:
  • 类名附加到软件包名称。
  • 点被除去,且软件包和类名的每部分的第一个字母是大写的。
例如,缺省情况下,以下类指派节点名称“ComIbmPluginsamplesBasicInputNode”:
package com.ibm.pluginsamples;
public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
   ...

声明属性

您使用与 Java Bean 属性相同的方法声明节点属性。您负责编写属性的 getter 和 setter 方法,且 API 框架使用 Java 自省规则推断属性名。例如,如果您声明下列两个方法:

private String attributeVariable;

public String getFirstAttribute()
{
    return attributeVariable;
}

publc void setFirstAttribute(String value)
{
    attributeVariable = value;
}

代理推断此节点有一个称为 firstAttribute 的属性。此名称从 get 或 set 方法的名称派生而来,而不是从任何内部类成员变量名称派生而来。属性只能以字符串的形式显示,因此您必须在 get 或 set 方法中将任何数字类型转换为字符串或将字符串转换为数字类型。例如,下列方法定义称为 timeInSeconds 的属性:

int seconds;

public String getTimeInSeconds()
{
    return Integer.toString(seconds);
}

public void setTimeInSeconds(String value)
{
    seconds = Integer.parseInt(value);
}

实现节点功能

如果已描述的那样,代理调用 run 方法以创建输入消息。此方法应该提供输入节点的所有处理功能。

覆盖缺省消息解析器属性(可选)

输入节点实现通常确定什么消息解析器在开始的时候解析输入消息。例如,原语 MQInput 节点规定需要 MQMD 解析器解析 MQMD 头。用户定义的输入节点可以通过使用缺省包含的下列属性(您可以覆盖它们)选择合适的头或消息解析器:

rootParserClassName
定义解析用户定义的输入节点支持的消息格式的根解析器的名称。它缺省为 GenericRoot,提供的根解析器导致代理在一起分配和链接解析器。 代理不太会需要修改这个属性值。
firstParserClassName
定义第一个解析器的名称,它可能是负责解析位流的解析器的链。它缺省为 XML
messageDomainProperty
定义解析输入消息所需的消息解析器的名称的可选属性。支持的值与 MQInput 节点支持的值相同。(请参阅MQInput 节点,获取有关 MQInput 节点的更多信息。)
messageSetProperty
消息集字段中定义消息集标识或消息集名称的可选属性(仅当 MRM 解析器由 messageDomainProperty 属性指定时)。
messageTypeProperty
MessageType 字段中定义消息的标识的可选属性(仅当 MRM 解析器由 messageDomainProperty 属性指定时)。
messageFormatProperty
消息格式字段中定义消息的格式的可选属性(仅当 MRM 解析器由 messageDomainProperty 属性指定时)。

删除节点实例

当发生下列情况之一时,将删除节点的实例:
  • 关闭代理。
  • 除去包含节点的节点或消息流,然后重新部署配置。
在节点删除期间,节点可能想要被通知,以便它可以执行任何清除操作(例如,关闭套接字)。如果节点实现可选的 onDelete 方法,代理仅在删除节点之前调用此方法。

您按如下所示实现 onDelete 方法:

public void onDelete()
{
    // perform node cleanup if necessary
}
相关信息
Java API
声明 | 商标 | 下载 | 书库 | 支持 | 反馈
Copyright IBM Corporation 1999, 2006 最后一次更新时间:2006/08/14
as09950_