-
Notifications
You must be signed in to change notification settings - Fork 0
/
TweetsFetcher.py
83 lines (70 loc) · 2.95 KB
/
TweetsFetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
from kafka import KafkaProducer
import tweepy
import configparser
import time
class TweeterStreamListener(tweepy.StreamListener):
"""this class reads the incoming twitter streams and pushes it to kafka"""
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x : json.dumps(x).encode('utf-8')
)
def on_data(self, data):
"""
this method will be called whenever new data arrives from twitter api.
we will push the obtained stream into kafka queue.
"""
extracted_data = {}
data_dict = json.loads(data)
# print(data_dict.keys())
if 'extended_tweet' in data_dict:
print(data_dict['extended_tweet']['full_text'].encode('utf-8'))
extracted_data['text'] = data_dict['extended_tweet']['full_text']
elif 'text' in data_dict:
print(data_dict['text'].encode('utf-8'))
extracted_data['text'] = data_dict['text']
extracted_data['tags'] = []
if 'entities' in data_dict and 'hashtags' in data_dict['entities']:
for tag in data_dict['entities']['hashtags']:
print(tag['text'])
extracted_data['tags'].append(tag['text'])
print('-' * 70)
if len(extracted_data['tags']) > 0 :
try:
self.producer.send('twitterstream', extracted_data)
except Exception as e:
print('exception when pushing data to kafka queue')
print(e)
return False
return True
def on_error(self, status):
print('error occured when recieving twitter stream')
return True
def on_timeout(self):
print('timeout occured when receiving twitter stream')
return True
config = configparser.ConfigParser()
config.read('twitter_creds.text')
consumer_key = config['DEFAULT']['consumerKey']
consumer_skey = config['DEFAULT']['consumerSecretKey']
access_key = config['DEFAULT']['accessToken']
access_skey = config['DEFAULT']['accessTokenSecret']
# creating auth obj
auth = tweepy.OAuthHandler(consumer_key, consumer_skey)
auth.set_access_token(access_key, access_skey)
api = tweepy.API(auth)
# creating the stream and binding it to listener
listener = TweeterStreamListener(api)
stream = tweepy.Stream(auth, listener=listener, tweet_mode='extended')
tracks = ['the', 'is', 'i', 'me', 'we', 'was', 'so', 'just', 'on', 'your', 'about'
'will', 'today', 'got', 'when', 'you', 'u', 'he', 'she', 'his', 'her',
'up', 'down', 'awesome', 'bad', 'help', 'said', 'ur', 'yours', 'our']
#tracks = ['corona', 'covid', 'india', 'china', 'italy']
stream.filter(track=tracks, languages=['en'])
time.sleep(10)
stream.disconnect()
listener.producer.flush()
listener.producer.close()