-
Notifications
You must be signed in to change notification settings - Fork 1
/
Articles.py
192 lines (148 loc) · 6.69 KB
/
Articles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from lxml.etree import fromstring
from lxml import html
import os, datetime, calendar, requests
import pandas as pd
from articles.articlesData import *
from articles.sources import *
# set https header parameters
headers = {
'User-Agent': 'Mozilla/5.0', #required
'referer': "https://www.investing.com",
}
class ArticlesData():
def __init__(self, indicatorData):
self.parameters = {}
self.articles = {}
self.indicatorData = indicatorData
self.article_counter = 0
self.false_flag = 0
#set https header for request
def setHeaders(self, headers):
self.headers = headers
#desired time period from/to
def updateStartingEndingDate(self, startingDate, endingDate):
self.parameters['st_date'] = startingDate
self.parameters['end_date'] = endingDate
#download article titles, urls and dates for desired dates
def downloadListOfArticles(self, API_url):
#creates a dictionary included all the articles in the desired time period
if DEBUG:
print("[+] Downloading data from: " + str(API_url))
page = requests.get(API_url, headers=self.headers)
tree = html.fromstring(page.content)
article_titles = tree.xpath(self.indicatorData['xpath_articles'] + self.indicatorData['xpath_articles_title'])
article_title_links = tree.xpath(self.indicatorData['xpath_articles'] + self.indicatorData['xpath_articles_link'])
article_dates = tree.xpath(self.indicatorData['xpath_articles'] + self.indicatorData['xpath_articles_date'])
try:
article_titles.remove(" ")
except ValueError:
pass
#the problem is caused because of the sponsored article, which does not have a date
#the sponsored is deleted
if len(article_titles) > len(article_dates):
del(article_titles[3])
del(article_title_links[3])
# there is also an add sometimes in the 6th position after the sponsored
if len(article_titles) > len(article_dates):
del(article_titles[5])
# there is also an add sometimes in the 8th position after the sponsored
if len(article_titles) > len(article_dates):
del(article_titles[7])
# if more than two adds and one sponsored is appeared delete the first
while (len(article_titles) > len(article_dates)):
del(article_titles[0])
#if ~40 articles are outdated stop the procedure
for i in range(len(article_titles)):
#check if 'date' is formet like "Dec 22, 2017" or "11 hours before" or "am" etc
#if it is set today for date
if "hour" in article_dates[i] or "minute" in article_dates[i] or "second" in article_dates[i] or "am" in article_dates[i] or "pm" in article_dates[i]:
article_dates[i] = datetime.date.today()
article_date = article_dates[i]
else:
if self.indicatorData['name_API'] == 'reuters':
#find differencies in dates #just a comma
article_date = datetime.datetime.strptime(article_dates[i].replace('\xa0-\xa0',''), "%b %d %Y").date()
elif self.indicatorData['name_API'] == 'investing':
article_date = datetime.datetime.strptime(article_dates[i].replace('\xa0-\xa0',''), "%b %d, %Y").date()
# print(article_date)
#check if article date is in desired date period
if self.parameters['st_date'] <= article_date and article_date <= self.parameters['end_date']:
article_title = article_titles[i].replace("\n","").replace("\t","")
if self.filterArticle(article_title):
self.articles[self.article_counter] = {'article_title' : article_title, 'article_title_link' : article_title_links[i], 'article_date' : article_date}
self.article_counter += 1
else:
self.false_flag += 1
if self.false_flag >= len(article_titles): #almost 40
return False
return True
#search also on next pages for articles in the desired time period
def downloadListOfArticlesRepeatedly(self):
#creates a dictionary included all the articles in the desired time period
page_counter = 1
while self.downloadListOfArticles(self.indicatorData['url_API'] + str(page_counter)):
page_counter += 1
if DEBUG:
print("[+] Scraped " + str(page_counter) + " pages - Articles found: " + str(len(self.articles)))
#download all articles from article list dictionary
def downloadArticleText(self):
#create directory "/results" if doesn't exists
if not os.path.exists("results"):
os.makedirs("results")
#for every article in the dictionary, visit url and scrape text
for key in sorted(self.articles.keys()):
#if the artcile is IN the website
if "http" not in self.articles[key]['article_title_link']:
if self.indicatorData['name_API'] == 'reuters':
article_url = 'https://www.reuters.com' + self.articles[key]['article_title_link']
elif self.indicatorData['name_API'] == 'investing':
article_url = 'https://www.investing.com' + self.articles[key]['article_title_link']
#or is from another source
else:
article_url = self.articles[key]['article_title_link']
if DEBUG:
print("[+] Downloading article from: " + article_url)
article = requests.get(article_url, headers=headers)
tree = html.fromstring(article.content)
#check for every possible source (file: articles/sources.py)
for source in sources:
article_text = ' '.join(tree.xpath(sources[source]['xpath_article']))
if article_text is not "":
if DEBUG:
print("[+] Source: " + str(source))
print("[+] Saving data: results/" + str(self.articles[key]['article_date']) + "/" + self.articles[key]['article_title'] + ".txt")
try:
self.saveArticle("results/" + str(self.articles[key]['article_date']), self.articles[key]['article_title'].replace("/"," ") + ".txt", article_text)
except Exception:
pass
#save article in .txt format
def saveArticle(self, directory, filename, text):
try:
file = open(directory + "/" + filename, "w")
except FileNotFoundError:
if not os.path.exists(directory):
os.makedirs(directory)
file = open(directory + "/" + filename, "w")
file.write(text)
file.close()
#print article dictionary
def printListOfArticles(self):
for key in self.articles:
print("Title: " + self.articles[key]['article_title'] + "\nURL: " + self.articles[key]['article_title_link'] + "\nDate: " + str(self.articles[key]['article_date']) + "\n")
#filter article title for certain words
def filterArticle(self, title):
for title_word in title.split(" "):
if title_word in self.indicatorData['listOfWords']:
return False
return True
if __name__ == "__main__":
DEBUG = True
indicator = sp500
ad = ArticlesData(indicator)
ad.setHeaders(headers)
#DATE FORMAT: YYYY, MM, DD
ad.updateStartingEndingDate(datetime.date(2020, 3, 1), datetime.date(2020, 11, 16))
ad.downloadListOfArticlesRepeatedly()
ad.printListOfArticles()
ad.downloadArticleText()
print("[+] DONE")