package com.edu.classroom.message.repo.datasource;

import androidx.lifecycle.MutableLiveData;
import com.edu.classroom.base.settings.ClassroomSettingsManager;
import com.edu.classroom.message.MsgFetchException;
import com.edu.classroom.message.MsgParseException;
import com.edu.classroom.message.MsgPersistException;
import com.edu.classroom.message.NoStatusMsgException;
import com.edu.classroom.message.repo.d.d;
import com.edu.classroom.message.repo.fetcher.PlaybackBoardFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackChatFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageNetworkFetcher;
import com.squareup.wire.ProtoReader;
import edu.classroom.channel.ChannelMessage;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.n;
import java.io.InputStream;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;
import javax.inject.Named;
import kotlin.collections.b0;
import kotlin.collections.r;
import kotlin.jvm.b.l;
import kotlin.jvm.internal.Ref$BooleanRef;
import kotlin.jvm.internal.t;
import okio.Okio;
import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;

/* loaded from: classes2.dex */
public final class PlaybackMessageDataSourceImpl implements com.edu.classroom.message.repo.datasource.a {

    @NotNull
    private final MutableLiveData<Boolean> a;

    @Inject
    public com.edu.classroom.message.repo.e.b.d b;

    @Inject
    public com.edu.classroom.message.repo.e.b.b c;

    @Inject
    public com.edu.classroom.message.repo.fetcher.a d;

    @Inject
    public com.edu.classroom.base.network.k e;

    @Inject
    public Set<com.edu.classroom.message.i> f;

    /* renamed from: g, reason: collision with root package name */
    private final kotlin.d f4710g;

    /* renamed from: h, reason: collision with root package name */
    private com.edu.classroom.message.repo.d.c f4711h;

    /* renamed from: i, reason: collision with root package name */
    private com.edu.classroom.message.repo.d.b f4712i;

    /* renamed from: j, reason: collision with root package name */
    private final String f4713j;

    /* renamed from: k, reason: collision with root package name */
    private final String f4714k;

    /* renamed from: l, reason: collision with root package name */
    private final PlaybackMessageNetworkFetcher f4715l;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class a<T, R> implements Function<InputStream, io.reactivex.e> {
        final /* synthetic */ com.edu.classroom.message.repo.e.c.b b;

        a(com.edu.classroom.message.repo.e.c.b bVar) {
            this.b = bVar;
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final io.reactivex.e apply(@NotNull InputStream it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.A(it, this.b, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class b<T, R> implements Function<InputStream, ObservableSource<? extends ChannelMessage>> {
        b() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final ObservableSource<? extends ChannelMessage> apply(@NotNull InputStream it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.B(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class c<T> implements Consumer<ChannelMessage> {
        c() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(ChannelMessage msg) {
            int p;
            Set<com.edu.classroom.message.i> z = PlaybackMessageDataSourceImpl.this.z();
            p = kotlin.collections.u.p(z, 10);
            ArrayList arrayList = new ArrayList(p);
            for (com.edu.classroom.message.i iVar : z) {
                kotlin.jvm.internal.t.f(msg, "msg");
                iVar.a(msg);
                arrayList.add(kotlin.t.a);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class d<T> implements io.reactivex.functions.n<ChannelMessage> {
        public static final d a = new d();

        d() {
        }

        @Override // io.reactivex.functions.n
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final boolean test(@NotNull ChannelMessage it) {
            kotlin.jvm.internal.t.g(it, "it");
            return (kotlin.jvm.internal.t.c(it.msg_type, "fsm_version") ^ true) && (kotlin.jvm.internal.t.c(it.msg_type, "user_state_version") ^ true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class e<T, R> implements Function<ChannelMessage, com.edu.classroom.message.repo.e.c.a> {
        e() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final com.edu.classroom.message.repo.e.c.a apply(@NotNull ChannelMessage it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.G(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class f<T, R> implements Function<List<com.edu.classroom.message.repo.e.c.a>, io.reactivex.e> {
        f() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final io.reactivex.e apply(@NotNull List<com.edu.classroom.message.repo.e.c.a> it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.D(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class g implements io.reactivex.functions.a {
        final /* synthetic */ com.edu.classroom.message.repo.e.c.b b;

        g(com.edu.classroom.message.repo.e.c.b bVar) {
            this.b = bVar;
        }

        @Override // io.reactivex.functions.a
        public final void run() {
            this.b.l(true);
            PlaybackMessageDataSourceImpl.this.H(this.b);
            PlaybackMessageDataSourceImpl.this.F(this.b);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class h<T, R> implements Function<InputStream, io.reactivex.e> {
        final /* synthetic */ com.edu.classroom.message.repo.e.c.b b;

        h(com.edu.classroom.message.repo.e.c.b bVar) {
            this.b = bVar;
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final io.reactivex.e apply(@NotNull InputStream it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.A(it, this.b, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class i<T, R> implements Function<InputStream, ObservableSource<? extends ChannelMessage>> {
        i() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final ObservableSource<? extends ChannelMessage> apply(@NotNull InputStream it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.B(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class j<T> implements Consumer<ChannelMessage> {
        j() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(ChannelMessage msg) {
            int p;
            Set<com.edu.classroom.message.i> z = PlaybackMessageDataSourceImpl.this.z();
            p = kotlin.collections.u.p(z, 10);
            ArrayList arrayList = new ArrayList(p);
            for (com.edu.classroom.message.i iVar : z) {
                kotlin.jvm.internal.t.f(msg, "msg");
                iVar.a(msg);
                arrayList.add(kotlin.t.a);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class k<T> implements io.reactivex.functions.n<ChannelMessage> {
        public static final k a = new k();

        k() {
        }

        @Override // io.reactivex.functions.n
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final boolean test(@NotNull ChannelMessage it) {
            kotlin.jvm.internal.t.g(it, "it");
            return (kotlin.jvm.internal.t.c(it.msg_type, "fsm_version") ^ true) && (kotlin.jvm.internal.t.c(it.msg_type, "user_state_version") ^ true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class l<T, R> implements Function<ChannelMessage, com.edu.classroom.message.repo.e.c.a> {
        l() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final com.edu.classroom.message.repo.e.c.a apply(@NotNull ChannelMessage it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.G(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class m<T> implements Consumer<com.edu.classroom.message.repo.e.c.a> {
        m() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(com.edu.classroom.message.repo.e.c.a aVar) {
            aVar.g(PlaybackMessageDataSourceImpl.this.f4714k);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class n<T, R> implements Function<List<com.edu.classroom.message.repo.e.c.a>, io.reactivex.e> {
        n() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final io.reactivex.e apply(@NotNull List<com.edu.classroom.message.repo.e.c.a> it) {
            kotlin.jvm.internal.t.g(it, "it");
            return PlaybackMessageDataSourceImpl.this.D(it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class o implements io.reactivex.functions.a {
        final /* synthetic */ com.edu.classroom.message.repo.e.c.b b;

        o(com.edu.classroom.message.repo.e.c.b bVar) {
            this.b = bVar;
        }

        @Override // io.reactivex.functions.a
        public final void run() {
            this.b.n(true);
            PlaybackMessageDataSourceImpl.this.H(this.b);
            PlaybackMessageDataSourceImpl.this.F(this.b);
        }
    }

    /* loaded from: classes2.dex */
    static final class p<T1, T2, R> implements io.reactivex.functions.c<List<? extends com.edu.classroom.x.g.c.a>, List<? extends com.edu.classroom.x.g.c.a>, List<? extends com.edu.classroom.x.g.c.a>> {
        public static final p a = new p();

        p() {
        }

        @Override // io.reactivex.functions.c
        @NotNull
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.x.g.c.a> apply(@NotNull List<? extends com.edu.classroom.x.g.c.a> t1, @NotNull List<? extends com.edu.classroom.x.g.c.a> t2) {
            List<com.edu.classroom.x.g.c.a> S;
            kotlin.jvm.internal.t.g(t1, "t1");
            kotlin.jvm.internal.t.g(t2, "t2");
            S = b0.S(t1, t2);
            return S;
        }
    }

    /* loaded from: classes2.dex */
    static final class q<T1, T2, R> implements io.reactivex.functions.c<List<? extends com.edu.classroom.x.g.c.a>, List<? extends com.edu.classroom.x.g.c.a>, List<? extends com.edu.classroom.x.g.c.a>> {
        public static final q a = new q();

        q() {
        }

        @Override // io.reactivex.functions.c
        @NotNull
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.x.g.c.a> apply(@NotNull List<? extends com.edu.classroom.x.g.c.a> t1, @NotNull List<? extends com.edu.classroom.x.g.c.a> t2) {
            List<com.edu.classroom.x.g.c.a> S;
            kotlin.jvm.internal.t.g(t1, "t1");
            kotlin.jvm.internal.t.g(t2, "t2");
            S = b0.S(t1, t2);
            return S;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class r<T> implements ObservableOnSubscribe<com.edu.classroom.x.g.c.a> {
        final /* synthetic */ com.edu.classroom.message.repo.d.a a;
        final /* synthetic */ long b;

        r(com.edu.classroom.message.repo.d.a aVar, long j2) {
            this.a = aVar;
            this.b = j2;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<com.edu.classroom.x.g.c.a> emitter) {
            kotlin.jvm.internal.t.g(emitter, "emitter");
            com.edu.classroom.x.g.c.a f = this.a.f();
            while (f != null && f.o() <= this.b) {
                com.edu.classroom.x.g.c.a g2 = this.a.g();
                kotlin.jvm.internal.t.e(g2);
                emitter.onNext(g2);
                f = this.a.f();
            }
            this.a.i(this.b);
            emitter.onComplete();
        }
    }

    /* loaded from: classes2.dex */
    static final class s implements io.reactivex.functions.a {
        s() {
        }

        @Override // io.reactivex.functions.a
        public final void run() {
            int p;
            Set<com.edu.classroom.message.i> z = PlaybackMessageDataSourceImpl.this.z();
            p = kotlin.collections.u.p(z, 10);
            ArrayList arrayList = new ArrayList(p);
            Iterator<T> it = z.iterator();
            while (it.hasNext()) {
                ((com.edu.classroom.message.i) it.next()).complete();
                arrayList.add(kotlin.t.a);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class t<T> implements ObservableOnSubscribe<ChannelMessage> {
        final /* synthetic */ InputStream a;

        t(InputStream inputStream) {
            this.a = inputStream;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<ChannelMessage> emitter) {
            kotlin.jvm.internal.t.g(emitter, "emitter");
            ProtoReader protoReader = new ProtoReader(Okio.buffer(Okio.source(this.a)));
            try {
                long beginMessage = protoReader.beginMessage();
                while (protoReader.nextTag() != -1) {
                    emitter.onNext(ChannelMessage.ADAPTER.decode(protoReader));
                }
                protoReader.endMessageAndGetUnknownFields(beginMessage);
                emitter.onComplete();
            } catch (Throwable th) {
                if (!(th instanceof ProtocolException)) {
                    throw new MsgFetchException(th);
                }
                throw new MsgParseException(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class u<V> implements Callable<List<? extends com.edu.classroom.message.repo.e.c.a>> {
        final /* synthetic */ List b;

        u(List list) {
            this.b = list;
        }

        @Override // java.util.concurrent.Callable
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.message.repo.e.c.a> call() {
            if (PlaybackMessageDataSourceImpl.this.w().b(this.b).length == this.b.size()) {
                return this.b;
            }
            throw new MsgPersistException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class v implements io.reactivex.functions.a {
        final /* synthetic */ List b;

        v(List list) {
            this.b = list;
        }

        @Override // io.reactivex.functions.a
        public final void run() {
            if (PlaybackMessageDataSourceImpl.this.w().b(this.b).length != this.b.size()) {
                throw new MsgPersistException();
            }
        }
    }

    /* loaded from: classes2.dex */
    static final class w<T> implements Consumer<List<? extends com.edu.classroom.message.repo.e.c.a>> {
        w() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(List<com.edu.classroom.message.repo.e.c.a> list) {
            PlaybackMessageDataSourceImpl.this.v().h(list.get(0).d());
            PlaybackMessageDataSourceImpl.h(PlaybackMessageDataSourceImpl.this).h(list.get(0).d());
            PlaybackMessageDataSourceImpl.f(PlaybackMessageDataSourceImpl.this).h(list.get(0).d());
        }
    }

    /* loaded from: classes2.dex */
    static final class x<T, R> implements Function<List<? extends com.edu.classroom.message.repo.e.c.a>, List<? extends com.edu.classroom.x.g.c.a>> {
        public static final x a = new x();

        x() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.x.g.c.a> apply(@NotNull List<com.edu.classroom.message.repo.e.c.a> list) {
            int p;
            kotlin.jvm.internal.t.g(list, "list");
            p = kotlin.collections.u.p(list, 10);
            ArrayList arrayList = new ArrayList(p);
            Iterator<T> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(com.edu.classroom.message.repo.fetcher.b.a((com.edu.classroom.message.repo.e.c.a) it.next()));
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static final class y<T> implements ObservableOnSubscribe<com.edu.classroom.message.repo.e.c.a> {
        final /* synthetic */ long b;

        y(long j2) {
            this.b = j2;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<com.edu.classroom.message.repo.e.c.a> emitter) {
            kotlin.jvm.internal.t.g(emitter, "emitter");
            com.edu.classroom.message.repo.e.c.a e = PlaybackMessageDataSourceImpl.this.w().e(PlaybackMessageDataSourceImpl.this.f4713j, "fsm", this.b);
            if (e == null) {
                throw new NoStatusMsgException(this.b);
            }
            emitter.onNext(e);
            com.edu.classroom.message.repo.e.c.a e2 = PlaybackMessageDataSourceImpl.this.w().e(PlaybackMessageDataSourceImpl.this.f4713j, "user_state", this.b);
            if (e2 != null) {
                emitter.onNext(e2);
            }
            emitter.onComplete();
        }
    }

    @Inject
    public PlaybackMessageDataSourceImpl(@Named("room_id") @NotNull String roomId, @Named("user_id") @NotNull String userId, @NotNull PlaybackMessageNetworkFetcher messageNetworkFetcher) {
        kotlin.d b2;
        kotlin.jvm.internal.t.g(roomId, "roomId");
        kotlin.jvm.internal.t.g(userId, "userId");
        kotlin.jvm.internal.t.g(messageNetworkFetcher, "messageNetworkFetcher");
        this.f4713j = roomId;
        this.f4714k = userId;
        this.f4715l = messageNetworkFetcher;
        MutableLiveData<Boolean> mutableLiveData = new MutableLiveData<>();
        mutableLiveData.setValue(Boolean.valueOf(!ClassroomSettingsManager.d.b().getClassroomPlaybackSettings().a()));
        kotlin.t tVar = kotlin.t.a;
        this.a = mutableLiveData;
        b2 = kotlin.g.b(new kotlin.jvm.b.a<com.edu.classroom.message.repo.d.d>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$cache$2
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // kotlin.jvm.b.a
            @NotNull
            public final d invoke() {
                return new d(PlaybackMessageDataSourceImpl.this.f4713j, PlaybackMessageDataSourceImpl.this.f4714k, PlaybackMessageDataSourceImpl.this.x(), new LinkedBlockingQueue());
            }
        });
        this.f4710g = b2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final io.reactivex.a A(final InputStream inputStream, final com.edu.classroom.message.repo.e.c.b bVar, final boolean z) {
        io.reactivex.a g2 = io.reactivex.a.g(new io.reactivex.d() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1

            /* loaded from: classes2.dex */
            static final class a<T> implements n<ChannelMessage> {
                public static final a a = new a();

                a() {
                }

                @Override // io.reactivex.functions.n
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final boolean test(@NotNull ChannelMessage it) {
                    t.g(it, "it");
                    return (t.c(it.msg_type, "fsm_version") ^ true) && (t.c(it.msg_type, "user_state_version") ^ true);
                }
            }

            /* loaded from: classes2.dex */
            static final class b<T, R> implements Function<ChannelMessage, com.edu.classroom.message.repo.e.c.a> {
                b() {
                }

                @Override // io.reactivex.functions.Function
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final com.edu.classroom.message.repo.e.c.a apply(@NotNull ChannelMessage it) {
                    t.g(it, "it");
                    return PlaybackMessageDataSourceImpl.this.G(it);
                }
            }

            /* loaded from: classes2.dex */
            static final class c<T, R> implements Function<List<com.edu.classroom.message.repo.e.c.a>, ObservableSource<? extends List<? extends com.edu.classroom.message.repo.e.c.a>>> {
                c() {
                }

                @Override // io.reactivex.functions.Function
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final ObservableSource<? extends List<com.edu.classroom.message.repo.e.c.a>> apply(@NotNull List<com.edu.classroom.message.repo.e.c.a> it) {
                    Observable C;
                    t.g(it, "it");
                    C = PlaybackMessageDataSourceImpl.this.C(it);
                    return C;
                }
            }

            /* loaded from: classes2.dex */
            static final class d<T> implements Consumer<List<? extends com.edu.classroom.message.repo.e.c.a>> {
                final /* synthetic */ Ref$BooleanRef b;
                final /* synthetic */ io.reactivex.b c;

                d(Ref$BooleanRef ref$BooleanRef, io.reactivex.b bVar) {
                    this.b = ref$BooleanRef;
                    this.c = bVar;
                }

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(List<com.edu.classroom.message.repo.e.c.a> it) {
                    if (this.b.element) {
                        return;
                    }
                    t.f(it, "it");
                    if (((com.edu.classroom.message.repo.e.c.a) r.G(it)).d() > bVar.h() + 60000) {
                        this.c.onComplete();
                        this.b.element = true;
                    }
                }
            }

            /* loaded from: classes2.dex */
            static final class e<T> implements Consumer<List<? extends com.edu.classroom.message.repo.e.c.a>> {
                public static final e a = new e();

                e() {
                }

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(List<com.edu.classroom.message.repo.e.c.a> list) {
                }
            }

            /* loaded from: classes2.dex */
            static final class f<T> implements Consumer<Throwable> {
                final /* synthetic */ Ref$BooleanRef a;
                final /* synthetic */ io.reactivex.b b;

                f(Ref$BooleanRef ref$BooleanRef, io.reactivex.b bVar) {
                    this.a = ref$BooleanRef;
                    this.b = bVar;
                }

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(Throwable th) {
                    com.edu.classroom.base.log.c.e$default(com.edu.classroom.playback.p.a.a, "playback messages download failed", th, null, 4, null);
                    if (this.a.element) {
                        return;
                    }
                    this.b.onError(th);
                }
            }

            @Override // io.reactivex.d
            public final void a(@NotNull final io.reactivex.b emitter) {
                t.g(emitter, "emitter");
                final Ref$BooleanRef ref$BooleanRef = new Ref$BooleanRef();
                ref$BooleanRef.element = false;
                Observable doOnNext = PlaybackMessageDataSourceImpl.this.B(inputStream).filter(a.a).map(new b()).buffer(50).concatMap(new c()).doOnNext(new d(ref$BooleanRef, emitter));
                t.f(doOnNext, "parseMessages(r).filter{…      }\n                }");
                com.edu.classroom.base.f.b.b(doOnNext, new l<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.5
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(1);
                    }

                    @Override // kotlin.jvm.b.l
                    public /* bridge */ /* synthetic */ kotlin.t invoke(Long l2) {
                        invoke(l2.longValue());
                        return kotlin.t.a;
                    }

                    public final void invoke(long j2) {
                        if (!ref$BooleanRef.element) {
                            emitter.onComplete();
                        }
                        PlaybackMessageDataSourceImpl$parseAndPersistMessages$1 playbackMessageDataSourceImpl$parseAndPersistMessages$1 = PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.this;
                        if (z) {
                            bVar.l(true);
                            com.edu.classroom.base.sdkmonitor.b.e(com.edu.classroom.base.sdkmonitor.b.a, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j2), null, 8, null);
                        } else {
                            bVar.n(true);
                            com.edu.classroom.base.sdkmonitor.b.e(com.edu.classroom.base.sdkmonitor.b.a, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j2), null, 8, null);
                        }
                        PlaybackMessageDataSourceImpl$parseAndPersistMessages$1 playbackMessageDataSourceImpl$parseAndPersistMessages$12 = PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.this;
                        PlaybackMessageDataSourceImpl.this.H(bVar);
                        PlaybackMessageDataSourceImpl$parseAndPersistMessages$1 playbackMessageDataSourceImpl$parseAndPersistMessages$13 = PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.this;
                        PlaybackMessageDataSourceImpl.this.F(bVar);
                        com.edu.classroom.base.log.c.i$default(com.edu.classroom.playback.p.a.a, "playback messages download finish completely: " + j2, null, 2, null);
                    }
                }).subscribe(e.a, new f(ref$BooleanRef, emitter));
            }
        });
        kotlin.jvm.internal.t.f(g2, "Completable.create { emi…)\n                }\n    }");
        return g2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Observable<ChannelMessage> B(InputStream inputStream) {
        Observable<ChannelMessage> create = Observable.create(new t(inputStream));
        kotlin.jvm.internal.t.f(create, "Observable.create<Channe…mitter.onComplete()\n    }");
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Observable<List<com.edu.classroom.message.repo.e.c.a>> C(List<com.edu.classroom.message.repo.e.c.a> list) {
        Observable<List<com.edu.classroom.message.repo.e.c.a>> fromCallable = Observable.fromCallable(new u(list));
        kotlin.jvm.internal.t.f(fromCallable, "Observable.fromCallable …()\n        messages\n    }");
        return fromCallable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final io.reactivex.a D(List<com.edu.classroom.message.repo.e.c.a> list) {
        io.reactivex.a n2 = io.reactivex.a.n(new v(list));
        kotlin.jvm.internal.t.f(n2, "Completable.fromAction {…gPersistException()\n    }");
        return n2;
    }

    private final Single<List<com.edu.classroom.message.repo.e.c.a>> E(long j2) {
        com.edu.classroom.x.g.a.a.d("PlaybackMessageDataSourceImpl.queryLatestMessage ts=" + j2);
        Single<List<com.edu.classroom.message.repo.e.c.a>> list = Observable.create(new y(j2)).toList();
        kotlin.jvm.internal.t.f(list, "Observable.create<Messag…lete()\n        }.toList()");
        return list;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void F(com.edu.classroom.message.repo.e.c.b bVar) {
        if (bVar.f() || bVar.d()) {
            com.edu.classroom.message.repo.e.b.b bVar2 = this.c;
            if (bVar2 != null) {
                bVar2.d(bVar).w();
            } else {
                kotlin.jvm.internal.t.w("playbackInfoDao");
                throw null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final com.edu.classroom.message.repo.e.c.a G(ChannelMessage channelMessage) {
        long j2;
        try {
            String str = channelMessage.msg_id;
            kotlin.jvm.internal.t.f(str, "msg.msg_id");
            j2 = Long.parseLong(str);
        } catch (Throwable unused) {
            j2 = 0;
        }
        String msg_type = channelMessage.msg_type;
        kotlin.jvm.internal.t.f(msg_type, "msg_type");
        Long send_timestamp = channelMessage.send_timestamp;
        kotlin.jvm.internal.t.f(send_timestamp, "send_timestamp");
        long longValue = send_timestamp.longValue();
        String room_id = channelMessage.room_id;
        kotlin.jvm.internal.t.f(room_id, "room_id");
        return new com.edu.classroom.message.repo.e.c.a(j2, msg_type, longValue, room_id, channelMessage.payload.toByteArray(), null, 32, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void H(com.edu.classroom.message.repo.e.c.b bVar) {
        if (bVar.d()) {
            if (!bVar.f()) {
                if (!(bVar.g().length() == 0)) {
                    return;
                }
            }
            this.a.postValue(Boolean.TRUE);
        }
    }

    public static final /* synthetic */ com.edu.classroom.message.repo.d.b f(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        com.edu.classroom.message.repo.d.b bVar = playbackMessageDataSourceImpl.f4712i;
        if (bVar != null) {
            return bVar;
        }
        kotlin.jvm.internal.t.w("boardCache");
        throw null;
    }

    public static final /* synthetic */ com.edu.classroom.message.repo.d.c h(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        com.edu.classroom.message.repo.d.c cVar = playbackMessageDataSourceImpl.f4711h;
        if (cVar != null) {
            return cVar;
        }
        kotlin.jvm.internal.t.w("chatCache");
        throw null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final com.edu.classroom.message.repo.d.d v() {
        return (com.edu.classroom.message.repo.d.d) this.f4710g.getValue();
    }

    private final Single<List<com.edu.classroom.x.g.c.a>> y(long j2, com.edu.classroom.message.repo.d.a aVar) {
        Single<List<com.edu.classroom.x.g.c.a>> list = Observable.create(new r(aVar, j2)).toList();
        kotlin.jvm.internal.t.f(list, "Observable.create<Classr…lete()\n        }.toList()");
        return list;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    @NotNull
    public io.reactivex.a a(@NotNull com.edu.classroom.message.repo.e.c.b entity, @NotNull String url) {
        kotlin.jvm.internal.t.g(entity, "entity");
        kotlin.jvm.internal.t.g(url, "url");
        if (!entity.f() || !kotlin.jvm.internal.t.c(entity.g(), url)) {
            if (!(url.length() == 0)) {
                entity.o(url);
                return ClassroomSettingsManager.d.b().getClassroomPlaybackSettings().a() ? t(entity) : u(entity);
            }
        }
        H(entity);
        io.reactivex.a e2 = io.reactivex.a.e();
        kotlin.jvm.internal.t.f(e2, "Completable.complete()");
        return e2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    @NotNull
    public Single<List<com.edu.classroom.x.g.c.a>> b(long j2) {
        com.edu.classroom.base.log.c.i$default(com.edu.classroom.x.g.a.a, "prefetch message to cache: " + j2, null, 2, null);
        Single<List<com.edu.classroom.x.g.c.a>> map = com.edu.classroom.base.f.b.h(E(j2)).doOnSuccess(new w()).map(x.a);
        kotlin.jvm.internal.t.f(map, "queryLatestMessage(times…)\n            }\n        }");
        return map;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    @NotNull
    public io.reactivex.a c(@NotNull com.edu.classroom.message.repo.e.c.b entity, @NotNull String url) {
        kotlin.jvm.internal.t.g(entity, "entity");
        kotlin.jvm.internal.t.g(url, "url");
        if (!entity.d() || !kotlin.jvm.internal.t.c(entity.e(), url)) {
            entity.m(url);
            return ClassroomSettingsManager.d.b().getClassroomPlaybackSettings().a() ? r(entity) : s(entity);
        }
        H(entity);
        io.reactivex.a e2 = io.reactivex.a.e();
        kotlin.jvm.internal.t.f(e2, "Completable.complete()");
        return e2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    @NotNull
    public Single<List<com.edu.classroom.x.g.c.a>> d(long j2, long j3) {
        com.edu.classroom.x.g.a.a.d("getMessages start:" + j2 + " end:" + j3);
        Single<List<com.edu.classroom.x.g.c.a>> y2 = y(j3, v());
        com.edu.classroom.message.repo.d.c cVar = this.f4711h;
        if (cVar == null) {
            kotlin.jvm.internal.t.w("chatCache");
            throw null;
        }
        Single<R> zipWith = y2.zipWith(y(j3, cVar), p.a);
        com.edu.classroom.message.repo.d.b bVar = this.f4712i;
        if (bVar == null) {
            kotlin.jvm.internal.t.w("boardCache");
            throw null;
        }
        Single<List<com.edu.classroom.x.g.c.a>> zipWith2 = zipWith.zipWith(y(j3, bVar), q.a);
        kotlin.jvm.internal.t.f(zipWith2, "getMessagesFromCache(end…on { t1, t2 -> t1 + t2 })");
        return zipWith2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public void e(@NotNull List<com.edu.classroom.message.repo.f.b> chatInfoBlocks, @NotNull List<com.edu.classroom.message.repo.f.a> boardInfoBlocks) {
        kotlin.jvm.internal.t.g(chatInfoBlocks, "chatInfoBlocks");
        kotlin.jvm.internal.t.g(boardInfoBlocks, "boardInfoBlocks");
        com.edu.classroom.base.network.k kVar = this.e;
        if (kVar == null) {
            kotlin.jvm.internal.t.w("retrofit");
            throw null;
        }
        PlaybackChatFetcher playbackChatFetcher = new PlaybackChatFetcher(kVar, chatInfoBlocks);
        com.edu.classroom.base.network.k kVar2 = this.e;
        if (kVar2 == null) {
            kotlin.jvm.internal.t.w("retrofit");
            throw null;
        }
        PlaybackBoardFetcher playbackBoardFetcher = new PlaybackBoardFetcher(kVar2, boardInfoBlocks);
        this.f4711h = new com.edu.classroom.message.repo.d.c(playbackChatFetcher, new LinkedBlockingQueue());
        this.f4712i = new com.edu.classroom.message.repo.d.b(playbackBoardFetcher, new LinkedBlockingQueue());
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    @NotNull
    public io.reactivex.a onComplete() {
        io.reactivex.a n2 = io.reactivex.a.n(new s());
        kotlin.jvm.internal.t.f(n2, "Completable.fromAction {…p { it.complete() }\n    }");
        return n2;
    }

    @NotNull
    public final io.reactivex.a r(@NotNull com.edu.classroom.message.repo.e.c.b entity) {
        kotlin.jvm.internal.t.g(entity, "entity");
        io.reactivex.a z = this.f4715l.b(entity.e()).flatMapCompletable(new a(entity)).z(io.reactivex.schedulers.a.c());
        kotlin.jvm.internal.t.f(z, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return z;
    }

    @NotNull
    public final io.reactivex.a s(@NotNull com.edu.classroom.message.repo.e.c.b entity) {
        kotlin.jvm.internal.t.g(entity, "entity");
        io.reactivex.a i2 = this.f4715l.b(entity.e()).flatMapObservable(new b()).doOnNext(new c()).filter(d.a).map(new e()).buffer(50).flatMapCompletable(new f()).i(new g(entity));
        kotlin.jvm.internal.t.f(i2, "messageNetworkFetcher.fe…ty)\n                    }");
        io.reactivex.a z = com.edu.classroom.base.f.b.a(i2, new kotlin.jvm.b.l<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$7
            @Override // kotlin.jvm.b.l
            public /* bridge */ /* synthetic */ kotlin.t invoke(Long l2) {
                invoke(l2.longValue());
                return kotlin.t.a;
            }

            public final void invoke(long j2) {
                com.edu.classroom.base.sdkmonitor.b.e(com.edu.classroom.base.sdkmonitor.b.a, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j2), null, 8, null);
            }
        }).z(io.reactivex.schedulers.a.c());
        kotlin.jvm.internal.t.f(z, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return z;
    }

    @NotNull
    public final io.reactivex.a t(@NotNull com.edu.classroom.message.repo.e.c.b entity) {
        kotlin.jvm.internal.t.g(entity, "entity");
        io.reactivex.a z = this.f4715l.b(entity.g()).flatMapCompletable(new h(entity)).z(io.reactivex.schedulers.a.c());
        kotlin.jvm.internal.t.f(z, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return z;
    }

    @NotNull
    public final io.reactivex.a u(@NotNull com.edu.classroom.message.repo.e.c.b entity) {
        kotlin.jvm.internal.t.g(entity, "entity");
        io.reactivex.a i2 = this.f4715l.b(entity.g()).flatMapObservable(new i()).doOnNext(new j()).filter(k.a).map(new l()).doOnNext(new m()).buffer(50).flatMapCompletable(new n()).i(new o(entity));
        kotlin.jvm.internal.t.f(i2, "messageNetworkFetcher.fe…ty)\n                    }");
        io.reactivex.a z = com.edu.classroom.base.f.b.a(i2, new kotlin.jvm.b.l<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$8
            @Override // kotlin.jvm.b.l
            public /* bridge */ /* synthetic */ kotlin.t invoke(Long l2) {
                invoke(l2.longValue());
                return kotlin.t.a;
            }

            public final void invoke(long j2) {
                com.edu.classroom.base.sdkmonitor.b.e(com.edu.classroom.base.sdkmonitor.b.a, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j2), null, 8, null);
            }
        }).z(io.reactivex.schedulers.a.c());
        kotlin.jvm.internal.t.f(z, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return z;
    }

    @NotNull
    public final com.edu.classroom.message.repo.e.b.d w() {
        com.edu.classroom.message.repo.e.b.d dVar = this.b;
        if (dVar != null) {
            return dVar;
        }
        kotlin.jvm.internal.t.w("messageDao");
        throw null;
    }

    @NotNull
    public final com.edu.classroom.message.repo.fetcher.a x() {
        com.edu.classroom.message.repo.fetcher.a aVar = this.d;
        if (aVar != null) {
            return aVar;
        }
        kotlin.jvm.internal.t.w("messageDbFetcher");
        throw null;
    }

    @NotNull
    public final Set<com.edu.classroom.message.i> z() {
        Set<com.edu.classroom.message.i> set = this.f;
        if (set != null) {
            return set;
        }
        kotlin.jvm.internal.t.w("processors");
        throw null;
    }
}
