package com.edu.classroom.message.repo.datasource;

import androidx.core.app.NotificationCompat;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
import com.edu.classroom.base.log.CommonLog;
import com.edu.classroom.base.network.IRetrofit;
import com.edu.classroom.base.sdkmonitor.ESDKMonitor;
import com.edu.classroom.base.settings.ClassroomSettingsManager;
import com.edu.classroom.channel.api.MessageLog;
import com.edu.classroom.message.MessageProcessor;
import com.edu.classroom.message.MsgFetchException;
import com.edu.classroom.message.MsgParseException;
import com.edu.classroom.message.MsgPersistException;
import com.edu.classroom.message.NoStatusMsgException;
import com.edu.classroom.message.repo.cache.MessageCache;
import com.edu.classroom.message.repo.cache.PlaybackBoardCache;
import com.edu.classroom.message.repo.cache.PlaybackChatCache;
import com.edu.classroom.message.repo.cache.PlaybackMessageCache;
import com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl;
import com.edu.classroom.message.repo.db.dao.PlaybackInfoDao;
import com.edu.classroom.message.repo.db.dao.PlaybackMessageDao;
import com.edu.classroom.message.repo.db.entity.MessageEntity;
import com.edu.classroom.message.repo.db.entity.PlaybackInfoEntity;
import com.edu.classroom.message.repo.fetcher.PlaybackBoardFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackChatFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageDbFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageNetworkFetcher;
import com.edu.classroom.message.repo.model.BoardInfoBlock;
import com.edu.classroom.message.repo.model.ChatInfoBlock;
import com.edu.classroom.playback.utils.PlaybackLog;
import com.meituan.robust.ChangeQuickRedirect;
import com.meituan.robust.PatchProxy;
import com.meituan.robust.PatchProxyResult;
import com.meizu.cloud.pushsdk.notification.model.AdvanceSetting;
import com.squareup.wire.ProtoReader;
import com.tt.xs.miniapphost.AppbrandHostConstants;
import edu.classroom.channel.ChannelMessage;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.CompletableSource;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.functions.Action;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import java.io.InputStream;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;
import javax.inject.Named;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Ref;
import okio.Okio;
import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;

@Metadata(bv = {1, 0, 3}, d1 = {"\u0000â\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\"\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\f\u0018\u00002\u00020\u0001B#\b\u0007\u0012\b\b\u0001\u0010\u0002\u001a\u00020\u0003\u0012\b\b\u0001\u0010\u0004\u001a\u00020\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0018\u0010K\u001a\u00020L2\u0006\u0010M\u001a\u00020N2\u0006\u0010O\u001a\u00020\u0003H\u0016J\u0018\u0010P\u001a\u00020L2\u0006\u0010M\u001a\u00020N2\u0006\u0010O\u001a\u00020\u0003H\u0016J\u0010\u0010Q\u001a\u00020L2\u0006\u0010M\u001a\u00020NH\u0002J\u000e\u0010R\u001a\u00020L2\u0006\u0010M\u001a\u00020NJ\u0010\u0010S\u001a\u00020L2\u0006\u0010M\u001a\u00020NH\u0002J\u000e\u0010T\u001a\u00020L2\u0006\u0010M\u001a\u00020NJ$\u0010U\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020X0W0V2\u0006\u0010Y\u001a\u00020Z2\u0006\u0010[\u001a\u00020ZH\u0016J$\u0010\\\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020X0W0V2\u0006\u0010[\u001a\u00020Z2\u0006\u0010]\u001a\u00020^H\u0002J$\u0010_\u001a\u00020`2\f\u0010a\u001a\b\u0012\u0004\u0012\u00020b0W2\f\u0010c\u001a\b\u0012\u0004\u0012\u00020d0WH\u0016J\b\u0010e\u001a\u00020LH\u0016J \u0010f\u001a\u00020L2\u0006\u0010g\u001a\u00020h2\u0006\u0010M\u001a\u00020N2\u0006\u0010i\u001a\u00020\nH\u0002J\u001e\u0010j\u001a\u0010\u0012\f\u0012\n m*\u0004\u0018\u00010l0l0k2\u0006\u0010n\u001a\u00020hH\u0002J0\u0010o\u001a\u001c\u0012\u0018\u0012\u0016\u0012\u0004\u0012\u00020p m*\n\u0012\u0004\u0012\u00020p\u0018\u00010W0W0k2\f\u0010q\u001a\b\u0012\u0004\u0012\u00020p0WH\u0002J\u0016\u0010r\u001a\u00020L2\f\u0010q\u001a\b\u0012\u0004\u0012\u00020p0WH\u0002J\u001c\u0010s\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020X0W0V2\u0006\u0010t\u001a\u00020ZH\u0016J\u001c\u0010u\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020p0W0V2\u0006\u0010v\u001a\u00020ZH\u0002J\b\u0010w\u001a\u00020`H\u0016J\u0010\u0010x\u001a\u00020`2\u0006\u0010M\u001a\u00020NH\u0002J\u0010\u0010y\u001a\u00020p2\u0006\u0010z\u001a\u00020lH\u0002J\u0010\u0010{\u001a\u00020`2\u0006\u0010M\u001a\u00020NH\u0002R\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n\u0000R\u000e\u0010\u000b\u001a\u00020\fX\u0082.¢\u0006\u0002\n\u0000R\u001e\u0010\r\u001a\u00020\u000e8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u000f\u0010\u0010\"\u0004\b\u0011\u0010\u0012R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082.¢\u0006\u0002\n\u0000R\u001e\u0010\u0015\u001a\u00020\u00168\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u0017\u0010\u0018\"\u0004\b\u0019\u0010\u001aR\u001b\u0010\u001b\u001a\u00020\u001c8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u001f\u0010 \u001a\u0004\b\u001d\u0010\u001eR\u001e\u0010!\u001a\u00020\"8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b#\u0010$\"\u0004\b%\u0010&R\u001e\u0010'\u001a\u00020(8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b)\u0010*\"\u0004\b+\u0010,R\u001e\u0010-\u001a\u00020.8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b/\u00100\"\u0004\b1\u00102R\u001a\u00103\u001a\b\u0012\u0004\u0012\u00020\n04X\u0096\u0004¢\u0006\b\n\u0000\u001a\u0004\b5\u00106R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n\u0000R\u001e\u00107\u001a\u0002088\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b9\u0010:\"\u0004\b;\u0010<R)\u0010=\u001a\r\u0012\t\u0012\u00070?¢\u0006\u0002\b@0>8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\bA\u0010B\"\u0004\bC\u0010DR\u001e\u0010E\u001a\u00020F8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\bG\u0010H\"\u0004\bI\u0010JR\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000¨\u0006|"}, d2 = {"Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSourceImpl;", "Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSource;", AppbrandHostConstants.SCHEMA_INSPECT.roomId, "", "userId", "messageNetworkFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;", "(Ljava/lang/String;Ljava/lang/String;Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;)V", "_messageLoadState", "Landroidx/lifecycle/MutableLiveData;", "", "boardCache", "Lcom/edu/classroom/message/repo/cache/PlaybackBoardCache;", "boardFetcherFactory", "Lcom/edu/classroom/message/repo/fetcher/PlaybackBoardFetcher$PlaybackBoardFetcherFactory;", "getBoardFetcherFactory", "()Lcom/edu/classroom/message/repo/fetcher/PlaybackBoardFetcher$PlaybackBoardFetcherFactory;", "setBoardFetcherFactory", "(Lcom/edu/classroom/message/repo/fetcher/PlaybackBoardFetcher$PlaybackBoardFetcherFactory;)V", "chatCache", "Lcom/edu/classroom/message/repo/cache/PlaybackChatCache;", "chatFetcherFactory", "Lcom/edu/classroom/message/repo/fetcher/PlaybackChatFetcher$PlaybackChatFetcherFactory;", "getChatFetcherFactory", "()Lcom/edu/classroom/message/repo/fetcher/PlaybackChatFetcher$PlaybackChatFetcherFactory;", "setChatFetcherFactory", "(Lcom/edu/classroom/message/repo/fetcher/PlaybackChatFetcher$PlaybackChatFetcherFactory;)V", "messageCache", "Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "getMessageCache", "()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "messageCache$delegate", "Lkotlin/Lazy;", "messageCacheFactory", "Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache$PlaybackMessageCacheFactory;", "getMessageCacheFactory", "()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache$PlaybackMessageCacheFactory;", "setMessageCacheFactory", "(Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache$PlaybackMessageCacheFactory;)V", "messageDao", "Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "getMessageDao", "()Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "setMessageDao", "(Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;)V", "messageDbFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "getMessageDbFetcher", "()Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "setMessageDbFetcher", "(Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;)V", "messageLoadState", "Landroidx/lifecycle/LiveData;", "getMessageLoadState", "()Landroidx/lifecycle/LiveData;", "playbackInfoDao", "Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;", "getPlaybackInfoDao", "()Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;", "setPlaybackInfoDao", "(Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;)V", "processors", "", "Lcom/edu/classroom/message/MessageProcessor;", "Lkotlin/jvm/JvmSuppressWildcards;", "getProcessors", "()Ljava/util/Set;", "setProcessors", "(Ljava/util/Set;)V", "retrofit", "Lcom/edu/classroom/base/network/IRetrofit;", "getRetrofit", "()Lcom/edu/classroom/base/network/IRetrofit;", "setRetrofit", "(Lcom/edu/classroom/base/network/IRetrofit;)V", "downloadRoomMessage", "Lio/reactivex/Completable;", "entity", "Lcom/edu/classroom/message/repo/db/entity/PlaybackInfoEntity;", "url", "downloadSelfMessage", "fetchRoomMessages", "fetchRoomMessagesOld", "fetchSelfMessages", "fetchSelfMessagesOld", "getMessages", "Lio/reactivex/Single;", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "start", "", "end", "getMessagesFromCache", "cache", "Lcom/edu/classroom/message/repo/cache/MessageCache;", "init", "", "chatInfoBlocks", "Lcom/edu/classroom/message/repo/model/ChatInfoBlock;", "boardInfoBlocks", "Lcom/edu/classroom/message/repo/model/BoardInfoBlock;", "onComplete", "parseAndPersistMessages", "r", "Ljava/io/InputStream;", "isFetchRoomMsg", "parseMessages", "Lio/reactivex/Observable;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "inputStream", "persistMessages", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "messages", "persistMessagesOld", "prefetchRecentMessageToCache", "timestamp", "queryRecentMessage", "ts", "release", "savePlaybackInfoEntity", "transformToMessageEntity", "msg", "updateMessageState", "message_release"}, k = 1, mv = {1, 4, 2})
/* renamed from: com.edu.classroom.message.repo.datasource.b, reason: from Kotlin metadata */
/* loaded from: classes6.dex */
public final class PlaybackMessageDataSourceImpl implements PlaybackMessageDataSource {

    /* renamed from: a, reason: collision with root package name */
    public static ChangeQuickRedirect f12334a;

    @Inject
    public PlaybackMessageDao b;

    @Inject
    public PlaybackInfoDao c;

    @Inject
    public PlaybackMessageDbFetcher d;

    @Inject
    public IRetrofit e;

    @Inject
    public Set<MessageProcessor> f;

    @Inject
    public PlaybackChatFetcher.a g;

    @Inject
    public PlaybackBoardFetcher.a h;

    @Inject
    public PlaybackMessageCache.a i;
    private final MutableLiveData<Boolean> j;

    @NotNull
    private final LiveData<Boolean> k;
    private PlaybackChatCache l;
    private PlaybackBoardCache m;
    private final Lazy n;
    private final String o;
    private final String p;
    private final PlaybackMessageNetworkFetcher q;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012\u0006\u0010\u0003\u001a\u00020\u0004H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "Lio/reactivex/CompletableSource;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ljava/io/InputStream;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$a */
    /* loaded from: classes6.dex */
    public static final class a<T, R> implements Function<InputStream, CompletableSource> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12335a;
        final /* synthetic */ PlaybackInfoEntity c;

        a(PlaybackInfoEntity playbackInfoEntity) {
            this.c = playbackInfoEntity;
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final CompletableSource apply(@NotNull InputStream it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12335a, false, 34313);
            if (proxy.isSupported) {
                return (CompletableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it, this.c, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a*\u0012\u000e\b\u0001\u0012\n \u0003*\u0004\u0018\u00010\u00020\u0002 \u0003*\u0014\u0012\u000e\b\u0001\u0012\n \u0003*\u0004\u0018\u00010\u00020\u0002\u0018\u00010\u00010\u00012\u0006\u0010\u0004\u001a\u00020\u0005H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "Lio/reactivex/ObservableSource;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ljava/io/InputStream;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$b */
    /* loaded from: classes6.dex */
    public static final class b<T, R> implements Function<InputStream, ObservableSource<? extends ChannelMessage>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12336a;

        b() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final ObservableSource<? extends ChannelMessage> apply(@NotNull InputStream it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12336a, false, 34314);
            if (proxy.isSupported) {
                return (ObservableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u000e\u0010\u0002\u001a\n \u0004*\u0004\u0018\u00010\u00030\u0003H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "", "msg", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "accept"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$c */
    /* loaded from: classes6.dex */
    public static final class c<T> implements Consumer<ChannelMessage> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12337a;

        c() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(ChannelMessage msg) {
            if (PatchProxy.proxy(new Object[]{msg}, this, f12337a, false, 34315).isSupported) {
                return;
            }
            Set<MessageProcessor> d = PlaybackMessageDataSourceImpl.this.d();
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(d, 10));
            for (MessageProcessor messageProcessor : d) {
                Intrinsics.checkNotNullExpressionValue(msg, "msg");
                messageProcessor.a(msg);
                arrayList.add(Unit.INSTANCE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\u00020\u00012\u0006\u0010\u0002\u001a\u00020\u0003H\n¢\u0006\u0002\b\u0004"}, d2 = {"<anonymous>", "", AdvanceSetting.NETWORK_TYPE, "Ledu/classroom/channel/ChannelMessage;", "test"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$d */
    /* loaded from: classes6.dex */
    public static final class d<T> implements Predicate<ChannelMessage> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12338a;
        public static final d b = new d();

        d() {
        }

        @Override // io.reactivex.functions.Predicate
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final boolean test(@NotNull ChannelMessage it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12338a, false, 34316);
            if (proxy.isSupported) {
                return ((Boolean) proxy.result).booleanValue();
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return (Intrinsics.areEqual(it.msg_type, "fsm_version") ^ true) && (Intrinsics.areEqual(it.msg_type, "user_state_version") ^ true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012\u0006\u0010\u0003\u001a\u00020\u0004H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ledu/classroom/channel/ChannelMessage;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$e */
    /* loaded from: classes6.dex */
    public static final class e<T, R> implements Function<ChannelMessage, MessageEntity> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12339a;

        e() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final MessageEntity apply(@NotNull ChannelMessage it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12339a, false, 34317);
            if (proxy.isSupported) {
                return (MessageEntity) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0018\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012(\u0010\u0003\u001a$\u0012\f\u0012\n \u0002*\u0004\u0018\u00010\u00050\u0005 \u0002*\u0010\u0012\f\u0012\n \u0002*\u0004\u0018\u00010\u00050\u00050\u00060\u0004H\n¢\u0006\u0002\b\u0007"}, d2 = {"<anonymous>", "Lio/reactivex/CompletableSource;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$f */
    /* loaded from: classes6.dex */
    public static final class f<T, R> implements Function<List<MessageEntity>, CompletableSource> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12340a;

        f() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final CompletableSource apply(@NotNull List<MessageEntity> it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12340a, false, 34318);
            if (proxy.isSupported) {
                return (CompletableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u0010\u0000\u001a\u00020\u0001H\n¢\u0006\u0002\b\u0002"}, d2 = {"<anonymous>", "", "run"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$g */
    /* loaded from: classes6.dex */
    public static final class g implements Action {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12341a;
        final /* synthetic */ PlaybackInfoEntity c;

        g(PlaybackInfoEntity playbackInfoEntity) {
            this.c = playbackInfoEntity;
        }

        @Override // io.reactivex.functions.Action
        public final void run() {
            if (PatchProxy.proxy(new Object[0], this, f12341a, false, 34319).isSupported) {
                return;
            }
            this.c.a(true);
            PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, this.c);
            PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, this.c);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012\u0006\u0010\u0003\u001a\u00020\u0004H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "Lio/reactivex/CompletableSource;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ljava/io/InputStream;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$h */
    /* loaded from: classes6.dex */
    public static final class h<T, R> implements Function<InputStream, CompletableSource> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12342a;
        final /* synthetic */ PlaybackInfoEntity c;

        h(PlaybackInfoEntity playbackInfoEntity) {
            this.c = playbackInfoEntity;
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final CompletableSource apply(@NotNull InputStream it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12342a, false, 34321);
            if (proxy.isSupported) {
                return (CompletableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it, this.c, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a*\u0012\u000e\b\u0001\u0012\n \u0003*\u0004\u0018\u00010\u00020\u0002 \u0003*\u0014\u0012\u000e\b\u0001\u0012\n \u0003*\u0004\u0018\u00010\u00020\u0002\u0018\u00010\u00010\u00012\u0006\u0010\u0004\u001a\u00020\u0005H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "Lio/reactivex/ObservableSource;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ljava/io/InputStream;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$i */
    /* loaded from: classes6.dex */
    public static final class i<T, R> implements Function<InputStream, ObservableSource<? extends ChannelMessage>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12343a;

        i() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final ObservableSource<? extends ChannelMessage> apply(@NotNull InputStream it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12343a, false, 34322);
            if (proxy.isSupported) {
                return (ObservableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u000e\u0010\u0002\u001a\n \u0004*\u0004\u0018\u00010\u00030\u0003H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "", "msg", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "accept"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$j */
    /* loaded from: classes6.dex */
    public static final class j<T> implements Consumer<ChannelMessage> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12344a;

        j() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(ChannelMessage msg) {
            if (PatchProxy.proxy(new Object[]{msg}, this, f12344a, false, 34323).isSupported) {
                return;
            }
            Set<MessageProcessor> d = PlaybackMessageDataSourceImpl.this.d();
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(d, 10));
            for (MessageProcessor messageProcessor : d) {
                Intrinsics.checkNotNullExpressionValue(msg, "msg");
                messageProcessor.a(msg);
                arrayList.add(Unit.INSTANCE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\u00020\u00012\u0006\u0010\u0002\u001a\u00020\u0003H\n¢\u0006\u0002\b\u0004"}, d2 = {"<anonymous>", "", AdvanceSetting.NETWORK_TYPE, "Ledu/classroom/channel/ChannelMessage;", "test"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$k */
    /* loaded from: classes6.dex */
    public static final class k<T> implements Predicate<ChannelMessage> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12345a;
        public static final k b = new k();

        k() {
        }

        @Override // io.reactivex.functions.Predicate
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final boolean test(@NotNull ChannelMessage it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12345a, false, 34324);
            if (proxy.isSupported) {
                return ((Boolean) proxy.result).booleanValue();
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return (Intrinsics.areEqual(it.msg_type, "fsm_version") ^ true) && (Intrinsics.areEqual(it.msg_type, "user_state_version") ^ true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012\u0006\u0010\u0003\u001a\u00020\u0004H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "Ledu/classroom/channel/ChannelMessage;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$l */
    /* loaded from: classes6.dex */
    public static final class l<T, R> implements Function<ChannelMessage, MessageEntity> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12346a;

        l() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final MessageEntity apply(@NotNull ChannelMessage it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12346a, false, 34325);
            if (proxy.isSupported) {
                return (MessageEntity) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0010\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u000e\u0010\u0002\u001a\n \u0004*\u0004\u0018\u00010\u00030\u0003H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "", AdvanceSetting.NETWORK_TYPE, "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", "accept"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$m */
    /* loaded from: classes6.dex */
    public static final class m<T> implements Consumer<MessageEntity> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12347a;

        m() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(MessageEntity messageEntity) {
            if (PatchProxy.proxy(new Object[]{messageEntity}, this, f12347a, false, 34326).isSupported) {
                return;
            }
            messageEntity.a(PlaybackMessageDataSourceImpl.this.p);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0018\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0000\u0010\u0000\u001a\n \u0002*\u0004\u0018\u00010\u00010\u00012(\u0010\u0003\u001a$\u0012\f\u0012\n \u0002*\u0004\u0018\u00010\u00050\u0005 \u0002*\u0010\u0012\f\u0012\n \u0002*\u0004\u0018\u00010\u00050\u00050\u00060\u0004H\n¢\u0006\u0002\b\u0007"}, d2 = {"<anonymous>", "Lio/reactivex/CompletableSource;", "kotlin.jvm.PlatformType", AdvanceSetting.NETWORK_TYPE, "", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$n */
    /* loaded from: classes6.dex */
    public static final class n<T, R> implements Function<List<MessageEntity>, CompletableSource> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12348a;

        n() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final CompletableSource apply(@NotNull List<MessageEntity> it) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12348a, false, 34327);
            if (proxy.isSupported) {
                return (CompletableSource) proxy.result;
            }
            Intrinsics.checkNotNullParameter(it, "it");
            return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u0010\u0000\u001a\u00020\u0001H\n¢\u0006\u0002\b\u0002"}, d2 = {"<anonymous>", "", "run"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$o */
    /* loaded from: classes6.dex */
    public static final class o implements Action {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12349a;
        final /* synthetic */ PlaybackInfoEntity c;

        o(PlaybackInfoEntity playbackInfoEntity) {
            this.c = playbackInfoEntity;
        }

        @Override // io.reactivex.functions.Action
        public final void run() {
            if (PatchProxy.proxy(new Object[0], this, f12349a, false, 34328).isSupported) {
                return;
            }
            this.c.b(true);
            PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, this.c);
            PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, this.c);
        }
    }

    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0003\u0010\u0000\u001a\b\u0012\u0004\u0012\u00020\u00020\u00012\f\u0010\u0003\u001a\b\u0012\u0004\u0012\u00020\u00020\u00012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00020\u0001H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "t1", "t2", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$p */
    /* loaded from: classes6.dex */
    static final class p<T1, T2, R> implements BiFunction<List<? extends com.edu.classroom.channel.api.b.a>, List<? extends com.edu.classroom.channel.api.b.a>, List<? extends com.edu.classroom.channel.api.b.a>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12350a;
        public static final p b = new p();

        p() {
        }

        @Override // io.reactivex.functions.BiFunction
        @NotNull
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.channel.api.b.a> apply(@NotNull List<? extends com.edu.classroom.channel.api.b.a> t1, @NotNull List<? extends com.edu.classroom.channel.api.b.a> t2) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{t1, t2}, this, f12350a, false, 34330);
            if (proxy.isSupported) {
                return (List) proxy.result;
            }
            Intrinsics.checkNotNullParameter(t1, "t1");
            Intrinsics.checkNotNullParameter(t2, "t2");
            return CollectionsKt.plus((Collection) t1, (Iterable) t2);
        }
    }

    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0003\u0010\u0000\u001a\b\u0012\u0004\u0012\u00020\u00020\u00012\f\u0010\u0003\u001a\b\u0012\u0004\u0012\u00020\u00020\u00012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00020\u0001H\n¢\u0006\u0002\b\u0005"}, d2 = {"<anonymous>", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "t1", "t2", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$q */
    /* loaded from: classes6.dex */
    static final class q<T1, T2, R> implements BiFunction<List<? extends com.edu.classroom.channel.api.b.a>, List<? extends com.edu.classroom.channel.api.b.a>, List<? extends com.edu.classroom.channel.api.b.a>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12351a;
        public static final q b = new q();

        q() {
        }

        @Override // io.reactivex.functions.BiFunction
        @NotNull
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.channel.api.b.a> apply(@NotNull List<? extends com.edu.classroom.channel.api.b.a> t1, @NotNull List<? extends com.edu.classroom.channel.api.b.a> t2) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{t1, t2}, this, f12351a, false, 34331);
            if (proxy.isSupported) {
                return (List) proxy.result;
            }
            Intrinsics.checkNotNullParameter(t1, "t1");
            Intrinsics.checkNotNullParameter(t2, "t2");
            return CollectionsKt.plus((Collection) t1, (Iterable) t2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u0014\u0010\u0002\u001a\u0010\u0012\f\u0012\n \u0005*\u0004\u0018\u00010\u00040\u00040\u0003H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "", "emitter", "Lio/reactivex/ObservableEmitter;", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "kotlin.jvm.PlatformType", "subscribe"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$r */
    /* loaded from: classes6.dex */
    public static final class r<T> implements ObservableOnSubscribe<com.edu.classroom.channel.api.b.a> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12352a;
        final /* synthetic */ MessageCache b;
        final /* synthetic */ long c;

        r(MessageCache messageCache, long j) {
            this.b = messageCache;
            this.c = j;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<com.edu.classroom.channel.api.b.a> emitter) {
            if (PatchProxy.proxy(new Object[]{emitter}, this, f12352a, false, 34332).isSupported) {
                return;
            }
            Intrinsics.checkNotNullParameter(emitter, "emitter");
            com.edu.classroom.channel.api.b.a c = this.b.c();
            while (c != null && c.m() <= this.c) {
                com.edu.classroom.channel.api.b.a b = this.b.b();
                Intrinsics.checkNotNull(b);
                emitter.onNext(b);
                c = this.b.c();
            }
            this.b.b(this.c);
            emitter.onComplete();
        }
    }

    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u0010\u0000\u001a\u00020\u0001H\n¢\u0006\u0002\b\u0002"}, d2 = {"<anonymous>", "", "run"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$s */
    /* loaded from: classes6.dex */
    static final class s implements Action {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12353a;

        s() {
        }

        @Override // io.reactivex.functions.Action
        public final void run() {
            if (PatchProxy.proxy(new Object[0], this, f12353a, false, 34334).isSupported) {
                return;
            }
            Set<MessageProcessor> d = PlaybackMessageDataSourceImpl.this.d();
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(d, 10));
            Iterator<T> it = d.iterator();
            while (it.hasNext()) {
                ((MessageProcessor) it.next()).a();
                arrayList.add(Unit.INSTANCE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\u00020\u00012\u0006\u0010\u0002\u001a\u00020\u0003H\n¢\u0006\u0002\b\u0004"}, d2 = {"<anonymous>", "", "emitter", "Lio/reactivex/CompletableEmitter;", "subscribe"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$t */
    /* loaded from: classes6.dex */
    public static final class t implements CompletableOnSubscribe {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12354a;
        final /* synthetic */ InputStream c;
        final /* synthetic */ boolean d;
        final /* synthetic */ PlaybackInfoEntity e;

        t(InputStream inputStream, boolean z, PlaybackInfoEntity playbackInfoEntity) {
            this.c = inputStream;
            this.d = z;
            this.e = playbackInfoEntity;
        }

        @Override // io.reactivex.CompletableOnSubscribe
        public final void subscribe(@NotNull final CompletableEmitter emitter) {
            if (PatchProxy.proxy(new Object[]{emitter}, this, f12354a, false, 34335).isSupported) {
                return;
            }
            Intrinsics.checkNotNullParameter(emitter, "emitter");
            final Ref.BooleanRef booleanRef = new Ref.BooleanRef();
            booleanRef.element = false;
            Observable b = PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, this.c).a((Predicate) new Predicate<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.b.t.1

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12355a;

                @Override // io.reactivex.functions.Predicate
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final boolean test(@NotNull ChannelMessage it) {
                    PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12355a, false, 34336);
                    if (proxy.isSupported) {
                        return ((Boolean) proxy.result).booleanValue();
                    }
                    Intrinsics.checkNotNullParameter(it, "it");
                    return (Intrinsics.areEqual(it.msg_type, "fsm_version") ^ true) && (Intrinsics.areEqual(it.msg_type, "user_state_version") ^ true);
                }
            }).b(Schedulers.b()).k(new Function<ChannelMessage, MessageEntity>() { // from class: com.edu.classroom.message.repo.datasource.b.t.2

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12356a;

                @Override // io.reactivex.functions.Function
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final MessageEntity apply(@NotNull ChannelMessage it) {
                    PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12356a, false, 34337);
                    if (proxy.isSupported) {
                        return (MessageEntity) proxy.result;
                    }
                    Intrinsics.checkNotNullParameter(it, "it");
                    return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, it);
                }
            }).b((Consumer) new Consumer<MessageEntity>() { // from class: com.edu.classroom.message.repo.datasource.b.t.3

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12357a;

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(MessageEntity messageEntity) {
                    if (PatchProxy.proxy(new Object[]{messageEntity}, this, f12357a, false, 34338).isSupported || t.this.d) {
                        return;
                    }
                    messageEntity.a(PlaybackMessageDataSourceImpl.this.p);
                }
            }).a(50).a((Function) new Function<List<MessageEntity>, ObservableSource<? extends List<? extends MessageEntity>>>() { // from class: com.edu.classroom.message.repo.datasource.b.t.4

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12358a;

                @Override // io.reactivex.functions.Function
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final ObservableSource<? extends List<MessageEntity>> apply(@NotNull List<MessageEntity> it) {
                    PatchProxyResult proxy = PatchProxy.proxy(new Object[]{it}, this, f12358a, false, 34339);
                    if (proxy.isSupported) {
                        return (ObservableSource) proxy.result;
                    }
                    Intrinsics.checkNotNullParameter(it, "it");
                    return PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, it);
                }
            }).b((Consumer) new Consumer<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.b.t.5

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12359a;

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(List<MessageEntity> it) {
                    if (PatchProxy.proxy(new Object[]{it}, this, f12359a, false, 34340).isSupported || booleanRef.element) {
                        return;
                    }
                    Intrinsics.checkNotNullExpressionValue(it, "it");
                    if (((MessageEntity) CollectionsKt.first((List) it)).getD() > t.this.e.getF() + 60000) {
                        emitter.onComplete();
                        booleanRef.element = true;
                    }
                }
            });
            Intrinsics.checkNotNullExpressionValue(b, "parseMessages(r).filter{…      }\n                }");
            com.edu.classroom.base.e.a.a(b, new Function1<Long, Unit>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1$6
                public static ChangeQuickRedirect changeQuickRedirect;

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(1);
                }

                @Override // kotlin.jvm.functions.Function1
                public /* synthetic */ Unit invoke(Long l) {
                    invoke(l.longValue());
                    return Unit.INSTANCE;
                }

                public final void invoke(long j) {
                    if (PatchProxy.proxy(new Object[]{new Long(j)}, this, changeQuickRedirect, false, 34341).isSupported) {
                        return;
                    }
                    if (!booleanRef.element) {
                        emitter.onComplete();
                    }
                    if (PlaybackMessageDataSourceImpl.t.this.d) {
                        PlaybackMessageDataSourceImpl.t.this.e.a(true);
                        ESDKMonitor.a(ESDKMonitor.b, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j), null, 8, null);
                    } else {
                        PlaybackMessageDataSourceImpl.t.this.e.b(true);
                        ESDKMonitor.a(ESDKMonitor.b, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j), null, 8, null);
                    }
                    PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, PlaybackMessageDataSourceImpl.t.this.e);
                    PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, PlaybackMessageDataSourceImpl.t.this.e);
                    CommonLog.i$default(PlaybackLog.f12790a, "playback messages download finish completely: " + j, null, 2, null);
                }
            }).a(new Consumer<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.b.t.6
                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(List<MessageEntity> list) {
                }
            }, new Consumer<Throwable>() { // from class: com.edu.classroom.message.repo.datasource.b.t.7

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f12361a;

                @Override // io.reactivex.functions.Consumer
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public final void accept(Throwable th) {
                    if (PatchProxy.proxy(new Object[]{th}, this, f12361a, false, 34342).isSupported) {
                        return;
                    }
                    CommonLog.e$default(PlaybackLog.f12790a, "playback messages download failed", th, null, 4, null);
                    if (Ref.BooleanRef.this.element) {
                        return;
                    }
                    emitter.onError(th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u0014\u0010\u0002\u001a\u0010\u0012\f\u0012\n \u0005*\u0004\u0018\u00010\u00040\u00040\u0003H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "", "emitter", "Lio/reactivex/ObservableEmitter;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "subscribe"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$u */
    /* loaded from: classes6.dex */
    public static final class u<T> implements ObservableOnSubscribe<ChannelMessage> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12362a;
        final /* synthetic */ InputStream b;

        u(InputStream inputStream) {
            this.b = inputStream;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<ChannelMessage> emitter) {
            if (PatchProxy.proxy(new Object[]{emitter}, this, f12362a, false, 34343).isSupported) {
                return;
            }
            Intrinsics.checkNotNullParameter(emitter, "emitter");
            ProtoReader protoReader = new ProtoReader(Okio.buffer(Okio.source(this.b)));
            try {
                long beginMessage = protoReader.beginMessage();
                while (protoReader.nextTag() != -1) {
                    emitter.onNext(ChannelMessage.ADAPTER.decode(protoReader));
                }
                protoReader.endMessageAndGetUnknownFields(beginMessage);
                emitter.onComplete();
            } catch (Throwable th) {
                if (!(th instanceof ProtocolException)) {
                    throw new MsgFetchException(th);
                }
                throw new MsgParseException(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u0016\u0012\u0004\u0012\u00020\u0002 \u0003*\n\u0012\u0004\u0012\u00020\u0002\u0018\u00010\u00010\u0001H\n¢\u0006\u0002\b\u0004"}, d2 = {"<anonymous>", "", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", NotificationCompat.CATEGORY_CALL}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$v */
    /* loaded from: classes6.dex */
    public static final class v<V> implements Callable<List<? extends MessageEntity>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12363a;
        final /* synthetic */ List c;

        v(List list) {
            this.c = list;
        }

        @Override // java.util.concurrent.Callable
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<MessageEntity> call() {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12363a, false, 34344);
            if (proxy.isSupported) {
                return (List) proxy.result;
            }
            if (PlaybackMessageDataSourceImpl.this.c().a(this.c).length == this.c.size()) {
                return this.c;
            }
            throw new MsgPersistException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u0010\u0000\u001a\u00020\u0001H\n¢\u0006\u0002\b\u0002"}, d2 = {"<anonymous>", "", "run"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$w */
    /* loaded from: classes6.dex */
    public static final class w implements Action {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12364a;
        final /* synthetic */ List c;

        w(List list) {
            this.c = list;
        }

        @Override // io.reactivex.functions.Action
        public final void run() {
            if (!PatchProxy.proxy(new Object[0], this, f12364a, false, 34345).isSupported && PlaybackMessageDataSourceImpl.this.c().a(this.c).length != this.c.size()) {
                throw new MsgPersistException();
            }
        }
    }

    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u001a\u0010\u0002\u001a\u0016\u0012\u0004\u0012\u00020\u0004 \u0005*\n\u0012\u0004\u0012\u00020\u0004\u0018\u00010\u00030\u0003H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "", AdvanceSetting.NETWORK_TYPE, "", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", "accept"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$x */
    /* loaded from: classes6.dex */
    static final class x<T> implements Consumer<List<? extends MessageEntity>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12365a;

        x() {
        }

        @Override // io.reactivex.functions.Consumer
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final void accept(List<MessageEntity> list) {
            if (PatchProxy.proxy(new Object[]{list}, this, f12365a, false, 34346).isSupported) {
                return;
            }
            PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this).c(list.get(0).getD());
            PlaybackMessageDataSourceImpl.c(PlaybackMessageDataSourceImpl.this).c(list.get(0).getD());
            PlaybackMessageDataSourceImpl.d(PlaybackMessageDataSourceImpl.this).c(list.get(0).getD());
        }
    }

    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\u0010\u0000\u001a\u0016\u0012\u0004\u0012\u00020\u0002 \u0003*\n\u0012\u0004\u0012\u00020\u0002\u0018\u00010\u00010\u00012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00050\u0001H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "kotlin.jvm.PlatformType", "list", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "apply"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$y */
    /* loaded from: classes6.dex */
    static final class y<T, R> implements Function<List<? extends MessageEntity>, List<? extends com.edu.classroom.channel.api.b.a>> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12366a;
        public static final y b = new y();

        y() {
        }

        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final List<com.edu.classroom.channel.api.b.a> apply(@NotNull List<MessageEntity> list) {
            PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f12366a, false, 34347);
            if (proxy.isSupported) {
                return (List) proxy.result;
            }
            Intrinsics.checkNotNullParameter(list, "list");
            List<MessageEntity> list2 = list;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
            Iterator<T> it = list2.iterator();
            while (it.hasNext()) {
                arrayList.add(com.edu.classroom.message.repo.fetcher.h.a((MessageEntity) it.next()));
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0014\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u00012\u0014\u0010\u0002\u001a\u0010\u0012\f\u0012\n \u0005*\u0004\u0018\u00010\u00040\u00040\u0003H\n¢\u0006\u0002\b\u0006"}, d2 = {"<anonymous>", "", "emitter", "Lio/reactivex/ObservableEmitter;", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "kotlin.jvm.PlatformType", "subscribe"}, k = 3, mv = {1, 4, 2})
    /* renamed from: com.edu.classroom.message.repo.datasource.b$z */
    /* loaded from: classes6.dex */
    public static final class z<T> implements ObservableOnSubscribe<MessageEntity> {

        /* renamed from: a, reason: collision with root package name */
        public static ChangeQuickRedirect f12367a;
        final /* synthetic */ long c;

        z(long j) {
            this.c = j;
        }

        @Override // io.reactivex.ObservableOnSubscribe
        public final void subscribe(@NotNull ObservableEmitter<MessageEntity> emitter) {
            if (PatchProxy.proxy(new Object[]{emitter}, this, f12367a, false, 34348).isSupported) {
                return;
            }
            Intrinsics.checkNotNullParameter(emitter, "emitter");
            MessageEntity a2 = PlaybackMessageDataSourceImpl.this.c().a(PlaybackMessageDataSourceImpl.this.o, "fsm", this.c);
            if (a2 != null) {
                emitter.onNext(a2);
            } else {
                MessageEntity b = PlaybackMessageDataSourceImpl.this.c().b(PlaybackMessageDataSourceImpl.this.o, "fsm", this.c);
                if (b == null) {
                    throw new NoStatusMsgException(this.c);
                }
                emitter.onNext(b);
            }
            MessageEntity a3 = PlaybackMessageDataSourceImpl.this.c().a(PlaybackMessageDataSourceImpl.this.o, "user_state", this.c);
            if (a3 != null) {
                emitter.onNext(a3);
            }
            MessageEntity a4 = PlaybackMessageDataSourceImpl.this.c().a(PlaybackMessageDataSourceImpl.this.o, "on_wall_photos", this.c);
            if (a4 != null) {
                emitter.onNext(a4);
            }
            emitter.onComplete();
        }
    }

    @Inject
    public PlaybackMessageDataSourceImpl(@Named @NotNull String roomId, @Named @NotNull String userId, @NotNull PlaybackMessageNetworkFetcher messageNetworkFetcher) {
        Intrinsics.checkNotNullParameter(roomId, "roomId");
        Intrinsics.checkNotNullParameter(userId, "userId");
        Intrinsics.checkNotNullParameter(messageNetworkFetcher, "messageNetworkFetcher");
        this.o = roomId;
        this.p = userId;
        this.q = messageNetworkFetcher;
        MutableLiveData<Boolean> mutableLiveData = new MutableLiveData<>();
        mutableLiveData.setValue(Boolean.valueOf(!ClassroomSettingsManager.b.b().getClassroomPlaybackSettings().getF10201a()));
        Unit unit = Unit.INSTANCE;
        this.j = mutableLiveData;
        this.k = this.j;
        this.n = LazyKt.lazy(new Function0<PlaybackMessageCache>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$messageCache$2
            public static ChangeQuickRedirect changeQuickRedirect;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // kotlin.jvm.functions.Function0
            @NotNull
            public final PlaybackMessageCache invoke() {
                PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, changeQuickRedirect, false, 34333);
                return proxy.isSupported ? (PlaybackMessageCache) proxy.result : PlaybackMessageDataSourceImpl.this.e().a(new LinkedBlockingQueue());
            }
        });
    }

    public static final /* synthetic */ MessageEntity a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, ChannelMessage channelMessage) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, channelMessage}, null, f12334a, true, 34305);
        return proxy.isSupported ? (MessageEntity) proxy.result : playbackMessageDataSourceImpl.a(channelMessage);
    }

    private final MessageEntity a(ChannelMessage channelMessage) {
        long j2;
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{channelMessage}, this, f12334a, false, 34295);
        if (proxy.isSupported) {
            return (MessageEntity) proxy.result;
        }
        try {
            String str = channelMessage.msg_id;
            Intrinsics.checkNotNullExpressionValue(str, "msg.msg_id");
            j2 = Long.parseLong(str);
        } catch (Throwable unused) {
            j2 = 0;
        }
        String msg_type = channelMessage.msg_type;
        Intrinsics.checkNotNullExpressionValue(msg_type, "msg_type");
        Long send_timestamp = channelMessage.send_timestamp;
        Intrinsics.checkNotNullExpressionValue(send_timestamp, "send_timestamp");
        long longValue = send_timestamp.longValue();
        String room_id = channelMessage.room_id;
        Intrinsics.checkNotNullExpressionValue(room_id, "room_id");
        return new MessageEntity(j2, msg_type, longValue, room_id, channelMessage.payload.toByteArray(), null, 32, null);
    }

    public static final /* synthetic */ Completable a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, InputStream inputStream, PlaybackInfoEntity playbackInfoEntity, boolean z2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, inputStream, playbackInfoEntity, new Byte(z2 ? (byte) 1 : (byte) 0)}, null, f12334a, true, 34303);
        return proxy.isSupported ? (Completable) proxy.result : playbackMessageDataSourceImpl.a(inputStream, playbackInfoEntity, z2);
    }

    public static final /* synthetic */ Completable a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, List list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, list}, null, f12334a, true, 34306);
        return proxy.isSupported ? (Completable) proxy.result : playbackMessageDataSourceImpl.b((List<MessageEntity>) list);
    }

    private final Completable a(InputStream inputStream, PlaybackInfoEntity playbackInfoEntity, boolean z2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{inputStream, playbackInfoEntity, new Byte(z2 ? (byte) 1 : (byte) 0)}, this, f12334a, false, 34291);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Completable a2 = Completable.a(new t(inputStream, z2, playbackInfoEntity));
        Intrinsics.checkNotNullExpressionValue(a2, "Completable.create { emi…)\n                }\n    }");
        return a2;
    }

    public static final /* synthetic */ Observable a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, inputStream}, null, f12334a, true, 34304);
        return proxy.isSupported ? (Observable) proxy.result : playbackMessageDataSourceImpl.a(inputStream);
    }

    private final Observable<ChannelMessage> a(InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{inputStream}, this, f12334a, false, 34292);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        Observable<ChannelMessage> a2 = Observable.a((ObservableOnSubscribe) new u(inputStream));
        Intrinsics.checkNotNullExpressionValue(a2, "Observable.create<Channe…mitter.onComplete()\n    }");
        return a2;
    }

    private final Observable<List<MessageEntity>> a(List<MessageEntity> list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f12334a, false, 34293);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        Observable<List<MessageEntity>> c2 = Observable.c(new v(list));
        Intrinsics.checkNotNullExpressionValue(c2, "Observable.fromCallable …()\n        messages\n    }");
        return c2;
    }

    private final Single<List<com.edu.classroom.channel.api.b.a>> a(long j2, MessageCache messageCache) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j2), messageCache}, this, f12334a, false, 34299);
        if (proxy.isSupported) {
            return (Single) proxy.result;
        }
        Single<List<com.edu.classroom.channel.api.b.a>> l2 = Observable.a((ObservableOnSubscribe) new r(messageCache, j2)).l();
        Intrinsics.checkNotNullExpressionValue(l2, "Observable.create<Classr…lete()\n        }.toList()");
        return l2;
    }

    public static final /* synthetic */ void a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, playbackInfoEntity}, null, f12334a, true, 34307).isSupported) {
            return;
        }
        playbackMessageDataSourceImpl.f(playbackInfoEntity);
    }

    public static final /* synthetic */ PlaybackMessageCache b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f12334a, true, 34310);
        return proxy.isSupported ? (PlaybackMessageCache) proxy.result : playbackMessageDataSourceImpl.f();
    }

    private final Completable b(List<MessageEntity> list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f12334a, false, 34294);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Completable a2 = Completable.a(new w(list));
        Intrinsics.checkNotNullExpressionValue(a2, "Completable.fromAction {…gPersistException()\n    }");
        return a2;
    }

    public static final /* synthetic */ Observable b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, List list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, list}, null, f12334a, true, 34309);
        return proxy.isSupported ? (Observable) proxy.result : playbackMessageDataSourceImpl.a((List<MessageEntity>) list);
    }

    private final Single<List<MessageEntity>> b(long j2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j2)}, this, f12334a, false, 34297);
        if (proxy.isSupported) {
            return (Single) proxy.result;
        }
        MessageLog.f10619a.d("PlaybackMessageDataSourceImpl.queryRecentMessage ts=" + j2);
        Single<List<MessageEntity>> l2 = Observable.a((ObservableOnSubscribe) new z(j2)).l();
        Intrinsics.checkNotNullExpressionValue(l2, "Observable.create<Messag…lete()\n        }.toList()");
        return l2;
    }

    public static final /* synthetic */ void b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, playbackInfoEntity}, null, f12334a, true, 34308).isSupported) {
            return;
        }
        playbackMessageDataSourceImpl.e(playbackInfoEntity);
    }

    public static final /* synthetic */ PlaybackChatCache c(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f12334a, true, 34311);
        if (proxy.isSupported) {
            return (PlaybackChatCache) proxy.result;
        }
        PlaybackChatCache playbackChatCache = playbackMessageDataSourceImpl.l;
        if (playbackChatCache == null) {
            Intrinsics.throwUninitializedPropertyAccessException("chatCache");
        }
        return playbackChatCache;
    }

    private final Completable c(PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f12334a, false, 34287);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Completable b2 = this.q.a(playbackInfoEntity.getI()).d(new a(playbackInfoEntity)).b(Schedulers.b());
        Intrinsics.checkNotNullExpressionValue(b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    public static final /* synthetic */ PlaybackBoardCache d(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f12334a, true, 34312);
        if (proxy.isSupported) {
            return (PlaybackBoardCache) proxy.result;
        }
        PlaybackBoardCache playbackBoardCache = playbackMessageDataSourceImpl.m;
        if (playbackBoardCache == null) {
            Intrinsics.throwUninitializedPropertyAccessException("boardCache");
        }
        return playbackBoardCache;
    }

    private final Completable d(PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f12334a, false, 34288);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Completable b2 = this.q.a(playbackInfoEntity.getJ()).d(new h(playbackInfoEntity)).b(Schedulers.b());
        Intrinsics.checkNotNullExpressionValue(b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    private final void e(PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f12334a, false, 34301).isSupported) {
            return;
        }
        if (playbackInfoEntity.getE() || playbackInfoEntity.getD()) {
            PlaybackInfoDao playbackInfoDao = this.c;
            if (playbackInfoDao == null) {
                Intrinsics.throwUninitializedPropertyAccessException("playbackInfoDao");
            }
            playbackInfoDao.a(playbackInfoEntity).d();
        }
    }

    private final PlaybackMessageCache f() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12334a, false, 34283);
        return (PlaybackMessageCache) (proxy.isSupported ? proxy.result : this.n.getValue());
    }

    private final void f(PlaybackInfoEntity playbackInfoEntity) {
        if (!PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f12334a, false, 34302).isSupported && playbackInfoEntity.getD()) {
            if (!playbackInfoEntity.getE()) {
                if (!(playbackInfoEntity.getJ().length() == 0)) {
                    return;
                }
            }
            this.j.postValue(true);
        }
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public LiveData<Boolean> a() {
        return this.k;
    }

    @NotNull
    public final Completable a(@NotNull PlaybackInfoEntity entity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{entity}, this, f12334a, false, 34289);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Intrinsics.checkNotNullParameter(entity, "entity");
        Completable b2 = this.q.a(entity.getI()).c(new b()).b(new c()).a((Predicate) d.b).k(new e()).a(50).f(new f()).b(new g(entity));
        Intrinsics.checkNotNullExpressionValue(b2, "messageNetworkFetcher.fe…ty)\n                    }");
        Completable b3 = com.edu.classroom.base.e.a.a(b2, new Function1<Long, Unit>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$7
            public static ChangeQuickRedirect changeQuickRedirect;

            @Override // kotlin.jvm.functions.Function1
            public /* synthetic */ Unit invoke(Long l2) {
                invoke(l2.longValue());
                return Unit.INSTANCE;
            }

            public final void invoke(long j2) {
                if (PatchProxy.proxy(new Object[]{new Long(j2)}, this, changeQuickRedirect, false, 34320).isSupported) {
                    return;
                }
                ESDKMonitor.a(ESDKMonitor.b, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j2), null, 8, null);
            }
        }).b(Schedulers.b());
        Intrinsics.checkNotNullExpressionValue(b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public Completable a(@NotNull PlaybackInfoEntity entity, @NotNull String url) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{entity, url}, this, f12334a, false, 34285);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Intrinsics.checkNotNullParameter(entity, "entity");
        Intrinsics.checkNotNullParameter(url, "url");
        if (!entity.getD() || !Intrinsics.areEqual(entity.getI(), url)) {
            entity.a(url);
            return ClassroomSettingsManager.b.b().getClassroomPlaybackSettings().getF10201a() ? c(entity) : a(entity);
        }
        f(entity);
        Completable a2 = Completable.a();
        Intrinsics.checkNotNullExpressionValue(a2, "Completable.complete()");
        return a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public Single<List<com.edu.classroom.channel.api.b.a>> a(long j2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j2)}, this, f12334a, false, 34296);
        if (proxy.isSupported) {
            return (Single) proxy.result;
        }
        CommonLog.i$default(MessageLog.f10619a, "prefetch message to cache: " + j2, null, 2, null);
        Single<List<com.edu.classroom.channel.api.b.a>> e2 = com.edu.classroom.base.e.a.a(b(j2)).c((Consumer) new x()).e(y.b);
        Intrinsics.checkNotNullExpressionValue(e2, "queryRecentMessage(times…)\n            }\n        }");
        return e2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public Single<List<com.edu.classroom.channel.api.b.a>> a(long j2, long j3) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j2), new Long(j3)}, this, f12334a, false, 34298);
        if (proxy.isSupported) {
            return (Single) proxy.result;
        }
        MessageLog.f10619a.d("getMessages start:" + j2 + " end:" + j3);
        Single<List<com.edu.classroom.channel.api.b.a>> a2 = a(j3, f());
        PlaybackChatCache playbackChatCache = this.l;
        if (playbackChatCache == null) {
            Intrinsics.throwUninitializedPropertyAccessException("chatCache");
        }
        Single<R> a3 = a2.a(a(j3, playbackChatCache), p.b);
        PlaybackBoardCache playbackBoardCache = this.m;
        if (playbackBoardCache == null) {
            Intrinsics.throwUninitializedPropertyAccessException("boardCache");
        }
        Single<List<com.edu.classroom.channel.api.b.a>> a4 = a3.a(a(j3, playbackBoardCache), q.b);
        Intrinsics.checkNotNullExpressionValue(a4, "getMessagesFromCache(end…), { t1, t2 -> t1 + t2 })");
        return a4;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public void a(@NotNull List<ChatInfoBlock> chatInfoBlocks, @NotNull List<BoardInfoBlock> boardInfoBlocks) {
        if (PatchProxy.proxy(new Object[]{chatInfoBlocks, boardInfoBlocks}, this, f12334a, false, 34284).isSupported) {
            return;
        }
        Intrinsics.checkNotNullParameter(chatInfoBlocks, "chatInfoBlocks");
        Intrinsics.checkNotNullParameter(boardInfoBlocks, "boardInfoBlocks");
        PlaybackChatFetcher.a aVar = this.g;
        if (aVar == null) {
            Intrinsics.throwUninitializedPropertyAccessException("chatFetcherFactory");
        }
        PlaybackChatFetcher a2 = aVar.a(chatInfoBlocks);
        PlaybackBoardFetcher.a aVar2 = this.h;
        if (aVar2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException("boardFetcherFactory");
        }
        PlaybackBoardFetcher a3 = aVar2.a(boardInfoBlocks);
        this.l = new PlaybackChatCache(a2, new LinkedBlockingQueue());
        this.m = new PlaybackBoardCache(a3, new LinkedBlockingQueue());
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public Completable b() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12334a, false, 34300);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Completable a2 = Completable.a(new s());
        Intrinsics.checkNotNullExpressionValue(a2, "Completable.fromAction {…p { it.complete() }\n    }");
        return a2;
    }

    @NotNull
    public final Completable b(@NotNull PlaybackInfoEntity entity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{entity}, this, f12334a, false, 34290);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Intrinsics.checkNotNullParameter(entity, "entity");
        Completable b2 = this.q.a(entity.getJ()).c(new i()).b(new j()).a((Predicate) k.b).k(new l()).b((Consumer) new m()).a(50).f(new n()).b(new o(entity));
        Intrinsics.checkNotNullExpressionValue(b2, "messageNetworkFetcher.fe…ty)\n                    }");
        Completable b3 = com.edu.classroom.base.e.a.a(b2, new Function1<Long, Unit>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$8
            public static ChangeQuickRedirect changeQuickRedirect;

            @Override // kotlin.jvm.functions.Function1
            public /* synthetic */ Unit invoke(Long l2) {
                invoke(l2.longValue());
                return Unit.INSTANCE;
            }

            public final void invoke(long j2) {
                if (PatchProxy.proxy(new Object[]{new Long(j2)}, this, changeQuickRedirect, false, 34329).isSupported) {
                    return;
                }
                ESDKMonitor.a(ESDKMonitor.b, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j2), null, 8, null);
            }
        }).b(Schedulers.b());
        Intrinsics.checkNotNullExpressionValue(b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    @NotNull
    public Completable b(@NotNull PlaybackInfoEntity entity, @NotNull String url) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{entity, url}, this, f12334a, false, 34286);
        if (proxy.isSupported) {
            return (Completable) proxy.result;
        }
        Intrinsics.checkNotNullParameter(entity, "entity");
        Intrinsics.checkNotNullParameter(url, "url");
        if (!entity.getE() || !Intrinsics.areEqual(entity.getJ(), url)) {
            if (!(url.length() == 0)) {
                entity.b(url);
                return ClassroomSettingsManager.b.b().getClassroomPlaybackSettings().getF10201a() ? d(entity) : b(entity);
            }
        }
        f(entity);
        Completable a2 = Completable.a();
        Intrinsics.checkNotNullExpressionValue(a2, "Completable.complete()");
        return a2;
    }

    @NotNull
    public final PlaybackMessageDao c() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12334a, false, 34267);
        if (proxy.isSupported) {
            return (PlaybackMessageDao) proxy.result;
        }
        PlaybackMessageDao playbackMessageDao = this.b;
        if (playbackMessageDao == null) {
            Intrinsics.throwUninitializedPropertyAccessException("messageDao");
        }
        return playbackMessageDao;
    }

    @NotNull
    public final Set<MessageProcessor> d() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12334a, false, 34275);
        if (proxy.isSupported) {
            return (Set) proxy.result;
        }
        Set<MessageProcessor> set = this.f;
        if (set == null) {
            Intrinsics.throwUninitializedPropertyAccessException("processors");
        }
        return set;
    }

    @NotNull
    public final PlaybackMessageCache.a e() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f12334a, false, 34281);
        if (proxy.isSupported) {
            return (PlaybackMessageCache.a) proxy.result;
        }
        PlaybackMessageCache.a aVar = this.i;
        if (aVar == null) {
            Intrinsics.throwUninitializedPropertyAccessException("messageCacheFactory");
        }
        return aVar;
    }
}
