SpringBoot處理死信隊(duì)列的方法詳解
在項(xiàng)目開(kāi)發(fā)中,消息隊(duì)列是重要的組件,而死信隊(duì)列(Dead Letter Queue, DLQ)作為處理異常消息的關(guān)鍵機(jī)制,直接影響系統(tǒng)的穩(wěn)定性和可靠性。
當(dāng)消息因各種原因(如消費(fèi)失敗、消息過(guò)期、隊(duì)列已滿)無(wú)法正常處理時(shí),這些消息會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列。
本文將分享四種死信隊(duì)列處理方式。
一、原生消費(fèi)者處理方式
1.1 處理原理
最直接的死信隊(duì)列處理方式是針對(duì)死信隊(duì)列設(shè)置專(zhuān)門(mén)的消費(fèi)者,定期消費(fèi)并處理死信消息。
這種方式利用消息中間件(如RabbitMQ)的原生特性,通過(guò)配置死信交換機(jī)(Dead Letter Exchange, DLX)和死信隊(duì)列來(lái)收集異常消息,然后由專(zhuān)門(mén)的服務(wù)進(jìn)行消費(fèi)。
1.2 實(shí)現(xiàn)方式
1.2.1 配置死信隊(duì)列消費(fèi)者
@Component public class DeadLetterConsumer { private static final Logger logger = LoggerFactory.getLogger(DeadLetterConsumer.class); @RabbitListener(queues = "${rabbitmq.dead-letter-queue}") public void processDeadLetters(Message message, Channel channel) throws IOException { try { // 解析消息內(nèi)容 String messageContent = new String(message.getBody(), StandardCharsets.UTF_8); Map<String, Object> headers = message.getMessageProperties().getHeaders(); logger.info("Processing dead letter: {}", messageContent); // 獲取死信相關(guān)的元數(shù)據(jù) String originalExchange = getHeaderAsString(headers, "x-first-death-exchange"); String originalRoutingKey = getHeaderAsString(headers, "x-first-death-queue"); String reason = getHeaderAsString(headers, "x-first-death-reason"); logger.info("Original exchange: {}, Original queue: {}, Reason: {}", originalExchange, originalRoutingKey, reason); // 根據(jù)不同原因進(jìn)行處理 switch (reason) { case "rejected": handleRejectedMessage(messageContent, headers); break; case "expired": handleExpiredMessage(messageContent, headers); break; case "maxlen": handleMaxLengthMessage(messageContent, headers); break; default: handleUnknownReasonMessage(messageContent, headers); } // 確認(rèn)消息已處理 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("Dead letter processed successfully"); } catch (Exception e) { logger.error("Error processing dead letter", e); // 處理失敗,根據(jù)業(yè)務(wù)需要決定是否重新入隊(duì) boolean requeue = shouldRequeueOnError(e); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, requeue); } } private String getHeaderAsString(Map<String, Object> headers, String key) { return headers.containsKey(key) ? headers.get(key).toString() : "unknown"; } // 處理被拒絕的消息 private void handleRejectedMessage(String messageContent, Map<String, Object> headers) { logger.info("Handling rejected message"); // 記錄詳細(xì)日志 // 嘗試修復(fù)消息內(nèi)容或格式問(wèn)題 // 可能的處理策略:重新發(fā)送到原隊(duì)列、發(fā)送到特定修復(fù)隊(duì)列、存儲(chǔ)到數(shù)據(jù)庫(kù)等 } // 處理過(guò)期的消息 private void handleExpiredMessage(String messageContent, Map<String, Object> headers) { logger.info("Handling expired message"); // 評(píng)估消息是否仍然有價(jià)值 // 可能的處理策略:對(duì)于時(shí)效性業(yè)務(wù),可能直接丟棄;對(duì)于關(guān)鍵業(yè)務(wù),可能需要補(bǔ)償處理 } // 處理因隊(duì)列長(zhǎng)度限制而成為死信的消息 private void handleMaxLengthMessage(String messageContent, Map<String, Object> headers) { logger.info("Handling max length exceeded message"); // 考慮系統(tǒng)負(fù)載問(wèn)題 // 可能的處理策略:延遲重新發(fā)送、調(diào)整優(yōu)先級(jí)、觸發(fā)告警等 } // 處理未知原因的死信消息 private void handleUnknownReasonMessage(String messageContent, Map<String, Object> headers) { logger.info("Handling message with unknown reason"); // 進(jìn)行詳細(xì)分析和記錄 // 可能需要人工介入 } // 判斷是否應(yīng)該在處理出錯(cuò)時(shí)重新入隊(duì) private boolean shouldRequeueOnError(Exception e) { // 根據(jù)異常類(lèi)型決定是否重新入隊(duì) // 臨時(shí)性錯(cuò)誤(如網(wǎng)絡(luò)問(wèn)題)可能適合重新入隊(duì) // 永久性錯(cuò)誤(如數(shù)據(jù)格式問(wèn)題)可能不適合重新入隊(duì) return e instanceof TemporaryException; } // 示例異常類(lèi) private static class TemporaryException extends RuntimeException { public TemporaryException(String message) { super(message); } } }
1.2.2 死信處理服務(wù)
@Service public class DeadLetterService { private static final Logger logger = LoggerFactory.getLogger(DeadLetterService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private NotificationService notificationService; @Autowired private DeadLetterRepository deadLetterRepository; /** * 重新發(fā)送消息到原始隊(duì)列 */ public void resendToOriginalQueue(String messageContent, String originalExchange, String originalRoutingKey) { try { logger.info("Resending message to original queue: {}", originalRoutingKey); MessageProperties properties = new MessageProperties(); properties.setHeader("x-resent-from-dlq", true); properties.setHeader("x-resent-time", new Date()); Message message = new Message(messageContent.getBytes(), properties); rabbitTemplate.send(originalExchange, originalRoutingKey, message); logger.info("Message resent successfully"); } catch (Exception e) { logger.error("Failed to resend message", e); throw e; } } /** * 存儲(chǔ)死信消息到數(shù)據(jù)庫(kù) */ public void storeDeadLetter(String messageContent, Map<String, Object> headers, String reason) { try { logger.info("Storing dead letter to database"); DeadLetterEntity entity = new DeadLetterEntity(); entity.setMessageContent(messageContent); entity.setOriginalExchange(getHeaderAsString(headers, "x-first-death-exchange")); entity.setOriginalQueue(getHeaderAsString(headers, "x-first-death-queue")); entity.setReason(reason); entity.setTimestamp(new Date()); entity.setHeaders(convertHeadersToJson(headers)); deadLetterRepository.save(entity); logger.info("Dead letter stored successfully"); } catch (Exception e) { logger.error("Failed to store dead letter", e); } } /** * 發(fā)送告警通知 */ public void sendAlert(String messageContent, String reason) { try { logger.info("Sending alert for dead letter"); String alertMessage = String.format("Dead letter detected - Reason: %s, Content: %s", reason, messageContent); notificationService.sendAlert("Dead Letter Alert", alertMessage, AlertLevel.WARNING); logger.info("Alert sent successfully"); } catch (Exception e) { logger.error("Failed to send alert", e); } } private String getHeaderAsString(Map<String, Object> headers, String key) { return headers.containsKey(key) ? headers.get(key).toString() : "unknown"; } private String convertHeadersToJson(Map<String, Object> headers) { try { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(headers); } catch (Exception e) { logger.error("Failed to convert headers to JSON", e); return "{}"; } } }
1.3 處理策略
在原生消費(fèi)者處理方式中,常見(jiàn)的死信處理策略包括:
1. 分析與記錄:記錄死信消息的內(nèi)容、元數(shù)據(jù)和失敗原因,用于問(wèn)題追蹤和分析。
2. 重新發(fā)送:根據(jù)死信原因,可能選擇修復(fù)后重新發(fā)送到原隊(duì)列。例如:
// 重新發(fā)送到原隊(duì)列的示例 if (canBeRetried(messageContent, headers)) { String originalExchange = getHeaderAsString(headers, "x-first-death-exchange"); String originalRoutingKey = getHeaderAsString(headers, "x-first-death-queue"); deadLetterService.resendToOriginalQueue(messageContent, originalExchange, originalRoutingKey); }
3. 存儲(chǔ)與歸檔:將無(wú)法立即處理的死信存儲(chǔ)到數(shù)據(jù)庫(kù),便于后續(xù)分析或手動(dòng)處理。
4. 告警通知:對(duì)于重要的死信消息或死信數(shù)量異常增加的情況,發(fā)送告警通知。
5. 業(yè)務(wù)補(bǔ)償:對(duì)于某些業(yè)務(wù)場(chǎng)景,可能需要執(zhí)行補(bǔ)償操作:
// 業(yè)務(wù)補(bǔ)償處理示例 if (messageContent.contains("payment")) { try { PaymentInfo paymentInfo = objectMapper.readValue(messageContent, PaymentInfo.class); compensationService.handleFailedPayment(paymentInfo); } catch (Exception e) { logger.error("Failed to process compensation", e); } }
1.4 優(yōu)缺點(diǎn)與適用場(chǎng)景
優(yōu)點(diǎn):
- 實(shí)現(xiàn)簡(jiǎn)單,直接利用消息中間件的原生功能
- 與正常業(yè)務(wù)流程完全隔離,不影響主流程
- 可以靈活定制處理邏輯,針對(duì)不同類(lèi)型的死信采取不同策略
缺點(diǎn):
- 缺乏自動(dòng)重試機(jī)制,需要手動(dòng)實(shí)現(xiàn)
- 處理失敗后的進(jìn)一步處理相對(duì)復(fù)雜
- 需要額外維護(hù)死信隊(duì)列的消費(fèi)邏輯
適用場(chǎng)景:
- 死信消息需要特殊業(yè)務(wù)邏輯處理的場(chǎng)景
- 需要詳細(xì)記錄和分析死信原因的系統(tǒng)
- 對(duì)死信處理流程有細(xì)粒度控制需求的應(yīng)用
二、重試機(jī)制處理方式
2.1 處理原理
重試機(jī)制處理方式核心思想是將死信消息按照一定策略自動(dòng)重試,而不是立即進(jìn)入死信隊(duì)列。
通過(guò)Spring AMQP提供的重試框架,可以在消費(fèi)者端實(shí)現(xiàn)消息的多次重試,只有當(dāng)重試次數(shù)耗盡后,才將消息發(fā)送到死信隊(duì)列。
2.2 實(shí)現(xiàn)方式
2.2.1 配置重試機(jī)制
@Configuration public class RetryConfig { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 配置并發(fā)消費(fèi)者 factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); // 設(shè)置手動(dòng)確認(rèn)模式 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 配置重試機(jī)制 factory.setAdviceChain(RetryInterceptorBuilder .stateless() .maxAttempts(5) // 最大重試次數(shù) .backOffOptions(1000, 2.0, 30000) // 初始間隔、乘數(shù)、最大間隔 .recoverer(new RejectAndDontRequeueRecoverer()) // 重試耗盡后的處理器 .build()); return factory; } /** * 自定義恢復(fù)策略:將重試失敗的消息發(fā)送到指定隊(duì)列 */ @Bean public MessageRecoverer customMessageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "retry.failed.exchange", "retry.failed.key"); } }
2.2.2 消息消費(fèi)者
@Component public class RetryAwareConsumer { private static final Logger logger = LoggerFactory.getLogger(RetryAwareConsumer.class); @RabbitListener(queues = "${rabbitmq.business-queue}", containerFactory = "rabbitListenerContainerFactory") public void processMessage(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { String payload = new String(message.getBody(), StandardCharsets.UTF_8); logger.info("Processing message: {}", payload); // 獲取當(dāng)前重試次數(shù) Object retryCountObj = message.getMessageProperties().getHeaders().get("x-retry-count"); int retryCount = retryCountObj != null ? (int) retryCountObj : 0; if (retryCount > 0) { logger.info("This is retry attempt #{}", retryCount); } // 模擬業(yè)務(wù)處理 processBusinessLogic(payload); // 處理成功,確認(rèn)消息 channel.basicAck(deliveryTag, false); logger.info("Message processed successfully"); } catch (TemporaryException e) { // 臨時(shí)性異常,適合重試 logger.warn("Temporary exception occurred, will retry: {}", e.getMessage()); // 拒絕消息并重新入隊(duì),觸發(fā)重試 channel.basicNack(deliveryTag, false, true); } catch (PermanentException e) { // 永久性異常,不適合重試 logger.error("Permanent exception occurred, no retry: {}", e.getMessage()); // 拒絕消息但不重新入隊(duì),消息將進(jìn)入死信隊(duì)列 channel.basicNack(deliveryTag, false, false); } catch (Exception e) { logger.error("Unexpected error", e); // 未預(yù)期的異常,拒絕消息但不重新入隊(duì) channel.basicNack(deliveryTag, false, false); } } private void processBusinessLogic(String payload) { // 模擬業(yè)務(wù)處理邏輯 if (payload.contains("temp_error")) { throw new TemporaryException("Temporary processing error"); } else if (payload.contains("perm_error")) { throw new PermanentException("Permanent processing error"); } // 正常處理... } // 示例異常類(lèi) private static class TemporaryException extends RuntimeException { public TemporaryException(String message) { super(message); } } private static class PermanentException extends RuntimeException { public PermanentException(String message) { super(message); } } }
2.2.3 自定義重試恢復(fù)器
public class CustomRecoverer implements MessageRecoverer { private static final Logger logger = LoggerFactory.getLogger(CustomRecoverer.class); private final RabbitTemplate rabbitTemplate; private final String failedExchange; private final String failedRoutingKey; private final DeadLetterService deadLetterService; public CustomRecoverer(RabbitTemplate rabbitTemplate, String failedExchange, String failedRoutingKey, DeadLetterService deadLetterService) { this.rabbitTemplate = rabbitTemplate; this.failedExchange = failedExchange; this.failedRoutingKey = failedRoutingKey; this.deadLetterService = deadLetterService; } @Override public void recover(Message message, Throwable cause) { Map<String, Object> headers = message.getMessageProperties().getHeaders(); String messageContent = new String(message.getBody(), StandardCharsets.UTF_8); // 記錄重試失敗信息 logger.warn("Message processing failed after retries: {}", cause.getMessage()); try { // 存儲(chǔ)失敗消息到數(shù)據(jù)庫(kù) deadLetterService.storeDeadLetter(messageContent, headers, "retry_exhausted"); // 添加失敗信息到消息頭 MessageProperties newProperties = new MessageProperties(); newProperties.copyProperties(message.getMessageProperties()); newProperties.setHeader("x-exception-message", cause.getMessage()); newProperties.setHeader("x-exception-type", cause.getClass().getName()); newProperties.setHeader("x-original-exchange", message.getMessageProperties().getReceivedExchange()); newProperties.setHeader("x-original-routing-key", message.getMessageProperties().getReceivedRoutingKey()); newProperties.setHeader("x-failed-at", new Date()); // 發(fā)送到失敗隊(duì)列 Message failedMessage = new Message(message.getBody(), newProperties); rabbitTemplate.send(failedExchange, failedRoutingKey, failedMessage); logger.info("Message sent to failure queue: {}", failedExchange); // 發(fā)送告警 deadLetterService.sendAlert(messageContent, "retry_exhausted"); } catch (Exception e) { logger.error("Error handling retry exhausted message", e); } } }
2.3 重試策略
在重試機(jī)制處理方式中,可以采用以下策略:
1. 指數(shù)退避重試:每次重試的間隔時(shí)間按指數(shù)增長(zhǎng),避免立即重試導(dǎo)致的資源浪費(fèi):
// 配置指數(shù)退避策略 .backOffOptions( 1000, // 初始重試間隔(毫秒) 2.0, // 間隔乘數(shù) 30000 // 最大間隔(毫秒) )
2. 區(qū)分異常類(lèi)型:根據(jù)異常類(lèi)型決定是否重試,避免對(duì)永久性錯(cuò)誤進(jìn)行無(wú)意義的重試:
// 可重試的異常類(lèi)型 RetryTemplate.builder() .retryOn(TemporaryNetworkException.class, ServiceUnavailableException.class) .notRetryOn(ValidationException.class, BusinessLogicException.class) .build();
3. 有狀態(tài)重試:在某些場(chǎng)景下,可能需要在重試之間保持狀態(tài):
RetryInterceptorBuilder .stateful() // 使用有狀態(tài)重試 .keyGenerator(message -> message.getMessageProperties().getMessageId()) // 使用消息ID作為重試鍵 .build();
4. 自定義恢復(fù)策略:當(dāng)重試耗盡后,根據(jù)業(yè)務(wù)需求執(zhí)行特定操作:
// 自定義恢復(fù)策略 .recoverer((message, cause) -> { // 記錄詳細(xì)日志 logger.error("Message processing failed after retries", cause); // 根據(jù)消息內(nèi)容和異常類(lèi)型決定后續(xù)處理 if (cause instanceof TemporaryNetworkException) { // 延遲后重新發(fā)送到原隊(duì)列 reEnqueueWithDelay(message, 60000); // 1分鐘后重試 } else { // 發(fā)送到死信隊(duì)列并通知運(yùn)維人員 sendToDeadLetterAndAlert(message, cause); } })
2.4 優(yōu)缺點(diǎn)與適用場(chǎng)景
優(yōu)點(diǎn):
- 提供自動(dòng)化的重試機(jī)制,減少人工干預(yù)
- 支持指數(shù)退避策略,避免頻繁重試導(dǎo)致的資源浪費(fèi)
- 可以針對(duì)不同類(lèi)型的異常采取不同的重試策略
- 靈活的恢復(fù)機(jī)制,可以定制重試耗盡后的處理邏輯
缺點(diǎn):
- 配置相對(duì)復(fù)雜
- 重試過(guò)程會(huì)占用消費(fèi)者線程資源
- 需要注意重試與業(yè)務(wù)冪等性的關(guān)系
- 重試過(guò)程中的狀態(tài)管理較為復(fù)雜
適用場(chǎng)景:
- 臨時(shí)性錯(cuò)誤頻發(fā)的環(huán)境(如網(wǎng)絡(luò)不穩(wěn)定)
- 需要精細(xì)控制重試策略的場(chǎng)景
- 對(duì)消息處理成功率有較高要求的業(yè)務(wù)
- 具有良好冪等性設(shè)計(jì)的系統(tǒng)
三、死信隊(duì)列重新入隊(duì)處理
3.1 處理原理
死信隊(duì)列重新入隊(duì)處理方式是一種更加靈活的策略,它不依賴于消費(fèi)端的重試機(jī)制,而是將死信消息收集到專(zhuān)門(mén)的隊(duì)列后,通過(guò)定時(shí)任務(wù)或手動(dòng)操作將這些消息重新發(fā)送到原始隊(duì)列或其他處理隊(duì)列。
這種方式特別適合需要額外處理或修復(fù)的死信消息。
3.2 實(shí)現(xiàn)方式
3.2.1 死信隊(duì)列處理服務(wù)
@Service public class DeadLetterRequeueService { private static final Logger logger = LoggerFactory.getLogger(DeadLetterRequeueService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private DeadLetterRepository deadLetterRepository; /** * 將死信消息重新入隊(duì)到原始隊(duì)列 */ public void requeueDeadLetter(Message message) { try { MessageProperties properties = message.getMessageProperties(); Map<String, Object> headers = properties.getHeaders(); // 獲取原始交換機(jī)和路由鍵 String originalExchange = getHeaderAsString(headers, "x-first-death-exchange"); String originalRoutingKey = getHeaderAsString(headers, "x-first-death-queue"); logger.info("Requeuing message to original destination: exchange={}, routingKey={}", originalExchange, originalRoutingKey); // 創(chuàng)建新的消息屬性,避免無(wú)限循環(huán) MessageProperties newProperties = new MessageProperties(); newProperties.setContentType(properties.getContentType()); newProperties.setContentEncoding(properties.getContentEncoding()); newProperties.setMessageId(UUID.randomUUID().toString()); // 添加重新入隊(duì)標(biāo)記和時(shí)間 newProperties.setHeader("x-requeued-from-dlq", true); newProperties.setHeader("x-requeued-time", new Date()); newProperties.setHeader("x-original-message-id", properties.getMessageId()); // 發(fā)送到原始隊(duì)列 Message newMessage = new Message(message.getBody(), newProperties); rabbitTemplate.send(originalExchange, originalRoutingKey, newMessage); logger.info("Message requeued successfully"); } catch (Exception e) { logger.error("Failed to requeue message", e); throw e; } } /** * 批量重新入隊(duì)死信消息 */ @Scheduled(fixedDelay = 300000) // 每5分鐘執(zhí)行一次 public void requeueBatchDeadLetters() { logger.info("Starting batch requeue process"); try { List<DeadLetterEntity> pendingDeadLetters = deadLetterRepository.findByStatusAndRetryCountLessThan( DeadLetterStatus.PENDING, 3); logger.info("Found {} dead letters pending for requeue", pendingDeadLetters.size()); for (DeadLetterEntity deadLetter : pendingDeadLetters) { try { // 構(gòu)建消息 MessageProperties properties = new MessageProperties(); properties.setContentType("application/json"); properties.setMessageId(UUID.randomUUID().toString()); properties.setHeader("x-requeued-from-dlq", true); properties.setHeader("x-requeued-time", new Date()); properties.setHeader("x-dead-letter-id", deadLetter.getId()); properties.setHeader("x-retry-count", deadLetter.getRetryCount() + 1); Message message = new Message(deadLetter.getMessageContent().getBytes(), properties); // 發(fā)送到原始隊(duì)列 rabbitTemplate.send(deadLetter.getOriginalExchange(), deadLetter.getOriginalRoutingKey(), message); // 更新?tīng)顟B(tài) deadLetter.setRetryCount(deadLetter.getRetryCount() + 1); deadLetter.setLastRetryTime(new Date()); deadLetter.setStatus(DeadLetterStatus.REQUEUED); deadLetterRepository.save(deadLetter); logger.info("Requeued dead letter: id={}", deadLetter.getId()); } catch (Exception e) { logger.error("Failed to requeue dead letter: id={}", deadLetter.getId(), e); // 更新失敗狀態(tài) deadLetter.setLastErrorMessage(e.getMessage()); if (deadLetter.getRetryCount() >= 2) { deadLetter.setStatus(DeadLetterStatus.FAILED); } deadLetterRepository.save(deadLetter); } } logger.info("Batch requeue process completed"); } catch (Exception e) { logger.error("Error in batch requeue process", e); } } private String getHeaderAsString(Map<String, Object> headers, String key) { return headers.containsKey(key) ? headers.get(key).toString() : ""; } }
3.2.2 REST API進(jìn)行手動(dòng)重新入隊(duì)
@RestController @RequestMapping("/api/dead-letters") public class DeadLetterController { private static final Logger logger = LoggerFactory.getLogger(DeadLetterController.class); @Autowired private DeadLetterRepository deadLetterRepository; @Autowired private DeadLetterRequeueService requeueService; /** * 獲取死信消息列表 */ @GetMapping public Page<DeadLetterEntity> getDeadLetters( @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "20") int size, @RequestParam(required = false) DeadLetterStatus status) { Pageable pageable = PageRequest.of(page, size, Sort.by("createdAt").descending()); if (status != null) { return deadLetterRepository.findByStatus(status, pageable); } else { return deadLetterRepository.findAll(pageable); } } /** * 手動(dòng)重新入隊(duì)單個(gè)死信消息 */ @PostMapping("/{id}/requeue") public ResponseEntity<Map<String, Object>> requeueDeadLetter(@PathVariable Long id) { try { logger.info("Manual requeue request for dead letter: id={}", id); DeadLetterEntity deadLetter = deadLetterRepository.findById(id) .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Dead letter not found")); // 構(gòu)建消息 MessageProperties properties = new MessageProperties(); properties.setContentType("application/json"); properties.setMessageId(UUID.randomUUID().toString()); properties.setHeader("x-requeued-from-dlq", true); properties.setHeader("x-requeued-time", new Date()); properties.setHeader("x-dead-letter-id", deadLetter.getId()); properties.setHeader("x-manually-requeued", true); properties.setHeader("x-retry-count", deadLetter.getRetryCount() + 1); Message message = new Message(deadLetter.getMessageContent().getBytes(), properties); // 發(fā)送到原始隊(duì)列 rabbitTemplate.send(deadLetter.getOriginalExchange(), deadLetter.getOriginalRoutingKey(), message); // 更新?tīng)顟B(tài) deadLetter.setRetryCount(deadLetter.getRetryCount() + 1); deadLetter.setLastRetryTime(new Date()); deadLetter.setStatus(DeadLetterStatus.REQUEUED); deadLetter.setManuallyRequeued(true); deadLetterRepository.save(deadLetter); logger.info("Dead letter manually requeued: id={}", id); Map<String, Object> response = new HashMap<>(); response.put("success", true); response.put("message", "Dead letter requeued successfully"); return ResponseEntity.ok(response); } catch (Exception e) { logger.error("Failed to manually requeue dead letter", e); Map<String, Object> response = new HashMap<>(); response.put("success", false); response.put("message", "Failed to requeue: " + e.getMessage()); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response); } } /** * 批量重新入隊(duì)多個(gè)死信消息 */ @PostMapping("/batch-requeue") public ResponseEntity<Map<String, Object>> batchRequeueDeadLetters(@RequestBody List<Long> ids) { logger.info("Batch requeue request for {} dead letters", ids.size()); int success = 0; int failed = 0; for (Long id : ids) { try { DeadLetterEntity deadLetter = deadLetterRepository.findById(id) .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "Dead letter not found: " + id)); // 構(gòu)建和發(fā)送消息 // ... (與單個(gè)重新入隊(duì)類(lèi)似) success++; } catch (Exception e) { logger.error("Failed to requeue dead letter: id={}", id, e); failed++; } } Map<String, Object> response = new HashMap<>(); response.put("success", success); response.put("failed", failed); response.put("total", ids.size()); return ResponseEntity.ok(response); } }
3.3 重新入隊(duì)策略
在死信隊(duì)列重新入隊(duì)處理中,可以采用以下策略:
1. 選擇性重新入隊(duì):根據(jù)死信原因和業(yè)務(wù)需求,決定哪些消息需要重新入隊(duì):
// 選擇性重新入隊(duì)示例 public boolean shouldRequeue(DeadLetterEntity deadLetter) { // 如果是由于消息格式錯(cuò)誤導(dǎo)致的死信,可能不適合重新入隊(duì) if (deadLetter.getReason().equals("rejected") && deadLetter.getErrorMessage().contains("parse error")) { return false; } // 如果重試次數(shù)過(guò)多,不再重新入隊(duì) if (deadLetter.getRetryCount() >= 3) { return false; } // 對(duì)于過(guò)期消息,根據(jù)業(yè)務(wù)時(shí)效性判斷 if (deadLetter.getReason().equals("expired")) { long messageAge = System.currentTimeMillis() - deadLetter.getCreatedAt().getTime(); // 如果消息已經(jīng)超過(guò)1天,則不再重新入隊(duì) return messageAge < 24 * 60 * 60 * 1000; } return true; }
2. 延遲重新入隊(duì):不立即重新入隊(duì),而是按照一定的延遲策略:
// 延遲重新入隊(duì)示例 public void requeueWithDelay(DeadLetterEntity deadLetter) { // 計(jì)算延遲時(shí)間,使用指數(shù)退避策略 long delayMillis = (long) (Math.pow(2, deadLetter.getRetryCount()) * 1000); // 設(shè)置上限 delayMillis = Math.min(delayMillis, 30 * 60 * 1000); // 最多30分鐘 // 使用RabbitMQ的延遲插件或死信隊(duì)列實(shí)現(xiàn)延遲 MessageProperties properties = new MessageProperties(); properties.setExpiration(String.valueOf(delayMillis)); // 其他設(shè)置... // 發(fā)送到延遲隊(duì)列 rabbitTemplate.send("delay.exchange", "delay.key", new Message(deadLetter.getMessageContent().getBytes(), properties)); }
3. 批量重新入隊(duì):定期批量處理死信消息:
@Scheduled(cron = "0 0/30 * * * ?") // 每30分鐘執(zhí)行一次 public void scheduledBatchRequeue() { // 查找符合條件的死信消息 List<DeadLetterEntity> candidates = deadLetterRepository.findByStatusAndRetryCountLessThan( DeadLetterStatus.PENDING, 3); // 限制批次大小,避免一次處理太多 int batchSize = Math.min(candidates.size(), 100); // 處理批次 for (int i = 0; i < batchSize; i++) { try { requeueDeadLetter(candidates.get(i)); } catch (Exception e) { logger.error("Failed to requeue in batch", e); } } }
4. 修復(fù)后重新入隊(duì):對(duì)消息內(nèi)容進(jìn)行修復(fù)或轉(zhuǎn)換后重新入隊(duì):
// 修復(fù)后重新入隊(duì)示例 public void requeueWithFix(DeadLetterEntity deadLetter) { try { String originalContent = deadLetter.getMessageContent(); // 解析和修復(fù)消息內(nèi)容 JsonNode node = objectMapper.readTree(originalContent); // 執(zhí)行修復(fù)操作,例如添加缺失字段、修正格式等 // 創(chuàng)建修復(fù)后的消息 String fixedContent = objectMapper.writeValueAsString(node); // 記錄修復(fù)信息 deadLetter.setFixedContent(fixedContent); deadLetter.setFixNotes("Added missing fields and corrected format"); // 重新入隊(duì)修復(fù)后的消息 MessageProperties properties = new MessageProperties(); // 設(shè)置屬性... rabbitTemplate.send(deadLetter.getOriginalExchange(), deadLetter.getOriginalRoutingKey(), new Message(fixedContent.getBytes(), properties)); // 更新?tīng)顟B(tài) deadLetter.setStatus(DeadLetterStatus.FIXED_AND_REQUEUED); deadLetterRepository.save(deadLetter); } catch (Exception e) { logger.error("Failed to fix and requeue", e); deadLetter.setLastErrorMessage("Fix failed: " + e.getMessage()); deadLetterRepository.save(deadLetter); } }
3.4 優(yōu)缺點(diǎn)與適用場(chǎng)景
優(yōu)點(diǎn):
- 提供更靈活的死信處理機(jī)制,可以根據(jù)業(yè)務(wù)需求定制處理邏輯
- 支持手動(dòng)和自動(dòng)重新入隊(duì),適應(yīng)不同場(chǎng)景需求
- 可以對(duì)消息內(nèi)容進(jìn)行修復(fù)或轉(zhuǎn)換后再重新入隊(duì)
- 便于實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)補(bǔ)償流程
缺點(diǎn):
- 需要額外的存儲(chǔ)和管理機(jī)制
- 實(shí)現(xiàn)復(fù)雜度較高,需要考慮并發(fā)和冪等性問(wèn)題
- 可能引入延遲,影響實(shí)時(shí)性
- 需要額外的監(jiān)控和管理界面
適用場(chǎng)景:
- 需要人工干預(yù)和審核的死信處理流程
- 消息內(nèi)容可能需要修改或修復(fù)后重新處理
- 復(fù)雜業(yè)務(wù)場(chǎng)景下的失敗恢復(fù)
- 需要靈活控制重新入隊(duì)時(shí)機(jī)和策略的應(yīng)用
四、事件驅(qū)動(dòng)處理方式
4.1 處理原理
事件驅(qū)動(dòng)處理方式將死信隊(duì)列與事件系統(tǒng)集成,當(dāng)消息進(jìn)入死信隊(duì)列時(shí),系統(tǒng)發(fā)布相應(yīng)的事件,由專(zhuān)門(mén)的事件處理器根據(jù)業(yè)務(wù)規(guī)則進(jìn)行處理。
這種方式實(shí)現(xiàn)了死信處理與業(yè)務(wù)邏輯的解耦,使系統(tǒng)更加靈活和可擴(kuò)展。
4.2 實(shí)現(xiàn)方式
4.2.1 定義死信事件
public class DeadLetterEvent { private final String messageId; private final String originalQueue; private final String originalExchange; private final String originalRoutingKey; private final String reason; private final String content; private final Map<String, Object> headers; private final Date timestamp; // 構(gòu)造函數(shù)、getter方法等 public static DeadLetterEvent fromMessage(Message message) { MessageProperties properties = message.getMessageProperties(); Map<String, Object> headers = properties.getHeaders(); return new DeadLetterEvent( properties.getMessageId(), getHeaderAsString(headers, "x-first-death-queue"), getHeaderAsString(headers, "x-first-death-exchange"), getHeaderAsString(headers, "x-first-death-routing-key"), getHeaderAsString(headers, "x-first-death-reason"), new String(message.getBody(), StandardCharsets.UTF_8), headers, new Date() ); } private static String getHeaderAsString(Map<String, Object> headers, String key) { return headers.containsKey(key) ? headers.get(key).toString() : ""; } }
4.2.2 死信事件發(fā)布者
@Component public class DeadLetterConsumerAndPublisher { private static final Logger logger = LoggerFactory.getLogger(DeadLetterConsumerAndPublisher.class); @Autowired private ApplicationEventPublisher eventPublisher; @RabbitListener(queues = "${rabbitmq.dead-letter-queue}") public void consumeDeadLetter(Message message, Channel channel) throws IOException { try { logger.info("Received message in dead letter queue: {}", message.getMessageProperties().getMessageId()); // 創(chuàng)建并發(fā)布死信事件 DeadLetterEvent event = DeadLetterEvent.fromMessage(message); eventPublisher.publishEvent(event); // 確認(rèn)消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("Dead letter event published: {}", event.getMessageId()); } catch (Exception e) { logger.error("Error processing dead letter", e); // 處理失敗,可以選擇重新入隊(duì)或直接拒絕 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
4.2.3 死信事件處理器
@Component public class DeadLetterEventHandlers { private static final Logger logger = LoggerFactory.getLogger(DeadLetterEventHandlers.class); @Autowired private DeadLetterRepository deadLetterRepository; @Autowired private NotificationService notificationService; @Autowired private RabbitTemplate rabbitTemplate; /** * 處理所有死信事件 */ @EventListener public void handleDeadLetterEvent(DeadLetterEvent event) { logger.info("Handling dead letter event: {}", event.getMessageId()); // 記錄死信事件 DeadLetterEntity entity = new DeadLetterEntity(); entity.setMessageId(event.getMessageId()); entity.setOriginalQueue(event.getOriginalQueue()); entity.setOriginalExchange(event.getOriginalExchange()); entity.setOriginalRoutingKey(event.getOriginalRoutingKey()); entity.setReason(event.getReason()); entity.setMessageContent(event.getContent()); entity.setHeadersJson(convertHeadersToJson(event.getHeaders())); entity.setCreatedAt(event.getTimestamp()); entity.setStatus(DeadLetterStatus.NEW); deadLetterRepository.save(entity); logger.info("Dead letter event recorded: {}", event.getMessageId()); } /** * 處理由于拒絕導(dǎo)致的死信事件 */ @EventListener(condition = "#event.reason == 'rejected'") public void handleRejectedMessages(DeadLetterEvent event) { logger.info("Handling rejected message: {}", event.getMessageId()); try { // 根據(jù)業(yè)務(wù)規(guī)則處理被拒絕的消息 if (isTemporaryRejection(event)) { // 對(duì)于臨時(shí)性問(wèn)題導(dǎo)致的拒絕,可以稍后重試 scheduleRequeueAfterDelay(event, 60000); // 1分鐘后重試 } else { // 對(duì)于永久性問(wèn)題,可能需要告警和人工干預(yù) notificationService.sendAlert( "Permanent rejection", String.format("Message %s was permanently rejected", event.getMessageId()), AlertLevel.WARNING ); } } catch (Exception e) { logger.error("Error handling rejected message", e); } } /** * 處理由于過(guò)期導(dǎo)致的死信事件 */ @EventListener(condition = "#event.reason == 'expired'") public void handleExpiredMessages(DeadLetterEvent event) { logger.info("Handling expired message: {}", event.getMessageId()); try { // 分析消息內(nèi)容,判斷是否仍然有價(jià)值 if (isStillRelevant(event)) { // 如果消息仍然有價(jià)值,可以重新發(fā)送 requeueMessage(event); } else { // 否則可以記錄并忽略 logger.info("Expired message is no longer relevant: {}", event.getMessageId()); updateDeadLetterStatus(event.getMessageId(), DeadLetterStatus.IGNORED); } } catch (Exception e) { logger.error("Error handling expired message", e); } } /** * 處理由于隊(duì)列滿導(dǎo)致的死信事件 */ @EventListener(condition = "#event.reason == 'maxlen'") public void handleMaxLengthMessages(DeadLetterEvent event) { logger.info("Handling max length exceeded message: {}", event.getMessageId()); try { // 檢查系統(tǒng)負(fù)載情況 if (isSystemOverloaded()) { // 如果系統(tǒng)仍然過(guò)載,可以延遲重新入隊(duì) scheduleRequeueAfterDelay(event, 300000); // 5分鐘后重試 // 同時(shí)可能需要觸發(fā)告警 notificationService.sendAlert( "System overload", "Queue capacity exceeded, messages being delayed", AlertLevel.WARNING ); } else { // 否則可以嘗試立即重新入隊(duì) requeueMessage(event); } } catch (Exception e) { logger.error("Error handling max length message", e); } } // 輔助方法 private boolean isTemporaryRejection(DeadLetterEvent event) { // 根據(jù)消息內(nèi)容或頭信息判斷是否是臨時(shí)性拒絕 // 例如:網(wǎng)絡(luò)問(wèn)題、服務(wù)暫時(shí)不可用等 return event.getContent().contains("temporary") || event.getHeaders().containsKey("x-temporary-error"); } private boolean isStillRelevant(DeadLetterEvent event) { // 判斷過(guò)期消息是否仍然有價(jià)值 // 例如:基于消息類(lèi)型、創(chuàng)建時(shí)間和當(dāng)前業(yè)務(wù)狀態(tài) try { JsonNode contentNode = objectMapper.readTree(event.getContent()); if (contentNode.has("expiryTime")) { long expiryTime = contentNode.get("expiryTime").asLong(); return System.currentTimeMillis() < expiryTime; } } catch (Exception e) { logger.error("Error parsing message content", e); } // 默認(rèn)假設(shè)消息仍然有價(jià)值 return true; } private boolean isSystemOverloaded() { // 檢查系統(tǒng)負(fù)載情況 // 可以基于隊(duì)列深度、處理延遲等指標(biāo) // 這里簡(jiǎn)化實(shí)現(xiàn) return false; } private void requeueMessage(DeadLetterEvent event) { try { logger.info("Requeuing message: {}", event.getMessageId()); MessageProperties properties = new MessageProperties(); properties.setMessageId(UUID.randomUUID().toString()); properties.setHeader("x-original-message-id", event.getMessageId()); properties.setHeader("x-requeued-time", new Date()); properties.setHeader("x-original-reason", event.getReason()); Message message = new Message(event.getContent().getBytes(), properties); rabbitTemplate.send(event.getOriginalExchange(), event.getOriginalRoutingKey(), message); updateDeadLetterStatus(event.getMessageId(), DeadLetterStatus.REQUEUED); logger.info("Message requeued successfully: {}", event.getMessageId()); } catch (Exception e) { logger.error("Failed to requeue message", e); updateDeadLetterStatus(event.getMessageId(), DeadLetterStatus.REQUEUE_FAILED); } } private void scheduleRequeueAfterDelay(DeadLetterEvent event, long delayMillis) { // 使用調(diào)度器延遲執(zhí)行重新入隊(duì)操作 // 這里使用簡(jiǎn)化的實(shí)現(xiàn) logger.info("Scheduling requeue after {} ms for message: {}", delayMillis, event.getMessageId()); // 更新?tīng)顟B(tài)為待重新入隊(duì) updateDeadLetterStatus(event.getMessageId(), DeadLetterStatus.SCHEDULED_REQUEUE); // 實(shí)際應(yīng)用中可以使用定時(shí)任務(wù)或延遲隊(duì)列 // 這里使用簡(jiǎn)單的線程延遲模擬 new Thread(() -> { try { Thread.sleep(delayMillis); requeueMessage(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Scheduled requeue interrupted", e); } }).start(); } private void updateDeadLetterStatus(String messageId, DeadLetterStatus status) { try { deadLetterRepository.updateStatusByMessageId(messageId, status); } catch (Exception e) { logger.error("Failed to update dead letter status", e); } } private String convertHeadersToJson(Map<String, Object> headers) { try { return objectMapper.writeValueAsString(headers); } catch (Exception e) { logger.error("Failed to convert headers to JSON", e); return "{}"; } } }
4.2.4 特定業(yè)務(wù)領(lǐng)域的事件處理器
@Component public class OrderDeadLetterHandler { private static final Logger logger = LoggerFactory.getLogger(OrderDeadLetterHandler.class); @Autowired private OrderService orderService; @Autowired private PaymentService paymentService; /** * 處理訂單相關(guān)的死信事件 */ @EventListener(condition = "#event.originalQueue.startsWith('order')") public void handleOrderDeadLetters(DeadLetterEvent event) { logger.info("Handling order-related dead letter: {}", event.getMessageId()); try { // 解析訂單消息 JsonNode contentNode = objectMapper.readTree(event.getContent()); if (contentNode.has("orderId")) { String orderId = contentNode.get("orderId").asText(); String messageType = contentNode.has("type") ? contentNode.get("type").asText() : "unknown"; logger.info("Processing order dead letter: orderId={}, type={}", orderId, messageType); switch (messageType) { case "order_created": handleOrderCreationDeadLetter(orderId, contentNode); break; case "payment_processed": handlePaymentProcessedDeadLetter(orderId, contentNode); break; case "order_shipped": handleOrderShippedDeadLetter(orderId, contentNode); break; default: logger.warn("Unknown order message type: {}", messageType); // 可能需要人工干預(yù) notifyUnknownOrderMessageType(event, messageType); } } else { logger.warn("Order ID not found in message content"); } } catch (Exception e) { logger.error("Error handling order dead letter", e); } } private void handleOrderCreationDeadLetter(String orderId, JsonNode contentNode) { logger.info("Handling order creation dead letter: {}", orderId); try { // 查詢訂單狀態(tài) OrderStatus status = orderService.getOrderStatus(orderId); if (status == null) { // 訂單不存在,可能需要重新創(chuàng)建 logger.info("Order does not exist, recreating: {}", orderId); orderService.recreateOrderFromDeadLetter(contentNode); } else { logger.info("Order already exists: {}, status: {}", orderId, status); // 可能需要檢查訂單狀態(tài)是否與預(yù)期一致 } } catch (Exception e) { logger.error("Failed to handle order creation dead letter", e); // 可能需要人工干預(yù) } } private void handlePaymentProcessedDeadLetter(String orderId, JsonNode contentNode) { logger.info("Handling payment processed dead letter: {}", orderId); try { // 檢查支付狀態(tài) boolean paymentExists = paymentService.checkPaymentStatus(orderId); if (!paymentExists) { // 支付記錄不存在,需要重新處理 logger.info("Payment record not found, reprocessing: {}", orderId); paymentService.reprocessPaymentFromDeadLetter(contentNode); } else { logger.info("Payment already processed for order: {}", orderId); // 可能需要核對(duì)支付金額等信息 } } catch (Exception e) { logger.error("Failed to handle payment processed dead letter", e); } } private void handleOrderShippedDeadLetter(String orderId, JsonNode contentNode) { // 處理訂單發(fā)貨相關(guān)的死信消息 // ... } private void notifyUnknownOrderMessageType(DeadLetterEvent event, String messageType) { // 通知未知消息類(lèi)型 // ... } }
4.3 處理策略
在事件驅(qū)動(dòng)處理方式中,可以采用以下策略:
1. 業(yè)務(wù)領(lǐng)域劃分:根據(jù)業(yè)務(wù)領(lǐng)域組織事件處理器,使每個(gè)處理器專(zhuān)注于特定類(lèi)型的死信:
// 按業(yè)務(wù)領(lǐng)域組織事件處理器 @EventListener(condition = "#event.originalQueue.startsWith('payment')") public void handlePaymentDeadLetters(DeadLetterEvent event) { // 處理支付相關(guān)的死信 } @EventListener(condition = "#event.originalQueue.startsWith('inventory')") public void handleInventoryDeadLetters(DeadLetterEvent event) { // 處理庫(kù)存相關(guān)的死信 }
2. 基于原因的處理策略:根據(jù)死信產(chǎn)生的原因采取不同的處理策略:
// 基于原因的處理邏輯 @EventListener public void handleDeadLetterEvent(DeadLetterEvent event) { switch (event.getReason()) { case "rejected": // 處理被拒絕的消息 if (isTransientError(event)) { requeueAfterDelay(event, calculateBackoffDelay(event)); } else { logPermanentFailure(event); } break; case "expired": // 處理過(guò)期的消息 if (isTimeoutSensitive(event)) { // 對(duì)于時(shí)間敏感的消息,可能需要特殊處理 handleTimeoutSensitiveMessage(event); } else { // 對(duì)于不敏感的消息,可以重新入隊(duì) requeueMessage(event); } break; // 其他原因... } }
3. 業(yè)務(wù)狀態(tài)檢查與補(bǔ)償:在處理死信前,檢查相關(guān)業(yè)務(wù)狀態(tài),避免重復(fù)處理或執(zhí)行補(bǔ)償操作:
// 業(yè)務(wù)狀態(tài)檢查與補(bǔ)償 private void handleOrderPaymentDeadLetter(String orderId, JsonNode content) { // 查詢訂單當(dāng)前狀態(tài) OrderStatus currentStatus = orderService.getOrderStatus(orderId); // 獲取死信中的期望狀態(tài) String expectedStatus = content.get("expectedStatus").asText(); if (currentStatus.toString().equals(expectedStatus)) { // 狀態(tài)已經(jīng)是期望的狀態(tài),說(shuō)明消息已被處理 logger.info("Order {} already in expected status: {}", orderId, expectedStatus); return; } // 檢查是否可以從當(dāng)前狀態(tài)轉(zhuǎn)換到期望狀態(tài) if (canTransitionState(currentStatus, OrderStatus.valueOf(expectedStatus))) { // 執(zhí)行狀態(tài)轉(zhuǎn)換 orderService.updateOrderStatus(orderId, OrderStatus.valueOf(expectedStatus)); } else { // 狀態(tài)轉(zhuǎn)換不合法,可能需要執(zhí)行補(bǔ)償操作 performCompensatingActions(orderId, currentStatus, expectedStatus); } }
4. 異步處理與回調(diào):對(duì)于復(fù)雜的處理邏輯,可以采用異步處理模式:
// 異步處理死信事件 @Async @EventListener public CompletableFuture<Void> handleDeadLetterEventAsync(DeadLetterEvent event) { return CompletableFuture.runAsync(() -> { try { // 復(fù)雜的處理邏輯 processComplexDeadLetter(event); } catch (Exception e) { logger.error("Async processing failed", e); } }); }
4.4 優(yōu)缺點(diǎn)與適用場(chǎng)景
優(yōu)點(diǎn):
- 高度解耦,使死信處理邏輯與消息消費(fèi)邏輯分離
- 靈活的事件處理機(jī)制,可以基于各種條件路由事件
- 易于擴(kuò)展,可以添加新的事件處理器而不影響現(xiàn)有邏輯
- 適合復(fù)雜業(yè)務(wù)場(chǎng)景,可以實(shí)現(xiàn)細(xì)粒度的處理策略
缺點(diǎn):
- 事件處理流程可能變得復(fù)雜,難以追蹤
- 需要額外的事件發(fā)布和訂閱機(jī)制
- 可能導(dǎo)致事件風(fēng)暴,特別是在高并發(fā)場(chǎng)景
- 異步處理可能帶來(lái)一致性挑戰(zhàn)
適用場(chǎng)景:
- 復(fù)雜的業(yè)務(wù)系統(tǒng),需要針對(duì)不同類(lèi)型的死信采取不同策略
- 微服務(wù)架構(gòu),死信處理需要跨多個(gè)服務(wù)協(xié)調(diào)
- 需要高度定制化的死信處理流程
- 系統(tǒng)具有良好的事件驅(qū)動(dòng)架構(gòu)基礎(chǔ)
五、方案對(duì)比
處理方式 | 復(fù)雜度 | 靈活性 | 實(shí)時(shí)性 | 可靠性 | 適用場(chǎng)景 |
---|---|---|---|---|---|
原生消費(fèi)者處理 | 低 | 中 | 高 | 中 | 簡(jiǎn)單業(yè)務(wù)場(chǎng)景,需要直接處理死信 |
重試機(jī)制處理 | 中 | 高 | 高 | 高 | 臨時(shí)性錯(cuò)誤頻發(fā)的環(huán)境,需要自動(dòng)重試 |
重新入隊(duì)處理 | 高 | 高 | 中 | 高 | 需要人工干預(yù)或修復(fù)后重新處理的場(chǎng)景 |
事件驅(qū)動(dòng)處理 | 高 | 極高 | 中 | 高 | 復(fù)雜業(yè)務(wù)系統(tǒng),需要跨服務(wù)協(xié)調(diào)處理 |
六、總結(jié)
死信隊(duì)列是消息中間件系統(tǒng)中的重要安全網(wǎng),通過(guò)合理的處理策略,可以提高系統(tǒng)的可靠性和健壯性。
在實(shí)際應(yīng)用中,可能需要結(jié)合多種方式,構(gòu)建一個(gè)全面的死信處理框架。
一個(gè)設(shè)計(jì)良好的死信處理系統(tǒng)不僅能夠提高消息處理的可靠性,還能為問(wèn)題排查和系統(tǒng)監(jiān)控提供寶貴數(shù)據(jù)。
到此這篇關(guān)于SpringBoot處理死信隊(duì)列的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot處理死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring 定時(shí)任務(wù)@Scheduled 注解中的 Cron 表達(dá)式詳解
Cron 表達(dá)式是一種用于定義定時(shí)任務(wù)觸發(fā)時(shí)間的字符串表示形式,它由七個(gè)字段組成,分別表示秒、分鐘、小時(shí)、日期、月份、星期和年份,這篇文章主要介紹了Spring 定時(shí)任務(wù)@Scheduled 注解中的 Cron 表達(dá)式,需要的朋友可以參考下2023-07-07Mybatisplus實(shí)現(xiàn)JSON處理器的示例代碼
Mybatisplusjson是基于Mybatisplus開(kāi)發(fā)的一個(gè)json工具庫(kù),本文主要介紹了Mybatisplus實(shí)現(xiàn)JSON處理器的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03關(guān)于Java限流功能的簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要介紹了關(guān)于Java限流功能的簡(jiǎn)單實(shí)現(xiàn),在Java中,限流是一種常見(jiàn)的技術(shù)手段,用于控制系統(tǒng)的訪問(wèn)速率,以保護(hù)系統(tǒng)免受過(guò)載和濫用,需要的朋友可以參考下2023-07-07Maven?依賴沖突調(diào)解與版本控制問(wèn)題記錄
這篇文章主要介紹了Maven?依賴沖突調(diào)解與版本控制問(wèn)題記錄,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2025-04-04Java中的關(guān)鍵字_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
關(guān)鍵字也稱為保留字,是指Java語(yǔ)言中規(guī)定了特定含義的標(biāo)示符。對(duì)于保留字,用戶只能按照系統(tǒng)規(guī)定的方式使用,不能自行定義2017-04-04從lombok的val和var到JDK的var關(guān)鍵字方式
這篇文章主要介紹了從lombok的val和var到JDK的var關(guān)鍵字方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Springboot結(jié)合Mybatis-Plus實(shí)現(xiàn)業(yè)務(wù)撤銷(xiāo)回滾功能
本文介紹了如何在Springboot結(jié)合Mybatis-Plus實(shí)現(xiàn)業(yè)務(wù)撤銷(xiāo)回滾功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-12-12HashMap底層數(shù)據(jù)結(jié)構(gòu)詳細(xì)解析
這篇文章主要介紹了HashMap底層數(shù)據(jù)結(jié)構(gòu)詳細(xì)解析,HashMap作為開(kāi)發(fā)中常用的數(shù)據(jù)結(jié)構(gòu),也是面試中經(jīng)常被問(wèn)的知識(shí)點(diǎn),因此作為開(kāi)發(fā)者應(yīng)該盡可能多的理解其底層的數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2023-11-11Java中鍵盤(pán)輸入的幾種常見(jiàn)方式小結(jié)
本文主要介紹了Java中鍵盤(pán)輸入的幾種常見(jiàn)方式小結(jié),主要是三種方式IO流、Scanner類(lèi)、BufferedReader寫(xiě)入,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09