Android实践 — ikan项目(二)

  • 转载时请注明出处,谢谢
  • / 0评 / 0

      通过上篇博客的简述可知,获取网络数据只需通过net层的Connector接口获取Retrofit对象,然后用其创建对应的Service对象即可发起网络请求,但是data层处理的数据不仅有网络数据,也有本地数据,其次本地数据存取又可分Sqlite存取和SharedPreferences存取等,这些存取过程都需要在data层处理,同时应尽可能的对依赖层保持低耦合。基于以上考虑,所以在data层中将数据存取抽出DataStore接口,实现为XXCloudDataStore和XXLocalDataStore。CloudDataStore负责网络相关数据存取,而LocalDataStore则负责本地数据存取。在项目中有FeedDataStore的定义如下:

    interface FeedDataStore {
    
        fun getHomeFeeds(feedParamProvider: FeedParamProvider): Observable<List<FeedEntity>>
    
       //xxx
    }
    

    需要本层实现FeedDataStore,CloudFeedDataStore代码如下:

    class CloudFeedDataStore(connector: Connector, tokenInterceptor: TokenInterceptor)
        : BaseCloudDataStore(tokenInterceptor), FeedDataStore {
    
        private val feedService: FeedService = connector
                .getServiceCreator()
                .create(FeedService::class.java)
    
        override fun getHomeFeeds(feedParamProvider: FeedParamProvider): Observable<List<FeedEntity>> {
            return feedService.getHomeFeeds(feedParamProvider.optionalParam.map)
                    .flatMap { dataResponse ->
                        ResponseFlatResult.flatResult(dataResponse)
                    }             .onErrorResumeNext(tokenInterceptor.refreshTokenAndRetry(Observable.defer {
                        feedService.getHomeFeeds(feedParamProvider.optionalParam.map)
                                .flatMap { dataResponse ->
                                    ResponseFlatResult.flatResult(dataResponse)
                                }
                    }
                    ))
        }
    }
    

    CloudFeedDataStore依赖connector对象和tokenInterceptor对象,Connector上篇博客已述,在操作符onErrorResumeNext执行tokenInterceptor.refreshTokenAndRetry,onErrorResumeNext这个操作符相信多数同学都有了解,它是Rxjava提供的操作符,当源Observablesource遇到错误时可以使用onErrorResumeNext操作符返回一个新的Observablesource,而不会执行Observer的onError方法。defer操作符延迟订阅的机制,以此保证新的Observable代码仅被订阅后才会执行。在分析TokenInterceptor前我先解释下JWT机制,有的同学可能还不太了解JWT(Json web token),JWT是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准(RFC 7519)。JWT的声明一般被用来在身份提供者和服务提供者间传递被认证的用户身份信息,以便于从资源服务器获取资源,也可以增加一些额外的其它业务逻辑所必须的声明信息,该token也可直接被用于认证,也可被加密。ikan项目稍微扩展JWT认证方式,在用户执行登录时服务端根据客户端提交的openid生成一对密串(token,refresh_token),它们的区别是过期时间长短不一致,项目规定refresh_token的过期时间为15天,token的过期时间为3天,客户端在请求非登录接口时要传本地缓存的token值给服务端,服务端依据token区分用户和请求是否有效,当token过期时服务端可通过本地缓存的refresh_token重新获取token,当refresh_token也过期时客户端此前的认证状态作废,用户必须要重新登录才能继续进行数据请求。TokenInterceptor的实现如下:

    @Singleton
    class TokenInterceptor @Inject internal constructor(private val bus: Bus, connector: Connector,
                                                        private val tokenCache: TokenCache) {
    
        private val tokenService: TokenService = connector
                .getServiceCreator()
                .create(TokenService::class.java)
    
        fun <T> refreshTokenAndRetry(toBeResumedObservable: Observable<T>): Function<Throwable, out Observable<out T>> {
            return Function { topThrowable ->
                if (isHttp401Error(topThrowable))
                    refreshToken()
                            .doOnError { throwable ->
                                if (isHttp403Error(throwable)) {
                                    bus.post(ReLoginEvent())
                                }
                            }
                            .doOnNext { tokenEntity ->
                                tokenCache.put(tokenEntity)
                            }
                            .flatMap {
                                toBeResumedObservable
                            }
                else
                    Observable.error(topThrowable)
            }
        }
    
        private fun refreshToken(): Observable<TokenEntity> {
            val tokenParamProvider = TokenParamProvider()
            tokenParamProvider.refreshToken(tokenCache.get()?.reToken!!)
    
            return tokenService.refreshToken(tokenParamProvider.optionalParam.map)
                    .flatMap({ tokenEntityDataResponse ->
                        ResponseFlatResult.flatResult(tokenEntityDataResponse)
                    })
        }
    
        private fun isHttp401Error(throwable: Throwable): Boolean {
            return if (throwable is HttpException) {
                throwable.code() == 401
            } else
                false
        }
    
        private fun isHttp403Error(throwable: Throwable): Boolean {
            return if (throwable is HttpException) {
                throwable.code() == 403
            } else
                false
        }
    }
    

    聚焦refreshTokenAndRetry方法前,先看下使用的onErrorResumeNext定义:

    public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction) {
        ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null");
        return RxJavaPlugins.onAssembly(new ObservableOnErrorNext<T>(this, resumeFunction, false));
    }
    

    可以看到onErrorResumeNext接收Function类型,而Function作用是将一种转换成另一种类型,因此在网络请求失败时可以将Throwable转成Observable。回到refreshTokenAndRetry方法,首先根据topThrowable对象判定异常类型是否为token过期,如果是则用本地缓存的refresh_token刷新token,否则返回topThrowable,接着看下refreshToken() .doOnError操作符,该操作符是在执行onError时触发,注意这个操作不会影响downstream接收的exception,在刷新token使用该操作符判定异常状态状态,从来决定是否post重新登录事件。

    ​ 细心的同学应该能注意到,在feedService.getHomeFeeds()和tokenService.refreshToken()都出现了ResponseFlatResult.flatResult()调用,而该方法返回至flatMap操作符中,flatMap方法定义:

    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
    }
    

    该操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。在数个重载方法中辗转调用onAssembly,而在onAssembly方法中onObservableAssembly默认为null,需要全局传入,所以source在本项目中是直接返回的。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    继续看下ResponseFlatResult.flatResult()方法定义:

    object ResponseFlatResult {
    
        fun <T : Any> flatResult(result: DataResponse<T>): Observable<T> {
            return Observable.create { emitter ->
                when (result.statusCode) {
                    ResponseStatus.STATUS_CODE_10000 -> {
                        result.data?.let {
                            emitter.onNext(result.data!!)
                        }
                        emitter.onComplete()
                    }
                    // xxx
                    else -> {
                        emitter.onError(UnKnownException())
                    }
                }
            }
        }
    }
    

    flatResult将DataResponse 类型按Observable 类型,在项目上一篇博客中有过说明,客户端会依据服务端返回的status_code区分接口请求的结果状态,所以在这个方法中将会处理所有服务端定义的业务响应码,然后将对应的异常发射出去。

    ​ 在了解CloudDataStore调用链之后,需要进一步简述LocalDataStore,相对Cloud存取而言,Local的存取简单许多,项目中LocalAccountDataStore的实现如下:

    class LocalAccountDataStore(private val accountCache: AccountCache,
                                private val tokenCache: TokenCache) : AccountDataStore {
    
        override fun login(accountParamProvider: AccountParamProvider): Observable<AccountEntity> = throw  UnsupportedOperationException()
    
        override fun getLogged(): AccountEntity? = accountCache.get()
    
        override fun getDetail(accountParamProvider: AccountParamProvider): Observable<AccountEntity> = throw  UnsupportedOperationException()
    
        override fun update(accountParamProvider: AccountParamProvider): Observable<AccountEntity> = throw  UnsupportedOperationException()
    
        override fun logout() {
            accountCache.evict()
            tokenCache.evict()
        }
    }
    

    因为Local存取项目中分为Sqlite存取和SharedPreferences存取,而LocalAccountDataStore类的职责是存取已登录的用户信息,需要存取的数据都比较简单,所以LocalAccountDataStore就采用了SharedPreferences存取。以下是AccountCache接口的具体实现代码:

    class AccountCacheImpl(context: Context) : AccountCache {
    
        private var ph: PreferenceHelper = PreferenceHelper.getInstance(context)
    
        override fun get(): AccountEntity? {
            val entityString = ph.getString(PH_KEY_ACCOUNT)
    
            return if (!TextUtils.isEmpty(entityString)) {
                Gson().fromJson<AccountEntity>(entityString, AccountEntity::class.java)
            } else {
                null
            }
        }
    
        override fun put(accountEntity: AccountEntity) {
            val jsonString = Gson().toJson(accountEntity)
            ph.put(PH_KEY_ACCOUNT, jsonString)?.commit()
        }
    
        override fun evict() {
            ph.put(PH_KEY_ACCOUNT, "")?.commit()
        }
    
        companion object {
    
            const val PH_KEY_ACCOUNT = "account"
    
            private var INSTANCE: AccountCacheImpl? = null
    
            fun getInstance(context: Context): AccountCacheImpl = INSTANCE ?: AccountCacheImpl(context).also { INSTANCE = it }
        }
    }
    

    AccountCacheImpl代码逻辑比较简单,这里就不做赘述了。着重看下FeedCacheImpl的实现

    class FeedCacheImpl(val context: Context) : FeedCache {
    
        private var connection = DbHelper.getConnection(context)
    
        override fun getFeedSegments(): Observable<List<SegmentEntity>> {
            return RxCupboard.withDefault(connection)
                    .query(SegmentEntity::class.java)
                    .toList()
                    .toObservable()
        }
    
        override fun addFeedSegments(segments: List<SegmentEntity>): Disposable =
                Single.concat(RxCupboard.withDefault(connection).deleteAll(SegmentEntity::class.java),
                        RxCupboard.withDefault(connection).put(segments)).subscribe({})
    }
    

    项目针对本地数据库存取使用的Cupboard,项目仅是对其稍作扩展使其支持Rxjava,需要说明一下Single.concat操作符,该操作符的作用是将几个Observable的数据合并有序发射,为了保证数据库存储的feed类别数据唯一性,所以使用该操作符在存储前将已存储feed类别数据清空再进行存储。最后强调一下,data层对domain层提供DataStore数据接口,除此之外并无其它职责。

    data层源码

    发表评论

    电子邮件地址不会被公开。 必填项已用*标注