Before you start
An input node can receive data from any type of external source, such as a file system, a queue or a database, in the same way as any other Java program, as long as the output from the node is in the correct format.
You provide an input buffer (or bit stream) to contain input data, and associate it with a message object. You create a message from a byte array using the createMessage method of the MbInputNode class, and then generate a valid message assembly from this message. For details of these methods, see theJava API. For example, to read the input from a file:
When you have created a message assembly, you can then propagate it to one of the node's terminals.
MbOutputTerminal out = getOutputTerminal("out"); out.propagate(newAssembly);
The broker infrastructure handles transaction issues such as controlling the commit of any WebSphere MQ or database unit of work when message processing has completed. However, if a user-defined node is used, any resource updates cannot be automatically committed by the broker.
Each message flow thread is allocated from a pool of threads maintained for each message flow, and starts execution in the run method.
The user-defined node uses return values to indicate whether a transaction has been successful, to control whether transactions are committed or rolled-back, and to control when the thread is returned to the pool. Any unhandled exceptions are caught by the broker infrastructure, and the transaction is rolled back.
You determine the behavior of transactions and threads using an appropriate return value from the following:
public int run( MbMessageAssembly assembly ) throws MbException { byte[] data = getDataWithTimeout(); // user supplied method // returns null if timeout if( data == null ) return TIMEOUT; MbMessage msg = createMessage( data ); msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE ); MbMessageAssembly newAssembly = new MbMessageAssembly( assembly, msg ); dispatchThread(); getOutputTerminal( "out" ).propagate( newAssembly ); return SUCCESS_RETURN; }
You use the mbException class to catch and access exceptions. The mbException class returns an array of exception objects representing the children of an exception in the broker exception list. Each element returned specifies its exception type. An empty array is returned if an exception has no children. The following code sample shows an example of the usage of the MbException class.
public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException { try { // plug-in functionality } catch(MbException ex) { traverse(ex, 0); throw ex; // if re-throwing, it must be the original exception that was caught } } void traverse(MbException ex, int level) { if(ex != null) { // Do whatever action here System.out.println("Level: " + level); System.out.println(ex.toString()); System.out.println("traceText: " + ex.getTraceText()); // traverse the hierarchy MbException e[] = ex.getNestedExceptions(); int size = e.length; for(int i = 0; i < size; i++) { traverse(e[i], level + 1); } } }
Refer to the javadoc for more details of using the mbException class.
You can develop a user-defined message processing or output node in such a way that it can access all current exceptions. For example, to catch database exceptions you can use the MbSQLStatement class. This class sets the value of the 'throwExceptionOnDatabaseError' attribute, which determines broker behavior when it encounters a database error. When it is set to true, if an exception is thrown, it can be caught and handled by the user-defined extension.
The following code sample shows an example of how to use the MbSQLStatement class.
public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException { MbMessage newMsg = new MbMessage(assembly.getMessage()); MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg); String table = assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName(); MbSQLStatement state = createSQLStatement( "dbName", "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" ); state.setThrowExceptionOnDatabaseError(false); state.setTreatWarningsAsErrors(true); state.select( assembly, newAssembly ); int sqlCode = state.getSQLCode(); if(sqlCode != 0) { // Do error handling here System.out.println("sqlCode = " + sqlCode); System.out.println("sqlNativeError = " + state.getSQLNativeError()); System.out.println("sqlState = " + state.getSQLState()); System.out.println("sqlErrorText = " + state.getSQLErrorText()); } getOutputTerminal("out").propagate(assembly); }
Notices |
Trademarks |
Downloads |
Library |
Support |
Feedback
![]() ![]() |
as24987_ |