本示例消息流包含两个配置为侦听不同 TCP/IP 端口的 SCADAInput 节点。当在没有错误的情况下收到消息时,输入节点会将它传播到 Compute 节点,该节点对输入消息内容进行处理并生成多条输出消息。Compute 节点将输出消息传播到 Publication 节点,该节点将它们发布到使用 WebSphere MQ 协议的已注册订户。流中的任何错误将传播到 MQOutput 节点,该节点在队列上记录错误消息,稍后对其进行处理。
Telemetry 客户机生成事件,例如指示状态更改,或确认它们仍然是活动的。这些客户机编程为对事件进行批处理,并且每 15 分钟将这些事件发送到代理。消息流用于从批处理消息抽取单独的事件消息并对它们进行发布。
已使用 ESQL 配置了 Compute 节点,将批处理的输入消息分离到各单独事件消息中。WHILE 循环在批处理中迭代每条消息。MQMD 被从输入消息复制到每条输出消息中。有关字段被从输入树复制到输出树中。每条输出消息都是 JMSText 消息,通过在 MQRFH2 头内的 usr 文件夹中设置“用户”属性而构建。每条消息都由 PROPAGATE 语句传递到 Publication 节点。
此处是一个包含两个事件的输入批处理消息的示例:
<?xml version="1.0" encoding="UTF-8"?> <!-- edited by Mary Bright --> <events d_tstamp="20040417103118"> <StateChange topic="LCUnit/12345/StateChange" d_tstamp="20040417104439" i_state="1" i_old_state="0"> <![CDATA[Changing state from 'Starting' to 'Payload' because 'The startup routine is complete']]> </StateChange> <Heartbeat topic="LCUnit/12345/Heartbeat" d_tstamp="20040417105126" i_state="1"> <d_tstamp>20040417104948</d_tstamp> <i_state>1</i_state> </Heartbeat> </events>
以下显示了处理此格式消息的 ESQL 模块:
CREATE COMPUTE MODULE messageflow_Compute CREATE FUNCTION Main() RETURNS BOOLEAN BEGIN DECLARE BatchTime CHAR; SET BatchTime = InputRoot.XML.events.d_tstamp; DECLARE Count INTEGER CARDINALITY(InputRoot.XML.events.*[]); DECLARE I INTEGER 2; WHILE I <= Count DO SET OutputRoot.Properties.MessageDomain = 'XML'; SET OutputRoot.XML = NULL; SET OutputRoot.MQMD = InputRoot.MQMD; SET OutputRoot.MQRFH2.CodedCharSetId = 1208; SET OutputRoot.MQRFH2.(MQRFH2.Field)Format = 'MQSTR '; SET OutputRoot.MQRFH2.(MQRFH2.Field)NameValueCCSID = 1208; SET OutputRoot.MQRFH2.psc.Topic = InputRoot.XML.events.*[I].topic; SET OutputRoot.MQRFH2.usr.*[] = InputRoot.XML.events.*[I].(XML.Attribute)*[]; SET OutputRoot.MQRFH2.usr.b_time = BatchTime; SET OutputRoot.XML.Body.Text = InputRoot.XML.events.*[I].(XML.CDataSection)*' SET I = I + 1; PROPAGATE; END WHILE; RETURN FALSE; END; END MODULE;
该消息流具有两个输入节点,以增强消息的处理能力。您可以使用任意数量的输入节点。还可以更改消息流属性附加实例来增加为消息流提供服务的进程数量。如果有数以百计的客户机,您可能会发现,必须在两个或更多的代理上处理大量消息负载。使用这些方法中的一项或多项可以为消息处理找到一个可以接受的级别。
此示例只显示了一种可以用来处理 Telemetry 客户机消息的方法。请更改此消息流或创建新的消息流来满足您自己的 Telemetry 需求。