Before you start
Read the topic Creating an input node in C.
An input node can receive data from any type of external source, such as a file system or FTP connections, provided the output from the node is in the correct format. For connections to queues or databases, use the built-in nodes and the API calls supplied, principally because the built-in nodes are already set up for error handling. Do not use the MQGET or MQPUT calls for direct access to WebSphere MQ queues.
{ static char* functionName = (char *)"_Input_run()"; void* buffer; CciTerminal* terminalObject; int buflen = 4096; int rc = CCI_SUCCESS; int rcDispatch = CCI_SUCCESS; buffer = readFromDevice(&buflen); cniSetInputBuffer(&rc, message, buffer, buflen); } /*propagate etc*/
An input node has a responsibility to perform appropriate end-of-message processing when a message has been propagated through a message flow. Specifically, the input node needs to cause any transactions to be committed or rolled back, and return threads to the thread pool.
Each message flow thread is allocated from a pool of threads that is maintained for each message flow, and starts execution in the cniRun function. You determine the behavior of a thread using the cniDispatchThread utility function, together with the appropriate return value.
From the cniRun function, you can call the cniDispatchThread utility function to cause another thread to start executing the cniRun function. The most appropriate time to execute another thread is directly after you have established that there could be data available for the function to process on the new thread.
The term transaction is used generically to describe either a globally coordinated transaction, or a broker-controlled transaction. Globally coordinated transactions are coordinated by either WebSphere MQ as an XA-compliant Transaction Manager, or Resource Recovery Service (RRS) on z/OS. WebSphere Message Broker controls transactions by committing (or rolling back) any database resources, and then committing any WebSphere MQ units of work. However, if a user-defined node is used, the broker cannot automatically commit any resource updates. The user-defined node uses return values to indicate whether a transaction has been successful, and to control whether transactions are committed or rolled-back. The broker infrastructure catches any unhandled exceptions, and rolls back the transaction.
The following table describes each of the supported return values, the effect that each one has on any transactions, and what the broker does with the current thread.
Return value | Affect on transaction | Broker action on the thread |
---|---|---|
CCI_SUCCESS_CONTINUE | Committed | Calls the same thread again in the cniRun function. |
CCI_SUCCESS_RETURN | Committed | Returns the thread to the thread pool. |
CCI_FAILURE_CONTINUE | Rolled back | Calls the same thread again in the cniRun function. |
CCI_FAILURE_RETURN | Rolled back | Returns the thread to the thread pool. |
CCI_TIMEOUT | Not applicable. The function periodically times out while it is waiting for an input message. | Calls the same thread again in the cniRun function. |
{ ... cniDispatchThread(&rcDispatch, ((NODE_CONTEXT_ST *)context)->nodeObject); ... if (rcDispatch == CCI_NO_THREADS_AVAILABLE) return CCI_SUCCESS_CONTINUE; else return CCI_SUCCESS_RETURN; }
Before you propagate a message, decide what message flow data you want to propagate, and which terminal is to receive the data.
The terminalObject is derived from a list that the user-defined node maintains.
if (terminalObject) { if (cniIsTerminalAttached(&rc, terminalObject)) { if (rc == CCI_SUCCESS) { cniPropagate(&rc, terminalObject, localEnvironment, exceptionList, message); } }
In the above example, the cniIsTerminalAttached function is used to test whether the message can be propagated to the specified terminal. If you do not use the cniIsTerminalAttached function, and the terminal is not attached to another node by a connector, the message is not propagated and no warning message is returned. Use the cniIsTerminalAttached function to prevent this error occurring.