-
Notifications
You must be signed in to change notification settings - Fork 0
/
import_tsv.coffee
executable file
·138 lines (103 loc) · 3.72 KB
/
import_tsv.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
elasticsearch = require "elasticsearch"
CSV = require "csv"
debug = require("debug")("es-import-tsv")
tz = require "timezone"
argv = require('yargs')
.demand(['index','type'])
.describe
server: "Elasticsearch Server"
index: "Elasticsearch Index"
type: "ES Type"
zone: "Timezone"
verbose: "Show Debugging Logs"
names: "Field prefixes with NAMF, NAML"
.boolean(['verbose'])
.default
server: "localhost:9200"
verbose: false
zone: "America/Los_Angeles"
.argv
if argv.verbose
(require "debug").enable("es-import-tsv")
debug = require("debug")("es-import-tsv")
zone = tz(require("timezone/#{argv.zone}"))
es = new elasticsearch.Client host:argv.server, apiVersion:"1.4"
csv = CSV.parse delimiter:"\t", columns:true, highWaterMark:1024
argv.names = argv.names.split(",") if argv.names
#----------
class Importer extends require("stream").Writable
constructor: ->
super objectMode:true, highWaterMark:100
@_count = 0
_write: (batch,encoding,cb) ->
bulk = []
for obj in batch
bulk.push index:{}
bulk.push obj
es.bulk index:argv.index, type:argv.type, body:bulk, (err,resp) =>
if err
console.error "Error inserting into ES: #{err}"
@_count += batch.length
console.log "Rows inserted: #{@_count}"
cb()
#----------
class CleanAndBatch extends require("stream").Transform
constructor: ->
super objectMode:true, highWaterMark:100
@DATE_MATCH = /_DATE$/
@NUMBER_MATCH = /(?:AMOUNT|_YTD)$/
@BLANK_MATCH = /^\s?$/
@_batch = []
_transform: (obj,encoding,cb) ->
for k,v of obj
obj[k] = v = null if @BLANK_MATCH.test(v)
if v && @DATE_MATCH.test k
# parse as JS date, but then pass into tz for timezone. we insert UTC into ES
d = new Date(v)
new_v = tz(
zone(
[d.getFullYear(),d.getMonth(),d.getDate(),d.getHours(),d.getMinutes()]
,argv.zone
),
"UTC",
"%Y-%m-%dT%H:%M"
)
obj[k] = new_v
else if v && @NUMBER_MATCH.test k
obj[k] = parseFloat(v)
else
# assume a string
for prefix in argv.names||[]
if obj["#{prefix}_NAMF"] || obj["#{prefix}_NAML"]
obj["#{prefix}_NAME"] = [obj["#{prefix}_NAMF"],obj["#{prefix}_NAML"]].join(" ")
@_batch.push obj
if @_batch.length >= 5000
b = @_batch.splice(0)
@push b
cb()
#----------
importer = new Importer
cleaner = new CleanAndBatch
# create the index/type if it doesn't exist and apply our dynamic mapping
es.indices.existsType index:argv.index, type:argv.type, (err,exists) ->
throw err if err
_go = ->
process.stdin.pipe(csv).pipe(cleaner).pipe(importer)
if !exists
mapping =
dynamic_templates: [
strings:
match: "*"
match_mapping_type: "string",
mapping:
type: "string",
fields:
raw: { type:"string", index: "not_analyzed" }
]
es.indices.create index:argv.index, (err,res) ->
throw err if err
es.indices.putMapping index:argv.index, type:argv.type, body:mapping, (err,res) ->
throw err if err
_go()
else
_go()