MySQL重復(fù)數(shù)據(jù)處理的七種高效方法
1. 重復(fù)數(shù)據(jù)插入問(wèn)題分析
1.1 問(wèn)題本質(zhì)
在 MySQL 中,當(dāng)我們使用主鍵或唯一索引來(lái)確保數(shù)據(jù)唯一性時(shí),如果插入重復(fù)數(shù)據(jù),MySQL 會(huì)拋出類似這樣的異常:
java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'xxx' for key 'xxx'
這個(gè)異常本質(zhì)上是數(shù)據(jù)庫(kù)告訴我們:"兄 dei,這條數(shù)據(jù)已經(jīng)存在了,別再塞了!"
唯一鍵定義:唯一鍵可以是主鍵或唯一索引,二者在觸發(fā)唯一性約束時(shí)行為一致。主鍵是特殊的唯一索引,區(qū)別在于主鍵不允許 NULL 值且一個(gè)表只能有一個(gè)主鍵,而唯一索引則可以有多個(gè)且允許 NULL 值。需注意,對(duì)于普通唯一索引,MySQL 將多個(gè) NULL 視為不同值,因此可以插入多條 NULL 唯一鍵的記錄;而主鍵則完全不允許 NULL。選擇約束類型時(shí)應(yīng)考慮字段是否允許為 NULL 的業(yè)務(wù)需求。
1.2 常見(jiàn)場(chǎng)景圖
2. 基礎(chǔ)解決方案:使用異常捕獲
最基礎(chǔ)的方案是使用 try-catch 捕獲異常,代碼如下:
public void insertUser(User user) { try { userMapper.insert(user); log.info("用戶數(shù)據(jù)插入成功"); } catch (org.springframework.dao.DuplicateKeyException e) { // Spring框架對(duì)SQLIntegrityConstraintViolationException的封裝 log.warn("用戶數(shù)據(jù)已存在: {}", user.getUsername()); // 可以選擇忽略或更新現(xiàn)有數(shù)據(jù) } catch (java.sql.SQLIntegrityConstraintViolationException e) { // 使用JDBC直接操作時(shí)可能遇到的原生異常 log.warn("用戶數(shù)據(jù)已存在(JDBC原生異常): {}", user.getUsername()); // 同樣可以處理重復(fù)數(shù)據(jù) } }
這種方法的缺點(diǎn)是:每次遇到重復(fù)數(shù)據(jù)都會(huì)產(chǎn)生一個(gè)異常,異常的創(chuàng)建和捕獲會(huì)帶來(lái)額外的性能開(kāi)銷,尤其在批量操作時(shí)性能損耗更明顯。
3. 改進(jìn)方案:預(yù)檢查+條件插入
一個(gè)改進(jìn)思路是先檢查數(shù)據(jù)是否存在,再?zèng)Q定插入或更新:
public void insertUserWithCheck(User user) { User existingUser = userMapper.selectByUsername(user.getUsername()); if (existingUser == null) { userMapper.insert(user); } else { // 處理重復(fù)數(shù)據(jù),比如更新或忽略 } }
這種方案的核心價(jià)值是減少數(shù)據(jù)庫(kù)異常拋出,而非保證數(shù)據(jù)唯一性。在并發(fā)環(huán)境下存在競(jìng)態(tài)條件:檢查和插入是兩個(gè)獨(dú)立操作,中間可能有其他事務(wù)插入相同數(shù)據(jù)。
解決競(jìng)態(tài)條件的正確方式:
- 必須結(jié)合數(shù)據(jù)庫(kù)唯一索引作為兜底保障
- 即使發(fā)生并發(fā)沖突,最終由數(shù)據(jù)庫(kù)約束保證數(shù)據(jù)唯一性
- 應(yīng)用層做好異常捕獲處理,保證業(yè)務(wù)流程正常進(jìn)行
在高并發(fā)場(chǎng)景下,可以考慮使用分布式鎖進(jìn)一步控制并發(fā)問(wèn)題,增加續(xù)租機(jī)制確保業(yè)務(wù)完成前鎖不會(huì)釋放:
@Transactional(rollbackFor = Exception.class) public void insertUserWithLock(User user) { // 獲取分布式鎖(采用Redisson實(shí)現(xiàn)自動(dòng)續(xù)租) String lockKey = "user_register:" + user.getUsername(); RLock lock = redissonClient.getLock(lockKey); try { // 嘗試獲取鎖,設(shè)置自動(dòng)續(xù)租(看門狗機(jī)制) boolean locked = lock.tryLock(5, 30, TimeUnit.SECONDS); if (locked) { User existingUser = userMapper.selectByUsername(user.getUsername()); if (existingUser == null) { userMapper.insert(user); } else { // 處理重復(fù)數(shù)據(jù) } } else { throw new BusinessException("操作頻繁,請(qǐng)稍后重試"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("操作被中斷"); } finally { // 確保鎖釋放 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
4. 高效解決方案
4.1 INSERT IGNORE 語(yǔ)句
MySQL 提供了 INSERT IGNORE 語(yǔ)句,當(dāng)遇到重復(fù)數(shù)據(jù)時(shí)會(huì)自動(dòng)忽略錯(cuò)誤:
@Insert("INSERT IGNORE INTO user(username, email, password) VALUES(#{username}, #{email}, #{password})") int insertIgnore(User user);
執(zhí)行流程如下:
注意:受影響的行數(shù)是實(shí)際成功插入的行數(shù),被忽略的行不計(jì)入受影響的行數(shù)。這點(diǎn)在批量操作時(shí)尤為重要,返回值只反映實(shí)際插入的記錄數(shù),而非處理的總記錄數(shù)。
4.2 ON DUPLICATE KEY UPDATE 語(yǔ)句
如果需要在遇到重復(fù)時(shí)更新數(shù)據(jù),可以使用 ON DUPLICATE KEY UPDATE:
@Insert("INSERT INTO user(username, email, password, login_count) " + "VALUES(#{username}, #{email}, #{password}, #{loginCount}) " + "ON DUPLICATE KEY UPDATE " + "email = IF(email = VALUES(email), email, VALUES(email)), " + // 僅當(dāng)值變化時(shí)更新,避免無(wú)謂更新 "login_count = login_count + 1") // 累加操作必然更新 int insertOrUpdateLoginCount(User user);
這條語(yǔ)句會(huì)在遇到重復(fù)主鍵或唯一索引時(shí),執(zhí)行 UPDATE 操作而不是插入。
注意:使用IF(字段 = VALUES(字段), 字段, VALUES(字段))
可以避免"靜默更新"問(wèn)題——當(dāng)新值與舊值相同時(shí),MySQL 不會(huì)真正執(zhí)行更新操作,受影響的行數(shù)為 0。這種寫法確保只有在值真正變化時(shí)才更新。
受影響的行數(shù)意義:
- 1: 新插入記錄或更新了已有記錄(值發(fā)生變化)
- 0: 行被更新但值未變化
- 2: 合并了多個(gè)唯一索引沖突的記錄(較少見(jiàn))
4.3 REPLACE INTO 語(yǔ)句
REPLACE INTO 是另一種處理方式,它會(huì)先嘗試插入數(shù)據(jù),如果出現(xiàn)重復(fù)則刪除舊記錄,再插入新記錄:
@Insert("REPLACE INTO user(id, username, email, password) VALUES(#{id}, #{username}, #{email}, #{password})") int replaceUser(User user);
執(zhí)行過(guò)程:
重要風(fēng)險(xiǎn)提示:
- 如果表存在外鍵約束,刪除舊記錄可能觸發(fā)級(jí)聯(lián)刪除,導(dǎo)致關(guān)聯(lián)數(shù)據(jù)丟失
- 使用自增主鍵時(shí),每次 REPLACE 都會(huì)生成新的主鍵值,導(dǎo)致主鍵值跳躍
- 大量使用 REPLACE 會(huì)導(dǎo)致更頻繁的行刪除再插入,增加表碎片和鎖競(jìng)爭(zhēng)
適用場(chǎng)景:無(wú)外鍵依賴、無(wú)需保留歷史版本、完全覆蓋舊數(shù)據(jù)的場(chǎng)景。
5. 批量處理優(yōu)化
對(duì)于批量數(shù)據(jù)處理,逐條插入效率低下。下面是更安全的批量插入方案(避免 SQL 注入風(fēng)險(xiǎn)):
@Mapper public interface UserMapper { @Insert("<script>" + "INSERT INTO user(username, email, password) VALUES " + "<foreach collection='users' item='user' separator=','>" + "(#{user.username}, #{user.email}, #{user.password})" + "</foreach>" + " ON DUPLICATE KEY UPDATE " + "email = VALUES(email), " + "password = VALUES(password)" + "</script>") int batchInsertOrUpdate(@Param("users") List<User> users); @Insert("<script>" + "INSERT IGNORE INTO user(username, email, password) VALUES " + "<foreach collection='users' item='user' separator=','>" + "(#{user.username}, #{user.email}, #{user.password})" + "</foreach>" + "</script>") int batchInsertIgnore(@Param("users") List<User> users); }
使用 JdbcTemplate 時(shí)也要注意避免 SQL 注入:
public int batchInsertWithJdbcTemplate(List<User> users) { String sql = "INSERT INTO user(username, email, password) VALUES (?, ?, ?) " + "ON DUPLICATE KEY UPDATE email = VALUES(email)"; return jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { User user = users.get(i); ps.setString(1, user.getUsername()); ps.setString(2, user.getEmail()); ps.setString(3, user.getPassword()); } @Override public int getBatchSize() { return users.size(); } }).length; }
6. Spring Boot 整合方案
在 Spring Boot 項(xiàng)目中,我們可以結(jié)合 MyBatis 實(shí)現(xiàn)更優(yōu)雅的解決方案:
@Service public class UserService { @Autowired private UserMapper userMapper; /** * 插入用戶數(shù)據(jù),遇到重復(fù)則更新 */ @Transactional(rollbackFor = Exception.class) public boolean insertOrUpdateUser(User user) { return userMapper.insertOrUpdate(user) > 0; } /** * 批量插入用戶數(shù)據(jù),忽略重復(fù) */ @Transactional(rollbackFor = Exception.class) public int batchInsertIgnore(List<User> users) { if (CollectionUtils.isEmpty(users)) { return 0; } // 大批量數(shù)據(jù),分批處理避免事務(wù)過(guò)大 int batchSize = 500; // 根據(jù)實(shí)際數(shù)據(jù)大小和數(shù)據(jù)庫(kù)配置調(diào)整 int totalInserted = 0; for (int i = 0; i < users.size(); i += batchSize) { List<User> batch = users.subList(i, Math.min(users.size(), i + batchSize)); totalInserted += userMapper.batchInsertIgnore(batch); } return totalInserted; } } // UserMapper接口 public interface UserMapper { @Insert("INSERT INTO user(username, email, password) " + "VALUES(#{username}, #{email}, #{password}) " + "ON DUPLICATE KEY UPDATE " + "email = VALUES(email), password = VALUES(password)") int insertOrUpdate(User user); @Insert("<script>INSERT IGNORE INTO user(username, email, password) VALUES " + "<foreach collection='list' item='user' separator=','>" + "(#{user.username}, #{user.email}, #{user.password})" + "</foreach></script>") int batchInsertIgnore(@Param("list") List<User> users); }
7. 實(shí)用異常處理封裝
為了使代碼更健壯,我們可以封裝一個(gè)通用的異常處理工具:
public class MySqlExceptionHelper { // MySQL錯(cuò)誤碼常量 public static final int ER_DUP_ENTRY = 1062; // 重復(fù)鍵錯(cuò)誤碼 /** * 執(zhí)行可能出現(xiàn)重復(fù)鍵的數(shù)據(jù)庫(kù)操作 * @param operation 數(shù)據(jù)庫(kù)操作 * @param duplicateKeyHandler 重復(fù)鍵處理器 * @return 處理結(jié)果 */ public static <T> T executeWithDuplicateKeyHandling( Supplier<T> operation, Function<Exception, T> duplicateKeyHandler) { try { return operation.get(); } catch (DataAccessException e) { // 提取原始異常 Throwable cause = e.getCause(); if (cause instanceof SQLException) { SQLException sqlEx = (SQLException) cause; // 通過(guò)錯(cuò)誤碼判斷,而非不可靠的字符串匹配 if (sqlEx.getErrorCode() == ER_DUP_ENTRY) { // 調(diào)用重復(fù)鍵處理器 return duplicateKeyHandler.apply((Exception) cause); } } // 重新拋出其他異常 throw e; } } }
使用示例:
public boolean insertUserSafely(User user) { return MySqlExceptionHelper.executeWithDuplicateKeyHandling( // 正常插入邏輯 () -> { int rows = userMapper.insert(user); return rows > 0; }, // 重復(fù)鍵處理邏輯 ex -> { log.warn("用戶{}已存在,嘗試更新", user.getUsername()); int rows = userMapper.updateByUsername(user); return rows > 0; } ); }
8. 不同方案性能對(duì)比
下面是各種方案在不同場(chǎng)景下的性能對(duì)比(基于實(shí)際測(cè)試數(shù)據(jù)):
性能測(cè)試環(huán)境:MySQL 8.0, 16G 內(nèi)存, SSD 存儲(chǔ), 1 萬(wàn)條記錄,20%重復(fù)率
索引掃描對(duì)性能的影響:
INSERT IGNORE
和ON DUPLICATE KEY UPDATE
直接利用唯一索引的 B+樹(shù)結(jié)構(gòu)快速判斷重復(fù),僅需一次索引查找操作- 預(yù)檢查方案則需要額外的索引查詢和多次與數(shù)據(jù)庫(kù)交互,增加網(wǎng)絡(luò)延遲和查詢成本
- 當(dāng)使用唯一索引的前綴索引(如
CREATE UNIQUE INDEX idx_name ON user(username(20))
)時(shí),判斷重復(fù)只比較前 N 個(gè)字符,需確保前綴長(zhǎng)度足夠區(qū)分業(yè)務(wù)數(shù)據(jù)
事務(wù)隔離級(jí)別的影響: 在 RR(Repeatable Read)隔離級(jí)別下,預(yù)檢查方案可能讀到舊版本數(shù)據(jù),而在實(shí)際插入時(shí)才發(fā)現(xiàn)數(shù)據(jù)已被其他事務(wù)插入,導(dǎo)致出現(xiàn)明明檢查過(guò)卻仍觸發(fā)唯一鍵異常的問(wèn)題。而在 RC(Read Committed)隔離級(jí)別下,ON DUPLICATE KEY UPDATE
使用快照讀,可能減少鎖等待;而 RR 隔離級(jí)別下可能觸發(fā)間隙鎖,增加鎖范圍,進(jìn)一步影響并發(fā)性能。
9. 方案原理對(duì)比
各種方案在鎖機(jī)制、事務(wù)行為上存在顯著差異:
比較表:
方案 | 鎖行為 | 鎖范圍 | 事務(wù)復(fù)雜度 | 主鍵變化 | 并發(fā)友好度 |
---|---|---|---|---|---|
INSERT IGNORE | 只鎖沖突時(shí)不操作 | 最小 | 簡(jiǎn)單 | 不變 | 最高 |
ON DUPLICATE KEY UPDATE | 鎖已有行并更新 | 中等 | 中等 | 不變 | 中等 |
REPLACE INTO | 鎖已有行,刪除后再插入 | 最大 | 復(fù)雜(刪除+插入) | 自增主鍵會(huì)變化 | 最低 |
分布式鎖+預(yù)檢查 | 全局分布式鎖 | 跨服務(wù) | 高 | 不變 | 較低 |
在高并發(fā)寫入場(chǎng)景,INSERT IGNORE
的鎖競(jìng)爭(zhēng)最小,性能最優(yōu);而REPLACE INTO
可能導(dǎo)致更多的鎖等待和死鎖風(fēng)險(xiǎn)。
10. 應(yīng)用場(chǎng)景案例
10.1 用戶注冊(cè)場(chǎng)景
用戶注冊(cè)時(shí),需要確保用戶名或郵箱唯一:
@Service public class UserRegistrationService { @Autowired private UserMapper userMapper; public RegisterResult register(RegisterRequest request) { User user = new User(); user.setUsername(request.getUsername()); user.setEmail(request.getEmail()); user.setPassword(encryptPassword(request.getPassword())); user.setCreateTime(new Date()); try { // 使用INSERT IGNORE插入 int result = userMapper.insertIgnore(user); if (result > 0) { // 成功插入新用戶 return RegisterResult.success(); } else { // 用戶名已存在 // 查詢是否是用戶名沖突 User existingUser = userMapper.selectByUsername(user.getUsername()); if (existingUser != null) { return RegisterResult.usernameExists(); } else { // 可能是郵箱沖突 return RegisterResult.emailExists(); } } } catch (Exception e) { log.error("注冊(cè)異常", e); return RegisterResult.error("系統(tǒng)異常"); } } }
10.2 數(shù)據(jù)導(dǎo)入場(chǎng)景
批量導(dǎo)入用戶數(shù)據(jù),忽略重復(fù)記錄:
@Service public class DataImportService { @Autowired private UserMapper userMapper; @Autowired private MetricsService metricsService; // 監(jiān)控服務(wù) @Transactional(rollbackFor = Exception.class) public ImportResult importUsers(List<UserDTO> userDTOs) { ImportResult result = new ImportResult(); // 數(shù)據(jù)預(yù)處理和驗(yàn)證 List<User> validUsers = userDTOs.stream() .filter(this::isValidUserData) .map(this::convertToUser) .collect(Collectors.toList()); if (validUsers.isEmpty()) { result.setMessage("沒(méi)有有效數(shù)據(jù)"); return result; } // 分批處理,每批500條 int batchSize = 500; List<List<User>> batches = new ArrayList<>(); for (int i = 0; i < validUsers.size(); i += batchSize) { batches.add(validUsers.subList(i, Math.min(validUsers.size(), i + batchSize))); } int totalImported = 0; int totalDuplicated = 0; List<String> errors = new ArrayList<>(); for (List<User> batch : batches) { try { int batchCount = batch.size(); int imported = userMapper.batchInsertIgnore(batch); totalImported += imported; totalDuplicated += (batchCount - imported); // 記錄監(jiān)控指標(biāo) metricsService.recordMetrics( "user_import_success", imported, "user_import_duplicate", batchCount - imported, "user_import_duplicate_ratio", (batchCount - imported) * 100.0 / batchCount ); } catch (Exception e) { log.error("導(dǎo)入批次異常", e); errors.add("批次導(dǎo)入錯(cuò)誤: " + e.getMessage()); } } result.setTotalProcessed(validUsers.size()); result.setSuccessCount(totalImported); result.setDuplicateCount(totalDuplicated); result.setErrors(errors); return result; } }
10.3 分布式 ID 生成器場(chǎng)景
基于數(shù)據(jù)庫(kù)序列的分布式 ID 生成方案,確保生成的 ID 全局唯一:
@Service public class SequenceGenerator { @Autowired private JdbcTemplate jdbcTemplate; /** * 獲取指定業(yè)務(wù)類型的ID序列段 * @param type 業(yè)務(wù)類型 * @param step 步長(zhǎng)(一次獲取多少個(gè)ID) * @return 起始ID,應(yīng)用可在內(nèi)存中遞增使用 */ public long getNextIdBatch(String type, int step) { // 使用悲觀鎖確保并發(fā)安全 String selectSql = "SELECT current_id FROM id_generator WHERE type = ? FOR UPDATE"; Long currentId = jdbcTemplate.queryForObject(selectSql, Long.class, type); if (currentId == null) { // 首次使用,初始化序列 String insertSql = "INSERT INTO id_generator(type, current_id, step) VALUES(?, 0, ?)"; jdbcTemplate.update(insertSql, type, step); currentId = 0L; } // 更新序列值 String updateSql = "UPDATE id_generator SET current_id = current_id + ? WHERE type = ?"; jdbcTemplate.update(updateSql, step, type); // 返回當(dāng)前批次的起始ID return currentId; } } // ID生成器表結(jié)構(gòu) /* CREATE TABLE id_generator ( type VARCHAR(50) PRIMARY KEY, current_id BIGINT NOT NULL, step INT NOT NULL DEFAULT 1000, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); */
10.4 金融交易場(chǎng)景的冪等性設(shè)計(jì)
金融系統(tǒng)中,支付交易必須確保冪等性,避免重復(fù)扣款:
@Service public class PaymentService { @Autowired private TransactionMapper transactionMapper; @Autowired private AccountMapper accountMapper; /** * 執(zhí)行支付交易(冪等操作) * 通過(guò)訂單號(hào)+交易類型作為唯一鍵,確保同一筆交易只執(zhí)行一次 */ @Transactional(rollbackFor = Exception.class) public PaymentResult pay(String orderNo, String accountId, BigDecimal amount) { // 創(chuàng)建交易記錄(使用唯一約束確保冪等) TransactionDO transaction = new TransactionDO(); transaction.setOrderNo(orderNo); transaction.setType("PAYMENT"); transaction.setAccountId(accountId); transaction.setAmount(amount); transaction.setStatus("PROCESSING"); transaction.setCreateTime(new Date()); // 嘗試插入交易記錄,如果已存在則返回0 int affected = transactionMapper.insertIgnore(transaction); if (affected == 0) { // 交易已存在,查詢狀態(tài)返回 TransactionDO existingTx = transactionMapper.selectByOrderNoAndType(orderNo, "PAYMENT"); return new PaymentResult(existingTx.getStatus(), "交易已處理"); } try { // 執(zhí)行實(shí)際扣款邏輯(此部分必須保證原子性) boolean success = accountMapper.deductBalance(accountId, amount) > 0; // 更新交易狀態(tài) if (success) { transactionMapper.updateStatus(orderNo, "PAYMENT", "SUCCESS"); return new PaymentResult("SUCCESS", "支付成功"); } else { transactionMapper.updateStatus(orderNo, "PAYMENT", "FAILED"); return new PaymentResult("FAILED", "余額不足"); } } catch (Exception e) { // 異常情況更新交易狀態(tài) transactionMapper.updateStatus(orderNo, "PAYMENT", "ERROR"); throw e; // 向上拋出異常觸發(fā)事務(wù)回滾 } } } @Mapper public interface TransactionMapper { @Insert("INSERT IGNORE INTO transactions(order_no, type, account_id, amount, status, create_time) " + "VALUES(#{orderNo}, #{type}, #{accountId}, #{amount}, #{status}, #{createTime})") int insertIgnore(TransactionDO transaction); @Select("SELECT * FROM transactions WHERE order_no = #{orderNo} AND type = #{type}") TransactionDO selectByOrderNoAndType(@Param("orderNo") String orderNo, @Param("type") String type); @Update("UPDATE transactions SET status = #{status}, update_time = NOW() " + "WHERE order_no = #{orderNo} AND type = #{type}") int updateStatus(@Param("orderNo") String orderNo, @Param("type") String type, @Param("status") String status); }
10.5 實(shí)時(shí)數(shù)據(jù)同步場(chǎng)景
設(shè)備實(shí)時(shí)數(shù)據(jù)采集系統(tǒng),確保只保留每臺(tái)設(shè)備每個(gè)時(shí)間點(diǎn)的最新數(shù)據(jù):
@Service public class DeviceMetricsService { @Autowired private MetricsMapper metricsMapper; /** * 記錄設(shè)備實(shí)時(shí)指標(biāo)數(shù)據(jù) * 使用device_id+timestamp作為唯一鍵,確保同一時(shí)間點(diǎn)只保留最新數(shù)據(jù) */ public void recordMetric(String deviceId, Date timestamp, Double value, String metricType) { DeviceMetric metric = new DeviceMetric(); metric.setDeviceId(deviceId); metric.setTimestamp(timestamp); metric.setValue(value); metric.setMetricType(metricType); metric.setCreateTime(new Date()); // 使用REPLACE INTO確保只保留最新值 metricsMapper.replaceMetric(metric); } /** * 批量記錄設(shè)備指標(biāo)(高性能版本) */ public void batchRecordMetrics(List<DeviceMetric> metrics) { if (CollectionUtils.isEmpty(metrics)) { return; } // 分批處理,每批200條 int batchSize = 200; for (int i = 0; i < metrics.size(); i += batchSize) { List<DeviceMetric> batch = metrics.subList(i, Math.min(metrics.size(), i + batchSize)); metricsMapper.batchReplaceMetrics(batch); } } } @Mapper public interface MetricsMapper { @Insert("REPLACE INTO device_metrics(device_id, timestamp, metric_type, value, create_time) " + "VALUES(#{deviceId}, #{timestamp}, #{metricType}, #{value}, #{createTime})") int replaceMetric(DeviceMetric metric); @Insert("<script>" + "REPLACE INTO device_metrics(device_id, timestamp, metric_type, value, create_time) VALUES " + "<foreach collection='metrics' item='metric' separator=','>" + "(#{metric.deviceId}, #{metric.timestamp}, #{metric.metricType}, " + "#{metric.value}, #{metric.createTime})" + "</foreach>" + "</script>") int batchReplaceMetrics(@Param("metrics") List<DeviceMetric> metrics); }
11. 跨庫(kù)分表場(chǎng)景的去重方案
在分庫(kù)分表架構(gòu)中,數(shù)據(jù)被分散到不同的物理表中,單靠數(shù)據(jù)庫(kù)唯一索引無(wú)法跨庫(kù)保證唯一性:
實(shí)現(xiàn)示例:
@Service public class ShardingUserService { @Autowired private List<UserMapper> shardedMappers; // 不同分片的mapper @Autowired private ConsistentHash consistentHash; // 一致性哈希服務(wù) /** * 跨分片用戶注冊(cè),確保用戶名全局唯一 */ public RegisterResult registerWithSharding(RegisterRequest request) { // 1. 先查詢?nèi)治ㄒ凰饕?,確認(rèn)用戶名不存在 String username = request.getUsername(); // 使用分布式鎖防止并發(fā)插入 String lockKey = "user:register:" + username; try (RedisLockWrapper lock = new RedisLockWrapper(redissonClient, lockKey)) { if (!lock.tryLock(5, TimeUnit.SECONDS)) { return RegisterResult.busy(); } // 2. 檢查全局用戶名索引 if (usernameIndexMapper.exists(username)) { return RegisterResult.usernameExists(); } // 3. 生成全局唯一用戶ID String userId = SnowflakeIdGenerator.nextId(); // 4. 確定分片(使用一致性哈希算法) int shardIndex = consistentHash.getShardIndex(username); UserMapper targetMapper = shardedMappers.get(shardIndex); // 5. 插入用戶數(shù)據(jù)到對(duì)應(yīng)分片 User user = createUserFromRequest(request, userId); targetMapper.insert(user); // 6. 插入全局用戶名索引(使用INSERT IGNORE防止并發(fā)) UserNameIndex index = new UserNameIndex(username, userId, shardIndex); usernameIndexMapper.insertIgnore(index); return RegisterResult.success(userId); } catch (Exception e) { log.error("分片用戶注冊(cè)異常", e); return RegisterResult.error("系統(tǒng)異常"); } } } /** * 基于虛擬節(jié)點(diǎn)的一致性哈希實(shí)現(xiàn) */ @Component public class ConsistentHash { private final TreeMap<Long, Integer> virtualNodes = new TreeMap<>(); private final int numberOfReplicas; // 虛擬節(jié)點(diǎn)數(shù)量 private final int shardCount; // 實(shí)際分片數(shù) public ConsistentHash(@Value("${sharding.virtual-nodes:160}") int numberOfReplicas, @Value("${sharding.shard-count:4}") int shardCount) { this.numberOfReplicas = numberOfReplicas; this.shardCount = shardCount; // 初始化虛擬節(jié)點(diǎn) for (int i = 0; i < shardCount; i++) { addShard(i); } } private void addShard(int shardIndex) { for (int i = 0; i < numberOfReplicas; i++) { String nodeKey = shardIndex + "-" + i; long hash = hash(nodeKey); virtualNodes.put(hash, shardIndex); } } public int getShardIndex(String key) { if (virtualNodes.isEmpty()) { return 0; } long hash = hash(key); // 找到第一個(gè)大于等于hash的節(jié)點(diǎn) Map.Entry<Long, Integer> entry = virtualNodes.ceilingEntry(hash); // 如果沒(méi)有找到,則取第一個(gè)節(jié)點(diǎn) if (entry == null) { entry = virtualNodes.firstEntry(); } return entry.getValue(); } private long hash(String key) { // 使用MurmurHash獲得更均勻的哈希分布 return Hashing.murmur3_128().hashString(key, StandardCharsets.UTF_8).asLong(); } }
12. 總結(jié)
下面表格全面總結(jié)了各種 MySQL 重復(fù)數(shù)據(jù)處理方案的特性和適用場(chǎng)景:
方案 | 優(yōu)點(diǎn) | 缺點(diǎn) | 適用場(chǎng)景 | 鎖粒度 | 事務(wù)復(fù)雜度 | 實(shí)現(xiàn)復(fù)雜度 | 維護(hù)成本 |
---|---|---|---|---|---|---|---|
try-catch 異常捕獲 | 實(shí)現(xiàn)簡(jiǎn)單,通用性強(qiáng) | 性能較低,異常開(kāi)銷大 | 單條插入,低頻操作 | 插入行 | 簡(jiǎn)單 | 低 | 低 |
預(yù)檢查+條件插入 | 邏輯清晰 | 存在并發(fā)問(wèn)題,需要額外查詢 | 單用戶操作,并發(fā)低場(chǎng)景 | 查詢+插入行 | 中等 | 中 | 中 |
INSERT IGNORE | 語(yǔ)法簡(jiǎn)單,性能最佳 | 無(wú)法獲知哪些記錄被忽略 | 只需插入不存在記錄場(chǎng)景 | 僅沖突行 | 簡(jiǎn)單 | 低 | 低 |
ON DUPLICATE KEY UPDATE | 一條語(yǔ)句完成插入或更新 | SQL 較長(zhǎng),需要指定更新字段 | 需要更新已存在記錄場(chǎng)景 | 已有行 | 中等 | 中 | 中(需維護(hù)更新字段) |
REPLACE INTO | 語(yǔ)法簡(jiǎn)單,總是保證最新數(shù)據(jù) | 會(huì)刪除并重建記錄,可能引發(fā)級(jí)聯(lián)刪除 | 需要完全覆蓋已有數(shù)據(jù)場(chǎng)景 | 舊行+新行 | 復(fù)雜(刪除+插入) | 低 | 中(需注意外鍵) |
批量插入方案 | 高性能,減少數(shù)據(jù)庫(kù)交互 | 實(shí)現(xiàn)較復(fù)雜 | 大批量數(shù)據(jù)導(dǎo)入場(chǎng)景 | 多行 | 較大 | 中 | 中 |
分布式鎖+唯一索引 | 從源頭避免重復(fù)數(shù)據(jù) | 實(shí)現(xiàn)復(fù)雜度高 | 分布式系統(tǒng),跨庫(kù)場(chǎng)景 | 全局分布式鎖 | 高(跨服務(wù)) | 高 | 高(需維護(hù)鎖服務(wù)) |
跨庫(kù)分表去重 | 支持分庫(kù)分表架構(gòu) | 實(shí)現(xiàn)極其復(fù)雜 | 大規(guī)模分布式系統(tǒng) | 分片+全局索引 | 極高 | 極高 | 極高(需分片路由邏輯) |
通過(guò)合理選擇和實(shí)現(xiàn)這些方案,我們可以有效解決 MySQL 中的重復(fù)數(shù)據(jù)處理問(wèn)題,提高系統(tǒng)的健壯性和性能。實(shí)際項(xiàng)目中,往往需要根據(jù)具體場(chǎng)景組合使用不同策略,例如高并發(fā)場(chǎng)景下可能同時(shí)使用分布式鎖、全局唯一 ID 和數(shù)據(jù)庫(kù)唯一索引作為多重保障。
這些方案各有優(yōu)劣,選擇時(shí)需考慮業(yè)務(wù)需求、數(shù)據(jù)量大小、并發(fā)級(jí)別和系統(tǒng)架構(gòu)等因素。在大多數(shù)場(chǎng)景下,使用INSERT IGNORE
和ON DUPLICATE KEY UPDATE
是既簡(jiǎn)單又高效的解決方案,而在分布式系統(tǒng)中,還需要加入全局唯一 ID 和分布式鎖等機(jī)制確保數(shù)據(jù)一致性。
以上就是MySQL重復(fù)數(shù)據(jù)處理的七種高效方法的詳細(xì)內(nèi)容,更多關(guān)于MySQL重復(fù)數(shù)據(jù)處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談MySQL數(shù)據(jù)查詢太多會(huì)OOM嗎
本文主要介紹了淺談MySQL數(shù)據(jù)查詢太多會(huì)OOM嗎?文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08mysql數(shù)據(jù)庫(kù)設(shè)置utf-8編碼的方法步驟
這篇文章主要介紹了mysql數(shù)據(jù)庫(kù)設(shè)置utf-8編碼的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08mysql導(dǎo)入導(dǎo)出數(shù)據(jù)的示例詳解
本文主要介紹了MySQL 導(dǎo)出和導(dǎo)入數(shù)據(jù)的幾種實(shí)現(xiàn)方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05MySQL下將一個(gè)表的數(shù)據(jù)插入到另外一個(gè)表的實(shí)現(xiàn)語(yǔ)句
開(kāi)發(fā)中,我們經(jīng)常需要將一個(gè)表的數(shù)據(jù)插入到另外一個(gè)表,有時(shí)還需要指定導(dǎo)入字段,雖然這個(gè)實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,但是還是會(huì)困擾許多新手,因此專門發(fā)一篇文章備查。2009-09-09MySQL Left JOIN時(shí)指定NULL列返回特定值詳解
我們有時(shí)會(huì)有這樣的應(yīng)用,需要在sql的left join時(shí),需要使值為NULL的列不返回NULL而時(shí)某個(gè)特定的值,比如0。這個(gè)時(shí)候,用is_null(field,0)是行不通的,會(huì)報(bào)錯(cuò)的,可以用ifnull實(shí)現(xiàn),但是COALESE似乎更符合標(biāo)準(zhǔn)2013-07-07MySQL數(shù)據(jù)庫(kù)導(dǎo)出與導(dǎo)入及常見(jiàn)錯(cuò)誤解決
MySQL數(shù)據(jù)庫(kù)導(dǎo)出與導(dǎo)入的過(guò)程中將會(huì)發(fā)生眾多不可預(yù)知的錯(cuò)誤,本文整理了一些常見(jiàn)錯(cuò)誤及相應(yīng)的解決方法,遇到類似情況的朋友可以參考下,希望對(duì)大家有所幫助2013-07-07Mysql?InnoDB?的內(nèi)存結(jié)構(gòu)詳情
這篇文章主要介紹了Mysql InnoDB的內(nèi)存結(jié)構(gòu)詳情,InnoDB存儲(chǔ)引擎的邏輯存儲(chǔ)結(jié)構(gòu)是什么呢,下面我們就一起進(jìn)入文章了解更多詳細(xì)內(nèi)容吧,感興趣的小伙伴可以參考一下2022-05-05SQL中日期與字符串互相轉(zhuǎn)換操作實(shí)例
我們經(jīng)常出于某種目的需要使用各種各樣的日期格式,當(dāng)然我們可以使用字符串操作來(lái)構(gòu)造各種日期格式,下面這篇文章主要給大家介紹了關(guān)于SQL中日期與字符串互相轉(zhuǎn)換操作的相關(guān)資料,需要的朋友可以參考下2022-10-10MySQL Daemon failed to start錯(cuò)誤解決辦法
這篇文章主要介紹了MySQL Daemon failed to start錯(cuò)誤解決辦法的相關(guān)資料,需要的朋友可以參考下2017-01-01