package com.sankuai.sjst.lmq.consumer.manager;

import com.sankuai.ng.common.log.l;
import com.sankuai.sjst.lmq.base.processor.BaseProcessor;
import com.sankuai.sjst.lmq.base.queue.MessageQueue;
import com.sankuai.sjst.lmq.common.bean.AckMessage;
import com.sankuai.sjst.lmq.common.bean.PackAck;
import com.sankuai.sjst.lmq.common.bean.TaskMessage;
import com.sankuai.sjst.lmq.common.constant.CommonConstant;
import com.sankuai.sjst.lmq.common.constant.RMSConstant;
import com.sankuai.sjst.lmq.consumer.channel.BaseChannel;
import com.sankuai.sjst.lmq.consumer.env.ConsumerEnvironment;
import com.sankuai.sjst.lmq.consumer.monitor.MonitorManager;
import java.util.List;

/* loaded from: classes4.dex */
public class AckManager {
    private static final int BATCH_SIZE = 100;
    private static final int CAPACITY = 1000;
    private static final long DEFAULT_TIMEOUT = 10;
    private static final int MAX_METRIC_COLLECT_SIZE = 3;
    private ChannelManager channelManager;
    private final ConsumerEnvironment env;
    private final MessageQueue<AckMessage> ackQueue = new MessageQueue<>(100, 1000);
    private final AckProcessor ackProcessor = new AckProcessor();

    /* loaded from: classes4.dex */
    private class AckProcessor extends BaseProcessor {
        private AckProcessor() {
        }

        @Override // com.sankuai.sjst.lmq.base.processor.BaseProcessor
        protected void onRunning() throws Exception {
            List batchMessage = AckManager.this.ackQueue.batchMessage(10L);
            if (batchMessage.isEmpty()) {
                return;
            }
            AckManager.this.batchAck(batchMessage);
        }

        @Override // com.sankuai.sjst.lmq.base.processor.BaseProcessor
        protected void postRunning() {
            while (true) {
                List drainMessage = AckManager.this.ackQueue.drainMessage();
                if (drainMessage.isEmpty()) {
                    return;
                } else {
                    AckManager.this.batchAck(drainMessage);
                }
            }
        }
    }

    public AckManager(ConsumerEnvironment consumerEnvironment) {
        this.env = consumerEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void batchAck(List<AckMessage> list) {
        BaseChannel availableChannel = this.channelManager.getAvailableChannel();
        if (availableChannel == null || !availableChannel.available()) {
            l.d(RMSConstant.LOG_TAG, "ackMsg is unable to send, size: " + list.size());
            return;
        }
        PackAck packAck = new PackAck();
        packAck.setMessageList(list);
        fillMetrics(packAck);
        availableChannel.ack(packAck);
    }

    private void fillMetrics(PackAck packAck) {
        packAck.setMetrics(MonitorManager.collect(packAck.getMessageList().size() * 3));
    }

    public void ack(TaskMessage taskMessage, String str, String str2) {
        if (CommonConstant.AckPolicy.NO_ACK.equals(taskMessage.getAckPolicy())) {
            l.c(RMSConstant.LOG_TAG, "no need to ack. id=" + taskMessage.getMessage().getMessageId());
            return;
        }
        if (this.env.noAck()) {
            l.c(RMSConstant.LOG_TAG, "debug enable, don't reply ack.");
        } else {
            if (this.ackQueue.offer(new AckMessage(taskMessage.getMessage().getTopic(), taskMessage.getTaskId(), taskMessage.getMessage().getMessageId(), taskMessage.getMessage().getBizId(), taskMessage.getMessage().getVersion(), str, str2, taskMessage.getAckPolicy()))) {
                return;
            }
            l.d(RMSConstant.LOG_TAG, "ack queue is full, discard");
        }
    }

    public void close() {
        l.c(RMSConstant.LOG_TAG, "ackManager closing");
        this.ackProcessor.close();
        l.c(RMSConstant.LOG_TAG, "ackManager closed");
    }

    public void setChannelManager(ChannelManager channelManager) {
        this.channelManager = channelManager;
    }

    public void start() {
        this.ackProcessor.start();
    }
}
