并发修改的情况下,保证数据的一致性。
模拟的场景
- 先从用户表中读取到用户的余额
- 在内存中对余额进行
+1
操作 - 把
+1
后的余额写到DB
建表SQL
CREATE TABLE `user` (
`id` int(11) NOT NULL COMMENT 'id',
`balance` int(255) DEFAULT NULL COMMENT '余额',
`version` int(255) DEFAULT NULL COMMENT '版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_croatian_ci;
INSERT INTO `demo`.`user`(`id`, `balance`, `version`) VALUES (1, 0, 0);
Main
启动 100个线程对user表中,id为1的balance字段进行: 读取 , +1 ,写入的操作
import java.sql.DriverManager;
import java.sql.SQLException;
public class Main {
private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/demo?serverTimezone=GMT%2b8";
private static final String USER = "root";
private static final String PASS = "root";
public static void main(String[] args) {
Service service = new Service();
for (int x = 0; x < 100; x++) {
new Thread(() -> {
try {
service.increment(DriverManager.getConnection(MYSQL_URL, USER, PASS));
} catch (SQLException e) {
e.printStackTrace();
}
}).start();
}
}
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
Service
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class Service {
private static final int USER_ID = 1;
// jdk同步锁
public synchronized void incrementSynchronized(Connection connection) throws SQLException {
this.increment(connection);
}
// 无锁,不能保证数据一致性
public void increment(Connection connection) throws SQLException {
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
// 从DB检索到余额
PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` FROM `user` WHERE `id` = ?;");
preparedStatement.setInt(1, USER_ID);
ResultSet resultSet = preparedStatement.executeQuery();
int balance = 0;
if (resultSet.next()) {
balance = resultSet.getInt("balance");
}
// +1后写入到DB
preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? WHERE `id` = ?;");
preparedStatement.setInt(1, balance + 1);
preparedStatement.setInt(2, USER_ID);
preparedStatement.executeUpdate();
// 提交事务
connection.commit();
}
// mysql悲观锁
public void incrementCasLock(Connection connection) throws SQLException {
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
// 从DB检索到余额
PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` FROM `user` WHERE `id` = ? FOR UPDATE;");
preparedStatement.setInt(1, USER_ID);
ResultSet resultSet = preparedStatement.executeQuery();
int balance = 0;
if (resultSet.next()) {
balance = resultSet.getInt("balance");
}
// +1后写入到DB
preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? WHERE `id` = ?;");
preparedStatement.setInt(1, balance + 1);
preparedStatement.setInt(2, USER_ID);
preparedStatement.executeUpdate();
// 提交事务
connection.commit();
}
// cas 乐观锁
public void incrementCas(Connection connection) throws SQLException {
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
while (true) {
connection.setAutoCommit(false);
// 从DB检索到余额和版本号
PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` ,`version` FROM `user` WHERE `id` = ?;");
preparedStatement.setInt(1, USER_ID);
ResultSet resultSet = preparedStatement.executeQuery();
int balance = 0;
int version = 0;
if (resultSet.next()) {
balance = resultSet.getInt("balance");
version = resultSet.getInt("version");
}
// +1后写入到DB
preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? ,`version` = `version` + 1 WHERE `id` = ? AND `version` = ?;");
preparedStatement.setInt(1, balance + 1);
preparedStatement.setInt(2, USER_ID);
preparedStatement.setInt(3, version); // 版本号
int result = preparedStatement.executeUpdate();
connection.commit();
if(result != 0) {
break;
}
// 更新失败,再次进入循环
}
}
}