Skip to content

Commit

Permalink
implement pivot table block with execution queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Nov 19, 2024
1 parent ab12df0 commit 3e9fe5c
Show file tree
Hide file tree
Showing 12 changed files with 453 additions and 190 deletions.
44 changes: 44 additions & 0 deletions apps/api/src/yjs/v2/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import {
ExecutionQueueItemDropdownInputRenameVariableMetadata,
isDropdownInputBlock,
DropdownInputBlock,
ExecutionQueueItemPivotTableMetadata,
PivotTableBlock,
ExecutionQueueItemPivotTableLoadPageMetadata,
isPivotTableBlock,
} from '@briefer/editor'
import { IPythonExecutor, PythonExecutor } from './python.js'
import { logger } from '../../../logger.js'
Expand All @@ -42,6 +46,7 @@ import {
DropdownInputExecutor,
IDropdownInputExecutor,
} from './dropdown-input.js'
import { IPivotTableExecutor, PivotTableExecutor } from './pivot-table.js'

export class Executor {
private isRunning: boolean = false
Expand All @@ -56,6 +61,7 @@ export class Executor {
private readonly textInputExecutor: ITextInputExecutor,
private readonly dropdownInputExecutor: IDropdownInputExecutor,
private readonly dateInputExecutor: IDateInputExecutor,
private readonly pivotTableExecutor: IPivotTableExecutor,
private readonly workspaceId: string,
private readonly documentId: string,
private readonly blocks: Y.Map<YBlock>,
Expand Down Expand Up @@ -184,6 +190,12 @@ export class Executor {
case 'date-input':
await this.dateInputExecutor.save(item, data.block, data.metadata)
break
case 'pivot-table':
await this.pivotTableExecutor.run(item, data.block, data.metadata)
break
case 'pivot-table-load-page':
await this.pivotTableExecutor.loadPage(item, data.block, data.metadata)
break
default:
exhaustiveCheck(data)
}
Expand Down Expand Up @@ -316,6 +328,27 @@ export class Executor {
return { _tag: 'date-input', metadata, block }
}

case 'pivot-table':
case 'pivot-table-load-page': {
if (!isPivotTableBlock(block)) {
logger().error(
{
workspaceId: this.workspaceId,
documentId: this.documentId,
blockId: item.getBlockId(),
},
'Got wrong block type for pivot table execution'
)
return null
}

switch (metadata._tag) {
case 'pivot-table':
return { _tag: 'pivot-table', metadata, block }
case 'pivot-table-load-page':
return { _tag: 'pivot-table-load-page', metadata, block }
}
}
case 'noop':
return null
}
Expand All @@ -334,6 +367,7 @@ export class Executor {
TextInputExecutor.fromWSSharedDocV2(doc),
DropdownInputExecutor.fromWSSharedDocV2(doc),
DateInputExecutor.fromWSSharedDocV2(doc),
PivotTableExecutor.fromWSSharedDocV2(doc),
doc.workspaceId,
doc.documentId,
doc.blocks,
Expand Down Expand Up @@ -388,5 +422,15 @@ type ExecutionItemData =
metadata: ExecutionQueueItemDateInputMetadata
block: Y.XmlElement<DateInputBlock>
}
| {
_tag: 'pivot-table'
metadata: ExecutionQueueItemPivotTableMetadata
block: Y.XmlElement<PivotTableBlock>
}
| {
_tag: 'pivot-table-load-page'
metadata: ExecutionQueueItemPivotTableLoadPageMetadata
block: Y.XmlElement<PivotTableBlock>
}

function exhaustiveCheck(_param: never) {}
217 changes: 217 additions & 0 deletions apps/api/src/yjs/v2/executor/pivot-table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import {
ExecutionQueueItem,
ExecutionQueueItemPivotTableLoadPageMetadata,
ExecutionQueueItemPivotTableMetadata,
PivotTableBlock,
YBlock,
getPivotTableAttributes,
} from '@briefer/editor'
import * as Y from 'yjs'
import { logger } from '../../../logger.js'
import { DataFrame } from '@briefer/types'
import { createPivotTable } from '../../../python/pivot-table.js'
import { WSSharedDocV2 } from '../index.js'

export type PivotTableEffects = {
createPivotTable: typeof createPivotTable
}

export interface IPivotTableExecutor {
run(
executionItem: ExecutionQueueItem,
block: Y.XmlElement<PivotTableBlock>,
metadata: ExecutionQueueItemPivotTableMetadata
): Promise<void>
loadPage(
executionItem: ExecutionQueueItem,
block: Y.XmlElement<PivotTableBlock>,
metadata: ExecutionQueueItemPivotTableLoadPageMetadata
): Promise<void>
}

export class PivotTableExecutor implements IPivotTableExecutor {
private workspaceId: string
private documentId: string
private dataframes: Y.Map<DataFrame>
private blocks: Y.Map<YBlock>
private effects: PivotTableEffects

constructor(
workspaceId: string,
documentId: string,
dataframes: Y.Map<DataFrame>,
blocks: Y.Map<YBlock>,
effects: PivotTableEffects
) {
this.workspaceId = workspaceId
this.documentId = documentId
this.dataframes = dataframes
this.blocks = blocks
this.effects = effects
}

public async run(
executionItem: ExecutionQueueItem,
block: Y.XmlElement<PivotTableBlock>,
_metadata: ExecutionQueueItemPivotTableMetadata
) {
return this._run(executionItem, block, 'create')
}

public async loadPage(
executionItem: ExecutionQueueItem,
block: Y.XmlElement<PivotTableBlock>,
_metadata: ExecutionQueueItemPivotTableLoadPageMetadata
) {
return this._run(executionItem, block, 'read')
}

private async _run(
executionItem: ExecutionQueueItem,
block: Y.XmlElement<PivotTableBlock>,
operation: 'create' | 'read'
) {
try {
logger().trace(
{
workspaceId: this.workspaceId,
documentId: this.documentId,
blockId: block.getAttribute('id'),
},
'running pivot table block'
)

const dataframeName = block.getAttribute('dataframeName')
if (!dataframeName) {
executionItem.setCompleted()
return
}

const dataframe = this.dataframes.get(dataframeName)
if (!dataframe) {
executionItem.setCompleted()
return
}

const attrs = getPivotTableAttributes(block, this.blocks)

if (operation === 'create') {
const rows = attrs.rows
.map((row) => row.column)
.filter((row) => row !== null)
const cols = attrs.columns
.map((col) => col.column)
.filter((col) => col !== null)
const metrics = attrs.metrics
.map((metric) => metric.column)
.filter((metric) => metric !== null)

if (rows.length === 0 || cols.length === 0 || metrics.length === 0) {
block.setAttribute('updatedAt', new Date().toISOString())
block.setAttribute('error', null)
executionItem.setCompleted()
return
}

const all = [...rows, ...cols, ...metrics]

for (const column of all) {
if (
column &&
dataframe.columns.findIndex(
(c) => c.name.toString() === column.name.toString()
) === -1
) {
block.setAttribute('updatedAt', new Date().toISOString())
block.setAttribute('error', null)
executionItem.setCompleted()
return
}
}
}

let aborted = false
let cleanup = executionItem.observeStatus((status) => {
if (status._tag === 'aborting') {
aborted = true
}
})
const { promise, abort } = await this.effects.createPivotTable(
this.workspaceId,
this.documentId,
dataframe,
attrs.rows,
attrs.columns,
attrs.metrics,
attrs.sort,
attrs.variable.value,
attrs.page,
operation
)

if (aborted) {
await abort()
}

let abortP = Promise.resolve(aborted)
cleanup()
cleanup = executionItem.observeStatus((status) => {
if (status._tag === 'aborting') {
abortP = abort().then(() => true)
}
})

const result = await promise
aborted = await abortP
cleanup()

if (aborted) {
executionItem.setCompleted()
return
}

if (!result.success) {
if (result.reason !== 'aborted') {
block.setAttribute('error', result.reason)
}
} else {
block.setAttribute('updatedAt', new Date().toISOString())
block.setAttribute('error', null)
block.setAttribute('result', result.result)
block.setAttribute('page', result.result.page)
}
executionItem.setCompleted()

logger().trace(
{
workspaceId: this.workspaceId,
documentId: this.documentId,
blockId: block.getAttribute('id'),
},
'pivot table block run completed'
)
} catch (err) {
logger().error(
{
workspaceId: this.workspaceId,
documentId: this.documentId,
blockId: block.getAttribute('id'),
err,
},
'Failed to run pivot table block'
)

block.setAttribute('error', 'unknown')
}
}

public static fromWSSharedDocV2(doc: WSSharedDocV2) {
return new PivotTableExecutor(
doc.workspaceId,
doc.documentId,
doc.dataframes,
doc.blocks,
{ createPivotTable }
)
}
}
1 change: 1 addition & 0 deletions apps/api/src/yjs/v2/executor/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export class SQLExecutor implements ISQLExecutor {

if ((!dataSourceId && !isFileDataSource) || !dataframeName) {
executionItem.setCompleted()
cleanup()
return
}

Expand Down
11 changes: 6 additions & 5 deletions apps/api/src/yjs/v2/executors_/run-all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import {
getResultStatus,
isExecutableBlock,
switchBlockType,
updateDropdownInputValue,
updateInputValue,
} from '@briefer/editor'
import { ISQLExecutor, SQLExecutor } from './blocks/sql.js'
import { IPythonExecutor, PythonExecutor } from './blocks/python.js'
Expand Down Expand Up @@ -323,7 +321,8 @@ export class RunAllExecutor implements IRunAllExecutor {
onFileUpload: () => {},
onDashboardHeader: () => {},
onPivotTable: (block) => {
block.setAttribute('status', 'run-all-enqueued')
// TODO
// block.setAttribute('status', 'run-all-enqueued')
},
})
}
Expand Down Expand Up @@ -365,7 +364,8 @@ export class RunAllExecutor implements IRunAllExecutor {
block.setAttribute('status', 'run-all-running')
},
onPivotTable: (block) => {
block.setAttribute('status', 'run-all-running')
// TODO
// block.setAttribute('status', 'run-all-running')
},
})
}
Expand Down Expand Up @@ -407,7 +407,8 @@ export class RunAllExecutor implements IRunAllExecutor {
block.setAttribute('status', 'idle')
},
onPivotTable: (block) => {
block.setAttribute('status', 'idle')
// TODO
// block.setAttribute('status', 'idle')
},
})
}
Expand Down
Loading

0 comments on commit 3e9fe5c

Please sign in to comment.