-
Notifications
You must be signed in to change notification settings - Fork 0
/
TwitterStream.scala
48 lines (41 loc) · 1.54 KB
/
TwitterStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import ch.epfl.data.squall.utilities.{CustomReader, SquallContext, ReaderProvider}
import twitter4j._
import java.util.concurrent.LinkedBlockingQueue
class StatusStreamer(twitterStream: TwitterStream) extends CustomReader {
// Initialization
val queue = new LinkedBlockingQueue[String](1000)
val area = Array(Array(5.9517912865,45.9720796059),
Array(10.4178924561,47.634536498)) // Switzerland
twitterStream.addListener(statusListener)
twitterStream.filter(new FilterQuery().locations(area))
def statusListener = new StatusListener() {
def onStatus(status: Status) { queue.offer(status.getText) }
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}
def onException(ex: Exception) { ex.printStackTrace }
def onScrubGeo(arg0: Long, arg1: Long) {}
def onStallWarning(warning: StallWarning) {}
}
override def readLine(): String = {
queue.take()
}
// Cleanup
override def close() {
twitterStream.cleanUp
twitterStream.shutdown
}
}
class TwitterProvider extends ReaderProvider {
override def canProvide (context: SquallContext, name: String) = {
name == "twitter"
}
override def getReaderForName (name: String, fileSection: Int, fileParts: Int): CustomReader = {
if (name == "twitter") {
val twitterStream = new TwitterStreamFactory(Util.config).getInstance
new StatusStreamer(twitterStream)
} else {
null
}
}
override def toString(): String = "[Twitter status provider]"
}