Tablestore入门手册--局部事务
局部事务介绍
表格存储提供的局部事务也可以称为是分区键事务:可以指定某个分区键下的操作是原子的,要么全部成功要么全部失败,并且所提供的隔离级别为串行化。也就是说表格存储的局部事务可以防止以下问题
- 脏读:事务之外的操作读到了尚未提交的写入
- 脏写:事务之外的写入覆盖了本事务尚未提交的写入
- 不可重复读:在事务中的多次对同一行数据的读操作读到了不同的值
- 更新丢失:本事务提交已提交之后被其他并行执行的事务所覆盖(与脏写不同,脏写是两个事务都没有提交时发生的)
局部事务基本使用流程如下图所示
Tablestore的局部事务在启动事务时或首先获取到分区键下的锁,所有后续对该分区键的写操作与启动事务操作都会被阻塞至原事务提交或者超时以保证操作的隔离性,有如下的一些特性:
- 在事务提交或者中止之前,不能有另外一个事务在同分区键下启动事务
- 在事务提交或中止之前,非本事务的写入将被阻塞或超时失败
- 在事务提交和中止之前,非本事务的读取操作无法读取到事务中未提交的写入,而本事务的读操作可以获取到本事务中的写入
局部事务的使用
事务的启动、提交与中止
启动事务
// 局部事务需要指定一个分区键(第一列主键) PrimaryKey transactionPK = new PrimaryKey(Collections.singletonList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)) )); StartLocalTransactionRequest startTransactionRequest = new StartLocalTransactionRequest(TABLE_NAME, transactionPK); StartLocalTransactionResponse startTransactionResponse = syncClient.startLocalTransaction(startTransactionRequest); // 启动事务成功后会返回一个transactionId,任意实现了TxnRequest抽象类的请求都可以通过setTransactionId方法指定事务 final String transactionId = startTransactionResponse.getTransactionID();
提交事务
// 提交事务,所有与该transactionId相关的写入操作将被永久地写入到Tablestore中 syncClient.commitTransaction(new CommitTransactionRequest(transactionId));
中止事务
// 中止事务,所有与改transactionId相关的写入操作将被回滚 syncClient.abortTransaction(new AbortTransactionRequest(transactionId));
基本用法
分别用同一个transactionId两次写入数据并提交,要么全部失败,要么全部失败
写入第一行数据
PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); // 设置事务ID,由启动事务返回 typeAPutRequest.setTransactionId(transactionId); syncClient.putRow(typeAPutRequest);
通过事务ID读取上面写入的数据
GetRowRequest getRowRequest = new GetRowRequest(); SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ); singleRowQueryCriteria.setMaxVersions(1); getRowRequest.setRowQueryCriteria(singleRowQueryCriteria); // 这边需要设置事务ID getRowRequest.setTransactionId(transactionId); GetRowResponse getRowResponse = syncClient.getRow(getRowRequest); // 可以正常获取到数据 Assert.assertNotNull(getRowResponse.getRow());
不通过事务ID读取
GetRowRequest getRowRequest = new GetRowRequest(); SingleRowQueryCriteria singleRowQueryCriteria = new SingleRowQueryCriteria( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ); singleRowQueryCriteria.setMaxVersions(1); getRowRequest.setRowQueryCriteria(singleRowQueryCriteria); GetRowResponse getRowResponse = syncClient.getRow(getRowRequest); // 不设置事务ID获取,由于Tablestore提供的隔离级别为串行化,这边不能读取到未提交的写入 Assert.assertNull(getRowResponse.getRow());
写入第二行数据
PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); typeBPutRequest.setTransactionId(transactionId); syncClient.putRow(typeBPutRequest);
提交
// 提交事务,提交事务后,其他读操作就可以获取到之前的写入 syncClient.commitTransaction(new CommitTransactionRequest(transactionId));
中止事务
// 中止事务,所有与改transactionId相关的写入操作将被回滚 syncClient.abortTransaction(new AbortTransactionRequest(transactionId));
启动事务后使用后其他操作非本事务的操作尝试写入
本示例展示的是一个在事务执行期间有另外一个同分区键的写入时的场景,由于在分区键下启动事务会直接锁定分区键下所有的写操作,在事务执行期间任何向同分区下的写入操作将被阻塞至事务提交或超时。下面的流程图展示了两个线程在通过事务写入的一些情景:
写入第一行
// put row A with transactionID PutRowRequest typeAPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); // set transactionId in startTransactionResponse typeAPutRequest.setTransactionId(transactionId); syncClient.putRow(typeAPutRequest);
写入第二行时不带事务ID,模拟非本事务的操作尝试写入,写入失败,返回的错误码为OTSRowOperationConflict
// put row B without transactionID PutRowRequest typeBPutRequest = new PutRowRequest(new RowPutChange( TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) )) ).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); // put without transactionId, due to PK_USE_ID is locked by another transaction, this action will fail try { syncClient.putRow(typeBPutRequest); Assert.fail(); } catch (TableStoreException e) { // ok Assert.assertEquals("OTSRowOperationConflict", e.getErrorCode()); }
Batch写入
使用batch写入第一行和第二行
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeA)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_a"))); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeB)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_b"))); batchWriteRowRequest.setTransactionId(transactionId); syncClient.batchWriteRow(batchWriteRowRequest);
使用batch写入第三行和第四行
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest(); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeC)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_c"))); batchWriteRowRequest.addRowChange(new RowPutChange(TABLE_NAME, new PrimaryKey(Arrays.asList( new PrimaryKeyColumn(PK_USER_ID, PrimaryKeyValue.fromString(userId)), new PrimaryKeyColumn(PK_TYPE, PrimaryKeyValue.fromString(typeD)) ))).addColumn(COLUMN_CONTENT, ColumnValue.fromString("content_d"))); batchWriteRowRequest.setTransactionId(transactionId); syncClient.batchWriteRow(batchWriteRowRequest);
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
使用 Python 合并多个格式一致的 Excel 文件
使用 Python 合并多个格式一致的 Excel 文件 一 问题描述 最近朋友在工作中遇到这样一个问题,她每天都要处理如下一批 Excel 表格:每个表格的都只有一个 sheet,表格的前两行为表格标题及表头,表格的最后一行是相关人员签字。最终目标是将每个表格的内容合并到一个 Excel 表格中,使之成为一张表格。在她未咨询我之前,每天复制粘贴这一类操作占用了她绝大部分时间。表格样式如下: 二 需求分析 根据她的描述,最终需求应该是这样的:在这一批表格中选取任意一个表格的前两行作为新表格的标题与表头,将这两行内容以嵌套列表的形式插入一个名为 data 空列表中。取每张表格的第3至倒数第二行,剔除空白行的内容。并将所有表格的内容以子列表的方式依次插入 data 列表中。任取一表格的最后一行以子列表的方式插入 data 列表中。最后将 data 列表的内容写入一个新的 Excel 表格中。 三 查阅资料 通过几分钟的上网查询,得出以下结论: 3.1 通过 xlrd 和 xlsxwriter 模块即可解决次需求; 3.2 之所以使用 xlrd 和 xlsxwriter 是因为: xlrd擅...
- 下一篇
Mpx 2.3.0 发布,初步支持输出到 Web 中运行
Mpx 2.3.0发布了。Mpx 是一款致力于提高小程序开发体验的增强型小程序框架,通过 Mpx,开发者能够以最先进的 web 开发体验(Vue + Webpack)开发生产性能深度优化的小程序。 此版本初步支持 Mpx 项目输出到 Web 中运行,使用脚手架初始化新项目时选择跨平台项目并选择输出到 Web 即可进行试用,不过目前输出到 Web 的能力尚不完善,仅能支持相对简单的项目或者配合条件编译使用。 详情查看更新说明: https://github.com/didi/mpx/releases/tag/v2.3.0
相关文章
文章评论
共有0条评论来说两句吧...