Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ClientOption] Add support for endpoint according to new routing policy #679

Merged
merged 7 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func NewRecorder(httpClient *http.Client) *HostRecorder {

func (hr *HostRecorder) Options(host string, opts ...ably.ClientOption) []ably.ClientOption {
return append(opts,
ably.WithRealtimeHost(host),
ably.WithEndpoint(host),
ably.WithAutoConnect(false),
ably.WithDial(hr.dialWS),
ably.WithHTTPClient(hr.httpClient),
Expand Down
8 changes: 4 additions & 4 deletions ably/auth_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestAuth_JWT_Token_RSA8c(t *testing.T) {
rec, optn := ablytest.NewHttpRecorder()
rest, err := ably.NewREST(
ably.WithToken(jwt),
ably.WithEnvironment(app.Environment),
ably.WithEndpoint(app.Endpoint),
optn[0],
)
assert.NoError(t, err, "rest()=%v", err)
Expand All @@ -414,7 +414,7 @@ func TestAuth_JWT_Token_RSA8c(t *testing.T) {
rest, err := ably.NewREST(
ably.WithAuthURL(ablytest.CREATE_JWT_URL),
ably.WithAuthParams(app.GetJwtAuthParams(30*time.Second, false)),
ably.WithEnvironment(app.Environment),
ably.WithEndpoint(app.Endpoint),
optn[0],
)
assert.NoError(t, err, "rest()=%v", err)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestAuth_JWT_Token_RSA8c(t *testing.T) {

rec, optn := ablytest.NewHttpRecorder()
rest, err := ably.NewREST(
ably.WithEnvironment(app.Environment),
ably.WithEndpoint(app.Endpoint),
authCallback,
optn[0],
)
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestAuth_JWT_Token_RSA8c(t *testing.T) {
rest, err := ably.NewREST(
ably.WithAuthURL(ablytest.CREATE_JWT_URL),
ably.WithAuthParams(app.GetJwtAuthParams(30*time.Second, true)),
ably.WithEnvironment(app.Environment),
ably.WithEndpoint(app.Endpoint),
optn[0],
)
assert.NoError(t, err, "rest()=%v", err)
Expand Down
4 changes: 2 additions & 2 deletions ably/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestIssue127ErrorResponse(t *testing.T) {
ably.WithKey("xxxxxxx.yyyyyyy:zzzzzzz"),
ably.WithTLS(false),
ably.WithUseTokenAuth(true),
ably.WithRESTHost(endpointURL.Hostname()),
ably.WithEndpoint(endpointURL.Hostname()),
}
port, _ := strconv.ParseInt(endpointURL.Port(), 10, 0)
opts = append(opts, ably.WithPort(int(port)))
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestIssue_154(t *testing.T) {
ably.WithKey("xxxxxxx.yyyyyyy:zzzzzzz"),
ably.WithTLS(false),
ably.WithUseTokenAuth(true),
ably.WithRESTHost(endpointURL.Hostname()),
ably.WithEndpoint(endpointURL.Hostname()),
}
port, _ := strconv.ParseInt(endpointURL.Port(), 10, 0)
opts = append(opts, ably.WithPort(int(port)))
Expand Down
18 changes: 15 additions & 3 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ func NewClientOptions(os ...ClientOption) *clientOptions {
return applyOptionsWithDefaults(os...)
}

func GetEnvFallbackHosts(env string) []string {
return getEnvFallbackHosts(env)
func GetEndpointFallbackHosts(endpoint string) []string {
return getEndpointFallbackHosts(endpoint)
}

func (opts *clientOptions) GetRestHost() string {
Expand All @@ -24,6 +24,14 @@ func (opts *clientOptions) GetRealtimeHost() string {
return opts.getRealtimeHost()
}

func (opts *clientOptions) Validate() error {
return opts.validate()
}

func (opts *clientOptions) GetHostnameFromEndpoint() string {
return opts.getHostnameFromEndpoint()
}

func (opts *clientOptions) ActivePort() (int, bool) {
return opts.activePort()
}
Expand Down Expand Up @@ -192,14 +200,18 @@ func ApplyOptionsWithDefaults(o ...ClientOption) *clientOptions {
return applyOptionsWithDefaults(o...)
}

func IsEndpointFQDN(endpoint string) bool {
return isEndpointFQDN(endpoint)
}

type ConnStateChanges = connStateChanges

type ChannelStateChanges = channelStateChanges

const ConnectionStateTTLErrFmt = connectionStateTTLErrFmt

func DefaultFallbackHosts() []string {
return defaultFallbackHosts()
return defaultOptions.FallbackHosts
}

// PendingItems returns the number of messages waiting for Ack/Nack
Expand Down
2 changes: 1 addition & 1 deletion ably/http_paginated_response_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestHTTPPaginatedFallback(t *testing.T) {
assert.NoError(t, err)
defer app.Close()
opts := app.Options(ably.WithUseBinaryProtocol(false),
ably.WithRESTHost("ably.invalid"),
ably.WithEndpoint("ably.invalid"),
ably.WithFallbackHosts(nil))
client, err := ably.NewREST(opts...)
assert.NoError(t, err)
Expand Down
141 changes: 109 additions & 32 deletions ably/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ const (
protocolJSON = "application/json"
protocolMsgPack = "application/x-msgpack"

// restHost is the primary ably host.
restHost = "rest.ably.io"
// realtimeHost is the primary ably host.
realtimeHost = "realtime.ably.io"
// defaultEndpoint is the default routing policy used to connect to Ably
defaultEndpoint = "main"
defaultPrimaryHost = "main.realtime.ably.net" // REC1a

Port = 80
TLSPort = 443
maxMessageSize = 65536 // 64kb, default value TO3l8
Expand All @@ -37,11 +37,12 @@ const (
)

var defaultOptions = clientOptions{
RESTHost: restHost,
FallbackHosts: defaultFallbackHosts(),
Endpoint: defaultEndpoint,
RESTHost: defaultPrimaryHost,
FallbackHosts: getEndpointFallbackHosts(defaultEndpoint), // REC2c1
HTTPMaxRetryCount: 3,
HTTPRequestTimeout: 10 * time.Second,
RealtimeHost: realtimeHost,
RealtimeHost: defaultPrimaryHost,
TimeoutDisconnect: 30 * time.Second,
ConnectionStateTTL: 120 * time.Second,
RealtimeRequestTimeout: 10 * time.Second, // DF1b
Expand All @@ -58,23 +59,31 @@ var defaultOptions = clientOptions{
LogLevel: LogWarning, // RSC2
}

func defaultFallbackHosts() []string {
return []string{
"a.ably-realtime.com",
"b.ably-realtime.com",
"c.ably-realtime.com",
"d.ably-realtime.com",
"e.ably-realtime.com",
func getPrimaryHost(root string) string {
// REC1b3
if strings.HasPrefix(root, "nonprod:") {
root := strings.TrimPrefix(root, "nonprod:")
return fmt.Sprintf("%s.realtime.ably-nonprod.net", root)
}
return fmt.Sprintf("%s.realtime.ably.net", root)
}

func getEndpointFallbackHosts(endpoint string) []string {
if strings.HasPrefix(endpoint, "nonprod:") { // REC2c3
root := strings.TrimPrefix(endpoint, "nonprod:")
return endpointFallbacks(root, "ably-realtime-nonprod.com")
}
return endpointFallbacks(endpoint, "ably-realtime.com") // REC2c4
}

func getEnvFallbackHosts(env string) []string {
// endpointFallbacks generates a list of fallback hosts based on the given namespace and root.
func endpointFallbacks(root, domain string) []string {
return []string{
fmt.Sprintf("%s-%s", env, "a-fallback.ably-realtime.com"),
fmt.Sprintf("%s-%s", env, "b-fallback.ably-realtime.com"),
fmt.Sprintf("%s-%s", env, "c-fallback.ably-realtime.com"),
fmt.Sprintf("%s-%s", env, "d-fallback.ably-realtime.com"),
fmt.Sprintf("%s-%s", env, "e-fallback.ably-realtime.com"),
fmt.Sprintf("%s.a.fallback.%s", root, domain),
fmt.Sprintf("%s.b.fallback.%s", root, domain),
fmt.Sprintf("%s.c.fallback.%s", root, domain),
fmt.Sprintf("%s.d.fallback.%s", root, domain),
fmt.Sprintf("%s.e.fallback.%s", root, domain),
}
}

Expand Down Expand Up @@ -244,8 +253,11 @@ type clientOptions struct {
// authOptions Embedded an [ably.authOptions] object (TO3j).
authOptions

// RESTHost enables a non-default Ably host to be specified. For development environments only.
// The default value is rest.ably.io (RSC12, TO3k2).
// Endpoint specifies either a routing policy name or fully qualified domain name to connect to Ably.
Endpoint string

// Deprecated: this property is deprecated and will be removed in a future version.
// If the restHost option is specified the primary domain is the value of the restHost option REC1d1).
RESTHost string

// Deprecated: this property is deprecated and will be removed in a future version.
Expand All @@ -257,12 +269,14 @@ type clientOptions struct {
// please specify them here (RSC15b, RSC15a, TO3k6).
FallbackHosts []string

// RealtimeHost enables a non-default Ably host to be specified for realtime connections.
// For development environments only. The default value is realtime.ably.io (RTC1d, TO3k3).
// Deprecated: this property is deprecated and will be removed in a future version.
// If the realtimeHost option is specified the primary domain is the value of the realtimeHost option (REC1d2).
RealtimeHost string

// Environment enables a custom environment to be used with the Ably service.
// Optional: prefixes both hostname with the environment string (RSC15b, TO3k1).
// Deprecated: this property is deprecated and will be removed in a future version.
// If the deprecated environment option is specified then it defines a production routing policy name [name] (REC1c):
// If any one of the deprecated options restHost, realtimeHost are also specified then the options as a set are invalid (REC1c1).
// Otherwise, the primary domain is [name].realtime.ably.net (REC1c2).
Environment string

// Port is used for non-TLS connections and requests
Expand Down Expand Up @@ -415,6 +429,14 @@ type clientOptions struct {
}

func (opts *clientOptions) validate() error {
// REC1b1
if !empty(opts.Endpoint) && (!empty(opts.Environment) || !empty(opts.RealtimeHost) || !empty(opts.RESTHost) || opts.FallbackHostsUseDefault) {
err := errors.New("invalid client option: cannot use endpoint with any of deprecated options environment, realtimeHost, restHost or FallbackHostsUseDefault")
logger := opts.LogHandler
logger.Printf(LogError, "Invalid client options : %v", err.Error())
return err
}

_, err := opts.getFallbackHosts()
if err != nil {
logger := opts.LogHandler
Expand Down Expand Up @@ -451,16 +473,22 @@ func (opts *clientOptions) activePort() (port int, isDefault bool) {
}

func (opts *clientOptions) getRestHost() string {
if !empty(opts.Endpoint) {
return opts.getHostnameFromEndpoint()
}
if !empty(opts.RESTHost) {
return opts.RESTHost
}
if !opts.isProductionEnvironment() {
return opts.Environment + "-" + defaultOptions.RESTHost
return getPrimaryHost(opts.Environment)
}
return defaultOptions.RESTHost
}

func (opts *clientOptions) getRealtimeHost() string {
if !empty(opts.Endpoint) {
return opts.getHostnameFromEndpoint()
}
if !empty(opts.RealtimeHost) {
return opts.RealtimeHost
}
Expand All @@ -470,11 +498,29 @@ func (opts *clientOptions) getRealtimeHost() string {
return opts.RESTHost
}
if !opts.isProductionEnvironment() {
return opts.Environment + "-" + defaultOptions.RealtimeHost
return getPrimaryHost(opts.Environment)
}
return defaultOptions.RealtimeHost
}

// isEndpointFQDN returns true if the given endpoint is a hostname, which may
// be an IPv4 address, IPv6 address or localhost
func isEndpointFQDN(endpoint string) bool {
return strings.Contains(endpoint, ".") || strings.Contains(endpoint, "::") || endpoint == "localhost"
}

// REC1b
func (opts *clientOptions) getHostnameFromEndpoint() string {
endpoint := opts.Endpoint
if empty(endpoint) {
return defaultPrimaryHost
}
if isEndpointFQDN(endpoint) { // REC1b2
return endpoint
}
return getPrimaryHost(endpoint) // REC1b4
}

func empty(s string) bool {
return len(strings.TrimSpace(s)) == 0
}
Expand Down Expand Up @@ -506,6 +552,16 @@ func (opts *clientOptions) realtimeURL(realtimeHost string) (realtimeUrl string)
}

func (opts *clientOptions) getFallbackHosts() ([]string, error) {
if !empty(opts.Endpoint) {
if opts.FallbackHosts == nil {
if isEndpointFQDN(opts.Endpoint) { // REC2c2
return opts.FallbackHosts, nil
}
return getEndpointFallbackHosts(opts.Endpoint), nil
}
return opts.FallbackHosts, nil //REC2a2
}

logger := opts.LogHandler
_, isDefaultPort := opts.activePort()
if opts.FallbackHostsUseDefault {
Expand All @@ -525,7 +581,7 @@ func (opts *clientOptions) getFallbackHosts() ([]string, error) {
if opts.isProductionEnvironment() {
return defaultOptions.FallbackHosts, nil
}
return getEnvFallbackHosts(opts.Environment), nil
return getEndpointFallbackHosts(opts.Environment), nil // REC2c5
}
return opts.FallbackHosts, nil
}
Expand Down Expand Up @@ -1070,9 +1126,23 @@ func WithEchoMessages(echo bool) ClientOption {
}
}

// WithEnvironment is used for setting Environment using [ably.ClientOption].
// Environment enables a custom environment to be used with the Ably service.
// Optional: prefixes both hostname with the environment string (RSC15b, TO3k1).
// WithEndpoint sets a custom endpoint for connecting to the Ably service (see
// [Platform Customization] for more information).
//
// [Platform Customization]: https://ably.com/docs/platform-customization
func WithEndpoint(env string) ClientOption {
return func(os *clientOptions) {
os.Endpoint = env
}
}

// WithEnvironment sets a custom endpoint for connecting to the Ably service
// (see [Platform Customization] for more information).
//
// Deprecated: this option is deprecated and will be removed in a future
// version.
//
// [Platform Customization]: https://ably.com/docs/platform-customization
func WithEnvironment(env string) ClientOption {
return func(os *clientOptions) {
os.Environment = env
Expand Down Expand Up @@ -1130,6 +1200,9 @@ func WithQueueMessages(queue bool) ClientOption {
// WithRESTHost is used for setting RESTHost using [ably.ClientOption].
// RESTHost enables a non-default Ably host to be specified. For development environments only.
// The default value is rest.ably.io (RSC12, TO3k2).
//
// Deprecated: this option is deprecated and will be removed in a future
// version.
func WithRESTHost(host string) ClientOption {
return func(os *clientOptions) {
os.RESTHost = host
Expand All @@ -1149,6 +1222,9 @@ func WithHTTPRequestTimeout(timeout time.Duration) ClientOption {
// WithRealtimeHost is used for setting RealtimeHost using [ably.ClientOption].
// RealtimeHost enables a non-default Ably host to be specified for realtime connections.
// For development environments only. The default value is realtime.ably.io (RTC1d, TO3k3).
//
// Deprecated: this option is deprecated and will be removed in a future
// version.
func WithRealtimeHost(host string) ClientOption {
return func(os *clientOptions) {
os.RealtimeHost = host
Expand Down Expand Up @@ -1331,6 +1407,7 @@ func WithInsecureAllowBasicAuthWithoutTLS() ClientOption {
func applyOptionsWithDefaults(opts ...ClientOption) *clientOptions {
to := defaultOptions
// No need to set hosts by default
to.Endpoint = ""
to.RESTHost = ""
to.RealtimeHost = ""
to.FallbackHosts = nil
Expand Down
Loading
Loading