Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

리뷰 수정 #15

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,52 @@
import com.flab.ccinside.api.trendingpost.application.port.ViewPostEvent;
import com.flab.ccinside.api.trendingpost.application.port.in.PostSystemUsecase;
import com.flab.ccinside.api.trendingpost.application.port.in.UpdateViewCountCommand;
import jakarta.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class InMemoryMessageQueue {

private final PostSystemUsecase postSystemUsecase;
private final BlockingQueue<ViewPostEvent> queue;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private static final Long INITIAL_DELAY = 10L;
private static final Long FIXED_DELAY = 100L;
private static final int THRESHOLD = 100;

@PostConstruct
public void init() {
executorService.submit(
() -> {
try {
while (true) {
var event = queue.take();
log.info("View post event consumed. postId: {}", event.postId());
var command = new UpdateViewCountCommand(event.postId());
postSystemUsecase.updateViewCount(command);
}
} catch (InterruptedException e) {
log.error("consume error: {}", e.getMessage());
throw new RuntimeException(e);
}
});
public InMemoryMessageQueue(
PostSystemUsecase postSystemUsecase,
Queue<ViewPostEvent> queue,
ScheduledExecutorService executorService) {

executorService.scheduleWithFixedDelay(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorService가 로직을 제대로 실행한다는 것은 어떻게 테스트 할 수 있을까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

말씀해주신대로, 비동기 프로세스 생성에 대한 부분과, 비즈니스 로직 처리에대한 부분을 분리하여 각각 테스트하도록 수정했습니다.
감사합니다 :)

createEventConsumerRunnable(postSystemUsecase, queue),
INITIAL_DELAY,
FIXED_DELAY,
TimeUnit.MILLISECONDS);
}

private Runnable createEventConsumerRunnable(
PostSystemUsecase postSystemUsecase, Queue<ViewPostEvent> queue) {
return () -> {
try {
List<UpdateViewCountCommand> commands = new ArrayList<>();
ViewPostEvent event;
int count = 0;
while ((event = queue.poll()) != null && count < THRESHOLD) {
log.info("View post event consumed. postId: {}", event.postId());
var command = new UpdateViewCountCommand(event.postId());
postSystemUsecase.updateViewCount(command);
commands.add(command);
count++;
}
postSystemUsecase.persistViewCountsInBatch(commands);
} catch (Exception e) {
log.error("error: {}", e.getMessage());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.flab.ccinside.api.trendingpost.application.port.ViewPostEvent;
import com.flab.ccinside.api.trendingpost.application.port.out.post.AsyncPublishAddViewCountPort;
import java.util.concurrent.BlockingQueue;
import java.util.Queue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand All @@ -12,7 +12,7 @@
@RequiredArgsConstructor
public class InMemoryPostQueue implements AsyncPublishAddViewCountPort {

private final BlockingQueue<ViewPostEvent> queue;
private final Queue<ViewPostEvent> queue;

@Override
public void add(ViewPostEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PostJpaRepository extends JpaRepository<PostEntity, Long> {
public interface PostJpaRepository extends JpaRepository<PostEntity, Long>, PostRepositorySupport {

List<PostEntity> findByGalleryNo(Long galleryNo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,10 @@ public void modify(Post post) {
postRepository.save(entity);
});
}

@Override
public void modifyInBatch(List<Post> posts) {
var postEntities = posts.stream().map(mapper::map).toList();
postRepository.saveAllByBatch(postEntities);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.flab.ccinside.api.trendingpost.adapter.out.post.persistence;

import java.util.List;

public interface PostRepositorySupport {

void saveAllByBatch(List<PostEntity> posts);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.flab.ccinside.api.trendingpost.adapter.out.post.persistence;

import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class PostRepositorySupportImpl implements PostRepositorySupport {

@PersistenceContext private final EntityManager em;

@Override
public void saveAllByBatch(List<PostEntity> posts) {
posts.forEach(em::merge);
em.flush();
em.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import com.flab.ccinside.api.trendingpost.application.port.out.post.HandlePostViewPort;
import com.flab.ccinside.api.trendingpost.application.port.out.post.LoadPostPort;
import com.flab.ccinside.api.trendingpost.application.port.out.post.PersistPostViewPort;
import com.flab.ccinside.api.trendingpost.exception.PostNotFoundException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -27,17 +28,22 @@ public void updateViewCount(UpdateViewCountCommand command) {
}

@Transactional
@Scheduled(fixedRate = 30000)
public void persistViewCounts() {
var allPosts = loadPostPort.loadAllPosts();
log.info("Posts view count persisted. size: {}", allPosts.size());
@Override
public void persistViewCountsInBatch(List<UpdateViewCountCommand> commands) {
var posts =
commands.stream()
.map(
command -> {
var post =
loadPostPort
.loadPost(command.postId())
.orElseThrow(PostNotFoundException::new);
var view = handlePostViewPort.getView(command.postId());
var postViewPersistCommand = new PostViewPersistCommand(view);
return post.persistViewCount(postViewPersistCommand);
})
.toList();

allPosts.forEach(
post -> {
var view = handlePostViewPort.getView(post.getId());
var command = new PostViewPersistCommand(view);
var persistedPost = post.persistViewCount(command);
persistPostViewPort.modify(persistedPost);
});
persistPostViewPort.modifyInBatch(posts);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.flab.ccinside.api.trendingpost.application.port.in;

import java.util.List;

public interface PostSystemUsecase {

void updateViewCount(UpdateViewCountCommand command);

void persistViewCountsInBatch(List<UpdateViewCountCommand> commands);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.flab.ccinside.api.trendingpost.application.port.out.post;

import com.flab.ccinside.api.trendingpost.domain.Post;
import java.util.List;

public interface PersistPostViewPort {

void modify(Post post);

void modifyInBatch(List<Post> posts);
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.flab.ccinside.api.trendingpost.config;

import com.flab.ccinside.api.trendingpost.application.port.ViewPostEvent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InMemoryQueueConfig {
@Bean
public Queue<ViewPostEvent> queue() {
return new ConcurrentLinkedQueue<>();
}

@Bean
public BlockingQueue<ViewPostEvent> queue() {
return new LinkedBlockingQueue<>();
public ScheduledExecutorService executorService() {
return Executors.newScheduledThreadPool(1);
}
}
23 changes: 20 additions & 3 deletions api/src/test/groovy/application/PostUserServiceSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package application

import com.flab.ccinside.api.trendingpost.application.PostMapper
import com.flab.ccinside.api.trendingpost.application.PostUserService
import com.flab.ccinside.api.trendingpost.application.port.in.CreatePostCommand
import com.flab.ccinside.api.trendingpost.application.port.out.PostId
import com.flab.ccinside.api.trendingpost.application.port.out.post.AsyncPublishAddViewCountPort
import com.flab.ccinside.api.trendingpost.application.port.out.post.CreatePostPort
Expand Down Expand Up @@ -30,8 +31,6 @@ class PostUserServiceSpec extends Specification {
@SpringBean
AsyncPublishAddViewCountPort publishAddViewCountPort = Mock()



def "게시글 조회시, 조회 이벤트 발행 - 정상"() {
given:
var event = ViewPostEventFixture.VIEW_POSE_EVENT
Expand All @@ -42,9 +41,27 @@ class PostUserServiceSpec extends Specification {
def got = postUserService.viewPostDetail(PostId.from(1L));

then:
1 * publishAddViewCountPort.add({it == event})
1 * publishAddViewCountPort.add({ it == event })
got.postTitle() == "test post title"
got.authorNo() == 1L
got.postNo() == 1L
}

def "게시글 생성 - 정상"() {
given:
def post = PostFixture.POST
var command = new CreatePostCommand(post.getPostTitle(), post.getAuthorNo(), post.getGalleryNo(), post.getViewCount())

when:
postUserService.create(command)

then:
1 * createPostPort.createPost({
it.postTitle == post.getPostTitle()
it.authorNo == post.getAuthorNo()
it.galleryNo == post.getGalleryNo()
it.viewCount == post.viewCount
})

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.flab.ccinside.api.trendingpost.adapter.in.message

import com.flab.ccinside.api.trendingpost.application.port.ViewPostEvent
import com.flab.ccinside.api.trendingpost.application.port.in.PostSystemUsecase
import com.flab.ccinside.api.trendingpost.application.port.in.UpdateViewCountCommand
import com.flab.ccinside.api.trendingpost.application.port.out.PostId
import com.flab.ccinside.api.trendingpost.config.InMemoryQueueConfig
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification


@ContextConfiguration(classes = [InMemoryMessageQueue, InMemoryQueueConfig])
class InMemoryMessageQueueSpec extends Specification {

@Autowired
Queue<ViewPostEvent> queue

@SpringBean
ScheduledExecutorService executorService = Mock()

@SpringBean
PostSystemUsecase postSystemUsecase = Mock()

def "인메모리 메시지큐 이벤트 소비 및 행위 검증- 정상"() {
given:
def event = new ViewPostEvent(PostId.from(1L))

postSystemUsecase.updateViewCount(_) >> {}
postSystemUsecase.persistViewCountsInBatch(_) >> {}
1 * executorService.scheduleWithFixedDelay(_, _, _, _) >> { Runnable runnable, long l1, long l2, TimeUnit unit -> runnable.run()}

queue.add(event)

when:
def inMemoryMessageQueue = new InMemoryMessageQueue(postSystemUsecase, queue, executorService)

then:
1 * postSystemUsecase.updateViewCount({it == new UpdateViewCountCommand(PostId.from(1L))})
1 * postSystemUsecase.persistViewCountsInBatch({it == List.of(new UpdateViewCountCommand(PostId.from(1L)))})
}

def "executorService 실제 동작 여부 - 정상" () {
when:
def inMemoryMessageQueue = new InMemoryMessageQueue(postSystemUsecase, queue, executorService)

then:
1 * executorService.scheduleWithFixedDelay(_, 10L, 100L, TimeUnit.MILLISECONDS)

}

def "임계치를 초과하는 이벤트 처리 테스트 - 정상"() {
given:
int eventCount = 101
(0..eventCount).each { i ->
queue.add(new ViewPostEvent(PostId.from(i)))
}

executorService.scheduleWithFixedDelay(_, _, _, _) >> { Runnable runnable, long l1, long l2, TimeUnit unit ->
runnable.run()
}

when:
def inMemoryMessageQueue = new InMemoryMessageQueue(postSystemUsecase, queue, executorService)

then:
100 * postSystemUsecase.updateViewCount(_)
1 * postSystemUsecase.persistViewCountsInBatch({ commands ->
commands.size() == 100
})

queue.size() == 1
}

}
2 changes: 1 addition & 1 deletion api/src/test/groovy/fixture/PostFixture.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import java.time.format.DateTimeFormatter

class PostFixture {

static final Post POST = new Post(PostId.from(1L), "test post title", 1L, 1L, LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))
static final Post POST = new Post(PostId.from(1L), "test post title", 1L, 1L, 0, LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))
}
Loading