package com.volcengine.service.tls;

import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.producer.CallBack;
import com.volcengine.model.tls.producer.ProducerConfig;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: classes3.dex */
public class ProducerImpl implements Producer {
    private final AtomicInteger batchCount;
    private final LogDispatcher dispatcher;
    private final BatchHandler failHandler;
    private final Semaphore memoryLock;
    private final Mover mover;
    private final String name;
    private ProducerConfig producerConfig;
    private final RetryManager retryManager;
    private final BatchHandler successHandler;
    private static final Log LOG = LogFactory.getLog(ProducerImpl.class);
    private static final AtomicInteger INSTANCE_ID = new AtomicInteger(0);

    public ProducerImpl(ProducerConfig producerConfig) throws LogException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.batchCount = atomicInteger;
        this.producerConfig = producerConfig;
        String str = "TLS-" + INSTANCE_ID.incrementAndGet();
        this.name = str;
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        Semaphore semaphore = new Semaphore(producerConfig.getTotalSizeInBytes());
        this.memoryLock = semaphore;
        RetryManager retryManager = new RetryManager();
        this.retryManager = retryManager;
        LogDispatcher logDispatcher = new LogDispatcher(producerConfig, str, linkedBlockingQueue, linkedBlockingQueue2, semaphore, atomicInteger, retryManager);
        this.dispatcher = logDispatcher;
        this.successHandler = new BatchHandler("success batch handler-" + str, semaphore, linkedBlockingQueue, atomicInteger);
        this.failHandler = new BatchHandler("fail batch handler-" + str, semaphore, linkedBlockingQueue2, atomicInteger);
        this.mover = new Mover(str + "-mover", producerConfig, logDispatcher, retryManager, linkedBlockingQueue, linkedBlockingQueue2);
    }

    public static Producer defaultProducer(String str, String str2, String str3, String str4, String str5) throws LogException {
        return new ProducerImpl(new ProducerConfig(str, str2, str3, str4, str5));
    }

    @Override // com.volcengine.service.tls.Producer
    public void close() throws InterruptedException, LogException {
        this.dispatcher.close();
        this.successHandler.interrupt();
        this.failHandler.interrupt();
        this.mover.close();
        LOG.info(String.format("producer %s closed", this.name));
    }

    @Override // com.volcengine.service.tls.Producer
    public void closeNow() throws InterruptedException, LogException {
        this.dispatcher.closeNow();
        this.successHandler.close();
        this.failHandler.close();
        this.mover.close();
        LOG.info(String.format("producer %s closed now", this.name));
    }

    @Override // com.volcengine.service.tls.Producer
    public void config(ProducerConfig producerConfig) throws LogException {
        if (producerConfig != null) {
            this.producerConfig = producerConfig;
            producerConfig.validConfig();
            LOG.info(String.format("producer %s configured,config: %s", this.name, producerConfig));
        }
    }

    @Override // com.volcengine.service.tls.Producer
    public void resetAccessKeyToken(String str, String str2, String str3) throws LogException {
        if (StringUtils.isEmpty(str) || StringUtils.isEmpty(str2)) {
            throw new LogException("InvalidArgument", String.format("reset producer %s access key failed,accessKey is %s,secretKey is %s, token is %s", this.name, str, str2, str3), null);
        }
        this.dispatcher.resetAccessKeyToken(str, str2, str3);
    }

    @Override // com.volcengine.service.tls.Producer
    public void sendLog(String str, String str2, String str3, String str4, PutLogRequest.Log log, CallBack callBack) throws InterruptedException, LogException {
        if (str2 == null || log == null) {
            throw new LogException("InvalidArgument", String.format("topic id:%s,log:%s", str2, log), null);
        }
        sendLogGroup(str, str2, str3, str4, PutLogRequest.LogGroup.newBuilder().setFileName(str4).setSource(str3).addLogs(log).build(), callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    public void sendLogGroup(String str, String str2, String str3, String str4, PutLogRequest.LogGroup logGroup, CallBack callBack) throws InterruptedException, LogException {
        if (str2 == null || logGroup == null || logGroup.getLogsList() == null || logGroup.getLogsList().size() == 0) {
            throw new LogException("InvalidArgument", String.format("topic id:%s,log group:%s", str2, logGroup), null);
        }
        if (logGroup.getLogsList().size() > this.producerConfig.getMaxBatchCount()) {
            throw new LogException("InvalidArgument", String.format("log list size %d is  greater than threshold %d", Integer.valueOf(logGroup.getLogsList().size()), Integer.valueOf(this.producerConfig.getMaxBatchCount())), null);
        }
        this.dispatcher.addBatch(str, str2, str3, str4, logGroup, callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    public void start() throws LogException {
        this.producerConfig.validConfig();
        this.dispatcher.start();
        this.successHandler.start();
        this.failHandler.start();
        this.mover.start();
        LOG.info(String.format("producer %s started", this.name));
    }
}
