-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnect.go
168 lines (143 loc) · 4.51 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package inngestgo
import (
"context"
"fmt"
"net/url"
"github.com/inngest/inngest/pkg/execution/state"
"github.com/inngest/inngest/pkg/publicerr"
"github.com/inngest/inngestgo/connect"
"github.com/inngest/inngestgo/internal/middleware"
"github.com/inngest/inngestgo/internal/sdkrequest"
)
const (
defaultMaxWorkerConcurrency = 1_000
)
type ConnectOpts struct {
Apps []Client
// InstanceID represents a stable identifier to be used for identifying connected SDKs.
// This can be a hostname or other identifier that remains stable across restarts.
//
// If nil, this defaults to the current machine's hostname.
InstanceID *string
RewriteGatewayEndpoint func(endpoint url.URL) (url.URL, error)
// MaxConcurrency defines the maximum number of requests the worker can process at once.
// This affects goroutines available to handle connnect workloads, as well as flow control.
// Defaults to 1000.
MaxConcurrency int
}
func Connect(ctx context.Context, opts ConnectOpts) (connect.WorkerConnection, error) {
concurrency := opts.MaxConcurrency
if concurrency < 1 {
concurrency = defaultMaxWorkerConcurrency
}
connectPlaceholder := url.URL{
Scheme: "ws",
Host: "connect",
}
if opts.InstanceID == nil {
return nil, fmt.Errorf("missing required Instance ID")
}
apps := make([]connect.ConnectApp, len(opts.Apps))
invokers := make(map[string]connect.FunctionInvoker, len(opts.Apps))
for i, a := range opts.Apps {
app, ok := a.(*apiClient)
if !ok {
return nil, fmt.Errorf("invalid handler passed")
}
appName := app.AppID()
fns, err := createFunctionConfigs(appName, app.h.GetFunctions(), connectPlaceholder, true)
if err != nil {
return nil, fmt.Errorf("error creating function configs: %w", err)
}
apps[i] = connect.ConnectApp{
AppName: appName,
Functions: fns,
AppVersion: app.h.GetAppVersion(),
}
invokers[appName] = app.h
}
if len(opts.Apps) < 1 {
return nil, fmt.Errorf("must specify at least one app")
}
defaultClient, ok := opts.Apps[0].(*apiClient)
if !ok {
return nil, fmt.Errorf("invalid handler passed")
}
var hashedKey []byte
var hashedFallbackKey []byte
signingKey := defaultClient.h.GetSigningKey()
if signingKey == "" {
if !defaultClient.h.isDev() {
// Signing key is only required in cloud mode.
return nil, fmt.Errorf("signing key is required")
}
} else {
var err error
hashedKey, err = hashedSigningKey([]byte(signingKey))
if err != nil {
return nil, fmt.Errorf("failed to hash signing key: %w", err)
}
if fallbackKey := defaultClient.h.GetSigningKeyFallback(); fallbackKey != "" {
hashedFallbackKey, err = hashedSigningKey([]byte(fallbackKey))
if err != nil {
return nil, fmt.Errorf("failed to hash fallback signing key: %w", err)
}
}
}
return connect.Connect(ctx, connect.Opts{
Apps: apps,
Env: defaultClient.Env,
Capabilities: capabilities,
HashedSigningKey: hashedKey,
HashedSigningKeyFallback: hashedFallbackKey,
MaxConcurrency: concurrency,
APIBaseUrl: defaultClient.h.GetAPIBaseURL(),
IsDev: defaultClient.h.isDev(),
DevServerUrl: DevServerURL(),
InstanceID: opts.InstanceID,
Platform: Ptr(platform()),
SDKVersion: SDKVersion,
SDKLanguage: SDKLanguage,
RewriteGatewayEndpoint: opts.RewriteGatewayEndpoint,
}, invokers, defaultClient.Logger)
}
func (h *handler) getServableFunctionBySlug(slug string) ServableFunction {
h.l.RLock()
var fn ServableFunction
for _, f := range h.funcs {
if f.FullyQualifiedID() == slug {
fn = f
break
}
}
h.l.RUnlock()
return fn
}
func (h *handler) InvokeFunction(ctx context.Context, slug string, stepId *string, request sdkrequest.Request) (any, []state.GeneratorOpcode, error) {
fn := h.getServableFunctionBySlug(slug)
if fn == nil {
// XXX: This is a 500 within the JS SDK. We should probably change
// the JS SDK's status code to 410. 404 indicates that the overall
// API for serving Inngest isn't found.
return nil, nil, publicerr.Error{
Message: fmt.Sprintf("function not found: %s", slug),
Status: 410,
}
}
cImpl, ok := h.client.(*apiClient)
if !ok {
return nil, nil, fmt.Errorf("invalid client")
}
mw := middleware.NewMiddlewareManager().Add(cImpl.Middleware...)
// Invoke function, always complete regardless of
resp, ops, err := invoke(
context.Background(),
h.client,
mw,
fn,
h.GetSigningKey(),
&request,
stepId,
)
return resp, ops, err
}