具有Retrofit + RxJava的Android Twitter API状态/过滤器:永远不会命中onNext或onError

I try to consume the Streaming API from Twitter: statuses/filter as the following link(1) (1)https://stream.twitter.com/1.1/statuses/filter.json?track=twitter

However, after getting a correct token on request_Token(2) response, and make the request to statuses/filter, I do get a response 200 but still without hitting onNext or onError endlessly.

(2) https://api.twitter.com/oauth/request_token

2020-05-09 13:09:26.023 28542-28617/com.example.myapplication D/OkHttp: <-- 200 https://stream.twitter.com/1.1/statuses/filter.json?track=foo&follow=1234 (1002ms)

我不确定代码中是否缺少某些内容或错误。 请给我一些反馈吗?这是我的代码:

package com.example.myapplication

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import hu.akarnokd.rxjava3.retrofit.RxJava3CallAdapterFactory
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import okhttp3.ResponseBody
import okhttp3.logging.HttpLoggingInterceptor
import retrofit2.Retrofit
import retrofit2.converter.scalars.ScalarsConverterFactory
import retrofit2.http.*
import se.akerfeldt.okhttp.signpost.OkHttpOAuthConsumer
import se.akerfeldt.okhttp.signpost.SigningInterceptor
import java.util.concurrent.TimeUnit


class MainActivity : AppCompatActivity() {

    companion object {
        private const val REQUEST_TOKEN = "https://api.twitter.com/oauth/request_token"
        private const val consumerKeyValue = "{{consumerKeyValue}}"
        private const val consumerKeySecretValue = "{{consumerKeySecretValue}}"
        private const val accessTokenValue = "{{accessTokenValue}}"
        private const val accessTokenSecretValue = "{{accessTokenSecretValue}}"
    }

    val disposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
    }

    override fun onResume() {
        super.onResume()
        disposable.add(calls())
    }

    private fun calls(): Disposable {
        return retrofit()
            .requestToken(REQUEST_TOKEN)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .toFlowable(BackpressureStrategy.BUFFER)
            .flatMap { getData(it.string().split("&")) }
            .subscribe({
                print("onNext: $it")
            }, {
                print("onError: ${it.localizedMessage}")
            }, {
                print("onComplete")
            })
    }

    private fun getData(resArray: List<String>): Flowable<ResponseBody> {
        val oauthToken = resArray.first().split("=")
        val oauthTokenSecret = resArray[1].split("=")
        return retrofit().requestFiltered(HashMap<String, String>().apply {
            this[oauthToken.first()] = oauthToken.last()
            this[oauthTokenSecret.first()] = oauthTokenSecret.last()
        })
    }

    private fun retrofit(): TwitterService {
        return Retrofit.Builder()
            .baseUrl("https://stream.twitter.com/1.1/")
            .addConverterFactory(ScalarsConverterFactory.create())
            .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
            .client(getClient())
            .build()
            .create(TwitterService::class.java)
    }

    private fun getClient(): OkHttpClient {
        val clientBuilder = OkHttpClient.Builder()
            .apply {
                connectTimeout(10, TimeUnit.SECONDS) // connect timeout
                writeTimeout(30, TimeUnit.SECONDS) // write timeout
                readTimeout(30, TimeUnit.SECONDS) // read timeout
            }
        if (BuildConfig.DEBUG) {
            val loggingInterceptor = HttpLoggingInterceptor()
            loggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
            clientBuilder.addInterceptor(loggingInterceptor)
        }

        val consumer = OkHttpOAuthConsumer(consumerKeyValue, consumerKeySecretValue)
        consumer.setTokenWithSecret(accessTokenValue, accessTokenSecretValue)
        clientBuilder.addInterceptor(SigningInterceptor(consumer))

        clientBuilder
            .addNetworkInterceptor { chain ->
                val requestBuilder = chain.request().newBuilder()
                requestBuilder.addHeader("Content-Type", "application/json")
                chain.proceed(requestBuilder.build())
            }
        clientBuilder.interceptors().add(object : Interceptor {
            override fun intercept(chain: Interceptor.Chain): okhttp3.Response {
                val request = chain.request()
                Log.e("OkHttp3 ${request.method}", request.body.toString())
                return chain.proceed(request)
            }

        })
        return clientBuilder.build()
    }

    interface TwitterService {

        @Headers("Content-Type: text/html;charset=utf-8")
        @GET
        fun requestToken(@Url url: String): Observable<ResponseBody>

        @Headers("Cache-Control: max-age=640000")
        @POST("statuses/filter.json")
        @Streaming
        fun requestFiltered(
            @HeaderMap headers: HashMap<String, String>,
            @Query("track") track: String = "foo",
            @Query("follow") follow: String = "1234"
        ): Flowable<ResponseBody>

    }

}

// Dependencies:
    implementation 'com.squareup.retrofit2:retrofit:2.8.1'
    implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
    implementation 'com.squareup.retrofit2:converter-scalars:2.8.1'
    implementation 'com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
    implementation 'com.squareup.okhttp3:okhttp:4.6.0'
    implementation 'com.squareup.okhttp3:logging-interceptor:4.6.0'
    implementation 'se.akerfeldt:okhttp-signpost:1.1.0'
    implementation 'oauth.signpost:signpost-core:1.2.1.2'

谢谢大家。

评论