This topic explains how to modify the simple aggregation flow provided to fit different requirements. You might also find this helpful if you are porting existing aggregation flows from Version 5 of the product and earlier.
The Control terminal of the Aggregate Control node is used to propagate the control message containing the status and tracking information for a particular aggregation operation. You can use it in three ways:
Factors you wish to bear in mind when choosing one of the options are:
In previous releases of the product option 1 was only allowed if
the timeout configuration setting of the Aggregate Control node was set
to zero, that is, aggregation operations never timed out. In Version 6
of the product, option 1 is the optimum solution in all cases - whether
aggregation timeouts are used or not. It uses the least amount of CPU to store the
aggregation state and simplifies the fan-out flow.
Also in Version 6 of the product, flows that use options 2 and 3 will
transparently default to option 1. In other words, any connections to the
Control terminal of the Aggregate Control node will be ignored. This is to
maximise efficiency of aggregation flows from previous versions of the
product, without rewriting those flows. This behaviour can be reversed so
that control messages are sent to the control terminal, if it is connected,
by exporting the following environment variable to the broker's startup environment:
MQSI_AGGR_COMPAT_MODE=ON
Option 2 is functionally equivalent to option 1 with regard to timeout
processing, but there is an additional processing overhead in the message
propagation from the Control terminal and the subsequent processing in the
Aggregate Reply node. You can avoid this overhead by not connecting the
Control terminal of the Aggregate Control node, as in option 1.
Option 3 is the most complex of the choices available, and you should be aware that choosing this option has implications regarding the performance and behaviour of message aggregation. For the processing in the aggregation mechanism to be most efficient, the Aggregate Reply node in the Fan-in message flow should have received information from the Aggregate Control node in the Fan-out message flow before receiving any messages on its In terminal.
In some situations this may not be the case, for example where the
processing performed on a fanned-out request message is very fast,
and at the same time the delivery of the message from the Control terminal
of the Aggregate Control node is delayed by processing in the Compute node.
This can potentially cause an issue in the processing of the Aggregate
Reply node.
However, the aggregation will still complete correctly if
Unknown Message Timeout on the Aggregate Reply node is not zero. In this case,
unknown replies are stored and then re-processed after the number of timeout
period, and at this point they are consolidated in the control
state information. The aggregation operation
still completes correctly after the delay caused by the unknown message timeout expiring.
If you set the Unknown Message Timeout to zero, replies
that come in ahead of the control message will be propagated directly
to the Unknown terminal of the Aggregate Reply node and will not be
consolidated with the rest of the aggregation data.
Option 3 would look similar to this truncated image:
The AddMqmd Compute node will contain ESQL similar to this to allow the
control message to be written to a queue:
CREATE COMPUTE MODULE AggregationOut_AddMqmd
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot = InputRoot;
CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID;
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
RETURN TRUE;
END;
END MODULE;
You should set the Transaction Mode of the MQInput node on the fan-out flow to Yes to avoid the possibility of reply messages being received by the fan-in flow before the control information has been stored by the Aggregate Control node.
When the fan-out flow is not run under transaction control, each aggregate fan-out request message written by the MQOutput nodes is eligible to be processed immediately by the application being invoked. Depending on the response from this application, the reply might be received by the Aggregate Reply node before the control information has been stored.
The same problem can occur if the Control terminal of the Aggregate Control node is wired to a Compute node and an MQOutput node, that is, the fan-in and fan-out flows sit in different message flows. Even if the fan-out is done under a transaction, the extent of the transaction is the writing of the message by the MQOutput node, so replies can still be processed as unknown while the Control branch of the fan-in flow is processing. This is discussed under option 3 in the section above.
In both cases the aggregation will still complete correctly if Unknown Message Timeout on the Aggregate Reply node is not zero. Unknown replies are stored and then re-processed after the number of seconds specified, and at this point they are consolidated in the control state information. The aggregation operation will still complete correctly after the delay caused by the unknown message timeout expiring. If you set the Unknown Message Timeout to zero, replies that come in ahead of the control message will be propagated directly to the Unknown terminal of the Aggregate Reply node and will not be consolidated with the rest of the aggregation data.
To summarize both this section and the previous one, the most efficient design for an Aggregation fan-out flow should ensure that:
If you adopt these two design recommendations, it will ensure that the timeout issues of the type described above will not occur. However, if for some reason it is not possible to follow these recommendations, you can still set the Unknown Message Timeout parameter on the Aggregate Reply node to a non-zero value to ensure that the aggregation operations complete successfully.
The Aggregate Reply node has three non-error output terminals: Out, Unknown, and Timeout. It is important to understand when and why messages are sent from these different terminals, paying particular attention to transactional context. The transactional context can either be owned by the MQInput node (this will occur when an incoming reply message triggers output from the Aggregate Reply node) or it can be owned by the Aggregate Reply node itself.
Where it is owned by the Aggregate Reply node, the transaction context has the usual semantics, but transactional control is centred on the Aggregate Reply node rather than the MQInput node. The implications of this are that an error produced later in the message flow will cause the Aggregate Reply node to roll back and propagate a message to its Catch terminal, rather than the MQInput node. Messages propagated from the Aggregate Reply node within the context of its own transactions are not directly supplied from the MQInput node; the Aggregate Reply node is the input node for this flow invocation.
The transactional behaviour of the Aggregate Reply node is controlled by its Transaction Mode configuration parameter. You should make sure that this setting and the setting on the MQInput node are the same, to ensure that all output from the Aggregate Reply node is under the same level of transactional control.
The following situations are under the transactional control of the MQInput node:
The following situations are under the transactional control of the Aggregate Reply node:
The Aggregate Reply node has two input terminals: In and Control.
If you use both of these terminals, remembering that the use of the Control terminal is optional,
the most efficient way to supply data to the Aggregate Reply node is to
have a single MQInput node for the fan-in flow followed by a Filter node.
The Filter node is used to route an incoming message to the In or
Control terminals of the Aggregate Reply node as appropriate. Use this
approach rather than using two MQInput nodes in the message flow: one for
the In terminal and one for the Control terminal. The fan-in flow needs
to look like this:
The Filter node will need an ESQL module similar to the one shown
below to ensure that the messages are routed to the appropriate
terminal of the Aggregate Reply node:
CREATE FILTER MODULE FanIn_Filter
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
IF Root.XML.ComIbmAggregateControlNode IS NULL THEN
RETURN TRUE; -- wired to In
ELSE
RETURN FALSE; -- wired to Control
END IF;
END;
END MODULE;
You should use a single MQInput node because there is no means of specifying how any additional threads (made available by the use of additional instances) should be distributed between the two MQInput nodes. Traffic on the Aggregate Reply node's In terminal is likely to be higher, therefore it would be most useful to have more threads running in its input node, but there is no way to configure this. It is therefore possible for the node to be starved of threads, backing up reply messages and stalling the aggregation mechanism.
This discussion only applies if the Control terminal of the Aggregate Control node is wired to output to a queue. By not wiring the Control terminal you can overcome the issues discussed in this section.
As a last resort, if the above solution cannot be implemented for some reason, then you can force the MQInput node that is reading control messages to run single-threaded. You can accomplish this under the Advanced panel of the MQInput node configuration, by setting Order Mode to be By Queue Order and by selecting Logical Order. This frees up all of the configured additional instances to be used by the other MQInput node. You should do this only as a last resort because the performance of the first MQInput node will be severely limited.