package com.boke.khaos.sdk.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.boke.khaos.sdk.exception.KhaosSDKException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    private Map<String, AbstractMQMsgProcessor> mqMsgProcessorServiceMap = new ConcurrentHashMap();

    private void consumeMsgForTag(String str, String str2, List<MessageExt> list) throws KhaosSDKException {
        MQMsgProcessor selectConsumeService = selectConsumeService(str, str2);
        if (selectConsumeService == null) {
            logger.error("根据Topic = {}和Tag = {} 没有找到对应的处理消息的服务", str, str2);
            throw new KhaosSDKException("没有找到对应的处理消息的服务");
        }
        logger.info("根据Topic = {} 和Tag = {} 路由到的服务为 = {}，开始调用处理消息", new Object[]{str, str2, selectConsumeService.getClass().getName()});
        MQConsumeResult handle = selectConsumeService.handle(str, str2, list);
        if (handle == null) {
            throw new KhaosSDKException("HANDLE_RESULT_NULL");
        }
        if (!handle.isSuccess()) {
            throw new KhaosSDKException(JSON.toJSONString(handle));
        }
        logger.info("消息处理成功 result = {} ", JSON.toJSONString(handle));
    }

    private MQMsgProcessor selectConsumeService(String str, String str2) {
        Iterator<Map.Entry<String, AbstractMQMsgProcessor>> it = this.mqMsgProcessorServiceMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, AbstractMQMsgProcessor> next = it.next();
            MQConsumeService mQConsumeService = (MQConsumeService) next.getValue().getClass().getAnnotation(MQConsumeService.class);
            if (mQConsumeService == null) {
                logger.error("消费者服务 = {} 上没有添加MQConsumeService注解", next.getValue().getClass().getName());
            } else {
                if (mQConsumeService.topic().equals(str)) {
                    String[] tags = mQConsumeService.tags();
                    if (!tags[0].equals("*") && !Arrays.asList(tags).contains(str2)) {
                    }
                    return next.getValue();
                }
                continue;
            }
        }
        return null;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            logger.info("接受到的消息为空，不处理，直接返回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        try {
            for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy(new Function() { // from class: com.boke.khaos.sdk.rocketmq.-$$Lambda$MQConsumeMsgListenerProcessor$GRRrsk8feXYQpHgJ_fKubY3sPto
                @Override // java.util.function.Function
                public final Object apply(Object obj) {
                    String topic;
                    topic = ((MessageExt) obj).getTopic();
                    return topic;
                }
            }))).entrySet()) {
                String str = (String) entry.getKey();
                for (Map.Entry entry2 : ((Map) ((List) entry.getValue()).stream().collect(Collectors.groupingBy(new Function() { // from class: com.boke.khaos.sdk.rocketmq.-$$Lambda$MQConsumeMsgListenerProcessor$t1ZDldpfr9_dshmtAldc1zfGjMI
                    @Override // java.util.function.Function
                    public final Object apply(Object obj) {
                        String tags;
                        tags = ((MessageExt) obj).getTags();
                        return tags;
                    }
                }))).entrySet()) {
                    consumeMsgForTag(str, (String) entry2.getKey(), (List) entry2.getValue());
                }
            }
            return consumeConcurrentlyStatus;
        } catch (Exception e) {
            logger.error("参数异常导致 处理消息失败 message = {}", e.getMessage());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public void registerMqMsgProcessor(AbstractMQMsgProcessor abstractMQMsgProcessor) {
        this.mqMsgProcessorServiceMap.put(abstractMQMsgProcessor.getClass().getName(), abstractMQMsgProcessor);
    }
}
