A amostra Agregação demonstra uma operação de agregação simples de quatro vias, utilizando os nós Aggregate Control, Request e Reply. Ela contém três fluxos de mensagens para implementar uma agregação de quatro vias: FanOut, RequestReplyApp e FanIn.
Esse é o fluxo que utiliza a mensagem de pedido que chega, gera quatro mensagens de pedido diferentes, as envia no pedido/resposta e inicia o rastreio da operação de agregação:
Observe que o terminal Controle do nó AggregateControl não está conectado e que o Modo de Transação do nó MQInput está definido como Sim.
Você pode saber mais sobre as razões desse design em Estendendo a Amostra Agregação.
O fluxo de FanOut contém os nós Aggregate Control e Request, que são utilizados para iniciar o processamento de agregação. O nó Aggregate Control propaga a mensagem de pedido para cada uma das quatro ramificações conectadas a seu terminal Saída (em nenhuma ordem definida). Cada ramificação tem um nó "BuildRequest" Compute para gerar o pedido individual. O seguinte ESQL é utilizado no nó BuildRequest1 Compute:
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
O procedimento CopyQuarter copia os cabeçalhos da mensagem de entrada e, em seguida, extrai um quarto dos elementos <SaleList>. No exemplo fornecido, existem oito elementos <SaleList>, portanto, cada mensagem de pedido conterá dois deles. O ESQL para esse procedimento é o seguinte:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
IN output REFERENCE,
IN jumps INTEGER)
BEGIN
CALL CopyMessageHeaders(input, output);
CREATE LASTCHILD OF output DOMAIN 'XML';
CREATE LASTCHILD OF output.XML NAME 'SaleEnvelope';
DECLARE xmlIn REFERENCE TO input.XML.SaleEnvelope;
DECLARE xmlOut REFERENCE TO output.XML.SaleEnvelope;
IF LASTMOVE(xmlOut) <> TRUE THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('could not create output message');
END IF;
DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
DECLARE quarter INTEGER invoices/4;
IF invoices <> (quarter*4) THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('not divisible by 4', invoices);
END IF;
IF jumps > 3 THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('too many jumps', jumps);
END IF;
DECLARE count INTEGER 1;
DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
WHILE count <= quarter DO
SET xmlOut.SaleList[count] = copyRef;
MOVE copyRef NEXTSIBLING;
SET count = count + 1;
END WHILE;
END;
Existe uma verificação inicial do status das entradas (o número de elementos <SaleList> deve ser divisível por quatro e o quarto requerido é selecionado por 0, 1, 2 ou 3) antes do número adequado de elementos <SaleList> ser copiado da mensagem de entrada para a mensagem de saída.
O procedimento CopyMessageHeaders, conforme chamado no procedimento CopyQuarter, baseia-se no procedimento CopyMessageHeaders padrão fornecido no ESQL gerado para um novo nó Compute. Para maximizar a reutilização, isso foi movido para o escopo do arquivo ESQL, para que todos os nós Compute pudessem chamar o mesmo procedimento.
Esse novo escopo tem uma implicação importante, necessitando uma alteração no procedimento. Em um nó Compute, a referência OutputRoot tem propriedades especiais que asseguram automaticamente que as informações do domínio sejam preservadas quando os elementos da árvore de mensagens forem copiados de InputRoot para OutputRoot. No entanto, nesse caso, OutputRoot é transmitido como uma referência para um procedimento externo, portanto, as informações do domínio devem ser explicitamente preservadas. Isso é executado pela inclusão do comando CREATE LASTCHILD:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
Depois do nó "BuildRequest" Compute ter gerado a mensagem de pedido definindo o nó Compute para Pass LocalEnvironment and Message, ele é colocado por um nó MQOutput na fila AGGR_SAMPLE_REQUEST. (Para simplificar esse exemplo, os quatro pedidos são colocados na mesma fila, mas isso provavelmente não é realístico para um aplicativo real). Cada nó Aggregate Request tem um FolderName especificado como um parâmetro de configuração, que é utilizado pelo nó Aggregate Reply ao anexar as várias respostas na mensagem de resposta agregada. "Request1" utiliza o primeiro quarto da mensagem de entrada, "Request2" o segundo quarto e assim por diante.
Os nós MQOutput são definidos para especificar AGGR_SAMPLE_REPLY como a fila ReplyTo nas mensagens de pedido - isso é utilizado pelo fluxo de mensagens RequestReplyApp.
Depois das quatro mensagens de pedido serem geradas, o nó Aggregate Control armazena o estado da agregação internamente no servidor intermediário. As seguintes etapas ocorrerão para executar isso:
Existem outras maneiras de executar isso - consulte Estendendo a Amostra para obter detalhes adicionais.
Esse fluxo inteiro deve ser feito sob uma transação, com transactionMode definido como YES no nó MQInput, porque será mais eficiente se a última operação (o armazenamento do estado de operação de agregação) for concluída antes de todas as respostas serem recebidas.
Utilize esse fluxo para simular os aplicativos de serviço backend que normalmente processariam as mensagens de pedido da operação de agregação. Em um sistema real, elas poderiam ser outros fluxos de mensagens ou aplicativos existentes, mas esse nível de complexidade não é requerido para a amostra Agregação - portanto, o fluxo contém o mínimo requerido para o processamento correto de pedido/resposta. Esse fluxo lê a partir da mesma fila em que os nós MQOutput no fluxo FanOut são gravados e é colocado na fila a partir da qual o nó de entrada no fluxo FanIn é lido - ela fornece uma ponte de sistema de mensagens entre os dois fluxos. As mensagens são colocadas em sua fila de resposta (conforme definido pelos nós MQOutput no fluxo FanOut).
O fluxo RequestReplyApp é especificado com três instâncias adicionais no arquivo bar, resultando em quatro encadeamentos no total. Isso assegura que os quatro pedidos sejam processados o mais rápido possível.
Esse fluxo recebe todas as respostas do fluxo RequestReplyApp
e as agrega em uma única mensagem de saída. A mensagem de saída do nó Aggregate Reply não pode ser gerada por um nó MQOutput, portanto, um nó Compute é incluído para colocar os dados em um formato no qual eles possam ser gravados em uma fila.
O fluxo de mensagens FanIn também pode ter três instâncias adicionais, pelas mesmas razões do fluxo RequestReplyApp. As três primeiras respostas que chegam são armazenadas internamente pelo servidor intermediário e o estado da agregação armazenada é atualizado. Quando a quarta resposta é processada, as três respostas armazenadas são extraídas e as quatro mensagens de resposta são construídas em uma mensagem de saída. Essa mensagem não fica em um estado no qual possa ser colocada em uma fila, portanto o nó "BuildReply" Compute chama o seguinte ESQL para retificar isso:
CREATE COMPUTE MODULE FanIn_BuildReply
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot.Properties = InputRoot.Properties;
CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID;
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
CREATE LASTCHILD OF OutputRoot DOMAIN 'XML';
CREATE LASTCHILD OF OutputRoot.XML NAME 'ComIbmAggregateReplyBody';
DECLARE next INTEGER 1;
DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
DECLARE repliesOut REFERENCE TO OutputRoot.XML.ComIbmAggregateReplyBody;
WHILE next <= 4 DO -- 4-way aggregation
CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
SET repliesOut.*[next].SaleEnvelope = repliesIn.XML.SaleEnvelope;
MOVE repliesIn NEXTSIBLING;
SET next = next + 1;
END WHILE;
RETURN TRUE;
END;
END MODULE;
O ESQL inclui um MQMD rudimentar antes de copiar os dados de ComIbmAggregateReplyBody na mensagem de entrada para uma árvore XML na mensagem de saída, enquanto mantém os identificadores e pastas do pedido agregado. A ordem das respostas não é especificada.
A mensagem de teste utilizada para conduzir o fluxo de mensagens de agregação é uma mensagem XML direta que contém detalhes de fatura para um cliente. Ela contém aproximadamente 8 KB de dados, em oito elementos <SaleList> separados.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>