Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

North Zhang -- Homework#4 #27

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>guava</artifactId>
<version>13.0.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
</dependency>

</dependencies>


Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/epam/data/RoadAccident.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class RoadAccident {
private String lightConditions;
private String weatherConditions;
private String roadSurfaceConditions;
private String forceContact;
private String timeosDay;

RoadAccident(RoadAccidentBuilder builder){
this.accidentId = builder.accidentId;
Expand Down Expand Up @@ -146,4 +148,24 @@ public String getRoadSurfaceConditions() {
public void setRoadSurfaceConditions(String roadSurfaceConditions) {
this.roadSurfaceConditions = roadSurfaceConditions;
}


public String getForceContact() {
return forceContact;
}


public void setForceContact(String forceContact) {
this.forceContact = forceContact;
}


public String getTimeosDay() {
return timeosDay;
}


public void setTimeosDay(String timeosDay) {
this.timeosDay = timeosDay;
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/epam/dataservice/NumberChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.epam.dataservice;

public class NumberChecker {
private PoliceForceService policeForceService;

public NumberChecker(PoliceForceService policeForceService) {
this.policeForceService = policeForceService;
}

public boolean isSpecialNumber(String policeForceName) {
String contactNumber = policeForceService.getContactNo(policeForceName);
return "ABCDEFGH".equals(contactNumber);
}
}
55 changes: 55 additions & 0 deletions src/main/java/com/epam/dataservice/accident/AccidentProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.epam.dataservice.accident;

import java.util.concurrent.BlockingQueue;

import com.epam.data.RoadAccident;
import com.epam.dataservice.PoliceForceService;

public class AccidentProcessor implements Runnable {

public static String TIME_MORNING = "MORNING";
public static String TIME_AFTERNOON = "AFTERNOON";
public static String TIME_EVENING = "EVENING";
public static String TIME_NIGHT = "NIGHT";

private BlockingQueue<RoadAccident> readerQueue;
private BlockingQueue<RoadAccident> daytimeQueue;
private BlockingQueue<RoadAccident> nighttimeQueue;
private PoliceForceService policeForceService;

public AccidentProcessor(BlockingQueue<RoadAccident> readerQueue, BlockingQueue<RoadAccident> daytimeQueue,
BlockingQueue<RoadAccident> nighttimeQueue) {
this.readerQueue = readerQueue;
this.daytimeQueue = daytimeQueue;
this.nighttimeQueue = nighttimeQueue;
this.policeForceService = new PoliceForceService();
}

@Override
public void run() {
try {
System.out.println("processor running...");
while (!Thread.currentThread().isInterrupted()) {
RoadAccident roadAccident = this.readerQueue.take();
roadAccident.setForceContact(this.policeForceService.getContactNo(roadAccident.getPoliceForce()));

if (roadAccident.getTime().getHour() >= 18) {
roadAccident.setTimeosDay(TIME_NIGHT);
nighttimeQueue.put(roadAccident);
} else if (roadAccident.getTime().getHour() >= 12) {
roadAccident.setTimeosDay(TIME_AFTERNOON);
daytimeQueue.put(roadAccident);
} else if (roadAccident.getTime().getHour() >= 6) {
roadAccident.setTimeosDay(TIME_MORNING);
daytimeQueue.put(roadAccident);
} else {
roadAccident.setTimeosDay(TIME_EVENING);
nighttimeQueue.put(roadAccident);
}
}
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
}
44 changes: 44 additions & 0 deletions src/main/java/com/epam/dataservice/accident/AccidentReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.epam.dataservice.accident;

import java.io.FileReader;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;

import com.epam.data.RoadAccident;
import com.epam.dataservice.RoadAccidentParser;

public class AccidentReader implements Runnable {

private BlockingQueue<RoadAccident> readerQueue;
private String accidentFile;
private RoadAccidentParser roadAccidentParser;

public AccidentReader(BlockingQueue<RoadAccident> readerQueue, String accidentFile) {
this.readerQueue = readerQueue;
this.accidentFile = accidentFile;
roadAccidentParser = new RoadAccidentParser();
}

@Override
public void run() {
try {
System.out.println("start reading file " + this.accidentFile);
CSVParser accidentFileParser = new CSVParser(new FileReader(accidentFile), CSVFormat.EXCEL.withHeader());
Iterator<CSVRecord> accidentFileIterator = accidentFileParser.iterator();
while (accidentFileIterator.hasNext()) {
RoadAccident accident = roadAccidentParser.parseRecord(accidentFileIterator.next());
if (accident != null)
readerQueue.put(accident);
}
accidentFileParser.close();
System.out.println("end reading file " + this.accidentFile);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
46 changes: 46 additions & 0 deletions src/main/java/com/epam/dataservice/accident/AccidentWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.epam.dataservice.accident;

import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;

import com.epam.data.RoadAccident;

public class AccidentWriter implements Runnable {

private BlockingQueue<RoadAccident> accidentQueue;
private String accidentFile;

public AccidentWriter(BlockingQueue<RoadAccident> accidentQueue, String accidentFile) {
this.accidentQueue = accidentQueue;
this.accidentFile = accidentFile;
}

@Override
public void run() {
CSVPrinter csvPrinter = null;
try {
FileWriter accidentFileWriter = new FileWriter(this.accidentFile);
csvPrinter = new CSVPrinter(accidentFileWriter, CSVFormat.DEFAULT.withRecordSeparator("\n"));
csvPrinter.printRecord("Accident_Index", "Longitude", "Latitude", "Accident_Hour");
while (true) {
RoadAccident accident = accidentQueue.take();
csvPrinter.printRecord(accident.getAccidentId(), accident.getLongitude(), accident.getLatitude(), accident.getTime().getHour());
}
} catch (InterruptedException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

North, never live empty Exception. At least leave comment why it's empty. It's extremely tough to find error in that case

//will interrupt when queue is empty and no more data come in
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
csvPrinter.flush();
csvPrinter.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.epam.dataservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.epam.data.RoadAccident;
import com.epam.dataservice.accident.AccidentProcessor;
import com.epam.dataservice.accident.AccidentReader;
import com.epam.dataservice.accident.AccidentWriter;

public class AccidentReaderProcessorWriterTest {

private static int readerThreadCount = 2;
private static int readerQueueCapacity = 10000;
private static int processorThreadCount = 5;
private static int daytimeQueueCapacity = 10000;
private static int nighttimeQueueCapacity = 10000;
private static List<String> accidentFileList = new ArrayList<String>();
static {
accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2009.csv");
accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2010.csv");
accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2011.csv");
accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2012.csv");
accidentFileList.add("src/main/resources/DfTRoadSafety_Accidents_2013.csv");
}

private static ExecutorService file2Queue(BlockingQueue<RoadAccident> readerQueue) {
ExecutorService readerExecutor = Executors.newFixedThreadPool(readerThreadCount);
for (String accidentFile : accidentFileList) {
readerExecutor.execute(new AccidentReader(readerQueue, accidentFile));
}
return readerExecutor;
}

private static List<Thread> processData(BlockingQueue<RoadAccident> readerQueue,
BlockingQueue<RoadAccident> daytimeQueue, BlockingQueue<RoadAccident> nighttimeQueue) {
List<Thread> processorThreadList = new ArrayList<Thread>();
for (int i = 0; i < processorThreadCount; i++) {
Thread processorThread = new Thread(new AccidentProcessor(readerQueue, daytimeQueue, nighttimeQueue));
processorThread.start();
processorThreadList.add(processorThread);
}
return processorThreadList;
}

private static Thread write2File(BlockingQueue<RoadAccident> accidentQueue, String fileName) {
Thread writerThread = new Thread(new AccidentWriter(accidentQueue, fileName));
writerThread.start();
return writerThread;
}

public static void main(String[] args) throws Exception {
BlockingQueue<RoadAccident> readerQueue = new ArrayBlockingQueue<RoadAccident>(readerQueueCapacity);
BlockingQueue<RoadAccident> daytimeQueue = new ArrayBlockingQueue<RoadAccident>(daytimeQueueCapacity);
BlockingQueue<RoadAccident> nighttimeQueue = new ArrayBlockingQueue<RoadAccident>(nighttimeQueueCapacity);

ExecutorService readerExecutor = file2Queue(readerQueue);
List<Thread> processorThreadList = processData(readerQueue, daytimeQueue, nighttimeQueue);
Thread daytimeWriterThread = write2File(daytimeQueue, "src/main/resources/DaytimeAccidents.csv");
Thread nighttimeWriterThread = write2File(nighttimeQueue, "src/main/resources/NighttimeAccidents.csv");

readerExecutor.shutdown();
readerExecutor.awaitTermination(5, TimeUnit.MINUTES);
while (!readerQueue.isEmpty())
Thread.sleep(1000);
for (Thread processorThread : processorThreadList)
processorThread.interrupt();
while (!daytimeQueue.isEmpty())
Thread.sleep(1000);
daytimeWriterThread.interrupt();
while (!nighttimeQueue.isEmpty())
Thread.sleep(1000);
nighttimeWriterThread.interrupt();
}

}
56 changes: 56 additions & 0 deletions src/test/java/com/epam/dataservice/IntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.epam.dataservice;

import static org.junit.Assert.*;

import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Random;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.junit.BeforeClass;
import org.junit.Test;

public class IntegrationTest {

@BeforeClass
public static void setUpBeforeClass() throws Exception {
AccidentReaderProcessorWriterTest.main(null);
}

@Test
public void testAccidentReaderProcessorWriter4Daytime() {
try {
Reader reader = new FileReader("src/main/resources/DaytimeAccidents.csv");
CSVParser records = new CSVParser(reader, CSVFormat.EXCEL.withHeader());
int i = 0;
for (CSVRecord record : records) {
Integer hour = Integer.valueOf(record.get(3));
assertTrue("record #" + i + " invalid", hour >= 6 && hour < 18);
i++;
}
records.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Test
public void testAccidentReaderProcessorWriter4Nighttime() {
try {
Reader reader = new FileReader("src/main/resources/NighttimeAccidents.csv");
CSVParser records = new CSVParser(reader, CSVFormat.EXCEL.withHeader());
int i = 0;
for (CSVRecord record : records) {
Integer hour = Integer.valueOf(record.get(3));
assertTrue("record #" + i + " invalid", hour < 6 || hour >= 18);
i++;
}
records.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
34 changes: 34 additions & 0 deletions src/test/java/com/epam/dataservice/PoliceForceServiceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.epam.dataservice;

import static org.junit.Assert.assertEquals;

import org.junit.BeforeClass;
import org.junit.Test;

public class PoliceForceServiceTest {

private static PoliceForceService policeForceService;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
policeForceService = new PoliceForceService();
}

@Test
public void testGetContactNo() {
assertEquals("1316386212", policeForceService.getContactNo("North Yorkshire"));
assertEquals("1316386213", policeForceService.getContactNo("West Yorkshire"));
assertEquals("1316386214", policeForceService.getContactNo("South Yorkshire"));
assertEquals("1316386216", policeForceService.getContactNo("Humberside"));
assertEquals("1316386217", policeForceService.getContactNo("Cleveland"));
assertEquals("1316386220", policeForceService.getContactNo("West Midlands"));
assertEquals("1316386221", policeForceService.getContactNo("Staffordshire"));
assertEquals("1316386222", policeForceService.getContactNo("West Mercia"));
assertEquals("1316386223", policeForceService.getContactNo("Warwickshire"));
assertEquals("1316386230", policeForceService.getContactNo("Derbyshire"));
assertEquals("1316386231", policeForceService.getContactNo("Nottinghamshire"));
assertEquals("1316386232", policeForceService.getContactNo("Lincolnshire"));
assertEquals("1316386233", policeForceService.getContactNo("Leicestershire"));
}

}
Loading