Skip to content

Commit

Permalink
Merge pull request #75 from sgdevcamp2023/feat/#70
Browse files Browse the repository at this point in the history
Feat/#70
  • Loading branch information
suakang17 authored Feb 16, 2024
2 parents 61dbe4f + 89b7525 commit 0566d3b
Show file tree
Hide file tree
Showing 40 changed files with 871 additions and 361 deletions.
12 changes: 12 additions & 0 deletions src/backend/chatting-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ dependencies {

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'

implementation 'org.springframework.cloud:spring-cloud-starter-openfeign:4.1.0'

annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'

// Eureka
Expand All @@ -56,6 +58,16 @@ dependencies {
implementation 'net.logstash.logback:logstash-logback-encoder:6.3'
}

ext {
set('springCloudVersion', "2023.0.0")
}

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

tasks.named('test') {
useJUnitPlatform()
}
2 changes: 2 additions & 0 deletions src/backend/chatting-server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ services:
volumes:
- chatting-mongodbdata:/data/db:cached
- ./init-mongo.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
networks:
- lalala-network

chatting-mongo-express:
image: mongo-express:1.0.2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package chattingserver.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Slf4j
@EnableAsync
@Configuration
public class AsyncConfig {

@Bean(name = "search")
public Executor pushAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int processors = Runtime.getRuntime().availableProcessors();
log.info("processors count {}", processors);
executor.setThreadNamePrefix("SearchAsync- ");
executor.setCorePoolSize(processors);
executor.setMaxPoolSize(processors * 2);
executor.setQueueCapacity(50);
executor.setKeepAliveSeconds(60);
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package chattingserver.config;

import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableFeignClients(basePackages = {"chattingserver.controller"})
public class OpenFeignConfig {

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package chattingserver.config;

import org.apache.catalina.filters.CorsFilter;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class WebConfig implements WebMvcConfigurer {

@Override
public void addCorsMappings(CorsRegistry registry) {
registry
.addMapping("/**")
.allowedOrigins("*")
registry.addMapping("/**")
.allowedOriginPatterns("http://localhost:3000")
.allowedMethods("*")
.allowedHeaders("*")
.allowCredentials(false)
.maxAge(60000);
.allowedHeaders("*");
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package chattingserver.config.kafka;

import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -24,14 +26,10 @@ public void listenChat(ChatMessageDto chatMessageDto) {
template.convertAndSend("/chat/topic/room/" + chatMessageDto.getRoomId(), chatMessageDto);
}

@KafkaListener(
groupId = "${spring.kafka.consumer.room-consumer.group-id}",
topics = "${spring.kafka.topic.room-name}",
containerFactory = "kafkaListenerContainerFactory")
public void listenGroupCreation(RoomMessageDto roomMessageDto) {
RoomResponseDto roomResponseDto = roomMessageDto.getRoomResponseDto();
for (Long userId : roomMessageDto.getReceivers()) {
template.convertAndSend("/chat/topic/new-room/" + userId, roomResponseDto);
}
// TODO search indexing test용 추후 삭제 예정
@KafkaListener(groupId = "${spring.kafka.consumer.room-consumer.group-id}", topics = "${kafka.topic.room-name}", containerFactory = "kafkaListenerContainerFactory")
public void listenRoomCreation(IndexingRequestMessageDto indexingRequestMessageDto) {

template.convertAndSend("/chat/topic/search/room/index", indexingRequestMessageDto);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package chattingserver.config.kafka;

import java.util.HashMap;
import java.util.Map;

import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -36,7 +37,7 @@ public class KafkaProducerConfig {
private String chatValueSerializer;

@Bean
public ProducerFactory<String, ChatMessageDto> producerFactory() {
public ProducerFactory<String, ChatMessageDto> chatProducerFactory() {
return new DefaultKafkaProducerFactory<>(chatProducerConfigurations());
}

Expand All @@ -51,11 +52,11 @@ public Map<String, Object> chatProducerConfigurations() {

@Bean
public KafkaTemplate<String, ChatMessageDto> chatKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
return new KafkaTemplate<>(chatProducerFactory());
}

@Bean
public ProducerFactory<String, RoomMessageDto> roomProducerFactory() {
public ProducerFactory<String, IndexingRequestMessageDto> roomProducerFactory() {
return new DefaultKafkaProducerFactory<>(roomProducerConfigurations());
}

Expand All @@ -69,7 +70,7 @@ public Map<String, Object> roomProducerConfigurations() {
}

@Bean
public KafkaTemplate<String, RoomMessageDto> roomKafkaTemplate() {
public KafkaTemplate<String, IndexingRequestMessageDto> roomKafkaTemplate() {
return new KafkaTemplate<>(roomProducerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
package chattingserver.config.kafka;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

import chattingserver.domain.room.User;
import chattingserver.dto.ChatMessageDto;
import chattingserver.dto.RoomMessageDto;
import chattingserver.dto.request.IndexingRequestMessageDto;
import chattingserver.dto.response.RoomResponseDto;
import chattingserver.dto.response.UserListResponseDto;
import chattingserver.service.ChatMessageService;
import chattingserver.service.RoomService;
import chattingserver.util.constant.MessageType;
Expand All @@ -32,58 +21,46 @@ public class Producers {
@Value("${spring.kafka.topic.room-name}")
private String topicRoomName;

private final KafkaTemplate<String, RoomMessageDto> roomKafkaTemplate;
private final KafkaTemplate<String, IndexingRequestMessageDto> roomKafkaTemplate;

private final ChatMessageService chatMessageService;
private final RoomService roomService;

public void sendMessage(ChatMessageDto chatMessageDto) {
if (chatMessageDto.getMessageType() == MessageType.ENTRANCE) {
if (chatMessageDto.getMessageType() == MessageType.CREATION) {
RoomResponseDto roomResponseDto = roomService.getRoomInfo(chatMessageDto.getRoomId());
List<Long> receivers =
roomResponseDto.getUsers().stream().map(User::getUid).collect(Collectors.toList());
List<Long> receivers = roomResponseDto.getUsers().stream().map(UserListResponseDto::getUid).collect(Collectors.toList());
receivers.remove(chatMessageDto.getSenderId());
sendRoomMessage(
RoomMessageDto.builder().receivers(receivers).roomResponseDto(roomResponseDto).build());
sendRoomMessage(IndexingRequestMessageDto.builder()
.roomId(roomResponseDto.getId())
.roomName(roomResponseDto.getRoomName())
.playlistId(roomResponseDto.getPlaylist().getId())
.thumbnailImage(roomResponseDto.getThumbnailImage())
.build());
} else {
CompletableFuture<SendResult<String, ChatMessageDto>> completableFuture =
chatKafkaTemplate.send(topicChatName, chatMessageDto);
completableFuture.whenComplete(
(result, ex) -> {
if (ex == null) {
log.info(
"채팅방 id: {}, 발신자 id: {}, 메시지: {}",
chatMessageDto.getRoomId(),
chatMessageDto.getSenderId(),
chatMessageDto.getContent());
} else {
log.error("메시지 전송 불가=[" + chatMessageDto.getContent() + "] 원인 : " + ex.getMessage());
chatMessageService.deleteChat(chatMessageDto.getId());
log.info("삭제된 메시지={}", chatMessageDto.getId());
}
});

CompletableFuture<SendResult<String, ChatMessageDto>> completableFuture = chatKafkaTemplate.send(topicChatName, chatMessageDto);
completableFuture.whenComplete((result, ex) -> {
if (ex == null) {
log.info("채팅방 id: {}, 발신자 id: {}, 메시지: {}", chatMessageDto.getRoomId(), chatMessageDto.getSenderId(), chatMessageDto.getContent());
} else {
log.error("메시지 전송 불가=[" + chatMessageDto.getContent() + "] 원인 : " + ex.getMessage());
chatMessageService.deleteChat(chatMessageDto.getId());
log.info("삭제된 메시지={}", chatMessageDto.getId());
}
});
}
}

public void sendRoomMessage(RoomMessageDto roomMessageDto) {
CompletableFuture<SendResult<String, RoomMessageDto>> completableFuture =
roomKafkaTemplate.send(topicRoomName, roomMessageDto);
completableFuture.whenComplete(
(result, ex) -> {
if (ex == null) {
log.info(
"메시지 전송 성공=["
+ roomMessageDto.getRoomResponseDto().getId()
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
} else {
log.info(
"메시지 전송 불가=["
+ roomMessageDto.getRoomResponseDto().getId()
+ "] 원인 : "
+ ex.getMessage());
}
});
public void sendRoomMessage(IndexingRequestMessageDto roomMessageDto) {
CompletableFuture<SendResult<String, IndexingRequestMessageDto>> completableFuture = roomKafkaTemplate.send(topicRoomName, roomMessageDto);
completableFuture.whenComplete((result, ex) -> {
if (ex == null) {
log.info("메시지 전송 성공=[" + roomMessageDto.getRoomId() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
log.info("roomMessageDto={}", roomMessageDto.toString());
} else {
log.info("메시지 전송 불가=[" + roomMessageDto.getRoomId() + "] 원인 : " + ex.getMessage());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class SwaggerConfig {

@Bean
public GroupedOpenApi chatOpenApi() {
String[] paths = {"/api/**", "/chat/**"};
String[] paths = {"/v1/**", "/**/chat"};

return GroupedOpenApi.builder().group("CHATTING-SERVER API V1").pathsToMatch(paths).build();
}
Expand Down
Loading

0 comments on commit 0566d3b

Please sign in to comment.