Skip to content

Commit

Permalink
增加新的同步接口
Browse files Browse the repository at this point in the history
  • Loading branch information
czf0613 committed Apr 2, 2023
1 parent dd72da2 commit 0530099
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 28 deletions.
6 changes: 6 additions & 0 deletions KChatSDK/src/main/java/ltd/kevinc/kchat/KChatEventDelegate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,11 @@ interface KChatEventDelegate {

suspend fun channelClose(e: Throwable?) {

}
}

interface KChatSyncRecordStreamDelegate {
suspend fun processPack(pack: List<Chat.ChatMessageWrapper>) {

}
}
82 changes: 61 additions & 21 deletions KChatSDK/src/main/java/ltd/kevinc/kchat/KChatServiceClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,33 @@ class KChatServiceClient {
* 在app启动时,可以用这个接口进行数据的同步
* @param startTime 同步聊天记录的开始区间,ISO8601时间戳,带时区
* @param endTime 参照startTime即可
* @return 返回的是一个list of object。内部的object是一个联合体,它可能是以下几种类型之一:C2CChatMessage, GroupChatMessage, FriendApply等等,需要通过ChatMessageWrapperContentCase这个枚举进行判断
* @param pageSize 分页查询的每一页大小,不要设置太大的值,手机可能处理不过来
* @param currentPage 以0为基数的页码,超出的话后端接口不会报错,而是返回空
* @return 返回的是一个元组,分别是list of object和totalPages。totalPages可以方便前端进行分页查询
* 内部的object是一个联合体,它可能是以下几种类型之一:C2CChatMessage, GroupChatMessage, FriendApply等等,需要通过ChatMessageWrapperContentCase这个枚举进行判断
*/
suspend fun fetChatRecords(
suspend fun fetchChatRecords(
startTime: String = "2022-01-01T00:00:00.000+08:00",
endTime: String = "2099-12-31T23:59:59.999+08:00"
): List<Chat.ChatMessageWrapper> {
endTime: String = "2099-12-31T23:59:59.999+08:00",
pageSize: Int = 10,
currentPage: Int = 0
): Pair<List<Chat.ChatMessageWrapper>, Int> {
val request = Chat.SyncChatRecordRequest.newBuilder()
.setUserUid(KChatSDKClient.userUid)
.setFromTime(startTime)
.setToTime(endTime)
.setMaxLength(pageSize)
.setPage(currentPage)
.build()

return try {
KChatSDKClient.chatClient.syncChatRecord(request, KChatSDKClient.header).recordsList
val resp = KChatSDKClient.chatClient.syncChatRecord(request, KChatSDKClient.header)
Pair(resp.recordsList, resp.totalPages)
} catch (e: Exception) {
Log.e("KChat.SyncRecord", "failed to fetch data")
e.printStackTrace()

emptyList()
Pair(emptyList(), 0)
}
}

Expand Down Expand Up @@ -72,50 +80,82 @@ class KChatServiceClient {
}

// 为防止GC导致listener被意外中断,这里需要建立一个局部变量进行处理
private lateinit var listener: KChatEventDelegate
private lateinit var chatChannel: Flow<Chat.ChatMessageWrapper>
private lateinit var syncChannel: Flow<Chat.SyncChatRecordReply>

/**
* 这个方法需要传入一个delegate用于接收新消息的回调
* 由于协程的特性,这个delegate被执行的地方是未知的,因此如果需要在回调中刷新UI
* 请务必手动切换线程以避免非主线程刷新UI的bug
*/
suspend fun listenForChatMessage(listener: KChatEventDelegate) {
this.listener = listener

val request = Chat.SubscribeChannelRequest.newBuilder()
.setUserUid(KChatSDKClient.userUid)
.setDeviceTag(KChatSDKClient.deviceId)
.build()

try {
this.chatChannel = KChatSDKClient.chatClient
.subscribeChatMessage(request, KChatSDKClient.header)
} catch (e: Exception) {
Log.e("KChat.Subscribe", "fail to establish a chat channel")
listener.channelClose(e)
}
this.chatChannel = KChatSDKClient.chatClient
.subscribeChatMessage(request, KChatSDKClient.header)

this.chatChannel
.flowOn(Dispatchers.IO)
this.chatChannel.flowOn(Dispatchers.IO)
.onStart {
Log.i("KChat.Subscribe", "start listening for message.")
}
.catch { err ->
this@KChatServiceClient.listener.onError(err)
listener.onError(err)
Log.e("KChat.Subscribe", "network err!")
}
.onCompletion { err ->
this@KChatServiceClient.listener.channelClose(err)
listener.channelClose(err)
Log.e("KChat.Subscribe", "channel closed due to some reason.")
}
.collect { message ->
when (message.contentCase) {
Chat.ChatMessageWrapper.ContentCase.C2CMESSAGE -> listener.onReceiveC2CMessage(
message.c2CMessage
)
else -> this@KChatServiceClient.listener.onError(IllegalArgumentException("unknown message type"))
else -> listener.onError(IllegalArgumentException("unknown message type"))
}
}
}

/**
* @see fetchChatRecords 这个也是用来同步聊天记录的接口,可以查看上面接口的定义
* 不同的是,这个接口会以stream的形式返回数据,因此就可以慢慢处理所有的数据
* 同样的道理,不应该在这个地方设置过大的pageSize,小心手机被卡爆
*
* @see listenForChatMessage 关于delegate的处理,可以看上面的函数
*/
suspend fun fetchChatRecords(
startTime: String = "2022-01-01T00:00:00.000+08:00",
endTime: String = "2099-12-31T23:59:59.999+08:00",
pageSize: Int = 10,
delegate: KChatSyncRecordStreamDelegate
) {
val request = Chat.SyncChatRecordRequest.newBuilder()
.setUserUid(KChatSDKClient.userUid)
.setFromTime(startTime)
.setToTime(endTime)
.setMaxLength(pageSize)
.build()

this.syncChannel = KChatSDKClient.chatClient.syncChatRecordStream(request)

this.syncChannel.flowOn(Dispatchers.IO)
.onStart {
Log.i("KChat.Sync", "start synking message.")
}
.catch { err ->
err.printStackTrace()
Log.e("KChat.Sync", "network err!")
}
.onCompletion { err ->
err?.printStackTrace()
Log.e("KChat.Sync", "sync closed due to some reason.")
}
.collect { message ->
Log.i("KChat.Sync", "pack size: ${message.recordsList.size}")
delegate.processPack(message.recordsList)
}
}
}
18 changes: 16 additions & 2 deletions KChatSDK/src/main/proto/chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ option csharp_namespace = "ChatService.Chat";
package service.chat;

service ChattingService {
// 这个方法暂时是残的,只能定向返回C2C的聊天数据,日后会丰富它的查询条件
// 这个方法日后会丰富它的查询条件
rpc syncChatRecord (SyncChatRecordRequest) returns (SyncChatRecordReply);

// 跟syncChatRecord返回是类似的,只不过会将分页数据通过stream全量传输,以简化客户端实现
rpc syncChatRecordStream (SyncChatRecordRequest) returns (stream SyncChatRecordReply);

// 无需成为好友都可以发送
rpc sendAnyC2CMessage (C2CChatMessage) returns (SendReply);
Expand All @@ -20,13 +23,23 @@ message SyncChatRecordRequest {
string fromTime = 2;
string toTime = 3;

// 4, 5还没有实现,给了也没用,目前会截断100条数据
// 用于实现分页查询,可以按需返回数据页,同时也可以按照订阅stream返回
uint32 maxLength = 4;

// page字段基数是0,从零开始数
uint32 page = 5;
}

message SyncChatRecordReply {
repeated ChatMessageWrapper records = 1;
// 基数是0,从零开始数
uint32 currentPage = 2;
// pageSize实际上就是请求里面那个maxLength字段,它未必等于这个数据的长度
uint32 pageSize = 3;
uint32 totalPages = 4;
}

message HeartBeatPack {
}

message C2CChatMessage {
Expand Down Expand Up @@ -80,6 +93,7 @@ message ChatMessageWrapper {
// 以后要补充例如好友申请、服务器下发消息等等
oneof content {
C2CChatMessage c2cMessage = 1;
HeartBeatPack pingPack = 9;
}
}

Expand Down
6 changes: 3 additions & 3 deletions app/src/main/java/ltd/kevinc/chat/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ class MainActivity : AppCompatActivity() {
val client = KChatServiceClient()

client.listenForChatMessage(object : KChatEventDelegate {
override fun onReceiveC2CMessage(message: Chat.C2CChatMessage) {
override suspend fun onReceiveC2CMessage(message: Chat.C2CChatMessage) {
val body = message.content.toStringUtf8()
println("receiving: $body")
}

override fun channelClose(e: Throwable?) {
override suspend fun channelClose(e: Throwable?) {
println("channel被关闭")
}

override fun onError(e: Throwable) {
override suspend fun onError(e: Throwable) {
e.printStackTrace()
}
})
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ buildscript {
}

plugins {
id 'com.android.application' version '7.3.0' apply false
id 'com.android.library' version '7.3.0' apply false
id 'com.android.application' version '7.3.1' apply false
id 'com.android.library' version '7.3.1' apply false
id 'org.jetbrains.kotlin.android' version '1.7.10' apply false
id 'com.google.protobuf' version '0.8.19' apply false
}
Expand Down

0 comments on commit 0530099

Please sign in to comment.