并发修改的情况下,保证数据的一致性

并发修改的情况下,保证数据的一致性。

模拟的场景

  1. 先从用户表中读取到用户的余额
  2. 在内存中对余额进行+1操作
  3. +1后的余额写到DB

建表SQL

CREATE TABLE `user` (
  `id` int(11) NOT NULL COMMENT 'id',
  `balance` int(255) DEFAULT NULL COMMENT '余额',
  `version` int(255) DEFAULT NULL COMMENT '版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_croatian_ci;

INSERT INTO `demo`.`user`(`id`, `balance`, `version`) VALUES (1, 0, 0);

Main

启动 100个线程对user表中,id为1的balance字段进行: 读取 , +1 ,写入的操作

import java.sql.DriverManager;
import java.sql.SQLException;
public class Main {
	
	private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/demo?serverTimezone=GMT%2b8";
	private static final String USER = "root";
	private static final String PASS = "root";
	public static void main(String[] args) {
		Service service = new Service();
		for (int x = 0; x < 100; x++) {
			new Thread(() -> {
				try {
					service.increment(DriverManager.getConnection(MYSQL_URL, USER, PASS));
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}).start();
		}
	}

	static {
		try {
			Class.forName("com.mysql.cj.jdbc.Driver");
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}
}

Service

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class Service {

	private static final int USER_ID = 1;
	
	// jdk同步锁
	public synchronized void incrementSynchronized(Connection connection) throws SQLException {
		this.increment(connection);
	}

	
	// 无锁,不能保证数据一致性
	public void  increment(Connection connection) throws SQLException {

		connection.setAutoCommit(false);
		connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);

		// 从DB检索到余额
		PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` FROM `user` WHERE `id` = ?;");
		preparedStatement.setInt(1, USER_ID);
		ResultSet resultSet = preparedStatement.executeQuery();
		int balance = 0;
		if (resultSet.next()) {
			balance = resultSet.getInt("balance");	
		}

		// +1后写入到DB
		preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? WHERE `id` = ?;");
		preparedStatement.setInt(1, balance + 1);
		preparedStatement.setInt(2, USER_ID);
		preparedStatement.executeUpdate();

		// 提交事务
		connection.commit();
	}
	
	// mysql悲观锁
	public void incrementCasLock(Connection connection) throws SQLException {
		connection.setAutoCommit(false);
		connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);

		// 从DB检索到余额
		PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` FROM `user` WHERE `id` = ? FOR UPDATE;");
		preparedStatement.setInt(1, USER_ID);
		ResultSet resultSet = preparedStatement.executeQuery();
		int balance = 0;
		if (resultSet.next()) {
			balance = resultSet.getInt("balance");	
		}

		// +1后写入到DB
		preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? WHERE `id` = ?;");
		preparedStatement.setInt(1, balance + 1);
		preparedStatement.setInt(2, USER_ID);
		preparedStatement.executeUpdate();

		// 提交事务
		connection.commit();
	}

	// cas 乐观锁
	public void incrementCas(Connection connection) throws SQLException {
		
		connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
		
		while (true) {
			
			connection.setAutoCommit(false);
			
			// 从DB检索到余额和版本号
			PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` ,`version` FROM `user` WHERE `id` = ?;");
			preparedStatement.setInt(1, USER_ID);
			ResultSet resultSet = preparedStatement.executeQuery();
			int balance = 0;
			int version = 0;
			if (resultSet.next()) {
				balance = resultSet.getInt("balance");
				version = resultSet.getInt("version");
			}

			// +1后写入到DB
			preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? ,`version` = `version` + 1 WHERE `id` = ? AND `version` = ?;");
			preparedStatement.setInt(1, balance + 1);
			preparedStatement.setInt(2, USER_ID);
			preparedStatement.setInt(3, version);		// 版本号
			int result = preparedStatement.executeUpdate();
			
			connection.commit();
			
			if(result != 0) {
				break;
			}
			// 更新失败,再次进入循环
		}
	}
}

1 Like