Skip to content

Commit

Permalink
Merge pull request #485 from zazuko/sp-destroyable-transform
Browse files Browse the repository at this point in the history
Fix DestroyableTransform issues
  • Loading branch information
ludovicm67 authored Aug 27, 2024
2 parents 37e42ba + 7d6a17a commit 11f74a2
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 327 deletions.
5 changes: 5 additions & 0 deletions .changeset/green-cherries-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@zazuko/trifid-plugin-sparql-proxy": patch
---

Fix issues in case of DestroyableTransform
5 changes: 5 additions & 0 deletions .changeset/lazy-glasses-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trifid-handler-fetch": patch
---

Upgrade Oxigraph to 0.4.0-rc.1.
677 changes: 367 additions & 310 deletions package-lock.json

Large diffs are not rendered by default.

26 changes: 19 additions & 7 deletions packages/handler-fetch/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@ export const performOxigraphQuery = async (store, query) => {

try {
if (isConstructQuery) {
contentType = 'application/n-quads'
results = store.query(query, {
const queryResults = store.query(query, {
use_default_graph_as_union: true,
}).map((quad) => quad.toString()).join('.\n')
if (results) {
results = `${results}.\n`
})
if (Array.isArray(queryResults)) {
contentType = 'application/n-quads'
results = queryResults.map((quad) => quad.toString()).join('.\n')
if (results) {
results = `${results}.\n`
}
} else {
contentType = 'text/plain'
results = 'Something went wrong while getting the query results (expected array).'
}
} else {
contentType = 'application/sparql-results+json'
results = store.query(query, {
const queryResults = store.query(query, {
use_default_graph_as_union: true,
results_format: 'json',
})
if (typeof queryResults === 'string') {
contentType = 'application/sparql-results+json'
results = queryResults
} else {
contentType = 'text/plain'
results = 'Something went wrong while getting the query results (expected string).'
}
}
} catch (error) {
contentType = 'text/plain'
Expand Down
2 changes: 1 addition & 1 deletion packages/handler-fetch/lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const handleConfig = async (config) => {
store.load(data, {
format: contentType,
base_iri: baseIri,
to_named_graph: graphNameIri,
to_graph_name: graphNameIri,
})
parentPort.postMessage({
type: 'log',
Expand Down
2 changes: 1 addition & 1 deletion packages/handler-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"prepack": "npm run build"
},
"dependencies": {
"oxigraph": "^0.4.0-alpha.7",
"oxigraph": "^0.4.0-rc.1",
"uuid": "^10.0.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/handler-fetch/test/handler-fetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('trifid-handler-fetch', () => {
`, {
format: 'text/turtle',
base_iri: 'http://example.org/',
to_named_graph: oxigraph.defaultGraph(),
to_graph_name: oxigraph.defaultGraph(),
})
})

Expand Down
15 changes: 9 additions & 6 deletions packages/sparql-proxy/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// @ts-check

import { Readable } from 'node:stream'
import { ReadableStream } from 'node:stream/web'
import { performance } from 'node:perf_hooks'
import { Worker } from 'node:worker_threads'
import { sparqlGetRewriteConfiguration } from 'trifid-core'
import replaceStream from 'string-replace-stream'
import rdf from '@zazuko/env-node'
import ReplaceStream from './lib/ReplaceStream.js'

const defaultConfiguration = {
endpointUrl: '',
Expand Down Expand Up @@ -253,14 +254,16 @@ const factory = async (trifid) => {
/** @type {any} */
let responseStream = response.body
if (rewriteResponse && options.rewriteResults) {
const replaceStream = new ReplaceStream(rewriteResponse.origin, rewriteResponse.replacement)
responseStream = Readable
.from(responseStream)
.pipe(replaceStream(
rewriteResponse.origin,
rewriteResponse.replacement,
))
.pipe(replaceStream)
responseStream = Readable
.from(responseStream)
}
if (responseStream instanceof ReadableStream) {
responseStream = Readable.fromWeb(responseStream)
}
responseStream = Readable.fromWeb(responseStream)

reply
.status(response.status)
Expand Down
42 changes: 42 additions & 0 deletions packages/sparql-proxy/lib/ReplaceStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Transform } from 'node:stream'

class ReplaceStream extends Transform {
constructor(searchStr, replaceStr, options = {}) {
super(options)
this.searchStr = searchStr
this.replaceStr = replaceStr
this.tailPiece = '' // Holds trailing data from previous chunk
this.searchStrLen = searchStr.length
}

_transform (chunk, encoding, callback) {
const data = this.tailPiece + chunk.toString() // Combine previous tail with new chunk
let lastIndex = 0
let index

const pieces = []

// Search for occurrences of searchStr
while ((index = data.indexOf(this.searchStr, lastIndex)) !== -1) {
pieces.push(data.slice(lastIndex, index)) // Push the data before the match
pieces.push(this.replaceStr) // Push the replacement string
lastIndex = index + this.searchStrLen // Move the index past the match
}

// Save the remaining data after the last match as tailPiece
this.tailPiece = data.slice(lastIndex)

// Push the processed data
this.push(pieces.join(''))

callback()
}

_flush (callback) {
// Push out any remaining data in tailPiece, processing any matches in it
this.push(this.tailPiece.replace(new RegExp(this.searchStr, 'g'), this.replaceStr))
callback()
}
}

export default ReplaceStream
2 changes: 1 addition & 1 deletion packages/sparql-proxy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
"dependencies": {
"@vocabulary/sd": "^1.0.4",
"@zazuko/env-node": "^2.1.3",
"string-replace-stream": "^0.0.2",
"trifid-core": "^5.0.0"
},
"devDependencies": {
"@types/node": "^22.4.1",
"chai": "^5.1.1",
"mocha": "^10.7.3",
"sinon": "^18.0.0",
"sinon-chai": "^4.0.0"
},
Expand Down
137 changes: 137 additions & 0 deletions packages/sparql-proxy/test/ReplaceStream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { PassThrough } from 'node:stream'
import { expect } from 'chai'
import { describe, it } from 'mocha'
import ReplaceStream from '../lib/ReplaceStream.js'

describe('ReplaceStream', () => {
it('should replace a single occurrence of a string', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

const receivedData = []

input.pipe(replaceStream).pipe(output)

output.setEncoding('utf8')
output.on('data', (data) => {
receivedData.push(data)
})

output.on('end', () => {
expect(receivedData.join('')).to.equal('new string')
done()
})

input.write('old string')
input.end()
})

it('should replace multiple occurrences of a string', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

const receivedData = []

input.pipe(replaceStream).pipe(output)

output.setEncoding('utf8')
output.on('data', (data) => {
receivedData.push(data)
})

output.on('end', () => {
expect(receivedData.join('')).to.equal('new string with new value')
done()
})

input.write('old string with old value')
input.end()
})

it('should replace strings that span across chunks', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

const receivedData = []

input.pipe(replaceStream).pipe(output)

output.setEncoding('utf8')
output.on('data', (data) => {
receivedData.push(data)
})

output.on('end', () => {
expect(receivedData.join('')).to.equal('new and new')
done()
})

input.write('ol')
input.write('d and ol')
input.write('d')
input.end()
})

it('should handle no occurrences of the string', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

input.pipe(replaceStream).pipe(output)

output.setEncoding('utf8')
output.on('data', (data) => {
expect(data).to.equal('no match here')
})

output.on('end', done)

input.write('no match here')
input.end()
})

it('should handle empty input', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

input.pipe(replaceStream).pipe(output)

output.setEncoding('utf8')
output.on('data', (data) => {
expect(data).to.equal('')
})

output.on('end', done)

input.end()
})

it('should handle large input with multiple replacements', (done) => {
const replaceStream = new ReplaceStream('old', 'new')
const input = new PassThrough()
const output = new PassThrough()

const largeInput = 'old '.repeat(1000)
const expectedOutput = 'new '.repeat(1000)

input.pipe(replaceStream).pipe(output)

let result = ''
output.setEncoding('utf8')
output.on('data', (data) => {
result += data
})

output.on('end', () => {
expect(result).to.equal(expectedOutput)
done()
})

input.write(largeInput)
input.end()
})
})

0 comments on commit 11f74a2

Please sign in to comment.