Das Aggregationsbeispiel stellt eine einfache Aggregationsoperation bestehend aus vier Schritten dar, für die die Knoten zur Aggregationssteuerung, für Aggregationsanforderungen und für Aggregationsantworten verwendet werden. Es enthält drei Nachrichtenflüsse ('FanOut', 'RequestReplyApp' und 'FanIn'), um eine Aggregation in vier Schritten zu implementieren.
Dieser Fluss ruft die eingehende Anforderungsnachricht ab, erstellt vier verschiedene Anforderungsnachrichten, sendet diese auf eine Request/Reply-Anforderung hin und startet die Überwachung der Aggregationsoperation:
Beachten Sie, dass das Steuerterminal des Knotens zur Aggregationssteuerung nicht festnetzgebunden ist und der Transaktionsmodus des MQEmpfangsknotens auf Yes (Ja) gesetzt ist.
Weitere Informationen zu den Gründen für dieses Design finden Sie unter Erweitern des Aggregationsbeispiels.
Der Fluss 'FanOut' enthält die Knoten zur Aggregationssteuerung und für Aggregationsanforderungen, mit denen die Verarbeitung der Aggregation gestartet wird. Der Knoten zur Aggregationssteuerung gibt die Anforderungsnachricht auf jeder der vier Verzweigungen, die mit dessen Ausgangsterminal verbunden sind, (in unbestimmter Reihenfolge) weiter. Jede Verzweigung besitzt den Rechenknoten "BuildRequest", um die individuelle Anforderung zu erstellen. Die folgende ESQL wird im Rechenknoten 'BuildRequest1' verwendet:
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
Die CopyQuarter-Prozedur kopiert die Nachrichtenheader der Eingabenachricht und extrahiert dann ein Viertel der Elemente der <SaleList>. Im dargestellten Beispiel gibt es acht Elemente der <SaleList>, da jede Anforderungsnachricht zwei davon enthält. Die ESQL für diese Prozedur lautet wie folgt:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
IN output REFERENCE,
IN jumps INTEGER)
BEGIN
CALL CopyMessageHeaders(input, output);
CREATE LASTCHILD OF output DOMAIN 'XML';
CREATE LASTCHILD OF output.XML NAME 'SaleEnvelope';
DECLARE xmlIn REFERENCE TO input.XML.SaleEnvelope;
DECLARE xmlOut REFERENCE TO output.XML.SaleEnvelope;
IF LASTMOVE(xmlOut) <> TRUE THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('could not create output message');
END IF;
DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
DECLARE quarter INTEGER invoices/4;
IF invoices <> (quarter*4) THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('not divisible by 4', invoices);
END IF;
IF jumps > 3 THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('too many jumps', jumps);
END IF;
DECLARE count INTEGER 1;
DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
WHILE count <= quarter DO
SET xmlOut.SaleList[count] = copyRef;
MOVE copyRef NEXTSIBLING;
SET count = count + 1;
END WHILE;
END;
Zuerst wird der Status der Eingaben überprüft (die Anzahl der Elemente der <SaleList> muss durch vier teilbar sein, und das erforderliche Viertel wird über 0, 1, 2 oder 3 ausgewählt), bevor die entsprechende Anzahl an Elementen der <SaleList> von der Eingabenachricht in die Ausgabenachricht kopiert werden.
Die CopyMessageHeaders-Prozedur, wie sie in der CopyQuarter-Prozedur genannt wird, basiert auf der bereitgestellten Standardprozedur CopyMessageHeaders, die in der erstellten ESQL für einen neuen Rechenknoten zur Verfügung gestellt wird. Um die Wiederverwendung zu maximieren, wurde diese Prozedur in den Bereich der ESQL-Datei verschoben, so dass alle Rechenknoten dieselbe Prozedur aufrufen konnten.
Diese Verschiebung hat zur Folge, dass eine Änderung bei der Prozedur notwendig wird. Innerhalb eines Rechenknotens hat die OutputRoot-Referenz besondere Eigenschaften, die automatisch sicherstellen, dass Domäneninformationen beibehalten werden, wenn Nachrichtenbaumstrukturelemente vom InputRoot in den OutputRoot kopiert werden. Jedoch wird in diesem Fall OutputRoot als Referenz an eine externe Prozedur weitergeleitet. Daher muss die Domäneninformation explizit beibehalten werden. Dies wird durch Hinzufügen des Befehls CREATE LASTCHILD erreicht:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
Nachdem der Rechenknoten "BuildRequest" die Anforderungsnachricht erstellt hat, indem der Rechenknoten auf Pass LocalEnvironment and Message gesetzt wurde, wird diese durch einen MQSendeknoten an die Warteschlange AGGR_SAMPLE_REQUEST ausgegeben. (Der Einfachheit halber werden in diesem Muster alle vier Anforderungen in dieselbe Warteschlange eingereiht. Für eine echte Anwendung ist dieser Fall jedoch unrealistisch.) Jeder Knoten für Aggregationsanforderungen besitzt einen als Konfigurationsparameter angegebenen Ordnernamen, der vom Knoten für Aggregationsantworten beim Hinzufügen der verschiedenen Antworten zur zusammengefassten Antwortnachricht verwendet wird. "Request1" verwendet das erste Viertel der Eingabenachricht, "Request2" das zweite usw.
Die MQSendeknoten werden gesetzt, um AGGR_SAMPLE_REPLY als ReplyTo-Warteschlange auf den Anforderungsnachrichten festzulegen. Dieser Vorgang wird vom Nachrichtenfluss 'RequestReplyApp' verwendet.
Nachdem alle vier Anforderungsnachrichten ausgegeben wurden, speichert der Knoten zur Aggregationssteuerung den Status der Aggregation intern im Broker. Um dies zu erreichen, werden folgende Schritte durchgeführt:
Es gibt andere Möglichkeiten, dies zu erreichen. Weitere Informationen hierzu finden Sie unter Erweitern des Aggregationsbeispiels.
Dieser ganze Fluss muss unter einer Transaktion durchgeführt werden, wobei 'transactionMode' im MQEmpfangsknoten auf YES (Ja) gesetzt werden sollte, da es höchst effizient ist, wenn die letzte Operation (das Speichern des Aggregationsoperationsstatus) beendet ist, bevor Antworten empfangen werden.
Dieser Fluss wird zur Simulation von Back-End-Serviceanwendungen verwendet, die normalerweise die Anforderungsnachrichten der Aggregationsoperation verarbeiten. In einem echten System können dies andere Nachrichtenflüsse oder bestehende Anwendungen sein. Dieser Komplexitätsgrad ist für das Aggregationsbeispiel jedoch nicht erforderlich, deshalb enthält der Fluss nur das, was für eine korrekte Request/Reply-Verarbeitung erforderlich ist. Dieser Fluss liest von derselben Warteschlange, in die die MQSendeknoten im Fluss 'FanOut' schreiben, und er gibt Daten an die Warteschlange aus, von der der Empfangsknoten im Fluss 'FanIn' liest. Er stellt eine Nachrichtenaustauschbrücke zwischen zwei Flüssen zur Verfügung. Die Nachrichten werden in ihrer Warteschlange für zu beantwortende Nachrichten eingereiht (wie von den MQSendeknoten im Fluss 'FanOut' gesetzt).
Der Fluss 'RequestReplyApp' wird mit drei zusätzlichen Instanzen in der BAR-Datei angegeben, woraus sich insgesamt vier Threads ergeben.
Dadurch wird sichergestellt, dass alle vier Anforderungen so schnell wie möglich verarbeitet werden.
Dieser Fluss empfängt alle Antworten vom Fluss 'RequestReplyApp' und fasst diese in einer einzelnen Ausgabenachricht zusammen.
Die Ausgabenachricht vom Knoten für Aggregationsantworten kann nicht von einem MQSendeknoten ausgegeben werden. Deshalb wird
ein Rechenknoten hinzugefügt, der die Daten in ein Format bringt, mit dem sie in eine Warteschlange geschrieben werden können.
Der Nachrichtenfluss 'FanIn' hat aus demselben Grund wie der Fluss 'RequestReplyApp' auch drei zusätzliche Instanzen. Die ersten drei eingehenden Antworten werden intern vom Broker gespeichert, und der gespeicherte Status der Aggregation wird aktualisiert. Wenn die vierte Antwort verarbeitet wird, werden die drei gespeicherten Antworten extrahiert, und alle vier Antwortnachrichten werden in einer Ausgabenachricht zusammengefasst. Diese Nachricht befindet sich nicht in einem Status, in dem sie an eine Warteschlange ausgegeben werden kann. Deshalb ruft der Rechenknoten "BuildReply" die folgende ESQL auf, um dies zu korrigieren:
CREATE COMPUTE MODULE FanIn_BuildReply
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot.Properties = InputRoot.Properties;
CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID;
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
CREATE LASTCHILD OF OutputRoot DOMAIN 'XML';
CREATE LASTCHILD OF OutputRoot.XML NAME 'ComIbmAggregateReplyBody';
DECLARE next INTEGER 1;
DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
DECLARE repliesOut REFERENCE TO OutputRoot.XML.ComIbmAggregateReplyBody;
WHILE next <= 4 DO -- 4-way aggregation
CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
SET repliesOut.*[next].SaleEnvelope = repliesIn.XML.SaleEnvelope;
MOVE repliesIn NEXTSIBLING;
SET next = next + 1;
END WHILE;
RETURN TRUE;
END;
END MODULE;
Die ESQL fügt eine rudimentäre MQMD-Struktur hinzu, bevor sie die Daten von ComIbmAggregateReplyBody in der Eingabenachricht in eine XML-Baumstruktur in der Ausgabenachricht kopiert. Dabei werden die Aggregat-IDs und -ordner beibehalten. Die Reihenfolge der Antworten ist nicht angegeben.
Die Testnachricht, die als Treiber des Aggregationsnachrichtenflusses verwendet wird, ist eine eindeutige XML-Nachricht, die Rechnungsdetails für einen Kunden enthält. Sie enthält Daten mit einer Größe von etwa 8 KB in acht einzelnen <SaleList>-Elementen.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>