Skip to content

Commit

Permalink
Fix thread usage in SynchronizedItemStreamReaderTests
Browse files Browse the repository at this point in the history
along with its builder test class SynchronizedItemStreamReaderBuilderTests

Resolves #837
  • Loading branch information
dimitrisli authored and fmbenhassine committed Jun 12, 2023
1 parent 0bfe31e commit f078745
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 227 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.support;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;

import static org.mockito.Mockito.verify;

/**
* Common parent class for {@link SynchronizedItemStreamReaderTests} and
* {@link org.springframework.batch.item.support.builder.SynchronizedItemStreamReaderBuilderTests}
*
* @author Dimitrios Liapis
* @author Mahmoud Ben Hassine
*
*/
@ExtendWith(MockitoExtension.class)
public abstract class AbstractSynchronizedItemStreamReaderTests {

@Mock
protected ItemStreamReader<Object> delegate;

private SynchronizedItemStreamReader<Object> synchronizedItemStreamReader;

private final ExecutionContext testExecutionContext = new ExecutionContext();

abstract protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader();

@BeforeEach
void init() {
this.synchronizedItemStreamReader = createNewSynchronizedItemStreamReader();
}

@Test
void testDelegateReadIsCalled() throws Exception {
this.synchronizedItemStreamReader.read();
verify(this.delegate).read();
}

@Test
void testDelegateOpenIsCalled() {
this.synchronizedItemStreamReader.open(this.testExecutionContext);
verify(this.delegate).open(this.testExecutionContext);
}

@Test
void testDelegateUpdateIsCalled() {
this.synchronizedItemStreamReader.update(this.testExecutionContext);
verify(this.delegate).update(this.testExecutionContext);
}

@Test
void testDelegateCloseIsClosed() {
this.synchronizedItemStreamReader.close();
verify(this.delegate).close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,131 +15,33 @@
*/
package org.springframework.batch.item.support;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashSet;
import java.util.Set;

import org.junit.jupiter.api.Test;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.lang.Nullable;
import org.springframework.beans.factory.InitializingBean;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* @author Matthew Ouyang
* @author Mahmoud Ben Hassine
* @author Dimitrios Liapis
*
*/
class SynchronizedItemStreamReaderTests {

/**
* A simple class used to test the SynchronizedItemStreamReader. It simply returns the
* number of times the read method has been called, manages some state variables and
* updates an ExecutionContext.
*
* @author Matthew Ouyang
*
*/
private class TestItemReader extends AbstractItemStreamItemReader<Integer> implements ItemStreamReader<Integer> {

private int cursor = 0;

private boolean isClosed = false;

public static final String HAS_BEEN_OPENED = "hasBeenOpened";

public static final String UPDATE_COUNT_KEY = "updateCount";

@Nullable
public Integer read() throws Exception, ParseException, NonTransientResourceException {
cursor = cursor + 1;
return cursor;
}

public void close() {
this.isClosed = true;
}

public void open(ExecutionContext executionContext) {
this.isClosed = false;
executionContext.put(HAS_BEEN_OPENED, true);
executionContext.remove(UPDATE_COUNT_KEY);
}

public void update(ExecutionContext executionContext) {

if (!executionContext.containsKey(UPDATE_COUNT_KEY)) {
executionContext.putInt(UPDATE_COUNT_KEY, 0);
}

executionContext.putInt(UPDATE_COUNT_KEY, executionContext.getInt(UPDATE_COUNT_KEY) + 1);
}

public boolean isClosed() {
return this.isClosed;
}
public class SynchronizedItemStreamReaderTests extends AbstractSynchronizedItemStreamReaderTests {

@Override
protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader() {
SynchronizedItemStreamReader<Object> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(delegate);
return synchronizedItemStreamReader;
}

@Test
void testMultipleThreads() throws Exception {

// Initialized an ExecutionContext and a SynchronizedItemStreamReader to test.
final ExecutionContext executionContext = new ExecutionContext();

final TestItemReader testItemReader = new TestItemReader();
final SynchronizedItemStreamReader<Integer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(testItemReader);

// Open the ItemReader and make sure it's initialized properly.
synchronizedItemStreamReader.open(executionContext);
assertEquals(true, executionContext.get(TestItemReader.HAS_BEEN_OPENED));
assertFalse(testItemReader.isClosed());

/*
* Set up SIZE threads that read from the reader and updates the execution
* context.
*/
final Set<Integer> ecSet = new HashSet<>();
final int SIZE = 20;
Thread[] threads = new Thread[SIZE];
for (int i = 0; i < SIZE; i++) {
threads[i] = new Thread() {
public void run() {
try {
ecSet.add(synchronizedItemStreamReader.read());
synchronizedItemStreamReader.update(executionContext);
}
catch (Exception ignore) {
}
}
};
}

// Start the threads and block until all threads are done.
for (Thread thread : threads) {
thread.run();
}
for (Thread thread : threads) {
thread.join();
}
testItemReader.close();

/*
* Ensure cleanup happens as expected: status variable is set correctly and
* ExecutionContext variable is set properly. Lastly, the Set<Integer> should have
* 1 to 20 which may not always be the case if the read is not synchronized.
*/
for (int i = 1; i <= SIZE; i++) {
assertTrue(ecSet.contains(i));
}
assertTrue(testItemReader.isClosed());
assertEquals(SIZE, executionContext.getInt(TestItemReader.UPDATE_COUNT_KEY));
void testDelegateIsNotNullWhenPropertiesSet() {
final Exception expectedException = assertThrows(IllegalStateException.class,
() -> ((InitializingBean) new SynchronizedItemStreamReader<>()).afterPropertiesSet());
assertEquals("A delegate item reader is required", expectedException.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,135 +16,36 @@

package org.springframework.batch.item.support.builder;

import java.util.HashSet;
import java.util.Set;

import org.junit.jupiter.api.Test;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractSynchronizedItemStreamReaderTests;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.lang.Nullable;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* @author Glenn Renfro
* @author Mahmoud Ben Hassine
* @author Dimitrios Liapis
*/
class SynchronizedItemStreamReaderBuilderTests {

@Test
void testMultipleThreads() throws Exception {

// Initialized an ExecutionContext and a SynchronizedItemStreamReader to test.
final ExecutionContext executionContext = new ExecutionContext();

final SynchronizedItemStreamReaderBuilderTests.TestItemReader testItemReader = new SynchronizedItemStreamReaderBuilderTests.TestItemReader();
final SynchronizedItemStreamReader<Integer> synchronizedItemStreamReader = new SynchronizedItemStreamReaderBuilder<Integer>()
.delegate(testItemReader)
.build();
public class SynchronizedItemStreamReaderBuilderTests extends AbstractSynchronizedItemStreamReaderTests {

// Open the ItemReader and make sure it's initialized properly.
synchronizedItemStreamReader.open(executionContext);
assertEquals(true,
executionContext.get(SynchronizedItemStreamReaderBuilderTests.TestItemReader.HAS_BEEN_OPENED));
assertFalse(testItemReader.isClosed());

/*
* Set up SIZE threads that read from the reader and updates the execution
* context.
*/
final Set<Integer> ecSet = new HashSet<>();
final int SIZE = 20;
Thread[] threads = new Thread[SIZE];
for (int i = 0; i < SIZE; i++) {
threads[i] = new Thread() {
public void run() {
try {
ecSet.add(synchronizedItemStreamReader.read());
synchronizedItemStreamReader.update(executionContext);
}
catch (Exception ignore) {
}
}
};
}

// Start the threads and block until all threads are done.
for (Thread thread : threads) {
thread.run();
}
for (Thread thread : threads) {
thread.join();
}
testItemReader.close();

/*
* Ensure cleanup happens as expected: status variable is set correctly and
* ExecutionContext variable is set properly. Lastly, the Set<Integer> should have
* 1 to 20 which may not always be the case if the read is not synchronized.
*/
for (int i = 1; i <= SIZE; i++) {
assertTrue(ecSet.contains(i));
}
assertTrue(testItemReader.isClosed());
assertEquals(SIZE,
executionContext.getInt(SynchronizedItemStreamReaderBuilderTests.TestItemReader.UPDATE_COUNT_KEY));
@Override
protected SynchronizedItemStreamReader<Object> createNewSynchronizedItemStreamReader() {
return new SynchronizedItemStreamReaderBuilder<>().delegate(delegate).build();
}

/**
* A simple class used to test the SynchronizedItemStreamReader. It simply returns the
* number of times the read method has been called, manages some state variables and
* updates an ExecutionContext.
*
* @author Matthew Ouyang
*
*/
private class TestItemReader extends AbstractItemStreamItemReader<Integer> implements ItemStreamReader<Integer> {

private int cursor = 0;

private boolean isClosed = false;

public static final String HAS_BEEN_OPENED = "hasBeenOpened";

public static final String UPDATE_COUNT_KEY = "updateCount";

@Nullable
public Integer read() throws Exception, ParseException, NonTransientResourceException {
cursor = cursor + 1;
return cursor;
}

public void close() {
this.isClosed = true;
}

public void open(ExecutionContext executionContext) {
this.isClosed = false;
executionContext.put(HAS_BEEN_OPENED, true);
executionContext.remove(UPDATE_COUNT_KEY);
}

public void update(ExecutionContext executionContext) {

if (!executionContext.containsKey(UPDATE_COUNT_KEY)) {
executionContext.putInt(UPDATE_COUNT_KEY, 0);
}

executionContext.putInt(UPDATE_COUNT_KEY, executionContext.getInt(UPDATE_COUNT_KEY) + 1);
}
@Test
void testBuilderDelegateIsNotNull() {
// given
final SynchronizedItemStreamReaderBuilder<Object> builder = new SynchronizedItemStreamReaderBuilder<>();

public boolean isClosed() {
return this.isClosed;
}
// when
final Exception expectedException = assertThrows(IllegalArgumentException.class, builder::build);

// then
assertEquals("A delegate is required", expectedException.getMessage());
}

}

0 comments on commit f078745

Please sign in to comment.