package org.kaaproject.kaa.client.logging;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.LogTransport;
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.kaaproject.kaa.client.logging.future.RecordFuture;
import org.kaaproject.kaa.client.logging.memory.MemLogStorage;
import org.kaaproject.kaa.common.TransportType;
import org.kaaproject.kaa.common.endpoint.gen.LogDeliveryErrorCode;
import org.kaaproject.kaa.common.endpoint.gen.LogDeliveryStatus;
import org.kaaproject.kaa.common.endpoint.gen.LogEntry;
import org.kaaproject.kaa.common.endpoint.gen.LogSyncRequest;
import org.kaaproject.kaa.common.endpoint.gen.LogSyncResponse;
import org.kaaproject.kaa.common.endpoint.gen.SyncResponseResultType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes2.dex */
public abstract class AbstractLogCollector implements LogCollector, LogProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractLogCollector.class);
    private final KaaChannelManager channelManager;
    protected final ExecutorContext executorContext;
    private final FailoverManager failoverManager;
    private LogDeliveryListener logDeliveryListener;
    private final LogTransport transport;
    protected final Map<Integer, List<RecordFuture>> deliveryFuturesMap = new HashMap();
    protected final Map<Integer, BucketInfo> bucketInfoMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, Future<?>> timeouts = new ConcurrentHashMap<>();
    private final Object uploadCheckLock = new Object();
    private boolean uploadCheckInProgress = false;
    private LogUploadStrategy strategy = new DefaultLogUploadStrategy();
    protected LogStorage storage = new MemLogStorage();
    private final LogFailoverCommand controller = new DefaultLogUploadController();

    /* loaded from: classes2.dex */
    private class DefaultLogUploadController implements LogFailoverCommand {
        private DefaultLogUploadController() {
        }

        @Override // org.kaaproject.kaa.client.logging.LogFailoverCommand
        public void retryLogUpload() {
            AbstractLogCollector.this.uploadIfNeeded();
        }

        @Override // org.kaaproject.kaa.client.logging.LogFailoverCommand
        public void retryLogUpload(int i) {
            AbstractLogCollector.this.executorContext.getScheduledExecutor().schedule(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.DefaultLogUploadController.1
                @Override // java.lang.Runnable
                public void run() {
                    AbstractLogCollector.this.uploadIfNeeded();
                }
            }, i, TimeUnit.SECONDS);
        }

        @Override // org.kaaproject.kaa.client.logging.AccessPointCommand
        public void switchAccessPoint() {
            TransportConnectionInfo activeServer = AbstractLogCollector.this.channelManager.getActiveServer(TransportType.LOGGING);
            if (activeServer != null) {
                AbstractLogCollector.this.failoverManager.onServerFailed(activeServer, FailoverStatus.OPERATION_SERVERS_NA);
            } else {
                AbstractLogCollector.LOG.warn("Failed to switch Operation server. No channel is used for logging transport");
            }
        }
    }

    public AbstractLogCollector(LogTransport logTransport, ExecutorContext executorContext, KaaChannelManager kaaChannelManager, FailoverManager failoverManager) {
        this.channelManager = kaaChannelManager;
        this.transport = logTransport;
        this.executorContext = executorContext;
        this.failoverManager = failoverManager;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkDeliveryTimeout(final int i) {
        LOG.debug("Checking for a delivery timeout of the bucket with id: [{}] ", Integer.valueOf(i));
        Future<?> remove = this.timeouts.remove(Integer.valueOf(i));
        if (remove == null) {
            LOG.trace("No log delivery timeout for the bucket with id [{}] was detected", Integer.valueOf(i));
            return;
        }
        LOG.info("Log delivery timeout detected for the bucket with id: [{}]", Integer.valueOf(i));
        this.storage.rollbackBucket(i);
        final LogFailoverCommand logFailoverCommand = this.controller;
        this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.7
            @Override // java.lang.Runnable
            public void run() {
                AbstractLogCollector.this.strategy.onTimeout(logFailoverCommand);
            }
        });
        if (this.logDeliveryListener != null) {
            this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.8
                @Override // java.lang.Runnable
                public void run() {
                    AbstractLogCollector.this.logDeliveryListener.onLogDeliveryTimeout(AbstractLogCollector.this.bucketInfoMap.get(Integer.valueOf(i)));
                }
            });
        }
        remove.cancel(true);
    }

    private boolean isUploadAllowed() {
        if (this.timeouts.size() < this.strategy.getMaxParallelUploads()) {
            return true;
        }
        LOG.debug("Ignore log upload: too much pending requests {}, max allowed {}", Integer.valueOf(this.timeouts.size()), Integer.valueOf(this.strategy.getMaxParallelUploads()));
        return false;
    }

    private void processUploadDecision(LogUploadStrategyDecision logUploadStrategyDecision) {
        switch (logUploadStrategyDecision) {
            case UPLOAD:
                if (isUploadAllowed()) {
                    LOG.debug("Going to upload logs");
                    this.transport.sync();
                    return;
                }
                return;
            case NOOP:
                if (this.strategy.getUploadCheckPeriod() <= 0 || this.storage.getStatus().getRecordCount() <= 0) {
                    return;
                }
                scheduleUploadCheck();
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addDeliveryFuture(BucketInfo bucketInfo, RecordFuture recordFuture) {
        synchronized (this.deliveryFuturesMap) {
            List<RecordFuture> list = this.deliveryFuturesMap.get(Integer.valueOf(bucketInfo.getBucketId()));
            if (list == null) {
                list = new LinkedList<>();
                this.deliveryFuturesMap.put(Integer.valueOf(bucketInfo.getBucketId()), list);
            }
            list.add(recordFuture);
        }
    }

    @Override // org.kaaproject.kaa.client.logging.LogProcessor
    public void fillSyncRequest(LogSyncRequest logSyncRequest) {
        if (isUploadAllowed()) {
            LogBucket nextBucket = this.storage.getNextBucket();
            if (nextBucket == null || nextBucket.getRecords().isEmpty()) {
                LOG.trace("No logs to send");
                return;
            }
            List<LogRecord> records = nextBucket.getRecords();
            LOG.trace("Sending {} log records", Integer.valueOf(records.size()));
            LinkedList linkedList = new LinkedList();
            Iterator<LogRecord> it = records.iterator();
            while (it.hasNext()) {
                linkedList.add(new LogEntry(ByteBuffer.wrap(it.next().getData())));
            }
            logSyncRequest.setRequestId(Integer.valueOf(nextBucket.getBucketId()));
            logSyncRequest.setLogEntries(linkedList);
            final int bucketId = nextBucket.getBucketId();
            ScheduledFuture<?> schedule = this.executorContext.getScheduledExecutor().schedule(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.1
                @Override // java.lang.Runnable
                public void run() {
                    if (Thread.currentThread().isInterrupted()) {
                        AbstractLogCollector.LOG.debug("Timeout check worker for log bucket: {} was interrupted", Integer.valueOf(bucketId));
                    } else {
                        AbstractLogCollector.this.checkDeliveryTimeout(bucketId);
                    }
                }
            }, this.strategy.getTimeout(), TimeUnit.SECONDS);
            LOG.info("Adding following bucket id [{}] for timeout tracking", Integer.valueOf(nextBucket.getBucketId()));
            this.timeouts.put(Integer.valueOf(nextBucket.getBucketId()), schedule);
        }
    }

    protected void notifyDeliveryFuturesOnSuccess(BucketInfo bucketInfo, Long l) {
        synchronized (this.deliveryFuturesMap) {
            List<RecordFuture> list = this.deliveryFuturesMap.get(Integer.valueOf(bucketInfo.getBucketId()));
            if (list != null) {
                Iterator<RecordFuture> it = list.iterator();
                while (it.hasNext()) {
                    it.next().setValue(new RecordInfo(bucketInfo), l);
                }
                this.deliveryFuturesMap.remove(Integer.valueOf(bucketInfo.getBucketId()));
            }
        }
    }

    @Override // org.kaaproject.kaa.client.logging.LogProcessor
    public void onLogResponse(LogSyncResponse logSyncResponse) throws IOException {
        if (logSyncResponse.getDeliveryStatuses() != null) {
            boolean z = false;
            for (LogDeliveryStatus logDeliveryStatus : logSyncResponse.getDeliveryStatuses()) {
                int intValue = logDeliveryStatus.getRequestId().intValue();
                final BucketInfo bucketInfo = this.bucketInfoMap.get(Integer.valueOf(intValue));
                final long currentTimeMillis = System.currentTimeMillis();
                if (bucketInfo != null) {
                    this.bucketInfoMap.remove(Integer.valueOf(intValue));
                    if (logDeliveryStatus.getResult() == SyncResponseResultType.SUCCESS) {
                        this.storage.removeBucket(logDeliveryStatus.getRequestId().intValue());
                        if (this.logDeliveryListener != null) {
                            this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.2
                                @Override // java.lang.Runnable
                                public void run() {
                                    AbstractLogCollector.this.logDeliveryListener.onLogDeliverySuccess(bucketInfo);
                                }
                            });
                        }
                        this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.3
                            @Override // java.lang.Runnable
                            public void run() {
                                AbstractLogCollector.this.notifyDeliveryFuturesOnSuccess(bucketInfo, Long.valueOf(currentTimeMillis));
                            }
                        });
                    } else {
                        this.storage.rollbackBucket(logDeliveryStatus.getRequestId().intValue());
                        final LogDeliveryErrorCode errorCode = logDeliveryStatus.getErrorCode();
                        final LogFailoverCommand logFailoverCommand = this.controller;
                        this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.4
                            @Override // java.lang.Runnable
                            public void run() {
                                AbstractLogCollector.this.strategy.onFailure(logFailoverCommand, errorCode);
                            }
                        });
                        if (this.logDeliveryListener != null) {
                            this.executorContext.getCallbackExecutor().execute(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.5
                                @Override // java.lang.Runnable
                                public void run() {
                                    AbstractLogCollector.this.logDeliveryListener.onLogDeliveryFailure(bucketInfo);
                                }
                            });
                        }
                        z = true;
                    }
                } else {
                    LOG.warn("BucketInfo is null");
                }
                LOG.info("Removing bucket id from timeouts: {}", logDeliveryStatus.getRequestId());
                Future<?> remove = this.timeouts.remove(logDeliveryStatus.getRequestId());
                if (remove != null) {
                    remove.cancel(true);
                } else {
                    LOG.warn("TimeoutFuture is null and cannot be canceled");
                }
            }
            if (z) {
                return;
            }
            processUploadDecision(this.strategy.isUploadNeeded(this.storage.getStatus()));
        }
    }

    protected void scheduleUploadCheck() {
        LOG.trace("Attempt to execute upload check: {}", Boolean.valueOf(this.uploadCheckInProgress));
        synchronized (this.uploadCheckLock) {
            if (this.uploadCheckInProgress) {
                LOG.trace("Upload check is already scheduled!");
            } else {
                LOG.trace("Scheduling upload check with timeout: {}", Integer.valueOf(this.strategy.getUploadCheckPeriod()));
                this.uploadCheckInProgress = true;
                this.executorContext.getScheduledExecutor().schedule(new Runnable() { // from class: org.kaaproject.kaa.client.logging.AbstractLogCollector.6
                    @Override // java.lang.Runnable
                    public void run() {
                        synchronized (AbstractLogCollector.this.uploadCheckLock) {
                            AbstractLogCollector.this.uploadCheckInProgress = false;
                        }
                        AbstractLogCollector.this.uploadIfNeeded();
                    }
                }, this.strategy.getUploadCheckPeriod(), TimeUnit.SECONDS);
            }
        }
    }

    @Override // org.kaaproject.kaa.client.logging.GenericLogCollector
    public void setLogDeliveryListener(LogDeliveryListener logDeliveryListener) {
        this.logDeliveryListener = logDeliveryListener;
    }

    @Override // org.kaaproject.kaa.client.logging.GenericLogCollector
    public void setStorage(LogStorage logStorage) {
        if (logStorage == null) {
            throw new IllegalArgumentException("Storage is null!");
        }
        this.storage = logStorage;
        LOG.info("New log storage was set {}", logStorage);
    }

    @Override // org.kaaproject.kaa.client.logging.GenericLogCollector
    public void setStrategy(LogUploadStrategy logUploadStrategy) {
        if (logUploadStrategy == null) {
            throw new IllegalArgumentException("Strategy is null!");
        }
        this.strategy = logUploadStrategy;
        LOG.info("New log upload strategy was set: {}", logUploadStrategy);
    }

    @Override // org.kaaproject.kaa.client.logging.GenericLogCollector
    public void stop() {
        LOG.debug("Closing storage");
        this.storage.close();
        LOG.debug("Clearing timeouts map");
        Iterator<Future<?>> it = this.timeouts.values().iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        this.timeouts.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void uploadIfNeeded() {
        processUploadDecision(this.strategy.isUploadNeeded(this.storage.getStatus()));
    }
}
