package dev.droidx.kit.bundle.rxjava;

import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes2.dex */
public class MultiAsyncObservable {
    private static String TAG = "devkit";
    Disposable disposable;
    int nextPidx = 0;
    volatile boolean taskComplete = false;
    final ArrayBlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public class EmitterObserable extends RxAsyncObservable<String> {
        EmitterObserable() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // dev.droidx.kit.bundle.rxjava.RxAsyncObservable, io.reactivex.Observable
        public void subscribeActual(Observer<? super String> observer) {
            super.subscribeActual(observer);
            try {
                notifyObservNext("emit-next");
                for (int i = 0; i < 10; i++) {
                    Log.i(MultiAsyncObservable.TAG, "Emitter-" + i + ": start");
                    MultiAsyncObservable.this.blockingDeque.put("item-" + i);
                    Log.i(MultiAsyncObservable.TAG, "Emitter-" + i + ": end");
                }
                MultiAsyncObservable.this.taskComplete = true;
                notifyObservComplete();
            } catch (Exception e) {
                notifyObservFailure(e);
            }
        }
    }

    /* loaded from: classes2.dex */
    interface MultiAsyncTaskAdapter<T> {
        void execute(T t);

        int getCount();

        T getItem(int i);

        void onPreExecute();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public class ProcessObserable extends RxAsyncObservable<String> {
        int pidx;

        public ProcessObserable() {
            this.pidx = 0;
            int i = MultiAsyncObservable.this.nextPidx;
            MultiAsyncObservable.this.nextPidx = i + 1;
            this.pidx = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // dev.droidx.kit.bundle.rxjava.RxAsyncObservable, io.reactivex.Observable
        public void subscribeActual(Observer<? super String> observer) {
            super.subscribeActual(observer);
            while (true) {
                try {
                    if (MultiAsyncObservable.this.taskComplete && MultiAsyncObservable.this.blockingDeque.isEmpty()) {
                        Log.i(MultiAsyncObservable.TAG, "Process(" + this.pidx + "): queue empty");
                        notifyObservComplete();
                        return;
                    }
                    String poll = MultiAsyncObservable.this.blockingDeque.poll(3L, TimeUnit.SECONDS);
                    if (poll != null) {
                        Log.i(MultiAsyncObservable.TAG, "Process(" + this.pidx + "): " + poll);
                        MultiAsyncObservable.this.sleepRandom();
                        Log.i(MultiAsyncObservable.TAG, "Process(" + this.pidx + "): " + poll + "end");
                    }
                    Thread.sleep(500L);
                } catch (Exception e) {
                    notifyObservFailure(e);
                    return;
                }
            }
        }
    }

    MultiAsyncObservable() {
    }

    protected void sleepRandom() {
        try {
            Thread.sleep((new Random().nextInt(4) + 1) * 1000);
        } catch (Exception unused) {
        }
    }

    public void startMultiAsyncTask() {
        Observable.just("").subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).flatMap(new Function<String, ObservableSource<String>>() { // from class: dev.droidx.kit.bundle.rxjava.MultiAsyncObservable.4
            @Override // io.reactivex.functions.Function
            public ObservableSource<String> apply(String str) throws Exception {
                return new EmitterObserable().subscribeOn(Schedulers.newThread());
            }
        }).observeOn(Schedulers.computation()).flatMap(new Function<String, ObservableSource<String>>() { // from class: dev.droidx.kit.bundle.rxjava.MultiAsyncObservable.3
            @Override // io.reactivex.functions.Function
            public ObservableSource<String> apply(String str) throws Exception {
                return Observable.create(new ObservableOnSubscribe<String>() { // from class: dev.droidx.kit.bundle.rxjava.MultiAsyncObservable.3.1
                    @Override // io.reactivex.ObservableOnSubscribe
                    public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                        for (int i = 0; i < 3; i++) {
                            observableEmitter.onNext("" + i);
                        }
                        observableEmitter.onComplete();
                    }
                }).subscribeOn(Schedulers.computation());
            }
        }).observeOn(Schedulers.computation()).flatMap(new Function<String, ObservableSource<String>>() { // from class: dev.droidx.kit.bundle.rxjava.MultiAsyncObservable.2
            @Override // io.reactivex.functions.Function
            public ObservableSource<String> apply(String str) throws Exception {
                return new ProcessObserable().subscribeOn(Schedulers.newThread());
            }
        }).subscribe(new Observer<String>() { // from class: dev.droidx.kit.bundle.rxjava.MultiAsyncObservable.1
            @Override // io.reactivex.Observer
            public void onComplete() {
                Log.i(MultiAsyncObservable.TAG, "testSchedulers.onComplete");
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
                Log.i(MultiAsyncObservable.TAG, "testSchedulers.onError");
            }

            @Override // io.reactivex.Observer
            public void onNext(String str) {
                Log.i(MultiAsyncObservable.TAG, "testSchedulers.onNext");
            }

            @Override // io.reactivex.Observer
            public void onSubscribe(Disposable disposable) {
                MultiAsyncObservable.this.disposable = disposable;
                Log.i(MultiAsyncObservable.TAG, "testSchedulers.onSubscribe");
            }
        });
    }
}
