package org.jetlang.channels;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jetlang.core.Callback;
import org.jetlang.core.Filter;
import org.jetlang.core.MessageBuffer;
import org.jetlang.core.MessageReader;
import org.jetlang.fibers.Fiber;

/* loaded from: classes2.dex */
public class RecyclingBatchSubscriber<T> extends BaseSubscription<T> {
    private MessageBuffer<T> _active;
    private final Runnable _flushRunnable;
    private final int _interval;
    private final Lock _lock;
    private MessageBuffer<T> _pending;
    private final Fiber _queue;
    private final Callback<MessageReader<T>> _receive;
    private final TimeUnit _timeUnit;

    public RecyclingBatchSubscriber(Fiber fiber, Callback<MessageReader<T>> callback, int i, TimeUnit timeUnit) {
        this(fiber, callback, null, i, timeUnit);
    }

    public RecyclingBatchSubscriber(Fiber fiber, Callback<MessageReader<T>> callback, Filter<T> filter, int i, TimeUnit timeUnit) {
        super(fiber, filter);
        this._lock = new ReentrantLock();
        this._pending = new MessageBuffer<>();
        this._active = new MessageBuffer<>();
        this._queue = fiber;
        this._receive = callback;
        this._interval = i;
        this._timeUnit = timeUnit;
        this._flushRunnable = new Runnable() { // from class: org.jetlang.channels.RecyclingBatchSubscriber.1
            @Override // java.lang.Runnable
            public void run() {
                RecyclingBatchSubscriber.this.flush();
            }

            public String toString() {
                return "Flushing " + RecyclingBatchSubscriber.this + " via " + RecyclingBatchSubscriber.this._receive.toString();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush() {
        this._lock.lock();
        try {
            MessageBuffer<T> messageBuffer = this._active;
            this._active = this._pending;
            this._pending = messageBuffer;
            try {
                this._receive.onMessage(this._active);
            } finally {
                this._active.clear();
            }
        } finally {
            this._lock.unlock();
        }
    }

    @Override // org.jetlang.channels.BaseSubscription
    protected void onMessageOnProducerThread(T t) {
        this._lock.lock();
        try {
            if (this._pending.isEmpty()) {
                this._queue.schedule(this._flushRunnable, this._interval, this._timeUnit);
            }
            this._pending.add(t);
        } finally {
            this._lock.unlock();
        }
    }
}
