package im.zico.fancy.data.repository.remote;

import android.text.TextUtils;
import android.util.Log;
import com.alibaba.fastjson.JSON;
import im.zico.fancy.api.StreamingApiService;
import im.zico.fancy.api.model.StreamingEvent;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import java.io.IOException;
import javax.inject.Singleton;
import okhttp3.ResponseBody;
import okio.BufferedSource;

@Singleton
/* loaded from: classes6.dex */
public class StreamingApi {
    private StreamingApiService streamingApiService;

    public StreamingApi(StreamingApiService streamingApiService) {
        this.streamingApiService = streamingApiService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static final /* synthetic */ void lambda$null$0$StreamingApi(BufferedSource bufferedSource, ObservableEmitter observableEmitter) throws Exception {
        while (!bufferedSource.exhausted()) {
            try {
                String readUtf8Line = bufferedSource.readUtf8Line();
                if (!TextUtils.isEmpty(readUtf8Line)) {
                    observableEmitter.onNext((StreamingEvent) JSON.parseObject(readUtf8Line, StreamingEvent.class));
                }
            } catch (IOException e) {
                observableEmitter.onError(e);
                return;
            }
        }
        observableEmitter.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static final /* synthetic */ ObservableSource lambda$streaming$1$StreamingApi(ResponseBody responseBody) throws Exception {
        Log.d("DEBUG", "Streaming: ");
        final BufferedSource source = responseBody.source();
        return Observable.create(new ObservableOnSubscribe(source) { // from class: im.zico.fancy.data.repository.remote.StreamingApi$$Lambda$2
            private final BufferedSource arg$1;

            /* JADX INFO: Access modifiers changed from: package-private */
            {
                this.arg$1 = source;
            }

            @Override // io.reactivex.ObservableOnSubscribe
            public void subscribe(ObservableEmitter observableEmitter) {
                StreamingApi.lambda$null$0$StreamingApi(this.arg$1, observableEmitter);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static final /* synthetic */ StreamingEvent lambda$streaming$2$StreamingApi(StreamingEvent streamingEvent) throws Exception {
        return streamingEvent;
    }

    public Observable<StreamingEvent> streaming() {
        return this.streamingApiService.streaming().flatMap(StreamingApi$$Lambda$0.$instance).map(StreamingApi$$Lambda$1.$instance).retry(1000L);
    }
}
