始める前に
入力ノードは、ノードからの出力が正しい形式になっている限り、 どんなタイプの外部ソース (ファイル・システム、キュー、またはデータベースなど) からでも、 他の Java プログラムと同じようにデータを受信することができます。
ユーザーは入力データを格納する入力バッファー (またはビット・ストリーム) を準備し、これをメッセージ・オブジェクトと関連付けます。 MbInputNode クラスの createMessage メソッドを使用してバイト・アレイからメッセージを作成し、 このメッセージから有効なメッセージ・アセンブリーを生成します。 これらのメソッドについての詳細は、com/ibm/broker/plugin/package-overview.htmlを参照してください。 たとえば、ファイルから入力データを読み取るには、次のようにします。
メッセージ・アセンブリーを作成したら、それをノードのターミナルの 1 つに伝搬することができます。
MbOutputTerminal out = getOutputTerminal("out"); out.propagate(newAssembly);
ブローカー・インフラストラクチャーは、 メッセージ処理の完了時の WebSphere MQ やデータベースの作業単位のコミットの制御などのトランザクション問題を処理します。 しかし、ユーザー定義のノードが使用されている場合、 ブローカーはリソースの更新を自動的にコミットすることはできません。
各メッセージ・フロー・スレッドは、 各メッセージ・フローごとに維持されているスレッドのプールから割り振られ、 run メソッドで実行を開始します。
ユーザー定義ノードが戻り値を使用して、トランザクションが正常かどうかを示し、 トランザクションをコミットするかロールバックするか、 また、いつスレッドをプールに戻すかを制御します。 ブローカーのインフラストラクチャーが処理されない例外を検出した場合、 トランザクションはロールバックされます。
トランザクションとスレッド化の動作は、以下の適切な戻り値を使用して判断します。
public int run( MbMessageAssembly assembly ) throws MbException { byte[] data = getDataWithTimeout(); // user supplied method // returns null if timeout if( data == null ) return TIMEOUT; MbMessage msg = createMessage( data ); msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE ); MbMessageAssembly newAssembly = new MbMessageAssembly( assembly, msg ); dispatchThread(); getOutputTerminal( "out" ).propagate( newAssembly ); return SUCCESS_RETURN; }
mbException クラスを使用して、 例外をキャッチして例外に接続します。 mbException クラスは、 ブローカー例外リスト内の例外の子を表す例外オブジェクトの配列を戻します。 戻される各エレメントは、その例外タイプを指定します。 例外に子が存在しない場合には、空の配列が戻されます。 次のコード・サンプルは、MbException クラスの使用法の例を示しています。
public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException { try { // plug-in functionality } catch(MbException ex) { traverse(ex, 0); throw ex; // if re-throwing, it must be the original exception that was caught } } void traverse(MbException ex, int level) { if(ex != null) { // Do whatever action here System.out.println("Level: " + level); System.out.println(ex.toString()); System.out.println("traceText: " + ex.getTraceText()); // traverse the hierarchy MbException e[] = ex.getNestedExceptions(); int size = e.length; for(int i = 0; i < size; i++) { traverse(e[i], level + 1); } } }
mbException クラスの使用法の詳細については、javadoc を参照してください。
すべての現行の例外にアクセスできるような仕方で、ユーザー定義メッセージ処理ノードまたは出力ノードを開発できます。 たとえば、 データベース例外をキャッチするには MbSQLStatement クラスを使用できます。 このクラスは「throwExceptionOnDatabaseError」属性の値を設定します。 この属性はデータベース・エラーの発生時のブローカー動作を決定します。 true に設定すると、例外がスローされる場合、ユーザー定義拡張機能がキャッチして処理できます。
次のコード・サンプルは、 MbSQLStatement クラスの使用法の例を示しています。
public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException { MbMessage newMsg = new MbMessage(assembly.getMessage()); MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg); String table = assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName(); MbSQLStatement state = createSQLStatement( "dbName", "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" ); state.setThrowExceptionOnDatabaseError(false); state.setTreatWarningsAsErrors(true); state.select( assembly, newAssembly ); int sqlCode = state.getSQLCode(); if(sqlCode != 0) { // Do error handling here System.out.println("sqlCode = " + sqlCode); System.out.println("sqlNativeError = " + state.getSQLNativeError()); System.out.println("sqlState = " + state.getSQLState()); System.out.println("sqlErrorText = " + state.getSQLErrorText()); } getOutputTerminal("out").propagate(assembly); }