forked from vcflib/vcflib
-
Notifications
You must be signed in to change notification settings - Fork 3
/
vcf2sqlite.py
executable file
·131 lines (113 loc) · 3.59 KB
/
vcf2sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# Push VCF file into SQLite3 database using dbname
import sys
import re
import sqlite3
if len(sys.argv) < 2:
print("usage {} [dbname]").format(sys.argv[0])
print("reads VCF on stdin, and writes output to a sqlite3 db [dbname]")
exit(1)
dbname = sys.argv[1]
# parse the header
# into a mapping from tag -> type
infotypes = {}
infonumbers = {}
for line in sys.stdin:
if line.startswith('##INFO'):
#<ID=XRS,Number=1,Type=Float,
i = re.search("ID=(.*?),", line)
n = re.search("Number=(.*?),", line)
t = re.search("Type=(.*?),", line)
if i and n and t:
iden = i.groups()[0]
number = n.groups()[0]
if number == "A":
number = -1
elif number == "G" or int(number) > 1:
# unclear how to deal with these
continue
else:
number = int(number)
typestr = t.groups()[0]
infotypes[iden] = typestr
infonumbers[iden] = number
else:
continue
elif line.startswith('##'):
continue
else:
break # header line, sample names etc.
# write the table schema
infotype_to_sqltype = {}
infotype_to_sqltype["Flag"] = "boolean"
infotype_to_sqltype["Integer"] = "integer"
infotype_to_sqltype["Float"] = "real"
infotype_to_sqltype["String"] = "text"
tablecmd = """create table alleles"""
specs = ["CHROM text",
"POS integer",
"ID text",
"REF text",
"ALT text",
"QUAL real",
"FILTER text"]
sorted_fields = sorted(infotypes.keys())
for field in sorted_fields:
infotype = infotypes[field]
sqltype = infotype_to_sqltype[infotype]
field = field.replace(".", "_") # escape periods, which are not allowed
specs.append(field + " " + sqltype)
tablecmd += " (" + ", ".join(specs) + ")"
conn = sqlite3.connect(dbname)
conn.execute(tablecmd)
# for each record
# parse the record
# for each allele
for line in sys.stdin:
fields = line.split('\t')
chrom, pos, iden, ref, alts, qual, filt, info = fields[:8]
alts = alts.split(",")
altindex = 0
chrom = "\'" + chrom + "\'"
iden = "\'" + iden + "\'"
ref = "\'" + ref + "\'"
filt = "\'" + filt + "\'"
for alt in alts:
alt = "\'" + alt + "\'"
info_values = {}
for pair in info.split(";"):
if pair.find("=") is not -1:
pair = pair.split("=")
key = pair[0]
value = pair[1]
if key not in infonumbers:
continue
if infonumbers[key] == -1:
values = value.split(",")
value = values[altindex]
info_values[key] = value
else:
# boolean flag
info_values[pair] = "1"
ordered_insertion = []
for field in sorted_fields:
value = "null"
if field in info_values:
value = info_values[field]
if infotypes[field] == "String":
value = "\'" + value + "\'"
else:
# missing flag means "false" for that flag
if infotypes[field] == "Flag":
value = "0"
ordered_insertion.append(value)
cmd = "insert into alleles values (" \
+ ", ".join([chrom, pos, iden, ref, alt, qual, filt]) \
+ ", " \
+ ", ".join(ordered_insertion) + ")"
conn.execute(cmd)
altindex += 1
conn.commit()
# TODO ignoring samples (for now)
# add indexes everywhere?
conn.close()