Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterized Graphs (aka Modules) #109

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/mixture-of-agents/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
"GPT-4o Mini",
"Llama 3.1 8B",
"Mixtral 8x7B",
]
];
const individualResults = "{{ individual }}";
const aggResults = "{{ summaries }}";

Expand All @@ -190,7 +190,7 @@
contentArea.textContent =
individualResults[currentLayer][currentIndex].trim();

cardTitle.textContent = `${modelNames[currentIndex]} - Layer ${ currentLayer + 1 }`;
cardTitle.textContent = `${modelNames[currentIndex]} - Layer ${currentLayer + 1}`;
} else {
contentArea.textContent = aggResults[currentLayer].trim();
cardTitle.textContent = `MoA Layer ${currentLayer + 1}`;
Expand Down
62 changes: 62 additions & 0 deletions examples/module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env -S npx ts-node --transpileOnly

import { Substrate, Box, Module, sb } from "substrate";

async function main() {
const SUBSTRATE_API_KEY = process.env["SUBSTRATE_API_KEY"];
const substrate = new Substrate({ apiKey: SUBSTRATE_API_KEY });

const x = sb.var({ type: "string", default: "hello" });
const y = sb.var({ type: "string" });
const z = sb.var({ type: "object", properties: {} });

const a = new Box({ value: { a: x, z: z, array: [x, x, x] } }, { id: "A" });
const b = new Box(
{ value: { b: sb.interpolate`x=${a.future.value.get("a")}, y=${y}` } },
{ id: "B" },
);

// publish the module on substrate.run
const publication = await substrate.module.publish({
name: "my reusable graph",
nodes: [a, b],
inputs: { x, y, z },
});
console.log("published:", publication.json);

// using the module from JSON
const mod = new Module({
module_json: substrate.module.serialize({
nodes: [a, b],
inputs: { x, y, z },
}),
inputs: {
// when commented will use "hello" because it is defined as the default above
// x: 123,
y: "yyy",
z: {
arr: ["123"],
},
},
});

// using the module from publication/module id
// const mod = new Module({
// module_id: publication.id,
// inputs: { y: "yyy", z: { arr: ["123"] } },
// });

const c = new Box(
{
value: {
"1": mod.future.get("A.value.z.arr[0]"),
"2": mod.future.get("B.value.b"),
},
},
{ id: "C" },
);

const res = await substrate.run(mod, c);
console.log(JSON.stringify(res.json, null, 2));
}
main();
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"vitest": "^1.0.4"
},
"dependencies": {
"@types/json-schema": "^7.0.15",
"@types/node-fetch": "^2.6.11",
"node-fetch": "2.7.0",
"pako": "^2.1.0"
Expand Down
39 changes: 38 additions & 1 deletion src/Future.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { idGenerator } from "substrate/idGenerator";
import { Node } from "substrate/Node";
import { type JSONSchema7 } from "json-schema";

type Accessor = "item" | "attr";
type TraceOperation = {
Expand Down Expand Up @@ -122,7 +123,7 @@ export class JQ extends Directive {
rawValue: (val: JQCompatible) => ({ future_id: null, val }),
};

override next(...items: TraceProp[]) {
override next(..._items: TraceProp[]) {
return new JQ(this.query, this.target);
}

Expand Down Expand Up @@ -315,3 +316,39 @@ export class FutureAnyObject extends Future<Object> {
return super._result();
}
}

export class Variable extends Directive {
items: any[]; // NOTE: unused field (will remove this from direcitve in a later refactor)
name: string | null;

constructor(items: any[]) {
super();
this.items = items;
}

override next(...args: any[]) {
return new Variable(args);
}

override async result(): Promise<any> {
return;
}

override toJSON() {
return {
type: "variable",
source: "input",
name: this.name,
};
}
}

export class FutureVariable extends Future<any> {
declare _directive: Variable;
schema: JSONSchema7;

constructor(schema?: JSONSchema7, id: string = newFutureId()) {
super(new Variable([]), id);
this.schema = schema ?? {};
}
}
36 changes: 36 additions & 0 deletions src/Module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Node, Options } from "substrate/Node";
import { FutureVariable } from "substrate/Future";

type ModuleId = `mod_${string}`;

export type SerializableModule = {
nodes: Node[];
inputs: ModuleInputs;
};

export type NewModule = SerializableModule & { name: string };
export type UpdateModule = NewModule & { id: ModuleId };
export type PublishableModule = NewModule; // | UpdateModule; (TODO: implement update module)
export type PublishedModule = {
id: ModuleId;
uri: string;
};

export type ModuleInputs = Record<string, FutureVariable>;

type ModuleIn =
| {
module_json: any;
inputs: Record<string, any>;
}
| {
module_id: ModuleId;
inputs: Record<string, any>;
};

export class Module extends Node {
constructor(args: ModuleIn, options?: Options) {
super(args, options);
this.node = "Module";
}
}
85 changes: 84 additions & 1 deletion src/Substrate.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { SubstrateError, RequestTimeoutError } from "substrate/Error";
import { VERSION } from "substrate/version";
import OpenAPIjson from "substrate/openapi.json";
import { SubstrateResponse } from "substrate/SubstrateResponse";
import {
asSubstratePublishedModuleResponse,
SubstrateResponse,
} from "substrate/SubstrateResponse";
import { SubstrateStreamingResponse } from "substrate/SubstrateStreamingResponse";
import { Node } from "substrate/Node";
import { Future } from "substrate/Future";
import { getPlatformProperties } from "substrate/Platform";
import { deflate } from "pako";
import { randomString } from "substrate/idGenerator";
import { SerializableModule, PublishableModule } from "substrate/Module";

type Configuration = {
/**
Expand Down Expand Up @@ -291,4 +295,83 @@ export class Substrate {

return headers;
}

module = {
/**
* Returns an object that represents a publishable "module" or code that can be used to construct
* a `Module` node.
*/
serialize: ({ nodes, inputs }: SerializableModule) => {
const inputIdToName = {};
const inputNameToSchema = {};

for (let name in inputs) {
let input = inputs[name];
// @ts-ignore
inputIdToName[input._id] = name;
// @ts-ignore
inputNameToSchema[name] = input.schema;
}

const dag = Substrate.serialize(...nodes);

// update variable name bindings in dag using inputs
dag.futures = dag.futures.map((future: any) => {
if (future.directive.type === "variable" && !future.directive.name) {
// @ts-ignore
future.directive.name = inputIdToName[future.id];
}
return future;
});

return {
dag,
inputs: inputNameToSchema,
api_version: this.apiVersion,
};
},

/**
* Publishes a module on substrate.run
*
*/
publish: async (
publishable: PublishableModule,
endpoint: string = "https://www.substrate.run/api/modules",
) => {
/**
* NOTE: Because the Module publishing API lives in another app and subdomain, the `baseUrl` configuration
* will not be applied to this request like we do with `.run`
*/
const serialized = this.module.serialize({
nodes: publishable.nodes,
inputs: publishable.inputs,
});

const body = {
module: { name: publishable.name },
module_version: serialized,
};

const requestOptions = {
method: "POST",
headers: this.headers(),
body: JSON.stringify(body),
};

const request = new Request(endpoint, requestOptions);
const requestId = request.headers.get("x-substrate-request-id");
const apiResponse = await fetch(request);

if (apiResponse.ok) {
const json = await apiResponse.json();
const res = new SubstrateResponse(request, apiResponse, json);
return asSubstratePublishedModuleResponse(res);
} else {
throw new SubstrateError(
`[Request failed] status=${apiResponse.status} statusText=${apiResponse.statusText} requestId=${requestId}`,
);
}
},
};
}
17 changes: 17 additions & 0 deletions src/SubstrateResponse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AnyNode, NodeOutput } from "substrate/Nodes";
import { NodeError } from "substrate/Error";
import { PublishedModule } from "substrate/Module";

/**
* Response to a run request.
Expand Down Expand Up @@ -39,3 +40,19 @@ export class SubstrateResponse {
return node.output() as NodeOutput<T>;
}
}

// TODO: create an alternate SubstrateResponse for non-compose responses
// For now using a type assertions and modifying the object.
export type SubstratePublishModuleResponse = Omit<
SubstrateResponse,
"get" | "getError"
> & { json: PublishedModule };
export const asSubstratePublishedModuleResponse = (
res: SubstrateResponse,
): SubstratePublishModuleResponse => {
// @ts-ignore
delete res.get;
// @ts-ignore
delete res.getError;
return res as SubstratePublishModuleResponse;
};
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ export {
DeleteVectors,
} from "substrate/Nodes";

export { Module } from "substrate/Module";

export { sb } from "substrate/sb";
export { Substrate };
import { Substrate } from "substrate/Substrate";
Expand Down
19 changes: 18 additions & 1 deletion src/sb.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
import { FutureAnyObject, FutureString } from "substrate/Future";
import {
FutureAnyObject,
FutureVariable,
FutureString,
} from "substrate/Future";
import { StreamingResponse } from "substrate/SubstrateStreamingResponse";

export const sb = {
concat: FutureString.concat,
jq: FutureAnyObject.jq,
interpolate: FutureString.interpolate,
/**
* `var` is used to specify a variable that can be bound to a name when creating a `module` (re-usable Substrate Graph)
* * Input types and validation paramters may optionally be described using a JSON Schema object.
* * Default values may also be specified here and will be used if user input is not provided for this input.
*/
var: (schema: FutureVariable["schema"]) => {
// NOTE: using `any` as the return type here for now to ease using
// this in general node input args or helper functions.
//
// Once we ship our Future type reorganization work, we can just
// use this as-is (Future<any>)
return new FutureVariable(schema) as any;
},
streaming: {
fromSSEResponse: StreamingResponse.fromReponse,
},
Expand Down
Loading