このメッセージ・フローの例には、別々の TCP/IP ポートを listen するよう 構成された 2 つの SCADAInput ノードが含まれています。 メッセージがエラーなしで受信されると、入力ノードは、そのメッセージを Compute ノードに伝搬し、 入力メッセージの内容が操作され、多くの出力メッセージが生成されます。 Compute ノードは出力メッセージを Publication ノードに伝搬し、 出力メッセージは WebSphere MQ プロトコルを使用する登録済みサブスクライバーに パブリッシュされます。 フロー内にエラーがあれば MQOutput ノードに伝搬され、後で処理するためにエラー・メッセージがキューに記録されます。
Telemetry クライアントは、例えば、状態の変化を示したり、 クライアントがまだアクティブであることを確認したりするために、イベントを生成します。 クライアントは、複数のイベントを 1 回分にまとめて 15 分ごとにブローカーに送信するようプログラムされています。 メッセージ・フローは、バッチ・メッセージから個々のイベント・メッセージを抽出してパブリッシュするよう設計されています。
Compute ノードは、バッチ処理された入力メッセージを個々のイベント・メッセージに分割する ESQL で構成されています。WHILE ループは、バッチ内の各メッセージで繰り返されます。MQMD が、入力メッセージから各出力メッセージにコピーされます。関連するフィールドが、入力ツリーから出力ツリーにコピーされます。各出力メッセージは、JMSText メッセージです。これは、MQRFH2ヘッダー内の usr フォルダーで「ユーザー」プロパティーを設定することによって構築されます。各メッセージは、PROPAGATE ステートメントによって、Publication ノードに渡されます。
以下に示すのは、2 つのイベントを含む入力バッチ・メッセージの例です。
<?xml version="1.0" encoding="UTF-8"?> <!-- edited by Mary Bright --> <events d_tstamp="20040417103118"> <StateChange topic="LCUnit/12345/StateChange" d_tstamp="20040417104439" i_state="1" i_old_state="0"> <![CDATA[Changing state from 'Starting' to 'Payload' because 'The startup routine is complete']]> </StateChange> <Heartbeat topic="LCUnit/12345/Heartbeat" d_tstamp="20040417105126" i_state="1"> <d_tstamp>20040417104948</d_tstamp> <i_state>1</i_state> </Heartbeat> </events>
このフォーマットのメッセージを処理する ESQL モジュールは、 以下のとおりです。
CREATE COMPUTE MODULE messageflow_Compute CREATE FUNCTION Main() RETURNS BOOLEAN BEGIN DECLARE BatchTime CHAR; SET BatchTime = InputRoot.XML.events.d_tstamp; DECLARE Count INTEGER CARDINALITY(InputRoot.XML.events.*[]); DECLARE I INTEGER 2; WHILE I <= Count DO SET OutputRoot.Properties.MessageDomain = 'XML'; SET OutputRoot.XML = NULL; SET OutputRoot.MQMD = InputRoot.MQMD; SET OutputRoot.MQRFH2.CodedCharSetId = 1208; SET OutputRoot.MQRFH2.(MQRFH2.Field)Format = 'MQSTR '; SET OutputRoot.MQRFH2.(MQRFH2.Field)NameValueCCSID = 1208; SET OutputRoot.MQRFH2.psc.Topic = InputRoot.XML.events.*[I].topic; SET OutputRoot.MQRFH2.usr.*[] = InputRoot.XML.events.*[I].(XML.Attribute)*[]; SET OutputRoot.MQRFH2.usr.b_time = BatchTime; SET OutputRoot.XML.Body.Text = InputRoot.XML.events.*[I].(XML.CDataSection)*' SET I = I + 1; PROPAGATE; END WHILE; RETURN FALSE; END; END MODULE;
このメッセージ・フローには、メッセージ処理能力を向上させるために、2 つの入力ノードがあります。入力ノードは、いくつ使用しても構いません。さらに、メッセージ・フロー・プロパティー「追加インスタンス」 を変更して、メッセージ・フローにサービスを提供するプロセスの数を増やすこともできます。数百のクライアントがある場合、複数のブローカーで高いメッセージ・ロードを処理しなければならない場合があります。このような手法を 1 つ以上使用して、メッセージ処理の許容レベルを見出してください。
この例は、Telemetry クライアント・メッセージを処理するための 1 つの方法を示しているに過ぎません。 独自の Telemetry 要件に合わせて、このメッセージ・フローを変更するか、新しいメッセージ・フローを作成してください。