package com.taobao.appfrm.command;

import com.taobao.appfrm.command.ICommand;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;

/* loaded from: classes25.dex */
public abstract class Command<Param> implements ICommand<Param> {
    private PublishSubject<Param> completedQueue;
    private PublishSubject<Error<Param>> errorQueue;
    private RxQueue<Param> executeQueue;
    private RxQueue<Param> inQueue;
    private BehaviorSubject<ICommand.State> stateSender;

    public Command() {
        this(10, 1);
    }

    public Command(int i, int i2) {
        this.completedQueue = PublishSubject.create();
        this.errorQueue = PublishSubject.create();
        this.stateSender = BehaviorSubject.create(ICommand.State.COMPLETED);
        this.inQueue = new RxQueue<>(i);
        this.executeQueue = new RxQueue<>(i2);
        this.inQueue.asRxObservalble().observeOn(Schedulers.computation()).subscribe((Action1<? super Param>) new Action1<Param>() { // from class: com.taobao.appfrm.command.Command.1
            @Override // rx.functions.Action1
            public void call(Param param) {
                Command.this.executeQueue.offer(param);
                Command.this.stateSender.onNext(Command.this.State());
                Command.this.innerExec(param);
            }
        });
        this.executeQueue.asRxObservalble().observeOn(Schedulers.computation()).subscribe((Action1<? super Param>) new Action1<Param>() { // from class: com.taobao.appfrm.command.Command.2
            @Override // rx.functions.Action1
            public void call(Param param) {
                Command.this.inQueue.remove(param);
            }
        });
        Observable.merge(this.completedQueue.asObservable(), this.errorQueue.asObservable().map(new Func1<Error<Param>, Param>() { // from class: com.taobao.appfrm.command.Command.4
            @Override // rx.functions.Func1
            public Param call(Error<Param> error) {
                return error.getParam();
            }
        })).onBackpressureBuffer().observeOn(Schedulers.io()).subscribe(new Action1<Param>() { // from class: com.taobao.appfrm.command.Command.3
            @Override // rx.functions.Action1
            public void call(Param param) {
                Command.this.executeQueue.remove(param);
                Command.this.stateSender.onNext(Command.this.State());
            }
        });
    }

    public static void main(String[] strArr) {
        Command<String> command = new Command<String>() { // from class: com.taobao.appfrm.command.Command.5
            @Override // com.taobao.appfrm.command.Command
            public void innerExec(String str) {
                signalError(Error.create(new RuntimeException("TTT"), str));
            }
        };
        Observable.merge(command.completedObservable(), command.errorObservable().map(new Func1<Error<String>, String>() { // from class: com.taobao.appfrm.command.Command.7
            @Override // rx.functions.Func1
            public String call(Error<String> error) {
                return error.getParam();
            }
        })).subscribe(new Action1<String>() { // from class: com.taobao.appfrm.command.Command.6
            @Override // rx.functions.Action1
            public void call(String str) {
                System.out.println(str + "...");
            }
        });
        command.stateObservable().subscribe(new Action1<ICommand.State>() { // from class: com.taobao.appfrm.command.Command.8
            @Override // rx.functions.Action1
            public void call(ICommand.State state) {
                System.out.println(state);
            }
        });
        command.exec("11");
        command.exec(null);
        command.exec("22");
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("wait");
    }

    public ICommand.State State() {
        return this.executeQueue.isEmpty() ? ICommand.State.COMPLETED : ICommand.State.EXECUTING;
    }

    public Observable<Param> completedObservable() {
        return this.completedQueue.asObservable();
    }

    public Observable<Error<Param>> errorObservable() {
        return this.errorQueue.asObservable();
    }

    @Override // com.taobao.appfrm.command.ICommand
    public final void exec(Param param) {
        if (this.inQueue.contains(param) || this.executeQueue.contains(param)) {
            return;
        }
        this.inQueue.offer(param);
    }

    public Observable<Param> executedObservable() {
        return this.executeQueue.asRxObservalble();
    }

    public abstract void innerExec(Param param);

    public void signalComplete(Param param) {
        this.completedQueue.onNext(param);
    }

    public void signalError(Error<Param> error) {
        this.errorQueue.onNext(error);
    }

    public Observable<ICommand.State> stateObservable() {
        return this.stateSender.distinctUntilChanged().asObservable();
    }
}
