Extending the capability of a Java input node

Before you start

Ensure that you have read and understood the following topics:
After you have created a user-defined node, the following functions are available:
  1. Receiving external data into a buffer
  2. Propagating the message
  3. Controlling threading and transactionality
  4. Handling exceptions

Receiving external data into a buffer

An input node can receive data from any type of external source, such as a file system, a queue or a database, in the same way as any other Java program, as long as the output from the node is in the correct format.

You provide an input buffer (or bit stream) to contain input data, and associate it with a message object. You create a message from a byte array using the createMessage method of the MbInputNode class, and then generate a valid message assembly from this message. For details of these methods, see theJava API. For example, to read the input from a file:

  1. Create an input stream to read from the file:
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. Create a byte array the size of the input file:
    byte[] buffer = new byte[inputStream.available()];
  3. Read from the file into the byte array:
    inputStream.read(buffer);
  4. Close the input stream:
    inputStream.close();
  5. Create a message to put on the queue:
    MbMessage msg = createMessage(buffer);
  6. Create a new message assembly to hold this message:
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
    MbMessageAssembly newAssembly =
         new MbMessageAssembly(assembly, msg);

Propagating the message

When you have created a message assembly, you can then propagate it to one of the node's terminals.

For example, to propagate the message assembly to the "out" terminal :
MbOutputTerminal out = getOutputTerminal("out");
out.propagate(newAssembly);

Controlling threading and transactionality

The broker infrastructure handles transaction issues such as controlling the commit of any WebSphere MQ or database unit of work when message processing has completed. However, if a user-defined node is used, any resource updates cannot be automatically committed by the broker.

Each message flow thread is allocated from a pool of threads maintained for each message flow, and starts execution in the run method.

The user-defined node uses return values to indicate whether a transaction has been successful, to control whether transactions are committed or rolled-back, and to control when the thread is returned to the pool. Any unhandled exceptions are caught by the broker infrastructure, and the transaction is rolled back.

You determine the behavior of transactions and threads using an appropriate return value from the following:

MbInputNode.SUCCESS_CONTINUE
The transaction is committed and the broker calls the run method again using the same thread.
MbInputNode.SUCCESS_RETURN
The transaction is committed and the thread is returned to the thread pool, assuming that it is not the only thread for this message flow.
MbInputNode.FAILURE_CONTINUE
The transaction is rolled back and the broker calls the run method again using the same thread.
MbInputNode.FAILURE_RETURN
The transaction is rolled back and the thread is returned to the thread pool, assuming that it is not the only thread for this message flow.
MbInputNode.TIMEOUT
The run method must not block indefinitely while waiting for input data to arrive. While the flow is blocked by user code, you cannot shutdown or reconfigure the broker. The run method must yield control to the broker periodically by returning from the run method. If input data has not been received after a certain period (for example, 5 seconds), the method should return with the TIMEOUT return code. Assuming that the broker does not need to reconfigure or shutdown, the input node's run method gets called again straight away.
To create multithreaded message flows, you call the dispatchThread method after a message has been created, but before the message is propagated to an output terminal. This ensures that only one thread is waiting for data while other threads are processing the message. New threads are obtained from the thread pool up to the maximum limit specified by the additionalInstances attribute of the message flow. For example:
public int run( MbMessageAssembly assembly ) throws MbException
{
  byte[] data = getDataWithTimeout();  // user supplied method
                                       // returns null if timeout
  if( data == null )
    return TIMEOUT;

  MbMessage msg = createMessage( data );
  msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
  MbMessageAssembly newAssembly =
       new MbMessageAssembly( assembly, msg );

  dispatchThread();

  getOutputTerminal( "out" ).propagate( newAssembly );

  return SUCCESS_RETURN;
}

Handling exceptions

You use the mbException class to catch and access exceptions. The mbException class returns an array of exception objects representing the children of an exception in the broker exception list. Each element returned specifies its exception type. An empty array is returned if an exception has no children. The following code sample shows an example of the usage of the MbException class.

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    try
      {

        // plug-in functionality

      }
    catch(MbException ex)
      {
        traverse(ex, 0);

        throw ex; // if re-throwing, it must be the original exception that was caught
      }
  }

  void traverse(MbException ex, int level)
  {
    if(ex != null)
      {
        // Do whatever action here
        System.out.println("Level: " + level);
        System.out.println(ex.toString());
        System.out.println("traceText:  " + ex.getTraceText());

        // traverse the hierarchy
        MbException e[] = ex.getNestedExceptions();
        int size = e.length;
        for(int i = 0; i < size; i++)
          {
            traverse(e[i], level + 1);
          }
      }
  }

Refer to the javadoc for more details of using the mbException class.

You can develop a user-defined message processing or output node in such a way that it can access all current exceptions. For example, to catch database exceptions you can use the MbSQLStatement class. This class sets the value of the 'throwExceptionOnDatabaseError' attribute, which determines broker behavior when it encounters a database error. When it is set to true, if an exception is thrown, it can be caught and handled by the user-defined extension.

The following code sample shows an example of how to use the MbSQLStatement class.

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    MbMessage newMsg = new MbMessage(assembly.getMessage());
    MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);

    String table = 
       assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName();

    MbSQLStatement state = createSQLStatement( "dbName", 
       "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" );

    state.setThrowExceptionOnDatabaseError(false);
    state.setTreatWarningsAsErrors(true);

    state.select( assembly, newAssembly );

    int sqlCode = state.getSQLCode();
    if(sqlCode != 0)
      {
        // Do error handling here

        System.out.println("sqlCode = " + sqlCode);
        System.out.println("sqlNativeError = " + state.getSQLNativeError());
        System.out.println("sqlState = " + state.getSQLState());
        System.out.println("sqlErrorText = " + state.getSQLErrorText());
      }

    getOutputTerminal("out").propagate(assembly);
  }
Related information
Java API