package org.quincy.rock.comm.process;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.quincy.rock.comm.CommunicateException;
import org.quincy.rock.core.exception.BlcokingException;

/* loaded from: classes3.dex */
public class QueueMessageProcessService<K> extends MessageProcessService<K> {
    private int capacity;
    private QueueMessageProcessService<K>.Executor<QueueMessage<K>>[] executors;
    private int maxThreadCount;
    private ThreadPoolExecutor threadPool;
    private int timeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class Executor<T extends QueueMessage<K>> implements Runnable {
        private BlockingQueue<T> queue;
        private boolean stopped;
        private final int threadIndex;

        public Executor(int i, BlockingQueue<T> blockingQueue) {
            this.threadIndex = i;
            this.queue = blockingQueue;
        }

        private synchronized BlockingQueue<T> queue() {
            return this.queue;
        }

        public int count() {
            return this.queue.size();
        }

        public void put(T t) {
            try {
                if (!stopped() && queue().offer(t, QueueMessageProcessService.this.timeout, TimeUnit.SECONDS)) {
                    if (QueueMessageProcessService.this.recorder.canWrite()) {
                        QueueMessageProcessService.this.recorder.write("The message has been put on the queue:{0},term:{1}.", Integer.valueOf(this.threadIndex), t.terminalId());
                    }
                } else {
                    BlcokingException blcokingException = new BlcokingException("terminalId:" + t.terminalId());
                    QueueMessageProcessService.this.recorder.write(blcokingException, blcokingException.getMessage(), new Object[0]);
                    throw blcokingException;
                }
            } catch (InterruptedException e) {
                throw new CommunicateException(e.getMessage(), e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    T poll = queue().poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        QueueMessageProcessService.this.recorder.write("Retrieve the message from the queue:{0},term:{1}.", Integer.valueOf(this.threadIndex), poll.terminalId());
                        QueueMessageProcessService.this.processQueueMessage(poll);
                    }
                    if (stopped() && this.queue.isEmpty()) {
                        QueueMessageProcessService.this.recorder.write("Quit the queue:{0}.", Integer.valueOf(this.threadIndex));
                        QueueMessageProcessService.this.executors[this.threadIndex] = null;
                        return;
                    }
                } catch (Exception e) {
                    QueueMessageProcessService.this.recorder.write(e, e.getMessage(), new Object[0]);
                }
            }
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean stopped() {
            return this.stopped;
        }

        public int waitSeconds() {
            T peek = queue().peek();
            if (peek == null) {
                return 0;
            }
            return (int) ((System.currentTimeMillis() - peek.timestamp()) / 1000);
        }
    }

    public QueueMessageProcessService() {
        this(2, 1024);
    }

    public QueueMessageProcessService(int i, int i2) {
        this.timeout = 10;
        this.executors = new Executor[512];
        init(i, i2);
    }

    private synchronized QueueMessageProcessService<K>.Executor<QueueMessage<K>> getValidExecutor(int i) {
        if (this.executors[i] == null) {
            this.executors[i] = new Executor<>(i, createBlockingQueue(this.capacity));
            this.threadPool.submit(this.executors[i]);
        }
        return this.executors[i];
    }

    private void init(int i, int i2) {
        QueueMessageProcessService<K>.Executor<QueueMessage<K>>[] executorArr = this.executors;
        this.threadPool = new ThreadPoolExecutor(executorArr.length, executorArr.length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.maxThreadCount = Math.min(i, this.executors.length);
        this.capacity = i2;
    }

    public final int activeThreadCount() {
        return this.threadPool.getActiveCount();
    }

    protected int assignThreadExecutorIndex(Object obj, int i) {
        return Math.abs(obj instanceof Number ? ((Number) obj).intValue() : obj.toString().hashCode()) % i;
    }

    @Override // org.quincy.rock.comm.process.MessageProcessService
    protected void awaitTermination() throws Exception {
        int i = 0;
        while (true) {
            QueueMessageProcessService<K>.Executor<QueueMessage<K>>[] executorArr = this.executors;
            if (i >= executorArr.length) {
                this.threadPool.shutdown();
                this.threadPool.awaitTermination(30L, TimeUnit.MINUTES);
                return;
            } else {
                if (executorArr[i] != null) {
                    executorArr[i].stop();
                }
                i++;
            }
        }
    }

    public final int count(int i) {
        QueueMessageProcessService<K>.Executor<QueueMessage<K>> executor = this.executors[i];
        if (executor == null) {
            return 0;
        }
        return executor.count();
    }

    protected BlockingQueue<QueueMessage<K>> createBlockingQueue(int i) {
        return new ArrayBlockingQueue(i);
    }

    public final int getCapacity() {
        return this.capacity;
    }

    public final int getMaxThreadCount() {
        return this.maxThreadCount;
    }

    public final int getTimeout() {
        return this.timeout;
    }

    @Override // org.quincy.rock.comm.process.MessageProcessService
    protected final void handleArrivedMessage(QueueMessage<K> queueMessage) {
        getValidExecutor(assignThreadExecutorIndex(queueMessage.terminalId(), getMaxThreadCount())).put(queueMessage);
    }

    @Override // org.quincy.rock.comm.process.MessageProcessService
    public void reset() {
        destroy();
        init(this.maxThreadCount, this.capacity);
        start();
    }

    public final void setTimeout(int i) {
        this.timeout = i;
    }

    public final int waitSeconds(int i) {
        QueueMessageProcessService<K>.Executor<QueueMessage<K>> executor = this.executors[i];
        if (executor == null) {
            return 0;
        }
        return executor.waitSeconds();
    }
}
