package com.edu.classroom.message.repo.datasource;

import com.bytedance.boost_multidex.Constants;
import com.edu.classroom.base.log.CommonLog;
import com.edu.classroom.base.network.IRetrofit;
import com.edu.classroom.base.rxjava.RxjavaExKt;
import com.edu.classroom.channel.api.MessageLog;
import com.edu.classroom.channel.api.model.ClassroomMessage;
import com.edu.classroom.message.MessageProcessor;
import com.edu.classroom.message.MsgFetchException;
import com.edu.classroom.message.MsgParseException;
import com.edu.classroom.message.MsgPersistException;
import com.edu.classroom.message.NoStatusMsgException;
import com.edu.classroom.message.repo.cache.MessageCache;
import com.edu.classroom.message.repo.cache.PlaybackChatCache;
import com.edu.classroom.message.repo.cache.PlaybackMessageCache;
import com.edu.classroom.message.repo.db.dao.PlaybackMessageDao;
import com.edu.classroom.message.repo.db.entity.MessageEntity;
import com.edu.classroom.message.repo.db.entity.PlaybackInfoEntity;
import com.edu.classroom.message.repo.fetcher.PlaybackChatFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageDbFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageDbFetcherKt;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageNetworkFetcher;
import com.edu.classroom.message.repo.model.ChatInfoBlock;
import com.meituan.robust.ChangeQuickRedirect;
import com.meituan.robust.PatchProxy;
import com.meituan.robust.PatchProxyResult;
import com.squareup.wire.ProtoReader;
import edu.classroom.channel.ChannelMessage;
import io.reactivex.Observable;
import io.reactivex.b;
import io.reactivex.functions.a;
import io.reactivex.functions.e;
import io.reactivex.functions.f;
import io.reactivex.q;
import io.reactivex.r;
import io.reactivex.s;
import io.reactivex.w;
import java.io.InputStream;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import kotlin.Lazy;
import kotlin.Metadata;
import kotlin.h;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.aa;
import kotlin.jvm.internal.n;
import kotlin.jvm.internal.y;
import kotlin.reflect.KProperty;
import okio.Okio;

@Metadata(bv = {1, 0, 3}, d1 = {"\u0000¤\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\"\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\t\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\t\u0018\u00002\u00020\u0001B#\b\u0007\u0012\b\b\u0001\u0010\u0002\u001a\u00020\u0003\u0012\b\b\u0001\u0010\u0004\u001a\u00020\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0010\u0010*\u001a\u00020+2\u0006\u0010,\u001a\u00020-H\u0016J\u0010\u0010.\u001a\u00020+2\u0006\u0010,\u001a\u00020-H\u0016J$\u0010/\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020201002\u0006\u00103\u001a\u0002042\u0006\u00105\u001a\u000204H\u0016J$\u00106\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020201002\u0006\u00105\u001a\u0002042\u0006\u0010\b\u001a\u000207H\u0002J\u0016\u00108\u001a\u0002092\f\u0010:\u001a\b\u0012\u0004\u0012\u00020;01H\u0016J\b\u0010<\u001a\u00020+H\u0016J\u001e\u0010=\u001a\u0010\u0012\f\u0012\n @*\u0004\u0018\u00010?0?0>2\u0006\u0010A\u001a\u00020BH\u0002J\u0016\u0010C\u001a\u00020+2\f\u0010D\u001a\b\u0012\u0004\u0012\u00020E01H\u0002J\u001c\u0010F\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020201002\u0006\u0010G\u001a\u000204H\u0016J\u001e\u0010H\u001a\u0010\u0012\f\u0012\n @*\u0004\u0018\u00010E0E002\u0006\u0010I\u001a\u000204H\u0002J\u001c\u0010J\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020E01002\u0006\u0010I\u001a\u000204H\u0002J\b\u0010K\u001a\u000209H\u0016J\u0010\u0010L\u001a\u00020E2\u0006\u0010M\u001a\u00020?H\u0002R\u001b\u0010\b\u001a\u00020\t8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\f\u0010\r\u001a\u0004\b\n\u0010\u000bR\u000e\u0010\u000e\u001a\u00020\u000fX\u0082.¢\u0006\u0002\n\u0000R\u001e\u0010\u0010\u001a\u00020\u00118\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u0012\u0010\u0013\"\u0004\b\u0014\u0010\u0015R\u001e\u0010\u0016\u001a\u00020\u00178\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u0018\u0010\u0019\"\u0004\b\u001a\u0010\u001bR\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n\u0000R)\u0010\u001c\u001a\r\u0012\t\u0012\u00070\u001e¢\u0006\u0002\b\u001f0\u001d8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b \u0010!\"\u0004\b\"\u0010#R\u001e\u0010$\u001a\u00020%8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b&\u0010'\"\u0004\b(\u0010)R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000¨\u0006N"}, d2 = {"Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSourceImpl;", "Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSource;", "roomId", "", "userId", "messageNetworkFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;", "(Ljava/lang/String;Ljava/lang/String;Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;)V", "cache", "Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "getCache", "()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "cache$delegate", "Lkotlin/Lazy;", "chatCache", "Lcom/edu/classroom/message/repo/cache/PlaybackChatCache;", "messageDao", "Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "getMessageDao", "()Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "setMessageDao", "(Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;)V", "messageDbFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "getMessageDbFetcher", "()Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "setMessageDbFetcher", "(Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;)V", "processors", "", "Lcom/edu/classroom/message/MessageProcessor;", "Lkotlin/jvm/JvmSuppressWildcards;", "getProcessors", "()Ljava/util/Set;", "setProcessors", "(Ljava/util/Set;)V", "retrofit", "Lcom/edu/classroom/base/network/IRetrofit;", "getRetrofit", "()Lcom/edu/classroom/base/network/IRetrofit;", "setRetrofit", "(Lcom/edu/classroom/base/network/IRetrofit;)V", "downloadRoomMessage", "Lio/reactivex/Completable;", "entity", "Lcom/edu/classroom/message/repo/db/entity/PlaybackInfoEntity;", "downloadSelfMessage", "getMessages", "Lio/reactivex/Single;", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "start", "", "end", "getMessagesFromCache", "Lcom/edu/classroom/message/repo/cache/MessageCache;", "init", "", "chatInfoBlocks", "Lcom/edu/classroom/message/repo/model/ChatInfoBlock;", "onComplete", "parseMessages", "Lio/reactivex/Observable;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "inputStream", "Ljava/io/InputStream;", "persistMessages", "messages", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "prefetchMessageToCache", Constants.KEY_TIME_STAMP, "queryLatestFsmMessage", "ts", "queryLatestMessage", "release", "transformToMessageEntity", "msg", "message_release"}, k = 1, mv = {1, 1, 16})
/* loaded from: classes3.dex */
public final class PlaybackMessageDataSourceImpl implements PlaybackMessageDataSource {

    /* renamed from: a, reason: collision with root package name */
    public static ChangeQuickRedirect f14407a;

    /* renamed from: b, reason: collision with root package name */
    static final /* synthetic */ KProperty[] f14408b = {aa.a(new y(aa.a(PlaybackMessageDataSourceImpl.class), "cache", "getCache()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;"))};

    /* renamed from: c, reason: collision with root package name */
    public PlaybackMessageDao f14409c;

    /* renamed from: d, reason: collision with root package name */
    public PlaybackMessageDbFetcher f14410d;
    public IRetrofit e;
    public Set<MessageProcessor> f;
    private final Lazy g;
    private PlaybackChatCache h;
    private final String i;
    private final String j;
    private final PlaybackMessageNetworkFetcher k;

    public PlaybackMessageDataSourceImpl(String str, String str2, PlaybackMessageNetworkFetcher playbackMessageNetworkFetcher) {
        n.b(str, "roomId");
        n.b(str2, "userId");
        n.b(playbackMessageNetworkFetcher, "messageNetworkFetcher");
        this.i = str;
        this.j = str2;
        this.k = playbackMessageNetworkFetcher;
        this.g = h.a((Function0) new PlaybackMessageDataSourceImpl$cache$2(this));
    }

    public static final /* synthetic */ MessageEntity a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, ChannelMessage channelMessage) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, channelMessage}, null, f14407a, true, 5795);
        return proxy.isSupported ? (MessageEntity) proxy.result : playbackMessageDataSourceImpl.a(channelMessage);
    }

    private final MessageEntity a(ChannelMessage channelMessage) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{channelMessage}, this, f14407a, false, 5787);
        if (proxy.isSupported) {
            return (MessageEntity) proxy.result;
        }
        String str = channelMessage.msg_id;
        n.a((Object) str, "msg_id");
        long parseLong = Long.parseLong(str);
        String str2 = channelMessage.msg_type;
        n.a((Object) str2, "msg_type");
        Long l = channelMessage.send_timestamp;
        n.a((Object) l, "send_timestamp");
        long longValue = l.longValue();
        String str3 = channelMessage.room_id;
        n.a((Object) str3, "room_id");
        return new MessageEntity(parseLong, str2, longValue, str3, channelMessage.payload.toByteArray(), null, 32, null);
    }

    public static final /* synthetic */ Observable a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, inputStream}, null, f14407a, true, 5794);
        return proxy.isSupported ? (Observable) proxy.result : playbackMessageDataSourceImpl.a(inputStream);
    }

    private final Observable<ChannelMessage> a(final InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{inputStream}, this, f14407a, false, 5785);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        Observable<ChannelMessage> a2 = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14446a;

            @Override // io.reactivex.r
            public final void a(q<ChannelMessage> qVar) {
                if (PatchProxy.proxy(new Object[]{qVar}, this, f14446a, false, 5816).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                ProtoReader protoReader = new ProtoReader(Okio.buffer(Okio.source(inputStream)));
                try {
                    long beginMessage = protoReader.beginMessage();
                    while (protoReader.nextTag() != -1) {
                        qVar.a((q<ChannelMessage>) ChannelMessage.ADAPTER.decode(protoReader));
                    }
                    protoReader.endMessageAndGetUnknownFields(beginMessage);
                    qVar.a();
                } catch (Throwable th) {
                    if (!(th instanceof ProtocolException)) {
                        throw new MsgFetchException(th);
                    }
                    throw new MsgParseException(th);
                }
            }
        });
        n.a((Object) a2, "Observable.create<Channe…mitter.onComplete()\n    }");
        return a2;
    }

    public static final /* synthetic */ b a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, List list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, list}, null, f14407a, true, 5796);
        return proxy.isSupported ? (b) proxy.result : playbackMessageDataSourceImpl.b((List<MessageEntity>) list);
    }

    private final w<List<ClassroomMessage>> a(final long j, final MessageCache messageCache) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j), messageCache}, this, f14407a, false, 5792);
        if (proxy.isSupported) {
            return (w) proxy.result;
        }
        w<List<ClassroomMessage>> k = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$getMessagesFromCache$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14441a;

            @Override // io.reactivex.r
            public final void a(q<ClassroomMessage> qVar) {
                if (PatchProxy.proxy(new Object[]{qVar}, this, f14441a, false, 5814).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                ClassroomMessage c2 = MessageCache.this.c();
                while (c2 != null && c2.l() <= j) {
                    ClassroomMessage b2 = MessageCache.this.b();
                    if (b2 == null) {
                        n.a();
                    }
                    qVar.a((q<ClassroomMessage>) b2);
                    c2 = MessageCache.this.c();
                }
                MessageCache.this.b(j);
                qVar.a();
            }
        }).k();
        n.a((Object) k, "Observable.create<Classr…lete()\n        }.toList()");
        return k;
    }

    public static final /* synthetic */ PlaybackMessageCache b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f14407a, true, 5797);
        return proxy.isSupported ? (PlaybackMessageCache) proxy.result : playbackMessageDataSourceImpl.e();
    }

    private final b b(final List<MessageEntity> list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f14407a, false, 5786);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        b a2 = b.a(new a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$persistMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14448a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (!PatchProxy.proxy(new Object[0], this, f14448a, false, 5817).isSupported && PlaybackMessageDataSourceImpl.this.b().a(list).length != list.size()) {
                    throw new MsgPersistException();
                }
            }
        });
        n.a((Object) a2, "Completable.fromAction {…gPersistException()\n    }");
        return a2;
    }

    private final w<List<MessageEntity>> b(final long j) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j)}, this, f14407a, false, 5790);
        if (proxy.isSupported) {
            return (w) proxy.result;
        }
        MessageLog.f12293a.b("PlaybackMessageDataSourceImpl.queryLatestMessage ts=" + j);
        w<List<MessageEntity>> k = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$queryLatestMessage$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14460a;

            @Override // io.reactivex.r
            public final void a(q<MessageEntity> qVar) {
                String str;
                String str2;
                if (PatchProxy.proxy(new Object[]{qVar}, this, f14460a, false, 5822).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                PlaybackMessageDao b2 = PlaybackMessageDataSourceImpl.this.b();
                str = PlaybackMessageDataSourceImpl.this.i;
                MessageEntity a2 = b2.a(str, "fsm", j);
                if (a2 == null) {
                    throw new NoStatusMsgException(j);
                }
                qVar.a((q<MessageEntity>) a2);
                PlaybackMessageDao b3 = PlaybackMessageDataSourceImpl.this.b();
                str2 = PlaybackMessageDataSourceImpl.this.i;
                MessageEntity a3 = b3.a(str2, "user_state", j);
                if (a3 != null) {
                    qVar.a((q<MessageEntity>) a3);
                }
                qVar.a();
            }
        }).k();
        n.a((Object) k, "Observable.create<Messag…lete()\n        }.toList()");
        return k;
    }

    public static final /* synthetic */ PlaybackChatCache c(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f14407a, true, 5798);
        if (proxy.isSupported) {
            return (PlaybackChatCache) proxy.result;
        }
        PlaybackChatCache playbackChatCache = playbackMessageDataSourceImpl.h;
        if (playbackChatCache == null) {
            n.b("chatCache");
        }
        return playbackChatCache;
    }

    private final PlaybackMessageCache e() {
        Object a2;
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f14407a, false, 5781);
        if (proxy.isSupported) {
            a2 = proxy.result;
        } else {
            Lazy lazy = this.g;
            KProperty kProperty = f14408b[0];
            a2 = lazy.a();
        }
        return (PlaybackMessageCache) a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b a() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f14407a, false, 5793);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        b a2 = b.a(new a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$onComplete$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14444a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (PatchProxy.proxy(new Object[0], this, f14444a, false, 5815).isSupported) {
                    return;
                }
                Set<MessageProcessor> d2 = PlaybackMessageDataSourceImpl.this.d();
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a(d2, 10));
                Iterator<T> it = d2.iterator();
                while (it.hasNext()) {
                    ((MessageProcessor) it.next()).a();
                    arrayList.add(kotlin.y.f26434a);
                }
            }
        });
        n.a((Object) a2, "Completable.fromAction {…p { it.complete() }\n    }");
        return a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b a(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f14407a, false, 5783);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        if (playbackInfoEntity.getF14513d()) {
            b a2 = b.a();
            n.a((Object) a2, "Completable.complete()");
            return a2;
        }
        b b2 = this.k.a(playbackInfoEntity.getH()).b((f<? super InputStream, ? extends s<? extends R>>) new f<T, s<? extends R>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14413a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final Observable<ChannelMessage> apply(InputStream inputStream) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f14413a, false, 5800);
                if (proxy2.isSupported) {
                    return (Observable) proxy2.result;
                }
                n.b(inputStream, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream);
            }
        }).a(new e<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$2

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14415a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(ChannelMessage channelMessage) {
                if (PatchProxy.proxy(new Object[]{channelMessage}, this, f14415a, false, 5801).isSupported) {
                    return;
                }
                Set<MessageProcessor> d2 = PlaybackMessageDataSourceImpl.this.d();
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a(d2, 10));
                for (MessageProcessor messageProcessor : d2) {
                    n.a((Object) channelMessage, "msg");
                    messageProcessor.a(channelMessage);
                    arrayList.add(kotlin.y.f26434a);
                }
            }
        }).a(new io.reactivex.functions.h<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$3

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14417a;

            @Override // io.reactivex.functions.h
            public final boolean a(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f14417a, false, 5802);
                if (proxy2.isSupported) {
                    return ((Boolean) proxy2.result).booleanValue();
                }
                n.b(channelMessage, "it");
                return (n.a((Object) channelMessage.msg_type, (Object) "fsm_version") ^ true) && (n.a((Object) channelMessage.msg_type, (Object) "user_state_version") ^ true);
            }
        }).g(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$4

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14419a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final MessageEntity apply(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f14419a, false, 5803);
                if (proxy2.isSupported) {
                    return (MessageEntity) proxy2.result;
                }
                n.b(channelMessage, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, channelMessage);
            }
        }).a(50).d(new f<List<MessageEntity>, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$5

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14421a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final b apply(List<MessageEntity> list) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f14421a, false, 5804);
                if (proxy2.isSupported) {
                    return (b) proxy2.result;
                }
                n.b(list, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, list);
            }
        }).b(new a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadRoomMessage$6

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14423a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (PatchProxy.proxy(new Object[0], this, f14423a, false, 5805).isSupported) {
                    return;
                }
                PlaybackInfoEntity.this.a(true);
            }
        }).b(io.reactivex.schedulers.a.b());
        n.a((Object) b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public w<List<ClassroomMessage>> a(long j) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j)}, this, f14407a, false, 5788);
        if (proxy.isSupported) {
            return (w) proxy.result;
        }
        CommonLog.a(MessageLog.f12293a, "prefetch message to cache: " + j, null, 2, null);
        w<List<ClassroomMessage>> d2 = RxjavaExKt.a(b(j)).c(new e<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$prefetchMessageToCache$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14451a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(List<MessageEntity> list) {
                if (PatchProxy.proxy(new Object[]{list}, this, f14451a, false, 5818).isSupported) {
                    return;
                }
                PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this).c(list.get(0).getF14509d());
                PlaybackMessageDataSourceImpl.c(PlaybackMessageDataSourceImpl.this).c(list.get(0).getF14509d());
            }
        }).d(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$prefetchMessageToCache$2

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14453a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final List<ClassroomMessage> apply(List<MessageEntity> list) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f14453a, false, 5819);
                if (proxy2.isSupported) {
                    return (List) proxy2.result;
                }
                n.b(list, "list");
                List<MessageEntity> list2 = list;
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a((Iterable) list2, 10));
                Iterator<T> it = list2.iterator();
                while (it.hasNext()) {
                    arrayList.add(PlaybackMessageDbFetcherKt.a((MessageEntity) it.next()));
                }
                return arrayList;
            }
        });
        n.a((Object) d2, "queryLatestMessage(times…)\n            }\n        }");
        return d2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public w<List<ClassroomMessage>> a(long j, long j2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j), new Long(j2)}, this, f14407a, false, 5791);
        if (proxy.isSupported) {
            return (w) proxy.result;
        }
        MessageLog.f12293a.b("getMessages start:" + j + " end:" + j2);
        w<List<ClassroomMessage>> a2 = a(j2, e());
        PlaybackChatCache playbackChatCache = this.h;
        if (playbackChatCache == null) {
            n.b("chatCache");
        }
        w a3 = a2.a(a(j2, playbackChatCache), new io.reactivex.functions.b<List<? extends ClassroomMessage>, List<? extends ClassroomMessage>, List<? extends ClassroomMessage>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$getMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f14439a;

            @Override // io.reactivex.functions.b
            public final List<ClassroomMessage> a(List<? extends ClassroomMessage> list, List<? extends ClassroomMessage> list2) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list, list2}, this, f14439a, false, 5813);
                if (proxy2.isSupported) {
                    return (List) proxy2.result;
                }
                n.b(list, "t1");
                n.b(list2, "t2");
                return kotlin.collections.n.d((Collection) list, (Iterable) list2);
            }
        });
        n.a((Object) a3, "getMessagesFromCache(end…on { t1, t2 -> t1 + t2 })");
        return a3;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public void a(List<ChatInfoBlock> list) {
        if (PatchProxy.proxy(new Object[]{list}, this, f14407a, false, 5782).isSupported) {
            return;
        }
        n.b(list, "chatInfoBlocks");
        IRetrofit iRetrofit = this.e;
        if (iRetrofit == null) {
            n.b("retrofit");
        }
        this.h = new PlaybackChatCache(new PlaybackChatFetcher(iRetrofit, list), new LinkedBlockingQueue());
    }

    public final PlaybackMessageDao b() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f14407a, false, 5773);
        if (proxy.isSupported) {
            return (PlaybackMessageDao) proxy.result;
        }
        PlaybackMessageDao playbackMessageDao = this.f14409c;
        if (playbackMessageDao == null) {
            n.b("messageDao");
        }
        return playbackMessageDao;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b b(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f14407a, false, 5784);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        if (!playbackInfoEntity.getE()) {
            if (!(playbackInfoEntity.getI().length() == 0)) {
                b b2 = this.k.a(playbackInfoEntity.getI()).b((f<? super InputStream, ? extends s<? extends R>>) new f<T, s<? extends R>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$1

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14425a;

                    @Override // io.reactivex.functions.f
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final Observable<ChannelMessage> apply(InputStream inputStream) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f14425a, false, 5806);
                        if (proxy2.isSupported) {
                            return (Observable) proxy2.result;
                        }
                        n.b(inputStream, "it");
                        return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream);
                    }
                }).a(new e<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$2

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14427a;

                    @Override // io.reactivex.functions.e
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final void accept(ChannelMessage channelMessage) {
                        if (PatchProxy.proxy(new Object[]{channelMessage}, this, f14427a, false, 5807).isSupported) {
                            return;
                        }
                        Set<MessageProcessor> d2 = PlaybackMessageDataSourceImpl.this.d();
                        ArrayList arrayList = new ArrayList(kotlin.collections.n.a(d2, 10));
                        for (MessageProcessor messageProcessor : d2) {
                            n.a((Object) channelMessage, "msg");
                            messageProcessor.a(channelMessage);
                            arrayList.add(kotlin.y.f26434a);
                        }
                    }
                }).a(new io.reactivex.functions.h<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$3

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14429a;

                    @Override // io.reactivex.functions.h
                    public final boolean a(ChannelMessage channelMessage) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f14429a, false, 5808);
                        if (proxy2.isSupported) {
                            return ((Boolean) proxy2.result).booleanValue();
                        }
                        n.b(channelMessage, "it");
                        return (n.a((Object) channelMessage.msg_type, (Object) "fsm_version") ^ true) && (n.a((Object) channelMessage.msg_type, (Object) "user_state_version") ^ true);
                    }
                }).g(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$4

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14431a;

                    @Override // io.reactivex.functions.f
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final MessageEntity apply(ChannelMessage channelMessage) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f14431a, false, 5809);
                        if (proxy2.isSupported) {
                            return (MessageEntity) proxy2.result;
                        }
                        n.b(channelMessage, "it");
                        return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, channelMessage);
                    }
                }).a(new e<MessageEntity>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$5

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14433a;

                    @Override // io.reactivex.functions.e
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final void accept(MessageEntity messageEntity) {
                        String str;
                        if (PatchProxy.proxy(new Object[]{messageEntity}, this, f14433a, false, 5810).isSupported) {
                            return;
                        }
                        str = PlaybackMessageDataSourceImpl.this.j;
                        messageEntity.a(str);
                    }
                }).a(50).d(new f<List<MessageEntity>, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$6

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14435a;

                    @Override // io.reactivex.functions.f
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final b apply(List<MessageEntity> list) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f14435a, false, 5811);
                        if (proxy2.isSupported) {
                            return (b) proxy2.result;
                        }
                        n.b(list, "it");
                        return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, list);
                    }
                }).b(new a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$downloadSelfMessage$7

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f14437a;

                    @Override // io.reactivex.functions.a
                    public final void run() {
                        if (PatchProxy.proxy(new Object[0], this, f14437a, false, 5812).isSupported) {
                            return;
                        }
                        PlaybackInfoEntity.this.b(true);
                    }
                }).b(io.reactivex.schedulers.a.b());
                n.a((Object) b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
                return b2;
            }
        }
        b a2 = b.a();
        n.a((Object) a2, "Completable.complete()");
        return a2;
    }

    public final PlaybackMessageDbFetcher c() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f14407a, false, 5775);
        if (proxy.isSupported) {
            return (PlaybackMessageDbFetcher) proxy.result;
        }
        PlaybackMessageDbFetcher playbackMessageDbFetcher = this.f14410d;
        if (playbackMessageDbFetcher == null) {
            n.b("messageDbFetcher");
        }
        return playbackMessageDbFetcher;
    }

    public final Set<MessageProcessor> d() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f14407a, false, 5779);
        if (proxy.isSupported) {
            return (Set) proxy.result;
        }
        Set<MessageProcessor> set = this.f;
        if (set == null) {
            n.b("processors");
        }
        return set;
    }
}
