package com.zktec.app.store.data.websocket.business;

import android.support.annotation.NonNull;
import com.zktec.app.store.data.websocket.RxMoreObservables;
import com.zktec.app.store.data.websocket.business.model.in.PingMessage;
import com.zktec.app.store.data.websocket.business.model.in.RegisterMessage;
import com.zktec.app.store.data.websocket.business.model.in.SocketRequestDataMessage;
import com.zktec.app.store.data.websocket.business.model.out.RegisteredMessage;
import com.zktec.app.store.data.websocket.business.model.out.SocketResponseMessage;
import com.zktec.app.store.data.websocket.log.LoggingObservables;
import com.zktec.app.store.data.websocket.object.event.RxObjectEvent;
import com.zktec.app.store.data.websocket.object.event.RxObjectEventConn;
import com.zktec.app.store.data.websocket.object.event.RxObjectEventConnected;
import com.zktec.app.store.data.websocket.object.event.RxObjectEventDisconnected;
import com.zktec.app.store.data.websocket.object.event.RxObjectEventMessage;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;

/* loaded from: classes2.dex */
public class Socket {
    public static final Logger LOGGER = Logger.getLogger("Rx");
    private final BehaviorSubject<RxObjectEvent> connectedAndRegistered;
    private final Observable<Object> connection;
    private final Observable<RxObjectEvent> events;

    @NonNull
    private final Scheduler scheduler;
    private final Object lock = new Object();
    private int counter = 0;

    /* loaded from: classes2.dex */
    private static class FilterRegisterMessage implements Func1<RxObjectEventMessage, Boolean> {
        private FilterRegisterMessage() {
        }

        @Override // rx.functions.Func1
        public Boolean call(RxObjectEventMessage rxObjectEventMessage) {
            return Boolean.valueOf(rxObjectEventMessage.message() instanceof RegisteredMessage);
        }
    }

    /* loaded from: classes2.dex */
    private class FlatMapToRegisterMessage implements Func1<RxObjectEventConnected, Observable<Object>> {
        private FlatMapToRegisterMessage() {
        }

        @Override // rx.functions.Func1
        public Observable<Object> call(RxObjectEventConnected rxObjectEventConnected) {
            if (Socket.this.needToRegisterAfterConnection()) {
                System.out.println("send message for registering....");
                return Observable.just(Socket.this.createRegisterAuthMessage()).compose(RxMoreObservables.sendMessage(rxObjectEventConnected));
            }
            System.out.println("simulator send message for registering....");
            ((PublishSubject) Socket.this.events).onNext(new RxObjectEventMessage(rxObjectEventConnected.sender(), new RegisteredMessage()));
            return Observable.just(new Object());
        }
    }

    public Socket(@NonNull SocketConnection socketConnection, @NonNull Scheduler scheduler) {
        this.scheduler = scheduler;
        PublishSubject create = PublishSubject.create();
        this.connection = socketConnection.createConnection().lift(new OperatorDoOnNext(create)).lift(MoreObservables.ignoreNext()).compose(MoreObservables.behaviorRefCount());
        this.events = create;
        Observable map = create.compose(MoreObservables.filterAndMap(RxObjectEventMessage.class)).filter(new FilterRegisterMessage()).map(new Func1<RxObjectEventMessage, RxObjectEvent>() { // from class: com.zktec.app.store.data.websocket.business.Socket.1
            @Override // rx.functions.Func1
            public RxObjectEvent call(RxObjectEventMessage rxObjectEventMessage) {
                return rxObjectEventMessage;
            }
        });
        Observable map2 = create.compose(MoreObservables.filterAndMap(RxObjectEventDisconnected.class)).map(new Func1<RxObjectEventDisconnected, RxObjectEvent>() { // from class: com.zktec.app.store.data.websocket.business.Socket.2
            @Override // rx.functions.Func1
            public RxObjectEvent call(RxObjectEventDisconnected rxObjectEventDisconnected) {
                return rxObjectEventDisconnected;
            }
        });
        this.connectedAndRegistered = BehaviorSubject.create((RxObjectEvent) null);
        map2.mergeWith(map).subscribe(this.connectedAndRegistered);
        create.compose(MoreObservables.filterAndMap(RxObjectEventConnected.class)).lift(LoggingObservables.loggingLift(LOGGER, "ConnectedEvent")).flatMap(new FlatMapToRegisterMessage()).lift(LoggingObservables.loggingOnlyErrorLift(LOGGER, "SendRegisterEvent")).onErrorReturn(MoreObservables.throwableToIgnoreError()).subscribe();
        LOGGER.setLevel(Level.ALL);
        RxMoreObservables.logger.setLevel(Level.ALL);
        create.subscribe(LoggingObservables.logging(LOGGER, "Events"));
        this.connectedAndRegistered.subscribe(LoggingObservables.logging(LOGGER, "ConnectedAndRegistered"));
    }

    static /* synthetic */ Observable.Transformer access$300() {
        return isConnected();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RegisterMessage createRegisterAuthMessage() {
        return new RegisterMessage("asdf");
    }

    @NonNull
    private static Observable.Transformer<RxObjectEvent, RxObjectEventConn> isConnected() {
        return new Observable.Transformer<RxObjectEvent, RxObjectEventConn>() { // from class: com.zktec.app.store.data.websocket.business.Socket.12
            @Override // rx.functions.Func1
            public Observable<RxObjectEventConn> call(Observable<RxObjectEvent> observable) {
                return observable.filter(new Func1<RxObjectEvent, Boolean>() { // from class: com.zktec.app.store.data.websocket.business.Socket.12.2
                    @Override // rx.functions.Func1
                    public Boolean call(RxObjectEvent rxObjectEvent) {
                        return rxObjectEvent instanceof RxObjectEventConn;
                    }
                }).map(new Func1<RxObjectEvent, RxObjectEventConn>() { // from class: com.zktec.app.store.data.websocket.business.Socket.12.1
                    @Override // rx.functions.Func1
                    public RxObjectEventConn call(RxObjectEvent rxObjectEvent) {
                        return (RxObjectEventConn) rxObjectEvent;
                    }
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean needToRegisterAfterConnection() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NonNull
    public Observable<Object> requestData(final RxObjectEventConn rxObjectEventConn, final Func1<String, Observable<Object>> func1) {
        return nextId().flatMap(new Func1<String, Observable<Object>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.11
            @Override // rx.functions.Func1
            public Observable<Object> call(String str) {
                return ((Observable) func1.call(str)).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NonNull
    public Observable<Object> requestData(final boolean z, final int i, final RxObjectEventConn rxObjectEventConn, final Func1<String, Object> func1) {
        return nextId().flatMap(new Func1<String, Observable<Object>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.10
            @Override // rx.functions.Func1
            public Observable<Object> call(String str) {
                String str2 = null;
                final Object call = func1.call(str);
                if (call instanceof SocketRequestDataMessage) {
                    str2 = ((SocketRequestDataMessage) call).getRequestId();
                    if (str2 == null) {
                        str2 = ((SocketRequestDataMessage) call).getId();
                    }
                    if (str2 == null) {
                        str2 = str;
                    }
                }
                final String str3 = str2;
                Observable<Object> compose = Observable.defer(new Func0<Observable<Object>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.10.1
                    @Override // rx.functions.Func0, java.util.concurrent.Callable
                    public Observable<Object> call() {
                        return Observable.just(call);
                    }
                }).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
                return !z ? compose : Observable.combineLatest(Socket.this.events.compose(MoreObservables.filterAndMap(RxObjectEventMessage.class)).compose(RxObjectEventMessage.filterAndMap(SocketResponseMessage.class)).filter(new Func1<SocketResponseMessage, Boolean>() { // from class: com.zktec.app.store.data.websocket.business.Socket.10.2
                    @Override // rx.functions.Func1
                    public Boolean call(SocketResponseMessage socketResponseMessage) {
                        String str4 = socketResponseMessage.requestId;
                        if (str4 == null) {
                            str4 = socketResponseMessage.typeId;
                        }
                        if (str4 == null) {
                            return false;
                        }
                        return Boolean.valueOf(str4.equals(str3));
                    }
                }).first().timeout(i, TimeUnit.SECONDS, Socket.this.scheduler), compose, new Func2<SocketResponseMessage, Object, Object>() { // from class: com.zktec.app.store.data.websocket.business.Socket.10.3
                    @Override // rx.functions.Func2
                    public SocketResponseMessage call(SocketResponseMessage socketResponseMessage, Object obj) {
                        return socketResponseMessage;
                    }
                });
            }
        });
    }

    @NonNull
    private Observable<Object> requestData(boolean z, RxObjectEventConn rxObjectEventConn, Func1<String, Object> func1) {
        return requestData(z, 10, rxObjectEventConn, func1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean sendMessageAsPing() {
        return false;
    }

    public Observable<RxObjectEvent> connectedAndRegistered() {
        return this.connectedAndRegistered;
    }

    public Observable<Object> connection() {
        return this.connection;
    }

    public Observable<RxObjectEvent> events() {
        return this.events;
    }

    @NonNull
    public Observable<String> nextId() {
        return Observable.create(new Observable.OnSubscribe<String>() { // from class: com.zktec.app.store.data.websocket.business.Socket.7
            @Override // rx.functions.Action1
            public void call(Subscriber<? super String> subscriber) {
                int i;
                synchronized (Socket.this.lock) {
                    i = Socket.this.counter;
                    Socket.this.counter++;
                }
                subscriber.onNext(String.valueOf(i));
                subscriber.onCompleted();
            }
        });
    }

    @NonNull
    public Observable<Object> sendMessageOnceWhenConnected(final Func1<String, Observable<Object>> func1) {
        return this.connectedAndRegistered.compose(isConnected()).first().flatMap(new Func1<RxObjectEventConn, Observable<Object>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.9
            @Override // rx.functions.Func1
            public Observable<Object> call(RxObjectEventConn rxObjectEventConn) {
                return Socket.this.requestData(rxObjectEventConn, func1);
            }
        });
    }

    @NonNull
    public Observable<Object> sendMessageOnceWhenConnectedV2(final boolean z, final Func1<String, Object> func1) {
        return this.connectedAndRegistered.compose(isConnected()).first().flatMap(new Func1<RxObjectEventConn, Observable<Object>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.8
            @Override // rx.functions.Func1
            public Observable<Object> call(RxObjectEventConn rxObjectEventConn) {
                return Socket.this.requestData(z, 40, rxObjectEventConn, func1);
            }
        });
    }

    public Subscription sendPingInterval(int i) {
        return Observable.interval(i, TimeUnit.SECONDS, this.scheduler).flatMap(new Func1<Long, Observable<?>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.6
            @Override // rx.functions.Func1
            public Observable<?> call(Long l) {
                return Socket.this.connectedAndRegistered.compose(Socket.access$300()).first().flatMap(new Func1<RxObjectEventConn, Observable<?>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.6.1
                    @Override // rx.functions.Func1
                    public Observable<?> call(RxObjectEventConn rxObjectEventConn) {
                        return !Socket.this.sendMessageAsPing() ? RxMoreObservables.sendPing(rxObjectEventConn) : Observable.just(new PingMessage("be_sure_to_send")).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
                    }
                });
            }
        }).subscribe((Subscriber<? super R>) new Subscriber<Object>() { // from class: com.zktec.app.store.data.websocket.business.Socket.5
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
            }

            @Override // rx.Observer
            public void onNext(Object obj) {
            }
        });
    }

    public Subscription sendPingWhenConnected(int i) {
        return Observable.combineLatest(Observable.interval(i, TimeUnit.SECONDS, this.scheduler), this.connectedAndRegistered, new Func2<Long, RxObjectEvent, RxObjectEventConn>() { // from class: com.zktec.app.store.data.websocket.business.Socket.4
            @Override // rx.functions.Func2
            public RxObjectEventConn call(Long l, RxObjectEvent rxObjectEvent) {
                if (rxObjectEvent instanceof RxObjectEventConn) {
                    return (RxObjectEventConn) rxObjectEvent;
                }
                return null;
            }
        }).compose(isConnected()).flatMap(new Func1<RxObjectEventConn, Observable<?>>() { // from class: com.zktec.app.store.data.websocket.business.Socket.3
            @Override // rx.functions.Func1
            public Observable<?> call(RxObjectEventConn rxObjectEventConn) {
                return Observable.just(new PingMessage("send_only_when_connected")).compose(RxMoreObservables.sendMessage(rxObjectEventConn));
            }
        }).subscribe();
    }
}
