El ejemplo de Agregación muestra una sencilla operación de agregación de cuatro formas, utilizando los nodos Aggregate Control (agregar control), Request (petición) y Reply (respuesta). Contiene tres flujos de mensajes para implementar una agregación de cuatro formas: FanOut, RequestReplyApp y FanIn.
Es el flujo que toma el mensaje de petición entrante, genera cuatro mensajes de petición distintos, los envía tras una petición/respuesta e inicia el seguimiento de la operación de agregación:
Recuerde que el terminal de control del nodo AggregateControl no está conectado y que la Modalidad de transacción del
nodo
MQInput está establecida en Sí.
Encontrará más información sobre las razones por las que se ha elegido este diseño en el apartado
Ampliación del ejemplo de Agregación.
El flujo de mensajes FanOut contiene los nodos Aggregate Control y Request que se utilizan para iniciar el proceso de agregación. El nodo Aggregate Control propaga el mensaje de petición a cada una de las cuatro ramas conectadas a su terminal de salida (Out) en un orden no definido. Cada rama tiene un nodo Compute "BuildRequest" (crear petición) para generar la petición individual. En el nodo BuildRequest1 Compute se utiliza el siguiente SQL:
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
El procedimiento CopyQuarter (copiar cuarto) copia las cabeceras de mensajes del mensaje de entrada y después extrae un cuarta parte de los elementos de la <SaleList>. En el ejemplo que se facilita, hay ocho elementos de la <SaleList> (listas de ventas) y, por lo tanto, cada mensaje de petición contendrá dos de ellos. El ESQL para este procedimiento es el siguiente:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
IN output REFERENCE,
IN jumps INTEGER)
BEGIN
CALL CopyMessageHeaders(input, output);
CREATE LASTCHILD OF output DOMAIN 'XML';
CREATE LASTCHILD OF output.XML NAME 'SaleEnvelope';
DECLARE xmlIn REFERENCE TO input.XML.SaleEnvelope;
DECLARE xmlOut REFERENCE TO output.XML.SaleEnvelope;
IF LASTMOVE(xmlOut) <> TRUE THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('no se ha podido crear el mensaje de salida');
END IF;
DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
DECLARE quarter INTEGER invoices/4;
IF invoices <> (quarter*4) THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('not divisible by 4', invoices);
END IF;
IF jumps > 3 THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('demasiados vuelcos', jumps);
END IF;
DECLARE count INTEGER 1;
DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
WHILE count <= quarter DO
SET xmlOut.SaleList[count] = copyRef;
MOVE copyRef NEXTSIBLING;
SET count = count + 1;
END WHILE;
END;
Es necesario hacer algunas verificaciones iniciales sobre el estado de las entradas (el número de elementos de la <SaleList> ha de ser divisible por cuatro y el cuarto requerido se selecciona mediante 1, 2 ó 3) antes de copiar el número adecuado de elementos de <SaleList> desde el mensaje de entrada en el mensaje de salida.
El procedimiento CopyMessageHeaders (copiar cabeceras de mensajes), como se le llama en el procedimiento CopyQuarter (copiar cuarto), se basa en el procedimiento estándar CopyMessageHeaders suministrado que procede del ESQL generado para un nuevo nodo Compute. Para maximizar la reutilización, éste se ha trasladado al ámbito del archivo ESQL, de forma que todos los nodos Compute puedan llamar al mismo procedimiento.
Este nuevo ámbito tiene una implementación importante y requiere un cambio en el procedimiento. Dentro del nodo Compute, la referencia OutputRoot (raíz de salida) tiene propiedades especiales que aseguran automáticamente que la información del dominio se conserva cuando los tres elementos del mensaje se copian desde InputRoot (raíz de entrada) a OutputRoot. Sin embargo, en este caso, OutputRoot se pasa como referencia a un procedimiento externo y, por lo tanto, la información del dominio ha de conservarse explícitamente. Esto se lleva a cabo añadiendo el mandato CREATE LASTCHILD:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
Después de que el nodo Compute "BuildRequest" haya generado el mensaje de petición estableciendo el nodo Compute en Pass LocalEnvironment and Message, un nodo MQOutput efectuará su salida a la cola AGGR_SAMPLE_REQUEST. (Para que este ejemplo sea más sencillo, se han transferido las cuatro peticiones a la misma cola, pero es probable que esto no resulte realista si se trata de una aplicación real.) Cada nodo Aggregate Request tiene especificado un FolderName como parámetro de configuración, que utiliza el nodo Aggregate Reply cuando añade las distintas respuestas al mensaje de respuesta agregado. La "Request1" (petición1) utiliza el primer cuarto del mensaje de entrada, la "Request2" el segundo cuarto, y así sucesivamente.
Los nodos MQOutput se establecen para especificar AGGR_SAMPLE_REPLY como cola de respuestas en los mensajes de petición -- esto lo utiliza el flujo de mensajes RequestReplyApp.
Cuando se realiza la salida de los cuatro mensajes, el nodo Aggregate Control almacena el estado internamente el estado de la agregación en el intermediario. Para ello, se produce lo siguiente:
Hay otras formas de llevar esto a cabo -- consulte el apartado Ampliación del ejemplo donde encontrará información más detallada.
Todo este flujo de mensajes ha de realizarse bajo una transacción, con transactionMode establecido en YES en el nodo MQInput, debido a que es mejor que la última operación (el almacenamiento del estado de la operación de agregación) haya terminado antes de recibir las respuestas.
Este flujo se utiliza para simular las aplicaciones de servicio de fondo que normalmente procesarían los mensajes procedentes de la operación de agregación. En un sistema real, podrían ser otros flujos de mensajes o aplicaciones existentes, pero este nivel de complejidad no es necesario para el ejemplo de Agregación -- de este modo, el flujo contiene lo mínimo necesario un proceso correcto de petición/respuesta. Este flujo lee de la misma cola en la que graban los nodos MQOutput del flujo FanOut, y realiza la salida para la cola en la que el nodo de entrada de FanIn lee -- proporciona un puente de mensajería entre los dos flujos de mensajes. Los mensajes de transfieren a su cola de respuestas (como establecen los nodos MQOutput en el flujo FanOut).
El flujo de mensajes RequestReplyApp se especifica con tres instancias adicionales en el archivo
bar, lo que produce cuatro hebras en total. Esto asegura que las cuatro peticiones se procesen tan rápidamente como sea posible.
Este flujo recibe todas las respuestas del flujo de mensajes RequestReplyApp y las agrega en un solo mensaje de salida. Un nodo MQOutput no puede efectuar la salida del mensaje de salida del nodo
Aggregate Reply, por lo que se añade un nodo Compute para convertir los datos a un formato con el que puedan
grabarse en una cola.
El flujo de mensajes de abanico de entrada (FanIn) tiene también tres instancias adicionales por los mismos motivos que el flujo de mensajes RequestReplyApp. Las tres primeras respuestas entrantes las almacena internamente el intermediario y el estado de agregación almacenado se actualiza. Cuando se procesa la cuarta respuesta, las tres respuestas almacenadas se extraen y los cuatro mensajes de respuesta se incorporan a un mensaje de salida. El estado de ese mensaje no le permite ser transferido a una cola, por lo que el nodo "BuildReply" Compute invoca el siguiente ESQL para rectificar este estado:
CREATE COMPUTE MODULE FanIn_BuildReply
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot.Properties = InputRoot.Properties;
CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID;
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
CREATE LASTCHILD OF OutputRoot DOMAIN 'XML';
CREATE LASTCHILD OF OutputRoot.XML NAME 'ComIbmAggregateReplyBody';
DECLARE next INTEGER 1;
DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
DECLARE repliesOut REFERENCE TO OutputRoot.XML.ComIbmAggregateReplyBody;
WHILE next <= 4 DO -- 4-way aggregation
CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
SET repliesOut.*[next].SaleEnvelope = repliesIn.XML.SaleEnvelope;
MOVE repliesIn NEXTSIBLING;
SET next = next + 1;
END WHILE;
RETURN TRUE;
END;
END MODULE;
El ESQL añade un MQMD rudimentario antes de copiar los datos de ComIbmAggregateReplyBody del mensaje de entrada a un árbol XML del mensaje de salida, mientras mantiene las carpetas y los identificadores de la petición de agregación. El orden de las respuestas no se especifica.
El mensaje de prueba que se utiliza para dirigir el flujo del mensaje de agregación es un mensaje XML sencillo que contiene detalles de facturación para un cliente. Contiene aproximadamente 8 KB de datos, en ocho elementos distintos de <SaleList>.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>