package com.taobao.message.service.base.conversation;

import com.taobao.message.datasdk.kit.DataSDKOpenPointConfig;
import com.taobao.message.datasdk.kit.chain.AbstractChainExecutor;
import com.taobao.message.datasdk.kit.chain.core.Observable;
import com.taobao.message.datasdk.kit.chain.core.OnSubscribe;
import com.taobao.message.datasdk.kit.chain.core.Subscriber;
import com.taobao.message.datasdk.kit.chain.core.functions.Func1;
import com.taobao.message.datasdk.kit.provider.ripple.openpoint.ConversationReportOpenPointProvider;
import com.taobao.message.datasdk.ripple.datasource.model.ReportConversationData;
import com.taobao.message.datasdk.ripple.datasource.node.conversationreport.ConversationReportNode;
import com.taobao.message.kit.threadpool.BaseRunnable;
import com.taobao.message.kit.util.CollectionUtil;
import com.taobao.message.service.base.util.ConversationClassify;
import com.taobao.message.service.inter.conversation.model.Conversation;
import com.taobao.message.service.inter.tool.ValueUtil;
import com.taobao.message.service.inter.tool.callback.DataCallback;
import com.taobao.message.service.inter.tool.event.Event;
import com.taobao.message.service.inter.tool.support.EventChannelSupport;
import com.taobao.message.service.inter.tool.support.IdentifierSupport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import tm.ewy;

/* loaded from: classes7.dex */
public class AllConversationReportNode extends ConversationReportNode {
    static {
        ewy.a(1770607383);
    }

    public AllConversationReportNode(IdentifierSupport identifierSupport, EventChannelSupport eventChannelSupport) {
        super(identifierSupport, eventChannelSupport);
    }

    @Override // com.taobao.message.datasdk.ripple.datasource.node.conversationreport.ConversationReportNode
    public void handle(ReportConversationData reportConversationData, Map<String, Object> map, Subscriber<? super List<Conversation>> subscriber) {
        if (CollectionUtil.isEmpty(reportConversationData.getConversations())) {
            subscriber.onError(new RuntimeException("report conversations is empty"));
            return;
        }
        if (!ValueUtil.getBoolean(map, "needComposeData", true)) {
            Event obtain = Event.obtain(reportConversationData.getType(), reportConversationData.getName(), reportConversationData.getConversations());
            obtain.version = reportConversationData.getVersion();
            this.eventChannelSupport.postEvent(obtain);
            subscriber.onNext(reportConversationData.getConversations());
            subscriber.onCompleted();
            return;
        }
        Map<String, List<Conversation>> classify = ConversationClassify.classify(reportConversationData.getConversations());
        final CountDownLatch countDownLatch = new CountDownLatch(classify.size());
        final ArrayList arrayList = new ArrayList();
        for (final Map.Entry<String, List<Conversation>> entry : classify.entrySet()) {
            new AbstractChainExecutor.ChainScheduler().run(new BaseRunnable() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1
                @Override // com.taobao.message.kit.threadpool.BaseRunnable
                public void execute() {
                    List<ConversationReportOpenPointProvider> conversationReportOpenPointProviders = DataSDKOpenPointConfig.getDataSDKOpenPointConfig(AllConversationReportNode.this.identifierSupport.getIdentifier(), (String) entry.getKey()).getConversationReportOpenPointProviders();
                    if (CollectionUtil.isEmpty(conversationReportOpenPointProviders)) {
                        arrayList.addAll((Collection) entry.getValue());
                        countDownLatch.countDown();
                        return;
                    }
                    Observable create = Observable.create(new OnSubscribe<List<Conversation>>() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1.1
                        @Override // com.taobao.message.datasdk.kit.chain.core.functions.Action1
                        public void call(Subscriber<? super List<Conversation>> subscriber2) {
                            subscriber2.onNext(entry.getValue());
                            subscriber2.onCompleted();
                        }
                    });
                    for (final ConversationReportOpenPointProvider conversationReportOpenPointProvider : conversationReportOpenPointProviders) {
                        create = create.flatMap(new Func1<List<Conversation>, Observable<? extends List<Conversation>>>() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1.2
                            @Override // com.taobao.message.datasdk.kit.chain.core.functions.Func1
                            public Observable<? extends List<Conversation>> call(final List<Conversation> list) {
                                return Observable.create(new OnSubscribe<List<Conversation>>() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1.2.1
                                    @Override // com.taobao.message.datasdk.kit.chain.core.functions.Action1
                                    public void call(final Subscriber<? super List<Conversation>> subscriber2) {
                                        if (conversationReportOpenPointProvider.handle(list, new DataCallback<List<Conversation>>() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1.2.1.1
                                            @Override // com.taobao.message.service.inter.tool.callback.DataCallback
                                            public void onComplete() {
                                                subscriber2.onCompleted();
                                            }

                                            @Override // com.taobao.message.service.inter.tool.callback.DataCallback
                                            public void onData(List<Conversation> list2) {
                                                subscriber2.onNext(list2);
                                            }

                                            @Override // com.taobao.message.service.inter.tool.callback.DataCallback
                                            public void onError(String str, String str2, Object obj) {
                                                subscriber2.onNext(list);
                                                subscriber2.onCompleted();
                                            }
                                        })) {
                                            return;
                                        }
                                        subscriber2.onNext(list);
                                        subscriber2.onCompleted();
                                    }
                                });
                            }
                        });
                    }
                    create.subscribe(new Subscriber<List<Conversation>>() { // from class: com.taobao.message.service.base.conversation.AllConversationReportNode.1.3
                        @Override // com.taobao.message.datasdk.kit.chain.core.Observer
                        public void onCompleted() {
                            countDownLatch.countDown();
                        }

                        @Override // com.taobao.message.datasdk.kit.chain.core.Observer
                        public void onError(Throwable th) {
                            countDownLatch.countDown();
                        }

                        @Override // com.taobao.message.datasdk.kit.chain.core.Observer
                        public void onNext(List<Conversation> list) {
                            arrayList.addAll(list);
                        }
                    });
                }
            });
        }
        try {
            countDownLatch.await(2000L, TimeUnit.SECONDS);
        } catch (InterruptedException unused) {
        }
        Event obtain2 = Event.obtain(reportConversationData.getType(), reportConversationData.getName(), arrayList);
        obtain2.version = reportConversationData.getVersion();
        this.eventChannelSupport.postEvent(obtain2);
        subscriber.onNext(arrayList);
        subscriber.onCompleted();
    }

    @Override // com.taobao.message.datasdk.ripple.datasource.node.conversationreport.ConversationReportNode, com.taobao.message.datasdk.kit.chain.INode
    public /* bridge */ /* synthetic */ void handle(Object obj, Map map, Subscriber subscriber) {
        handle((ReportConversationData) obj, (Map<String, Object>) map, (Subscriber<? super List<Conversation>>) subscriber);
    }
}
