diff --git a/pom.xml b/pom.xml
index 904b2df..8019e34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,12 @@
guava
13.0.1
+
+ org.mockito
+ mockito-all
+ 1.10.19
+
+
diff --git a/src/main/java/com/epam/data/RoadAccident.java b/src/main/java/com/epam/data/RoadAccident.java
index 9548fa5..8d2d61f 100644
--- a/src/main/java/com/epam/data/RoadAccident.java
+++ b/src/main/java/com/epam/data/RoadAccident.java
@@ -21,6 +21,8 @@ public class RoadAccident {
private String lightConditions;
private String weatherConditions;
private String roadSurfaceConditions;
+ private String forceContact;
+ private String timeosDay;
RoadAccident(RoadAccidentBuilder builder){
this.accidentId = builder.accidentId;
@@ -146,4 +148,24 @@ public String getRoadSurfaceConditions() {
public void setRoadSurfaceConditions(String roadSurfaceConditions) {
this.roadSurfaceConditions = roadSurfaceConditions;
}
+
+
+ public String getForceContact() {
+ return forceContact;
+ }
+
+
+ public void setForceContact(String forceContact) {
+ this.forceContact = forceContact;
+ }
+
+
+ public String getTimeosDay() {
+ return timeosDay;
+ }
+
+
+ public void setTimeosDay(String timeosDay) {
+ this.timeosDay = timeosDay;
+ }
}
diff --git a/src/main/java/com/epam/dataservice/NumberChecker.java b/src/main/java/com/epam/dataservice/NumberChecker.java
new file mode 100644
index 0000000..eef6f9f
--- /dev/null
+++ b/src/main/java/com/epam/dataservice/NumberChecker.java
@@ -0,0 +1,14 @@
+package com.epam.dataservice;
+
+public class NumberChecker {
+ private PoliceForceService policeForceService;
+
+ public NumberChecker(PoliceForceService policeForceService) {
+ this.policeForceService = policeForceService;
+ }
+
+ public boolean isSpecialNumber(String policeForceName) {
+ String contactNumber = policeForceService.getContactNo(policeForceName);
+ return "ABCDEFGH".equals(contactNumber);
+ }
+}
diff --git a/src/main/java/com/epam/dataservice/accident/AccidentProcessor.java b/src/main/java/com/epam/dataservice/accident/AccidentProcessor.java
new file mode 100644
index 0000000..0d973d9
--- /dev/null
+++ b/src/main/java/com/epam/dataservice/accident/AccidentProcessor.java
@@ -0,0 +1,55 @@
+package com.epam.dataservice.accident;
+
+import java.util.concurrent.BlockingQueue;
+
+import com.epam.data.RoadAccident;
+import com.epam.dataservice.PoliceForceService;
+
+public class AccidentProcessor implements Runnable {
+
+ public static String TIME_MORNING = "MORNING";
+ public static String TIME_AFTERNOON = "AFTERNOON";
+ public static String TIME_EVENING = "EVENING";
+ public static String TIME_NIGHT = "NIGHT";
+
+ private BlockingQueue readerQueue;
+ private BlockingQueue daytimeQueue;
+ private BlockingQueue nighttimeQueue;
+ private PoliceForceService policeForceService;
+
+ public AccidentProcessor(BlockingQueue readerQueue, BlockingQueue daytimeQueue,
+ BlockingQueue nighttimeQueue) {
+ this.readerQueue = readerQueue;
+ this.daytimeQueue = daytimeQueue;
+ this.nighttimeQueue = nighttimeQueue;
+ this.policeForceService = new PoliceForceService();
+ }
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("processor running...");
+ while (!Thread.currentThread().isInterrupted()) {
+ RoadAccident roadAccident = this.readerQueue.take();
+ roadAccident.setForceContact(this.policeForceService.getContactNo(roadAccident.getPoliceForce()));
+
+ if (roadAccident.getTime().getHour() >= 18) {
+ roadAccident.setTimeosDay(TIME_NIGHT);
+ nighttimeQueue.put(roadAccident);
+ } else if (roadAccident.getTime().getHour() >= 12) {
+ roadAccident.setTimeosDay(TIME_AFTERNOON);
+ daytimeQueue.put(roadAccident);
+ } else if (roadAccident.getTime().getHour() >= 6) {
+ roadAccident.setTimeosDay(TIME_MORNING);
+ daytimeQueue.put(roadAccident);
+ } else {
+ roadAccident.setTimeosDay(TIME_EVENING);
+ nighttimeQueue.put(roadAccident);
+ }
+ }
+ } catch (InterruptedException e) {
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/epam/dataservice/accident/AccidentReader.java b/src/main/java/com/epam/dataservice/accident/AccidentReader.java
new file mode 100644
index 0000000..2c2b458
--- /dev/null
+++ b/src/main/java/com/epam/dataservice/accident/AccidentReader.java
@@ -0,0 +1,44 @@
+package com.epam.dataservice.accident;
+
+import java.io.FileReader;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+import com.epam.data.RoadAccident;
+import com.epam.dataservice.RoadAccidentParser;
+
+public class AccidentReader implements Runnable {
+
+ private BlockingQueue readerQueue;
+ private String accidentFile;
+ private RoadAccidentParser roadAccidentParser;
+
+ public AccidentReader(BlockingQueue readerQueue, String accidentFile) {
+ this.readerQueue = readerQueue;
+ this.accidentFile = accidentFile;
+ roadAccidentParser = new RoadAccidentParser();
+ }
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("start reading file " + this.accidentFile);
+ CSVParser accidentFileParser = new CSVParser(new FileReader(accidentFile), CSVFormat.EXCEL.withHeader());
+ Iterator accidentFileIterator = accidentFileParser.iterator();
+ while (accidentFileIterator.hasNext()) {
+ RoadAccident accident = roadAccidentParser.parseRecord(accidentFileIterator.next());
+ if (accident != null)
+ readerQueue.put(accident);
+ }
+ accidentFileParser.close();
+ System.out.println("end reading file " + this.accidentFile);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/epam/dataservice/accident/AccidentWriter.java b/src/main/java/com/epam/dataservice/accident/AccidentWriter.java
new file mode 100644
index 0000000..a2a7cce
--- /dev/null
+++ b/src/main/java/com/epam/dataservice/accident/AccidentWriter.java
@@ -0,0 +1,46 @@
+package com.epam.dataservice.accident;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+
+import com.epam.data.RoadAccident;
+
+public class AccidentWriter implements Runnable {
+
+ private BlockingQueue accidentQueue;
+ private String accidentFile;
+
+ public AccidentWriter(BlockingQueue accidentQueue, String accidentFile) {
+ this.accidentQueue = accidentQueue;
+ this.accidentFile = accidentFile;
+ }
+
+ @Override
+ public void run() {
+ CSVPrinter csvPrinter = null;
+ try {
+ FileWriter accidentFileWriter = new FileWriter(this.accidentFile);
+ csvPrinter = new CSVPrinter(accidentFileWriter, CSVFormat.DEFAULT.withRecordSeparator("\n"));
+ csvPrinter.printRecord("Accident_Index", "Longitude", "Latitude", "Accident_Hour");
+ while (true) {
+ RoadAccident accident = accidentQueue.take();
+ csvPrinter.printRecord(accident.getAccidentId(), accident.getLongitude(), accident.getLatitude(), accident.getTime().getHour());
+ }
+ } catch (InterruptedException e) {
+ //will interrupt when queue is empty and no more data come in
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ csvPrinter.flush();
+ csvPrinter.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/epam/dataservice/AccidentReaderProcessorWriterTest.java b/src/test/java/com/epam/dataservice/AccidentReaderProcessorWriterTest.java
new file mode 100644
index 0000000..abb97f7
--- /dev/null
+++ b/src/test/java/com/epam/dataservice/AccidentReaderProcessorWriterTest.java
@@ -0,0 +1,81 @@
+package com.epam.dataservice;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.epam.data.RoadAccident;
+import com.epam.dataservice.accident.AccidentProcessor;
+import com.epam.dataservice.accident.AccidentReader;
+import com.epam.dataservice.accident.AccidentWriter;
+
+public class AccidentReaderProcessorWriterTest {
+
+ private static int readerThreadCount = 2;
+ private static int readerQueueCapacity = 10000;
+ private static int processorThreadCount = 5;
+ private static int daytimeQueueCapacity = 10000;
+ private static int nighttimeQueueCapacity = 10000;
+ private static List accidentFileList = new ArrayList();
+ static {
+ accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2009.csv");
+ accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2010.csv");
+ accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2011.csv");
+ accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2012.csv");
+ accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2013.csv");
+ }
+
+ private static ExecutorService file2Queue(BlockingQueue readerQueue) {
+ ExecutorService readerExecutor = Executors.newFixedThreadPool(readerThreadCount);
+ for (String accidentFile : accidentFileList) {
+ readerExecutor.execute(new AccidentReader(readerQueue, accidentFile));
+ }
+ return readerExecutor;
+ }
+
+ private static List processData(BlockingQueue readerQueue,
+ BlockingQueue daytimeQueue, BlockingQueue nighttimeQueue) {
+ List processorThreadList = new ArrayList();
+ for (int i = 0; i < processorThreadCount; i++) {
+ Thread processorThread = new Thread(new AccidentProcessor(readerQueue, daytimeQueue, nighttimeQueue));
+ processorThread.start();
+ processorThreadList.add(processorThread);
+ }
+ return processorThreadList;
+ }
+
+ private static Thread write2File(BlockingQueue accidentQueue, String fileName) {
+ Thread writerThread = new Thread(new AccidentWriter(accidentQueue, fileName));
+ writerThread.start();
+ return writerThread;
+ }
+
+ public static void main(String[] args) throws Exception {
+ BlockingQueue readerQueue = new ArrayBlockingQueue(readerQueueCapacity);
+ BlockingQueue daytimeQueue = new ArrayBlockingQueue(daytimeQueueCapacity);
+ BlockingQueue nighttimeQueue = new ArrayBlockingQueue(nighttimeQueueCapacity);
+
+ ExecutorService readerExecutor = file2Queue(readerQueue);
+ List processorThreadList = processData(readerQueue, daytimeQueue, nighttimeQueue);
+ Thread daytimeWriterThread = write2File(daytimeQueue, "src/main/resources/DaytimeAccidents.csv");
+ Thread nighttimeWriterThread = write2File(nighttimeQueue, "src/main/resources/NighttimeAccidents.csv");
+
+ readerExecutor.shutdown();
+ readerExecutor.awaitTermination(5, TimeUnit.MINUTES);
+ while (!readerQueue.isEmpty())
+ Thread.sleep(1000);
+ for (Thread processorThread : processorThreadList)
+ processorThread.interrupt();
+ while (!daytimeQueue.isEmpty())
+ Thread.sleep(1000);
+ daytimeWriterThread.interrupt();
+ while (!nighttimeQueue.isEmpty())
+ Thread.sleep(1000);
+ nighttimeWriterThread.interrupt();
+ }
+
+}
diff --git a/src/test/java/com/epam/dataservice/IntegrationTest.java b/src/test/java/com/epam/dataservice/IntegrationTest.java
new file mode 100644
index 0000000..68c23ab
--- /dev/null
+++ b/src/test/java/com/epam/dataservice/IntegrationTest.java
@@ -0,0 +1,56 @@
+package com.epam.dataservice;
+
+import static org.junit.Assert.*;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Random;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IntegrationTest {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ AccidentReaderProcessorWriterTest.main(null);
+ }
+
+ @Test
+ public void testAccidentReaderProcessorWriter4Daytime() {
+ try {
+ Reader reader = new FileReader("src/main/resources/DaytimeAccidents.csv");
+ CSVParser records = new CSVParser(reader, CSVFormat.EXCEL.withHeader());
+ int i = 0;
+ for (CSVRecord record : records) {
+ Integer hour = Integer.valueOf(record.get(3));
+ assertTrue("record #" + i + " invalid", hour >= 6 && hour < 18);
+ i++;
+ }
+ records.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testAccidentReaderProcessorWriter4Nighttime() {
+ try {
+ Reader reader = new FileReader("src/main/resources/NighttimeAccidents.csv");
+ CSVParser records = new CSVParser(reader, CSVFormat.EXCEL.withHeader());
+ int i = 0;
+ for (CSVRecord record : records) {
+ Integer hour = Integer.valueOf(record.get(3));
+ assertTrue("record #" + i + " invalid", hour < 6 || hour >= 18);
+ i++;
+ }
+ records.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/com/epam/dataservice/PoliceForceServiceTest.java b/src/test/java/com/epam/dataservice/PoliceForceServiceTest.java
new file mode 100644
index 0000000..45983a1
--- /dev/null
+++ b/src/test/java/com/epam/dataservice/PoliceForceServiceTest.java
@@ -0,0 +1,34 @@
+package com.epam.dataservice;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PoliceForceServiceTest {
+
+ private static PoliceForceService policeForceService;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ policeForceService = new PoliceForceService();
+ }
+
+ @Test
+ public void testGetContactNo() {
+ assertEquals("1316386212", policeForceService.getContactNo("North Yorkshire"));
+ assertEquals("1316386213", policeForceService.getContactNo("West Yorkshire"));
+ assertEquals("1316386214", policeForceService.getContactNo("South Yorkshire"));
+ assertEquals("1316386216", policeForceService.getContactNo("Humberside"));
+ assertEquals("1316386217", policeForceService.getContactNo("Cleveland"));
+ assertEquals("1316386220", policeForceService.getContactNo("West Midlands"));
+ assertEquals("1316386221", policeForceService.getContactNo("Staffordshire"));
+ assertEquals("1316386222", policeForceService.getContactNo("West Mercia"));
+ assertEquals("1316386223", policeForceService.getContactNo("Warwickshire"));
+ assertEquals("1316386230", policeForceService.getContactNo("Derbyshire"));
+ assertEquals("1316386231", policeForceService.getContactNo("Nottinghamshire"));
+ assertEquals("1316386232", policeForceService.getContactNo("Lincolnshire"));
+ assertEquals("1316386233", policeForceService.getContactNo("Leicestershire"));
+ }
+
+}
diff --git a/src/test/java/com/epam/dataservice/PoliceForceServiceTestMock.java b/src/test/java/com/epam/dataservice/PoliceForceServiceTestMock.java
new file mode 100644
index 0000000..5c1abcd
--- /dev/null
+++ b/src/test/java/com/epam/dataservice/PoliceForceServiceTestMock.java
@@ -0,0 +1,41 @@
+package com.epam.dataservice;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PoliceForceServiceTestMock {
+
+ @Mock
+ PoliceForceService policeForceService;
+
+ @Test
+ public void testGetContactNo() {
+ Mockito.when(policeForceService.getContactNo("North Yorkshire")).thenReturn("1234");
+ Mockito.when(policeForceService.getContactNo("North Yorkshire1")).thenReturn("123");
+
+ assertEquals("1234", policeForceService.getContactNo("North Yorkshire"));
+ assertEquals("123", policeForceService.getContactNo("North Yorkshire1"));
+
+ Mockito.verify(policeForceService, Mockito.atLeast(1)).getContactNo("North Yorkshire1");
+ Mockito.verify(policeForceService, Mockito.atMost(1)).getContactNo("North Yorkshire1");
+ Mockito.verify(policeForceService).getContactNo("North Yorkshire");
+ }
+
+ @Test
+ public void testNumberChecker() {
+ NumberChecker numberChecker = new NumberChecker(policeForceService);
+
+ Mockito.when(policeForceService.getContactNo("North Yorkshire")).thenReturn("ABCDEFGH");
+ Mockito.when(policeForceService.getContactNo("North Yorkshire1")).thenReturn("ABCDEFGH1");
+
+ assertTrue(numberChecker.isSpecialNumber("North Yorkshire"));
+ assertFalse(numberChecker.isSpecialNumber("North Yorkshire1"));
+ }
+
+}