-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_to_csv.py
167 lines (130 loc) · 4.19 KB
/
export_to_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import json
import logging
import pandas as pd
import pprint
import requests
import sys
import time
from io import StringIO
from simple_salesforce import Salesforce
from utils import salesforce_login as login
logging.basicConfig(format="%(asctime)s : %(message)s", level=logging.ERROR)
def get_sobject_fields(sf: Salesforce, object_name: str) -> list:
"""get_sobject_fields(sf: Salesforce, object_name: str)
Args:
sf (Salesforce): simple_salesforce.Salesforce object
object_name (str): Name of Salesforce sObject
Returns:
fields: List of fields on the sObject
"""
fields = []
describe_url = f"{sf.base_url}sobjects/{object_name}/describe"
result = requests.request(
"GET",
describe_url,
headers=sf.headers,
)
pyObj = result.json()
for field in pyObj["fields"]:
if field["type"] not in ["address", "location"]:
fields.append(field["name"])
return fields
def build_soql_query(fields: list, object_name: str) -> str:
"""build_soql_query(fields: list, object_name: str)
Args:
fields (list): List of fields on the sObject
object_name (str): Name of Salesforce sObject
Returns:
str: SOQL query string
"""
query = f"SELECT {','.join([str(field) for field in fields])} FROM {object_name}"
return query
def bulk_query_request(query: str) -> str:
"""bulk_query_request(query: str)
Args:
query (str): SOQL query string
Returns:
str: Bulk API v2 query request json body
"""
request_body = json.dumps(
{
"operation": "query",
"query": f"{query}",
"contentType": "CSV",
"columnDelimiter": "COMMA",
"lineEnding": "LF",
}
)
return request_body
def main():
"""
Example Salesforce batch query. Login using bearer token.
"""
sf = login(
(
"",
"",
)
)
if not sf:
print("\n*** Not Logged into Salesforce ***\n")
sys.exit(1)
object_names = [
"Account",
"Contact",
]
jobs = []
for object_name in object_names:
fields = get_sobject_fields(sf, object_name)
body = bulk_query_request(build_soql_query(fields, object_name))
bulk_v2_query_url = f"{sf.base_url}jobs/query"
result = requests.request(
"POST",
bulk_v2_query_url,
headers=sf.headers,
data=body,
)
pyObj = result.json()
jobs.append(
{
"object_name": object_name,
"job_id": pyObj["id"],
"job_state": pyObj["state"],
}
)
job_state = "UploadComplete"
while job_state == "UploadComplete" or job_state == "InProgress":
time.sleep(10)
for job in jobs:
if job["job_state"] == "UploadComplete" or job["job_state"] == "InProgress":
result = requests.request(
"GET",
f"{bulk_v2_query_url}/{job['job_id']}",
headers=sf.headers,
)
pyObj = result.json()
pprint.pprint(pyObj)
job["job_state"] = pyObj["state"]
job_state = pyObj["state"]
if job_state == "UploadComplete" or job_state == "InProgress":
break
for job in jobs:
if job["job_state"] != "JobComplete":
print(f"\n*** Job ({job['object_name']}) Unsuccessful ***\n")
else:
result = requests.request(
"GET",
f"{bulk_v2_query_url}/{job['job_id']}/results",
headers=sf.headers,
)
row_count = result.headers["Sforce-NumberOfRecords"]
csv = result.text
df = pd.read_csv(StringIO(csv))
df.dropna(how="all", axis=1, inplace=True)
df.to_csv(f"{job['object_name']}.csv", encoding="utf-8", index=False)
line_count = 0
for line in csv.splitlines():
line_count += 1
print(f"\n{job['object_name']} - {row_count} : {line_count}")
if __name__ == "__main__":
main()