package com.sankuai.sjst.lmq.consumer;

import com.sankuai.ng.common.log.l;
import com.sankuai.sjst.lmq.base.processor.TimingProcessor;
import com.sankuai.sjst.lmq.common.constant.RMSConstant;
import com.sankuai.sjst.lmq.consumer.channel.WebSocketChannel;
import com.sankuai.sjst.lmq.consumer.channel.pike.PikeChannel;
import com.sankuai.sjst.lmq.consumer.env.ConsumerEnvironment;
import com.sankuai.sjst.lmq.consumer.env.IConsumerEnvironment;
import com.sankuai.sjst.lmq.consumer.handler.LmqHandler;
import com.sankuai.sjst.lmq.consumer.manager.AckManager;
import com.sankuai.sjst.lmq.consumer.manager.ChannelManager;
import com.sankuai.sjst.lmq.consumer.manager.HandlerManager;
import com.sankuai.sjst.lmq.consumer.manager.MessageDispatcher;
import com.sankuai.sjst.lmq.consumer.manager.dup.MessageManager;
import com.sankuai.sjst.lmq.consumer.processor.ActivePullTask;

/* loaded from: classes4.dex */
public class LmqConsumer {
    private AckManager ackManager;
    private ChannelManager channelManager;
    private ConsumerEnvironment env;
    private final HandlerManager handlerManager;
    private final MessageDispatcher messageDispatcher;
    private final MessageManager messageManager;
    private TimingProcessor timingProcessor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes4.dex */
    public static final class HOLDER {
        private static final LmqConsumer INSTANCE = new LmqConsumer();

        private HOLDER() {
        }
    }

    private LmqConsumer() {
        this.messageManager = new MessageManager();
        this.handlerManager = new HandlerManager();
        this.messageDispatcher = new MessageDispatcher(this.messageManager, this.handlerManager);
    }

    private void closeAckManager() {
        if (this.ackManager != null) {
            this.ackManager.close();
            this.ackManager = null;
        }
    }

    private void closeChannelManager() {
        if (this.channelManager != null) {
            this.channelManager.close();
            this.channelManager = null;
        }
    }

    private void closeDupMessageManager() {
        this.messageManager.close();
    }

    private void closeMessageDispatcher() {
        this.messageDispatcher.close();
    }

    private void closeTimingProcess() {
        if (this.timingProcessor != null) {
            this.timingProcessor.close();
            this.timingProcessor = null;
        }
    }

    public static LmqConsumer getInstance() {
        return HOLDER.INSTANCE;
    }

    private void initAckManager() {
        this.ackManager = new AckManager(this.env);
        this.messageDispatcher.setAckManager(this.ackManager);
    }

    private void initActivePullTask() {
        this.timingProcessor.addTask(new ActivePullTask(this.channelManager, this.env));
    }

    private void initChannelManager() throws Exception {
        this.channelManager = new ChannelManager();
        this.ackManager.setChannelManager(this.channelManager);
        this.messageDispatcher.setChannelManager(this.channelManager);
        initWebSocketChannel();
        initPikeChannel();
        this.channelManager.registerMessageListener(this.messageDispatcher.channelListener());
        this.channelManager.init(this.env);
    }

    private void initDupMessageManager() {
        this.messageManager.init(false);
    }

    private void initMessageDispatcher() {
        this.messageDispatcher.init(this.env);
    }

    private void initPikeChannel() {
        if (this.env.disablePike()) {
            l.c(RMSConstant.LOG_TAG, "Pike channel has been closed");
            return;
        }
        this.channelManager.registerChannel(new PikeChannel());
        l.c(RMSConstant.LOG_TAG, "Pike channel is starting");
    }

    private void initTimingProcessor() {
        this.timingProcessor = new TimingProcessor();
    }

    private void initWebSocketChannel() {
        if (this.env.disableWebSocket()) {
            l.c(RMSConstant.LOG_TAG, "WebSocket channel has been closed");
            return;
        }
        this.channelManager.registerChannel(new WebSocketChannel());
        l.c(RMSConstant.LOG_TAG, "WebSocket channel is starting");
    }

    private void startAckManager() {
        this.ackManager.start();
    }

    private void startTimingProcess() {
        this.timingProcessor.start();
    }

    public synchronized void close() {
        l.c(RMSConstant.LOG_TAG, "consumer is closing...");
        closeTimingProcess();
        closeAckManager();
        closeChannelManager();
        closeMessageDispatcher();
        closeDupMessageManager();
    }

    public synchronized void init(IConsumerEnvironment iConsumerEnvironment) throws Exception {
        this.env = new ConsumerEnvironment(iConsumerEnvironment);
        l.c(RMSConstant.LOG_TAG, "consumer is initializing, debug=" + iConsumerEnvironment.debug());
        initTimingProcessor();
        initDupMessageManager();
        initMessageDispatcher();
        initAckManager();
        initChannelManager();
        initActivePullTask();
        startAckManager();
        startTimingProcess();
        l.c(RMSConstant.LOG_TAG, "consumer start success");
    }

    public void registerHandler(String str, LmqHandler lmqHandler) {
        this.handlerManager.registerHandler(str, lmqHandler);
    }

    public void unregisterHandler(String str, LmqHandler lmqHandler) {
        this.handlerManager.unregisterHandler(str, lmqHandler);
    }
}
