package com.ekoapp.ekosdk.internal.api.event;

import com.ekoapp.ekosdk.internal.api.dto.EkoMessageAndUserListDto;
import com.ekoapp.ekosdk.internal.api.dto.EkoMessageDto;
import com.ekoapp.ekosdk.internal.api.dto.EkoUserDto;
import com.ekoapp.ekosdk.internal.api.mapper.EkoMessageAndUserEventMapper;
import com.ekoapp.ekosdk.internal.data.UserDatabase;
import com.ekoapp.ekosdk.internal.data.dao.EkoChannelDao;
import com.ekoapp.ekosdk.internal.data.dao.EkoMessageDao;
import com.ekoapp.ekosdk.internal.util.EkoGson;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.observable.ObservableBufferTimed;
import io.reactivex.internal.operators.observable.ObservableFilter;
import io.reactivex.internal.util.ArrayListSupplier;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import o.C0411;
import o.C0418;
import o.C0431;
import org.joda.time.DateTime;

/* loaded from: classes.dex */
public abstract class MessageListener extends SocketEventListener {
    private static final int BUFFER_MAX_COUNT = 100;
    private static final long BUFFER_TIME_SPAN_IN_MILLIS = 500;
    private static final int RETAIN_MESSAGES_PER_CHANNEL = 1000;
    private final Gson gson = EkoGson.get();
    private final EkoMessageDao messageDao = UserDatabase.get().messageDao();
    private final Relay<EkoMessageAndUserListDto> relay = PublishRelay.m12280();
    private final EkoChannelDao channelDao = UserDatabase.get().channelDao();
    private Predicate<List<EkoMessageAndUserListDto>> hasItem = C0418.f25262;
    private Consumer<List<EkoMessageAndUserListDto>> persist = C0411.f25252;
    private Consumer<List<EkoMessageAndUserListDto>> trim = new C0431(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageListener() {
        Relay<EkoMessageAndUserListDto> relay = this.relay;
        TimeUnit timeUnit = TimeUnit.MILLISECONDS;
        Scheduler m13910 = Schedulers.m13910();
        Callable m13824 = ArrayListSupplier.m13824();
        ObjectHelper.m13681(timeUnit, "unit is null");
        ObjectHelper.m13681(m13910, "scheduler is null");
        ObjectHelper.m13681(m13824, "bufferSupplier is null");
        ObjectHelper.m13676(100, "count");
        Observable m13884 = RxJavaPlugins.m13884(new ObservableBufferTimed(relay, timeUnit, m13910, m13824));
        Predicate<List<EkoMessageAndUserListDto>> predicate = this.hasItem;
        ObjectHelper.m13681(predicate, "predicate is null");
        Observable m138842 = RxJavaPlugins.m13884(new ObservableFilter(m13884, predicate));
        Consumer<List<EkoMessageAndUserListDto>> consumer = this.persist;
        Consumer<? super Throwable> m13670 = Functions.m13670();
        Action action = Functions.f22530;
        Observable m13598 = m138842.m13598(consumer, m13670, action, action);
        Consumer<List<EkoMessageAndUserListDto>> consumer2 = this.trim;
        Consumer<? super Throwable> m136702 = Functions.m13670();
        Action action2 = Functions.f22530;
        m13598.m13598(consumer2, m136702, action2, action2).m13597(Functions.m13670(), Functions.f22524, Functions.f22530, Functions.m13670());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ boolean lambda$new$0(List list) throws Exception {
        return !list.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$new$1(List list) throws Exception {
        EkoMessageAndUserListDto ekoMessageAndUserListDto = (EkoMessageAndUserListDto) list.get(0);
        int size = list.size();
        if (size > 1) {
            List<EkoMessageDto> messages = ekoMessageAndUserListDto.getMessages();
            List<EkoUserDto> users = ekoMessageAndUserListDto.getUsers();
            for (int i = 1; i < size; i++) {
                EkoMessageAndUserListDto ekoMessageAndUserListDto2 = (EkoMessageAndUserListDto) list.get(i);
                messages.addAll(ekoMessageAndUserListDto2.getMessages());
                users.addAll(ekoMessageAndUserListDto2.getUsers());
            }
        }
        EkoMessageAndUserEventMapper.MAPPER.map((EkoMessageAndUserEventMapper) ekoMessageAndUserListDto);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$new$2(List list) throws Exception {
        this.messageDao.retainLatestFromChannel(((EkoMessageAndUserListDto) list.get(0)).getMessages().get(0).getChannelId(), 1000);
    }

    private void onMessageEvent(List<EkoMessageDto> list) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        for (EkoMessageDto ekoMessageDto : list) {
            newConcurrentMap.put(ekoMessageDto.getChannelId(), ekoMessageDto.getChannelId());
        }
        Iterator it = newConcurrentMap.values().iterator();
        while (it.hasNext()) {
            this.channelDao.updateLastActivity((String) it.next(), DateTime.now());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.ekoapp.ekosdk.internal.api.event.SocketEventListener
    public void onEvent(String str, Object... objArr) {
        super.onEvent(str, objArr);
        EkoMessageAndUserListDto ekoMessageAndUserListDto = (EkoMessageAndUserListDto) this.gson.fromJson(objArr[0].toString(), EkoMessageAndUserListDto.class);
        this.relay.mo4253((Relay<EkoMessageAndUserListDto>) ekoMessageAndUserListDto);
        onMessageEvent(ekoMessageAndUserListDto.getMessages());
    }
}
