Skip to content

Commit

Permalink
Merge pull request #62 from bento-platform/refact/fixed-mappings
Browse files Browse the repository at this point in the history
refact!: fixed mappings for variant indices
  • Loading branch information
davidlougheed authored Nov 6, 2024
2 parents d23175f + b433d15 commit 9e94ca8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 16 deletions.
6 changes: 3 additions & 3 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

GOHAN_DEBUG=false
GOHAN_SERVICE_CONTACT=[email protected]
GOHAN_SEMVER=5.0.2
GOHAN_SEMVER=6.0.0
GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization"

# GOOS=linux
Expand Down Expand Up @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api
GOHAN_API_VERSION=latest

GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.09.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.09.01
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.11.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.11.01

GOHAN_API_CONTAINER_NAME=gohan-api
GOHAN_API_SERVICE_HOST=0.0.0.0
Expand Down
63 changes: 63 additions & 0 deletions src/api/models/indexes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,69 @@ type Genotype struct {
Zygosity c.Zygosity `json:"zygosity"`
}

var MAPPING_FIELDS_KEYWORD_IG256 = map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
"ignore_above": 256,
},
}
var MAPPING_TEXT = map[string]interface{}{"type": "text", "fields": MAPPING_FIELDS_KEYWORD_IG256}
var MAPPING_LONG = map[string]interface{}{"type": "long"}
var MAPPING_FLOAT64 = map[string]interface{}{"type": "double"}
var MAPPING_BOOL = map[string]interface{}{"type": "boolean"}
var MAPPING_DATE = map[string]interface{}{"type": "date"}

// This mapping is derived from the one exported by Victor from the ICHANGE instance on 2024-11-01,
// using the following commands:
// ./bentoctl.bash shell gohan-api
// --> inside gohan-api container
// curl -u $GOHAN_ES_USERNAME:$GOHAN_ES_PASSWORD bentov2-gohan-elasticsearch:9200/_mapping
var VARIANT_INDEX_MAPPING = map[string]interface{}{
"properties": map[string]interface{}{
"chrom": MAPPING_TEXT,
"pos": MAPPING_LONG,
"id": MAPPING_TEXT,
"ref": MAPPING_TEXT,
"alt": MAPPING_TEXT,
"format": MAPPING_TEXT,
"qual": MAPPING_LONG,
"filter": MAPPING_TEXT,
"info": map[string]interface{}{
"properties": map[string]interface{}{
"id": MAPPING_TEXT,
"value": MAPPING_TEXT,
},
},
"sample": map[string]interface{}{
"properties": map[string]interface{}{
"id": MAPPING_TEXT,
"variation": map[string]interface{}{
"properties": map[string]interface{}{
"genotype": map[string]interface{}{
"properties": map[string]interface{}{
"phased": MAPPING_BOOL,
"zygosity": MAPPING_LONG,
},
},
"alleles": map[string]interface{}{
"properties": map[string]interface{}{
"left": MAPPING_TEXT,
"right": MAPPING_TEXT,
},
},
"phredScaleLikelyhood": MAPPING_LONG,
"genotypeProbability": MAPPING_FLOAT64,
},
},
},
},
"fileId": MAPPING_TEXT,
"dataset": MAPPING_TEXT,
"assemblyId": MAPPING_TEXT,
"createdTime": MAPPING_DATE,
},
}

type Gene struct {
Name string `json:"name"`
Chrom string `json:"chrom"`
Expand Down
63 changes: 50 additions & 13 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (i *IngestionService) Init() {
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",
Index: fmt.Sprintf("variants-%s", strings.ToLower(queuedVariant.Chrom)),
Index: variantIndexName(queuedVariant.Chrom),

// Body is an `io.Reader` with the payload
Body: bytes.NewReader(variantData),
Expand Down Expand Up @@ -369,6 +369,10 @@ func (i *IngestionService) ProcessVcf(
defer gr.Close()

scanner := bufio.NewScanner(gr)

contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header
var contigMutex = sync.RWMutex{}

var discoveredHeaders bool = false
var headers []string
headerSampleIds := make(map[int]string)
Expand All @@ -382,28 +386,25 @@ func (i *IngestionService) ProcessVcf(
lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel)

for scanner.Scan() {
//fmt.Println(scanner.Text())

// Gather Header row by seeking the CHROM string
// Collect contigs (chromosomes) to create indices
line := scanner.Text()
if !discoveredHeaders {
if line[0:6] == "#CHROM" {
// Split the string by tabs
headers = strings.Split(line, "\t")

for id, header := range headers {
for idx, header := range headers {
// determine if header is a default VCF header.
// if it is not, assume it's a sampleId and keep
// track of it with an id
if !utils.StringInSlice(strings.ToLower(strings.TrimSpace(strings.ReplaceAll(header, "#", ""))), constants.VcfHeaders) {
headerSampleIds[len(constants.VcfHeaders)-id] = header
headerSampleIds[len(constants.VcfHeaders)-idx] = header
}
}

discoveredHeaders = true

fmt.Println("Found the headers: ", headers)
continue
fmt.Printf("Found %d headers: %v\n", len(headers), headers)
}
continue
}
Expand Down Expand Up @@ -435,20 +436,30 @@ func (i *IngestionService) ProcessVcf(
var rowWg sync.WaitGroup
rowWg.Add(len(rowComponents))

for rowIndex, rowComponent := range rowComponents {
go func(i int, rc string, rwg *sync.WaitGroup) {
for colIdx, colVal := range rowComponents {
go func(h int, rc string, rwg *sync.WaitGroup) {
defer rwg.Done()
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1)))
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1)))
value := strings.TrimSpace(rc)

// if not a vcf header, assume it's a sampleId header
if utils.StringInSlice(key, constants.VcfHeaders) {

// filter field type by column name
if key == "chrom" {
// Strip out all non-numeric characters
// Strip out chr prefix
value = strings.ReplaceAll(value, "chr", "")

// We're making contig indices on the fly - check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs "set" (map).
contigMutex.Lock()
_, indexExists := contigs[value]
if !indexExists {
i.MakeVariantIndex(value)
contigs[value] = struct{}{} // add contig to the "set" of created configs
}
contigMutex.Unlock()

// ems if value is valid chromosome
if chromosome.IsValidHumanChromosome(value) {
tmpVariantMapMutex.Lock()
Expand Down Expand Up @@ -554,7 +565,7 @@ func (i *IngestionService) ProcessVcf(
})
tmpSamplesMutex.Unlock()
}
}(rowIndex, rowComponent, &rowWg)
}(colIdx, colVal, &rowWg)
}

rowWg.Wait()
Expand Down Expand Up @@ -865,3 +876,29 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
}
return false
}

func (i *IngestionService) MakeVariantIndex(c string) {
var client = i.ElasticsearchClient
var contigIndex = variantIndexName(c)

res, err := client.Indices.Exists([]string{contigIndex})
if res.StatusCode == 404 {
mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
contigIndex,
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
} else if err != nil {
// The actual check didn't work properly (e.g., couldn't contact ES).
fmt.Printf("Contig index %s existence-check got error: %s\n", c, err)
} else {
// The check worked and the index already exists, so we shouldn't try to recreate it.
fmt.Printf("Contig index %s already exists; skipping creation\n", c)
}
}

func variantIndexName(contig string) string {
return fmt.Sprintf("variants-%s", strings.ToLower(contig))
}

0 comments on commit 9e94ca8

Please sign in to comment.