Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into feat/#63
  • Loading branch information
mingzooo committed Feb 15, 2024
2 parents d059071 + 822405e commit f715a47
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 70 deletions.
12 changes: 12 additions & 0 deletions src/backend/chatting-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,21 @@ dependencies {

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'

implementation 'org.springframework.cloud:spring-cloud-starter-openfeign:4.1.0'

annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
}

ext {
set('springCloudVersion', "2023.0.0")
}

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

tasks.named('test') {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package chattingserver.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Slf4j
@EnableAsync
@Configuration
public class AsyncConfig {

@Bean(name = "search")
public Executor pushAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int processors = Runtime.getRuntime().availableProcessors();
log.info("processors count {}", processors);
executor.setThreadNamePrefix("SearchAsync- ");
executor.setCorePoolSize(processors);
executor.setMaxPoolSize(processors * 2);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package chattingserver.config;

import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableFeignClients(basePackages = {"chattingserver.controller"})
public class OpenFeignConfig {

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package chattingserver.config.kafka;

import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.RoomMessageDto;
import chattingserver.dto.response.RoomResponseDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -20,11 +19,11 @@ public void listenChat(ChatMessageDto chatMessageDto) {
template.convertAndSend("/chat/topic/room/" + chatMessageDto.getRoomId(), chatMessageDto);
}

// TODO search indexing test용 추후 삭제 예정
@KafkaListener(groupId = "${spring.kafka.consumer.room-consumer.group-id}", topics = "${kafka.topic.room-name}", containerFactory = "kafkaListenerContainerFactory")
public void listenGroupCreation(RoomMessageDto roomMessageDto) {
RoomResponseDto roomResponseDto = roomMessageDto.getRoomResponseDto();
for (Long userId : roomMessageDto.getReceivers()) {
template.convertAndSend("/chat/topic/new-room/" + userId, roomResponseDto);
}
public void listenRoomCreation(IndexingRequestMessageDto indexingRequestMessageDto) {

template.convertAndSend("/chat/topic/search/room/index", indexingRequestMessageDto);

}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package chattingserver.config.kafka;

import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.RoomMessageDto;
import chattingserver.dto.response.ChatMessageResponseDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -51,7 +50,7 @@ public KafkaTemplate<String, ChatMessageDto> chatKafkaTemplate() {
}

@Bean
public ProducerFactory<String, RoomMessageDto> roomProducerFactory() {
public ProducerFactory<String, IndexingRequestMessageDto> roomProducerFactory() {
return new DefaultKafkaProducerFactory<>(roomProducerConfigurations());
}

Expand All @@ -65,7 +64,7 @@ public Map<String, Object> roomProducerConfigurations() {
}

@Bean
public KafkaTemplate<String, RoomMessageDto> roomKafkaTemplate() {
public KafkaTemplate<String, IndexingRequestMessageDto> roomKafkaTemplate() {
return new KafkaTemplate<>(roomProducerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package chattingserver.config.kafka;

import chattingserver.domain.room.User;
import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.RoomMessageDto;
import chattingserver.dto.response.ChatMessageResponseDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import chattingserver.dto.response.RoomResponseDto;
import chattingserver.dto.response.UserListResponseDto;
import chattingserver.service.ChatMessageService;
Expand Down Expand Up @@ -32,23 +31,23 @@ public class Producers {
@Value("${kafka.topic.room-name}")
private String topicRoomName;

private final KafkaTemplate<String, RoomMessageDto> roomKafkaTemplate;
private final KafkaTemplate<String, IndexingRequestMessageDto> roomKafkaTemplate;

private final ChatMessageService chatMessageService;
private final RoomService roomService;

public void sendMessage(ChatMessageDto chatMessageDto) {
if (chatMessageDto.getMessageType() == MessageType.ENTRANCE) {
log.info("producers.sendMessage.if MessageType == ENTRANCE");
if (chatMessageDto.getMessageType() == MessageType.CREATION) {
RoomResponseDto roomResponseDto = roomService.getRoomInfo(chatMessageDto.getRoomId());
List<Long> receivers = roomResponseDto.getUsers().stream().map(UserListResponseDto::getUid).collect(Collectors.toList());
receivers.remove(chatMessageDto.getSenderId());
sendRoomMessage(RoomMessageDto.builder()
.receivers(receivers)
.roomResponseDto(roomResponseDto)
sendRoomMessage(IndexingRequestMessageDto.builder()
.roomId(roomResponseDto.getId())
.roomName(roomResponseDto.getRoomName())
.playlistId(roomResponseDto.getPlaylist().getId())
.thumbnailImage(roomResponseDto.getThumbnailImage())
.build());
} else {
log.info("producers.sendMessage.if MessageType != ENTRANCE");

CompletableFuture<SendResult<String, ChatMessageDto>> completableFuture = chatKafkaTemplate.send(topicChatName, chatMessageDto);
completableFuture.whenComplete((result, ex) -> {
Expand All @@ -63,14 +62,14 @@ public void sendMessage(ChatMessageDto chatMessageDto) {
}
}

public void sendRoomMessage(RoomMessageDto roomMessageDto) {
CompletableFuture<SendResult<String, RoomMessageDto>> completableFuture = roomKafkaTemplate.send(topicRoomName, roomMessageDto);
public void sendRoomMessage(IndexingRequestMessageDto roomMessageDto) {
CompletableFuture<SendResult<String, IndexingRequestMessageDto>> completableFuture = roomKafkaTemplate.send(topicRoomName, roomMessageDto);
completableFuture.whenComplete((result, ex) -> {
if (ex == null) {
log.info("메시지 전송 성공=[" + roomMessageDto.getRoomResponseDto().getId() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
log.info("메시지 전송 성공=[" + roomMessageDto.getRoomId() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
log.info("roomMessageDto={}", roomMessageDto.toString());
} else {
log.info("메시지 전송 불가=[" + roomMessageDto.getRoomResponseDto().getId() + "] 원인 : " + ex.getMessage());
log.info("메시지 전송 불가=[" + roomMessageDto.getRoomId() + "] 원인 : " + ex.getMessage());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class SwaggerConfig {

@Bean
public GroupedOpenApi chatOpenApi() {
String[] paths = {"/api/**", "/chat/**"};
String[] paths = {"/v1/**", "/**/chat"};

return GroupedOpenApi.builder()
.group("CHATTING-SERVER API V1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@Tag(name = "chat", description = "채팅 API")
@RequiredArgsConstructor
@RestController
@RequestMapping("/chat")
@RequestMapping("v1/api/chat")
@Slf4j
public class ChatController {

Expand Down Expand Up @@ -90,7 +90,7 @@ public ResponseEntity<CommonAPIMessage> allMessagesAtRoom(@PathVariable String r
return new ResponseEntity<>(apiMessage, HttpStatus.OK);
}

@Operation(summary = "채팅 메시지 Pagination", description = "내림차순으로 해당 채팅방 메시지 Pagination, 사이즈 N = 12 고정")
@Operation(summary = "채팅 메시지 Pagination", description = "내림차순으로 해당 채팅방 메시지 Pagination, 사이즈 N = 20 고정")
@GetMapping("/history")
public ResponseEntity<CommonAPIMessage> chatMessagePagination(
@RequestParam String roomId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package chattingserver.controller;

import chattingserver.config.kafka.Producers;
import chattingserver.domain.room.User;
import chattingserver.dto.RoomMessageDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import chattingserver.dto.request.ReadMessageUpdateRequestDto;
import chattingserver.dto.request.RoomCreateRequestDto;
import chattingserver.dto.request.UserEntranceRequestDto;
import chattingserver.dto.response.CommonAPIMessage;
import chattingserver.dto.response.JoinedRoomResponseDto;
import chattingserver.dto.response.RoomResponseDto;
import chattingserver.dto.response.UserListResponseDto;
import chattingserver.service.ChatMessageService;
import chattingserver.service.RoomService;
import chattingserver.service.SearchService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -26,17 +25,17 @@

import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

@Tag(name = "room", description = "채팅방 API")
@Slf4j
@RestController
@RequestMapping("/api/v1/rooms")
@RequestMapping("/v1/api/rooms")
@RequiredArgsConstructor
public class RoomController {

private final RoomService roomService;
private final ChatMessageService chatMessageService;
private final SearchService searchService;
private final Producers producers;

@Operation(summary = "채팅방 생성 API", description = "신규 채팅방 생성", responses = {
Expand All @@ -47,15 +46,19 @@ public ResponseEntity<CommonAPIMessage> groupCreation(@Valid @RequestBody RoomCr

RoomResponseDto roomResponseDto = roomService.create(roomCreateRequestDto);

producers.sendRoomMessage(RoomMessageDto.builder()
.receivers(roomResponseDto.getUsers().stream().map(UserListResponseDto::getUid).collect(Collectors.toList()))
.roomResponseDto(roomResponseDto)
producers.sendRoomMessage(IndexingRequestMessageDto.builder()
.roomId(roomResponseDto.getId())
.roomName(roomResponseDto.getRoomName())
.playlistId(roomResponseDto.getPlaylist().getId())
.thumbnailImage(roomResponseDto.getThumbnailImage())
.build());

CommonAPIMessage apiMessage = new CommonAPIMessage();
apiMessage.setMessage(CommonAPIMessage.ResultEnum.success);
apiMessage.setData(roomResponseDto);

searchService.sendIndexingRequestToSearchServer(roomResponseDto);

return new ResponseEntity<>(apiMessage, HttpStatus.CREATED);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package chattingserver.controller;

import chattingserver.dto.request.IndexingRequestMessageDto;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(name = "search-service", url = "localhost:20000")
public interface SearchProxy {
@PostMapping("/search/index/send")
void sendIndexingRequest(@RequestBody IndexingRequestMessageDto dto);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package chattingserver.dto.request;

import lombok.*;


@ToString
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder
public class IndexingRequestMessageDto {
private String roomId;
private String roomName;
private String playlistId;
private String thumbnailImage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ReadMessageUpdateRequestDto {
@NotBlank
private String roomId;
@NotBlank
private String uid;
private Long uid;
@NotBlank
private String messageId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public ChatMessage getLastMessage(String roomId) {

@Override
public List<ChatMessage> findPreviousMessages(String roomId, String readMsgId, int limit) {
ObjectId readMsgObjectId = new ObjectId(readMsgId);
Query query = new Query();
query.addCriteria(Criteria.where("roomId").is(roomId).and("_id").lt(readMsgId)); // 이전 메시지만 가져오도록 쿼리 설정
query.limit(20); // 최대 20개의 메시지 가져오도록 제한 설정
query.addCriteria(Criteria.where("roomId").is(roomId).and("_id").lt(readMsgObjectId));
query.limit(20);
return mongoTemplate.find(query, ChatMessage.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chattingserver.repository;

import chattingserver.domain.room.Room;
import chattingserver.domain.room.User;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
Expand All @@ -24,4 +25,6 @@ public interface RoomRepository extends MongoRepository<Room, String>, RoomRepos

@Override
boolean existsById(String s);


}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package chattingserver.repository;

import chattingserver.domain.room.User;
import chattingserver.dto.request.ReadMessageUpdateRequestDto;
import com.mongodb.client.result.UpdateResult;

public interface RoomRepositoryCustom {

void exitRoom(String roomId, Long uid);
UpdateResult addUserToRoom(String roomId, User user);

UpdateResult updateLastReadMsgId(ReadMessageUpdateRequestDto requestDto);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chattingserver.repository;

import chattingserver.domain.room.Room;
import chattingserver.domain.room.User;
import chattingserver.dto.request.ReadMessageUpdateRequestDto;
import com.mongodb.client.result.UpdateResult;
import org.springframework.data.mongodb.core.MongoTemplate;
Expand Down Expand Up @@ -29,12 +30,18 @@ public void exitRoom(String roomId, Long uid) {
public UpdateResult updateLastReadMsgId(ReadMessageUpdateRequestDto requestDto) {

Query query = Query.query(
Criteria.where("roomId").is(requestDto.getRoomId())
Criteria.where("_id").is(requestDto.getRoomId())
.andOperator(Criteria.where("users").elemMatch(Criteria.where("uid").is(requestDto.getUid())))
);

Update update = new Update().set("users.$.lastReadMessageId", requestDto.getMessageId());
return mongoTemplate.updateFirst(query, update, Room.class);
}

@Override
public UpdateResult addUserToRoom(String roomId, User user) {
Query query = new Query(Criteria.where("_id").is(roomId));
Update update = new Update().addToSet("users", user);
return mongoTemplate.updateFirst(query, update, Room.class);
}
}
Loading

0 comments on commit f715a47

Please sign in to comment.