JTA( Transaction API)允许应用执行分布式事务处理--在两个或多个计算机资源上访问并且更新数据。
JDBC程序的JTA支持极大地增强了数据访问能力。
本文的目的是要提供一个关于的Java事务处理API(JTA)的高级的概述,以及与分布式事务相关的内容。一个事务处理定义了一个工作逻辑单元,要么彻底成功要么不产生任何结果。
一个分布式事务处理只是一个在两个或更多网络资源上访问和更新数据的事务处理,因此它在那些资源之间必然是等价的。
在本文中,我们主要关心的是如何处理关系数据库系统。
我们要讨论的分布式事务处理(DTP)中包含的组件是: 1 应用程序 2 应用程序服务器 3 事务管理程序 4 资源适配器 5 资源管理程序 在以后的内容中,我们将描述这些组件以及它们与JTA和数据库访问的关系。最好把分布式事务处理中包含的组件看作是独立的过程,而不是考虑它们在一个特定的中的位置。这些组件中的一些可以保存在单机中,或者也可在好几台机器之间分布。 下面例子中的图表可以显示在一台特定的电脑上的组件,但是这些操作之间的关系是必须首要考虑的。
最简单的例子:用于本地数据库事务处理的应用程序 关系数据库访问的最简单的形式仅仅包括 应用程序、资源管理程序 和 资源适配器。应用程序只不过是发送请求到数据库并且从数据库中获取数据的最终用户访问点
我们讨论的资源管理程序是一个关系数据库管理系统(RDBMS),比如Oracle或者SQL Server。所有的实际数据库管理都是由这个组件处理的。 资源适配器是外部空间之间的通信管道组件,或者是请求翻译器,在本例中,是应用程序和资源管理程序。在我们的讨论中,这是一个JDBC驱动程序。 下面的描述是资源管理程序本地事务处理的一个描述,也就是说,一个事务处理被被限制在一个特定的企业数据库。 应用程序发送一个用于JDBC驱动程序数据的请求,然后翻译这个请求并把它通过网络发送到数据库中。 数据库把数据发送回驱动程序,然后把翻译的结果发送回应用程序,如下图所示:
这个例子说明了在一个简化的系统中的基本的信息流;然而,今天的企业使用的应用程序服务器都添加了其他的组件到这个过程处理中。
应用程序服务器
应用程序服务器是事务处理操作的另一个组件。应用程序服务器处理大部分的应用程序操作并且获得最终用户应用程序的一些负载。基于前面的例子,我们可以看出应用程序服务器在事务处理上添加了另一个操作层:到目前为止,我们的例子说明了单个的本地事务处理,并且描述了分布式事务处理的五个组件中的四个。
第五个组件,事务管理程序只有当事务将要被分配的时候才会开始被考虑。
分布式事务处理和事务管理程序 像我们前面所提到的,一个分布式事务处理是一个在两个或更多资源上访问和更新数据的事务处理。 这些资源可以由好几个位于一个单独服务器上的不同的关系型数据库管理系统组成,比如说Oracle、SQL Server和Sybase;它们也可以包含存在于若干不同的服务器上的同一种数据库的若干个实例。在任何情况下,一个分布式事务处理包括各种的资源管理程序之间的协同作用。这个协同作用是事务管理函数。 事务管理程序负责作出要么提交(commit)要么退回(rollback)任何分布式事务处理的决定。一个提交决定应该导致一个成功的事务处理;而退回操作则是保持数据库中的数据不变。
JTA指定一个分布式事务处理中的事务管理程序和另一个组件之间的标准接口:应用程序,应用程序服务器和资源管理程序。 这个关系被显示在下面的图表中:
javax.transaction.UserTransaction方法开启一个全局事务并且使用调用线程与事务处理关联。
2—Transaction Manager—javax.transaction.TransactionManager接口允许应用程序服务器来控制代表正在管理的应用程序的事务范围。 3—XAResource—javax.transaction.xa.XAResource接口是一个基于X/Open CAE Specification的行业标准XA接口的Java映射。 注意,一个限制性环节是通过JDBC程序的XAResource接口的支持。JDBC驱动程序必须支持两个正常的JDBC交互作用:应用程序和/或应用程序服务器,而且以及JTA的XAResource部分。 编写应用程序水平代码的开发者不会关心分布式事务处理管理的细节。这是分布式事务处理基本结构的工作—应用程序服务器、事务管理程序和JDBC驱动程序。应用程序代码中唯一的需要注意的就是当连接处于一个分布式事务范围内的时候,不应该调用一个会影响事务边界的方法。特别的是,一个应用程序不应该调用Connection方法commit、rollback和setAutoCommit(true),因为它们将破坏分布式事务的基本结构管理。
分布式事务处理 事务管理程序是分布式事务基本结构的基本组件;然而JDBC驱动程序和应用程序服务器组件应该具备下面的特征: 驱动程序应该实现JDBC 2.0应用程序接口,包括Optional Package接口XADataSource和XAConnection以及JTA接口XAResource。 应用程序服务器应该提供一个DataSource类,用来实现与分布式事务基本结的交互以及一个连接池模块(用于改善性能)。 分布式事务处理的第一步就是应用程序要发送一个事务请求到事务管理程序。虽然最后的commit/rollback决定把事务作为一个简单的逻辑单元来对待,但是仍然可能会包括许多事务分支。一个事务分支与一个到包含在分布式事务中的每个资源管理程序相关联。因此,到三个不同的关系数据库管理的请求需要三个事务分支。每个事务分支必须由本地资源管理程序提交或者返回。事务管理程序控制事务的边界,并且负责最后决定应该提交或者返回的全部事务。 这个决定由两个步骤组成,称为Two - Phase Commit Protocol。 在第一步骤中,事务管理程序轮询所有包含在分布式事务中的资源管理程序(关系数据库管理)来看看哪个可以准备提交。如果一个资源管理程序不能提交,它将不响应,并且把事务的特定部分返回,以便数据不被修改。 在第二步骤中,事务管理程序判断否定响应的资源管理程序中是否有能够返回整个事务的。如果没有否定响应的话,翻译管理程序提交整个事务并且返回结果到应用程序中。 开发事项管理程序代码的开发者必须与所有三个JTA接口有关:UserTransaction、TransactionManager和XAResource,这三个接口都被描述在 Sun JTA specification中。JDBC驱动程序开发者只需要关心XAResource接口。这个接口是允许一个资源管理程序参与事务的行业标准X/Open XA协议的Java映射。连接XAResource接口的驱动程序组件负责在事务管理程序和资源管理程序之间担任"翻译"的任务。下面的章节提供了XAResource调用的例子。
JDBC驱动和XAResource
为了简化XAResource的说明,这些例子说明了一个应用程序在不包含应用程序服务器和事项管理程序的情况下应该如何使用JTA。 基本上,这些例子中的应用程序也担任应用程序服务器和事项管理程序的任务。大部分的企业使用事务管理程序和应用程序服务器,因为它们能够比一个应用程序更能够高效地管理分布式事务。 然而遵循这些例子,一个应用程序开发者可以测试在JDBC程序中的JTA支持的健壮性。而且有一些例子可能不是工作在某个特定的数据库上,这是因为关联在数据库上的一些内在的问题。 在使用JTA之前,你必须首先实现一个Xid类用来标识事务(在普通情况下这将由事务管理程序来处理)。Xid包含三个元素:formatID、gtrid(全局事务标识符)和bqual(分支修饰词标识符)。 formatID通常是零,这意味着你将使用OSI CCR(Open Systems Interconnection Commitment, Concurrency和Recovery 标准)来命名。如果你要是用另外一种格式,那么formatID应该大于零。-1值意味着Xid为无效。 gtrid和bqual可以包含64个字节二进制码来分别标识全局事务和分支事务。唯一的要求是gtrid和bqual必须是全局唯一的。此外,这可以通过使用指定在OSI CCR中的命名规则规范来完成。 下面的例子说明Xid的实现: import javax.transaction.xa.*; public class MyXid implements Xid{ protected int formatId; protected byte gtrid[]; protected byte bqual[]; public MyXid() { } public MyXid(int formatId, byte gtrid[], byte bqual[]) { this.formatId = formatId; this.gtrid = gtrid; this.bqual = bqual; } public int getFormatId() { return formatId; } public byte[] getBranchQualifier() { return bqual; } public byte[] getGlobalTransactionId() { return gtrid; }} |
public DataSource getDataSource() throws SQLException { SQLServerDataSource xaDS = new com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource(); xaDS.setDataSourceName("SQLServer"); xaDS.setServerName("server"); xaDS.setPortNumber(1433); xaDS.setSelectMethod("cursor"); return xaDS;} |
XADataSource xaDS; XAConnection xaCon;XAResource xaRes;Xid xid;Connection con;Statement stmt;int ret;xaDS = getDataSource();xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password");xaRes = xaCon.getXAResource();con = xaCon.getConnection();stmt = con.createStatement();xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});try { xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid); if (ret == XAResource.XA_OK) { xaRes.commit(xid, false); }}catch (XAException e) { e.printStackTrace();}finally { stmt.close(); con.close(); xaCon.close();} |
xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)");xaRes.end(xid, XAResource.TMSUCCESS);ret = xaRes.prepare(xid);if (ret == XAResource.XA_OK) { xaRes.rollback(xid);} |
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xaRes.start(xid, XAResource.TMNOFLAGS);stmt.executeUpdate("insert into test_table values (100)");xaRes.end(xid, XAResource.TMSUSPEND);∥这个更新在事务范围之外完成,所以它不受XA返回影响。stmt.executeUpdate("insert into test_table2 values (111)");xaRes.start(xid, XAResource.TMRESUME);stmt.executeUpdate("insert into test_table values (200)");xaRes.end(xid, XAResource.TMSUCCESS);ret = xaRes.prepare(xid);if (ret == XAResource.XA_OK) { xaRes.rollback(xid);} |
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22});xaRes.start(xid1, XAResource.TMNOFLAGS);stmt.executeUpdate("insert into test_table1 values (100)");xaRes.end(xid1, XAResource.TMSUCCESS);xaRes.start(xid2, XAResource.TMNOFLAGS);ret = xaRes.prepare(xid1);if (ret == XAResource.XA_OK) { xaRes.commit(xid2, false);}stmt.executeUpdate("insert into test_table2 values (200)");xaRes.end(xid2, XAResource.TMSUCCESS);ret = xaRes.prepare(xid2);if (ret == XAResource.XA_OK) { xaRes.rollback(xid2);} |
xaDS = getDataSource(); xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password");xaRes1 = xaCon1.getXAResource();con1 = xaCon1.getConnection();stmt1 = con1.createStatement();xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02});xaRes1.start(xid1, XAResource.TMNOFLAGS);stmt1.executeUpdate("insert into test_table1 values (100)");xaRes1.end(xid, XAResource.TMSUCCESS);xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password");xaRes2 = xaCon1.getXAResource();con2 = xaCon1.getConnection();stmt2 = con1.createStatement();if (xaRes2.isSameRM(xaRes1)) { xaRes2.start(xid1, XAResource.TMJOIN); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid1, XAResource.TMSUCCESS);}else { xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03}); xaRes2.start(xid2, XAResource.TMNOFLAGS); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid2, XAResource.TMSUCCESS); ret = xaRes2.prepare(xid2); if (ret == XAResource.XA_OK) { xaRes2.commit(xid2, false); }}ret = xaRes1.prepare(xid1);if (ret == XAResource.XA_OK) { xaRes1.commit(xid1, false);} |
MyXid[] xids; xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN);for (int i=0; xids!=null && i try { xaRes.rollback(xids[i]); } catch (XAException ex) { try { xaRes.forget(xids[i]); } catch (XAException ex1) { System.out.println("rollback/forget failed: " + ex1.errorCode); }}} |