package com.sankuai.sjst.lmq.consumer.manager;

import com.sankuai.ng.common.log.l;
import com.sankuai.sjst.lmq.base.utils.device.AsyncUtils;
import com.sankuai.sjst.lmq.common.bean.ConnectContext;
import com.sankuai.sjst.lmq.common.bean.PackMessage;
import com.sankuai.sjst.lmq.common.bean.TaskMessage;
import com.sankuai.sjst.lmq.common.bean.monitor.Metric;
import com.sankuai.sjst.lmq.common.constant.CommonConstant;
import com.sankuai.sjst.lmq.common.constant.RMSConstant;
import com.sankuai.sjst.lmq.consumer.env.ConsumerEnvironment;
import com.sankuai.sjst.lmq.consumer.handler.LmqHandler;
import com.sankuai.sjst.lmq.consumer.listener.ChannelListener;
import com.sankuai.sjst.lmq.consumer.manager.dup.MessageDealPolicy;
import com.sankuai.sjst.lmq.consumer.manager.dup.MessageManager;
import com.sankuai.sjst.lmq.consumer.monitor.MonitorManager;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

/* loaded from: classes4.dex */
public class MessageDispatcher {
    private static final int CAPACITY = 1000;
    private static final int CORE_POOL_SIZE = 5;
    private static final String DEFAULT_MESSAGE_TYPE = "ALL";
    private static final int MAX_WORKS = 10;
    private AckManager ackManager;
    private ChannelManager channelManager;
    private ConsumerEnvironment env;
    private ExecutorService executorService;
    private final HandlerManager handlerManager;
    private final MessageManager messageManager;

    /* loaded from: classes4.dex */
    private class ChannelListenerImpl implements ChannelListener {
        private ChannelListenerImpl() {
        }

        @Override // com.sankuai.sjst.lmq.consumer.listener.ChannelListener
        public void onMessages(PackMessage packMessage) {
            MessageDispatcher.this.refreshConnectContext(packMessage.getCtx());
            List filter = MessageDispatcher.this.filter(packMessage);
            if (filter.isEmpty()) {
                return;
            }
            List routerMessage = MessageDispatcher.this.routerMessage(filter);
            if (routerMessage.isEmpty()) {
                return;
            }
            MessageDispatcher.this.ack(routerMessage);
            MessageDispatcher.this.process((List<TaskMessage>) routerMessage);
        }
    }

    public MessageDispatcher(MessageManager messageManager, HandlerManager handlerManager) {
        this.messageManager = messageManager;
        this.handlerManager = handlerManager;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ack(List<TaskMessage> list) {
        Iterator<TaskMessage> it = list.iterator();
        while (it.hasNext()) {
            this.ackManager.ack(it.next(), CommonConstant.AckStatus.RECEIVED, "");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<TaskMessage> filter(PackMessage packMessage) {
        ArrayList arrayList = new ArrayList();
        if (packMessage.getMessageList() == null) {
            return arrayList;
        }
        for (TaskMessage taskMessage : packMessage.getMessageList()) {
            if (this.env.getGroupAccountId().equals(taskMessage.getMessage().getAccountId())) {
                arrayList.add(taskMessage);
            } else {
                l.c(RMSConstant.LOG_TAG, "account id doesn't match, accountId: " + taskMessage.getMessage().getAccountId() + " env: " + this.env.getGroupAccountId());
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$process$0(List list) {
        Iterator it = list.iterator();
        while (it.hasNext()) {
            process((TaskMessage) it.next());
        }
    }

    private void logConsume(String str, long j, int i) {
        MonitorManager.log(Metric.builder().action("LmqConsume").param("topic", str).param("handler", String.valueOf(i)).param("elapsed", String.valueOf(j)).build());
    }

    private void process(TaskMessage taskMessage) {
        int i;
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Set<LmqHandler> set = this.handlerManager.get(taskMessage.getMessage().getTopic());
            if (set != null) {
                i = set.size();
                process(taskMessage, set);
            } else {
                i = 0;
            }
            Set<LmqHandler> set2 = this.handlerManager.get("ALL");
            if (set2 != null) {
                process(taskMessage, set2);
            }
        } catch (Exception e) {
            i = -1;
            l.d(RMSConstant.LOG_TAG, "handle message error: " + taskMessage, e);
        }
        logConsume(taskMessage.getMessage().getTopic(), System.currentTimeMillis() - currentTimeMillis, i);
    }

    private void process(TaskMessage taskMessage, Set<LmqHandler> set) throws InterruptedException {
        Iterator<LmqHandler> it = set.iterator();
        while (it.hasNext()) {
            try {
                it.next().handle(taskMessage.getMessage());
            } catch (Exception e) {
                l.d(RMSConstant.LOG_TAG, "handle message error: " + taskMessage, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process(final List<TaskMessage> list) {
        this.executorService.submit(new Runnable(this, list) { // from class: com.sankuai.sjst.lmq.consumer.manager.MessageDispatcher$$Lambda$0
            private final MessageDispatcher arg$0;
            private final List arg$1;

            {
                this.arg$0 = this;
                this.arg$1 = list;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.arg$0.lambda$process$0(this.arg$1);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refreshConnectContext(ConnectContext connectContext) {
        this.channelManager.refreshConnectContext(connectContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<TaskMessage> routerMessage(List<TaskMessage> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        synchronized (this.messageManager) {
            for (TaskMessage taskMessage : list) {
                MessageDealPolicy checkAndMark = this.messageManager.checkAndMark(taskMessage.getMessage().getMessageId(), taskMessage.getTaskId());
                if (MessageDealPolicy.RECEIVED.equals(checkAndMark)) {
                    arrayList2.add(taskMessage);
                } else if (MessageDealPolicy.CONTINUE.equals(checkAndMark)) {
                    arrayList.add(taskMessage);
                } else {
                    l.b(RMSConstant.LOG_TAG, "message is being processed, messageId=" + taskMessage.getMessage().getMessageId() + ", taskId=" + taskMessage.getTaskId());
                }
            }
        }
        ack(arrayList2);
        return arrayList;
    }

    public ChannelListener channelListener() {
        return new ChannelListenerImpl();
    }

    public void close() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void init(ConsumerEnvironment consumerEnvironment) {
        this.env = consumerEnvironment;
        this.executorService = AsyncUtils.getThreadPool4Device(5, 10, 1000);
    }

    public void setAckManager(AckManager ackManager) {
        this.ackManager = ackManager;
    }

    public void setChannelManager(ChannelManager channelManager) {
        this.channelManager = channelManager;
    }
}
