From 84c492a7a8bc9bb8d3fe6dbbcf52e80a353c6e65 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 31 Jan 2023 23:55:28 -0700 Subject: [PATCH 01/15] bump to lens with ethermint/injective protos --- go.mod | 2 ++ go.sum | 13 +++---------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index aaf55305a..f8c53cf46 100644 --- a/go.mod +++ b/go.mod @@ -181,3 +181,5 @@ require ( pgregory.net/rapid v0.5.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 diff --git a/go.sum b/go.sum index 8d892b17f..a48e09f27 100644 --- a/go.sum +++ b/go.sum @@ -318,12 +318,6 @@ github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZg github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.1-0.20201022092350-68b0159b7869 h1:kRpU4zq+Pzh4feET49aEWPOzwQy3U2SsbZEQ7QEcif0= github.com/gogo/googleapis v1.4.1-0.20201022092350-68b0159b7869/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c= -github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= -github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= -github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= -github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= @@ -532,8 +526,6 @@ github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b h1:FQ7+9fxhyp82ks9vAuy github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj4XXDkbjdmlbI505rUPBs6WBMYg2pXks= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= -github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= @@ -716,6 +708,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/regen-network/gocuke v0.6.2 h1:pHviZ0kKAq2U2hN2q3smKNxct6hS0mGByFMHGnWA97M= +github.com/regen-network/protobuf v1.3.3-alpha.regen.1 h1:OHEc+q5iIAXpqiqFKeLpu5NwTIkVXUs48vFMwzqpqY4= +github.com/regen-network/protobuf v1.3.3-alpha.regen.1/go.mod h1:2DjTFR1HhMQhiWC5sZ4OhQ3+NtdbZ6oBDKQwq5Ou+FI= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1092,10 +1086,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -1208,6 +1200,7 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200324203455-a04cca1dde73/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= From 5808ec06361ab4147c1ba771420ae76a55bcc9c1 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 00:22:22 -0700 Subject: [PATCH 02/15] multiple txs per block --- go.mod | 2 - go.sum | 13 +- relayer/chains/cosmos/provider.go | 6 +- relayer/chains/cosmos/tx.go | 239 +++++++++++++++- relayer/processor/path_end_runtime.go | 8 +- relayer/processor/path_processor_internal.go | 271 +++++++++++++------ relayer/processor/types_internal.go | 37 +-- relayer/provider/provider.go | 3 +- 8 files changed, 455 insertions(+), 124 deletions(-) diff --git a/go.mod b/go.mod index f8c53cf46..aaf55305a 100644 --- a/go.mod +++ b/go.mod @@ -181,5 +181,3 @@ require ( pgregory.net/rapid v0.5.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) - -replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 diff --git a/go.sum b/go.sum index a48e09f27..8d892b17f 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,12 @@ github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZg github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.4.1-0.20201022092350-68b0159b7869 h1:kRpU4zq+Pzh4feET49aEWPOzwQy3U2SsbZEQ7QEcif0= github.com/gogo/googleapis v1.4.1-0.20201022092350-68b0159b7869/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= @@ -526,6 +532,8 @@ github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b h1:FQ7+9fxhyp82ks9vAuy github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj4XXDkbjdmlbI505rUPBs6WBMYg2pXks= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= @@ -708,8 +716,6 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/regen-network/gocuke v0.6.2 h1:pHviZ0kKAq2U2hN2q3smKNxct6hS0mGByFMHGnWA97M= -github.com/regen-network/protobuf v1.3.3-alpha.regen.1 h1:OHEc+q5iIAXpqiqFKeLpu5NwTIkVXUs48vFMwzqpqY4= -github.com/regen-network/protobuf v1.3.3-alpha.regen.1/go.mod h1:2DjTFR1HhMQhiWC5sZ4OhQ3+NtdbZ6oBDKQwq5Ou+FI= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1086,8 +1092,10 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -1200,7 +1208,6 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200324203455-a04cca1dde73/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index 692e73628..1e74082c3 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -134,10 +134,14 @@ func (h CosmosIBCHeader) ConsensusState() ibcexported.ConsensusState { return &tmclient.ConsensusState{ Timestamp: h.SignedHeader.Time, Root: commitmenttypes.NewMerkleRoot(h.SignedHeader.AppHash), - NextValidatorsHash: h.ValidatorSet.Hash(), + NextValidatorsHash: h.SignedHeader.NextValidatorsHash, } } +func (h CosmosIBCHeader) NextValidatorsHash() []byte { + return h.SignedHeader.NextValidatorsHash +} + func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig { return cc.PCfg } diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index aa5fa303f..342f6b8a5 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -26,19 +26,23 @@ import ( tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride" "github.com/cosmos/relayer/v2/relayer/provider" + lensclient "github.com/strangelove-ventures/lens/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/light" + coretypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/zap" ) // Variables used for retries var ( - rtyAttNum = uint(5) - rtyAtt = retry.Attempts(rtyAttNum) - rtyDel = retry.Delay(time.Millisecond * 400) - rtyErr = retry.LastErrorOnly(true) - numRegex = regexp.MustCompile("[0-9]+") + rtyAttNum = uint(5) + rtyAtt = retry.Attempts(rtyAttNum) + rtyDel = retry.Delay(time.Millisecond * 400) + rtyErr = retry.LastErrorOnly(true) + numRegex = regexp.MustCompile("[0-9]+") + defaultBroadcastWaitTimeout = 10 * time.Minute + errUnknown = "unknown" ) // Default IBC settings @@ -210,6 +214,222 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela return rlyResp, true, nil } +func (cc *CosmosProvider) SendMessagesToMempool(ctx context.Context, msgs []provider.RelayerMessage, memo string) error { + var fees sdk.Coins + + // Guard against account sequence number mismatch errors by locking for the specific wallet for + // the account sequence query all the way through the transaction broadcast success/fail. + cc.txMu.Lock() + defer cc.txMu.Unlock() + + txBytes, sequence, f, err := cc.buildMessages(ctx, msgs, memo) + fees = f + if err != nil { + errMsg := err.Error() + + // Account sequence mismatch errors can happen on the simulated transaction also. + if strings.Contains(errMsg, sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + return err + } + + // Occasionally the client will be out of date, + // and we will receive an RPC error like: + // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: channel handshake open try failed: failed channel state verification for client (07-tendermint-0): client state height < proof height ({0 58} < {0 59}), please ensure the client has been updated: invalid height: invalid request + // or + // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: receive packet verification failed: couldn't verify counterparty packet commitment: failed packet commitment verification for client (07-tendermint-0): client state height < proof height ({0 142} < {0 143}), please ensure the client has been updated: invalid height: invalid request + // + // No amount of retrying will fix this. The client needs to be updated. + // Unfortunately, the entirety of that error message originates on the server, + // so there is not an obvious way to access a more structured error value. + // + // If this logic should ever fail due to the string values of the error messages on the server + // changing from the client's version of the library, + // at worst this will run more unnecessary retries. + if strings.Contains(errMsg, sdkerrors.ErrInvalidHeight.Error()) { + cc.log.Info( + "Skipping retry due to invalid height error", + zap.Error(err), + ) + return err + } + + // On a fast retry, it is possible to have an invalid connection state. + // Retrying that message also won't fix the underlying state mismatch, + // so log it and mark it as unrecoverable. + if strings.Contains(errMsg, conntypes.ErrInvalidConnectionState.Error()) { + cc.log.Info( + "Skipping retry due to invalid connection state", + zap.Error(err), + ) + return err + } + + // Also possible to have an invalid channel state on a fast retry. + if strings.Contains(errMsg, chantypes.ErrInvalidChannelState.Error()) { + cc.log.Info( + "Skipping retry due to invalid channel state", + zap.Error(err), + ) + return err + } + + // If the message reported an invalid proof, back off. + // NOTE: this error string ("invalid proof") will match other errors too, + // but presumably it is safe to stop retrying in those cases as well. + if strings.Contains(errMsg, commitmenttypes.ErrInvalidProof.Error()) { + cc.log.Info( + "Skipping retry due to invalid proof", + zap.Error(err), + ) + return err + } + + // Invalid packets should not be retried either. + if strings.Contains(errMsg, chantypes.ErrInvalidPacket.Error()) { + cc.log.Info( + "Skipping retry due to invalid packet", + zap.Error(err), + ) + return err + } + + return err + } + + if err := cc.broadcastTx(ctx, txBytes, msgs, fees, defaultBroadcastWaitTimeout); err != nil { + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + } + + return err + } + + // we had a potentially successful tx with this sequence, so update it to the next + cc.updateNextAccountSequence(sequence + 1) + + return nil +} + +// broadcastTx broadcasts a TX and then waits for the TX to be included in the block. +// The waiting will either be canceled after the waitTimeout has run out or the context +// exited. +func (cc *CosmosProvider) broadcastTx( + ctx context.Context, + tx []byte, + msgs []provider.RelayerMessage, + fees sdk.Coins, + waitTimeout time.Duration, +) error { + // broadcast tx sync waits for check tx to pass + // NOTE: this can return w/ a timeout + // need to investigate if this will leave the tx + // in the mempool or we can retry the broadcast at that + // point + + syncRes, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx) + if err != nil { + if syncRes == nil { + // There are some cases where BroadcastTxSync will return an error but the associated + // ResultBroadcastTx will be nil. + return err + } + rlyResp := &provider.RelayerTxResponse{ + TxHash: syncRes.Hash.String(), + Code: syncRes.Code, + } + cc.LogFailedTx(rlyResp, nil, msgs) + return err + } + + // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace + // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go + err = errors.Unwrap(sdkerrors.ABCIError(syncRes.Codespace, syncRes.Code, "error broadcasting transaction")) + if err.Error() != errUnknown { + return err + } + + // TODO: maybe we need to check if the node has tx indexing enabled? + // if not, we need to find a new way to block until inclusion in a block + + go func() { + ctx := context.Background() // TODO: update + resp, err := cc.waitForBlockInclusion(ctx, syncRes, waitTimeout) + if err != nil { + cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) + return + } + rlyResp := &provider.RelayerTxResponse{ + Height: resp.Height, + TxHash: resp.TxHash, + Code: resp.Code, + Data: resp.Data, + Events: parseEventsFromTxResponse(resp), + } + + // transaction was executed, log the success or failure using the tx response code + // NOTE: error is nil, logic should use the returned error to determine if the + // transaction was successfully executed. + cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) + + if resp.Code != 0 { + cc.LogFailedTx(rlyResp, nil, msgs) + return + } + + cc.LogSuccessTx(resp, msgs) + }() + + return nil +} + +func (cc *CosmosProvider) waitForBlockInclusion( + ctx context.Context, + syncRes *coretypes.ResultBroadcastTx, + waitTimeout time.Duration, +) (*sdk.TxResponse, error) { + exitAfter := time.After(waitTimeout) + for { + select { + case <-exitAfter: + return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, lensclient.ErrTimeoutAfterWaitingForTxBroadcast) + // TODO: this is potentially less than optimal and may + // be better as something configurable + case <-time.After(time.Millisecond * 100): + resTx, err := cc.ChainClient.RPCClient.Tx(ctx, syncRes.Hash, false) + if err == nil { + return cc.mkTxResult(resTx) + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } + +} + +func (cc *CosmosProvider) mkTxResult(resTx *coretypes.ResultTx) (*sdk.TxResponse, error) { + txb, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx) + if err != nil { + return nil, err + } + p, ok := txb.(intoAny) + if !ok { + return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txb) + } + any := p.AsAny() + // TODO: maybe don't make up the time here? + // we can fetch the block for the block time buts thats + // more round trips + // TODO: logs get rendered as base64 encoded, need to fix this somehow + return sdk.NewResponseResultTx(resTx, any, time.Now().Format(time.RFC3339)), nil +} + +// Deprecated: this interface is used only internally for scenario we are +// deprecating (StdTxConfig support) +type intoAny interface { + AsAny() *codectypes.Any +} + func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent { var events []provider.RelayerEvent @@ -243,6 +463,13 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel txf = txf.WithMemo(memo) } + sequence := txf.Sequence() + cc.updateNextAccountSequence(sequence) + if sequence < cc.nextAccountSeq { + sequence = cc.nextAccountSeq + txf = txf.WithSequence(sequence) + } + // TODO: Make this work with new CalculateGas method // TODO: This is related to GRPC client stuff? // https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297 @@ -296,7 +523,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel return nil, 0, sdk.Coins{}, err } - return txBytes, txf.Sequence(), fees, nil + return txBytes, sequence, fees, nil } // handleAccountSequenceMismatchError will parse the error string, e.g.: diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index beb33c0e0..0f5877001 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -658,7 +658,7 @@ func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTra channelProcessingCache[sequence] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } } @@ -680,7 +680,7 @@ func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMess msgProcessCache[connectionKey] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } } @@ -702,7 +702,7 @@ func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToT msgProcessCache[channelKey] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } } @@ -718,6 +718,6 @@ func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessag pathEnd.clientICQProcessing[queryID] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index c443f2004..156bd72f0 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1,6 +1,7 @@ package processor import ( + "bytes" "context" "errors" "fmt" @@ -428,6 +429,10 @@ func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst * clientID := dst.info.ClientID clientConsensusHeight := dst.clientState.ConsensusHeight trustedConsensusHeight := dst.clientTrustedState.ClientState.ConsensusHeight + var trustedNextValidatorsHash []byte + if dst.clientTrustedState.IBCHeader != nil { + trustedNextValidatorsHash = dst.clientTrustedState.IBCHeader.NextValidatorsHash() + } // If the client state height is not equal to the client trusted state height and the client state height is // the latest block, we cannot send a MsgUpdateClient until another block is observed on the counterparty. @@ -454,9 +459,10 @@ func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst * IBCHeader: header, } trustedConsensusHeight = clientConsensusHeight + trustedNextValidatorsHash = header.NextValidatorsHash() } - if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight { + if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight && !bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) { return nil, fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ "need to wait for next block's header before we can assemble and send a new MsgUpdateClient", trustedConsensusHeight.RevisionHeight) @@ -485,12 +491,22 @@ func (pp *PathProcessor) updateClientTrustedState(src *pathEndRuntime, dst *path // need to assemble new trusted state ibcHeader, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight+1] if !ok { + if ibcHeaderCurrent, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight]; ok { + if bytes.Equal(dst.clientTrustedState.IBCHeader.NextValidatorsHash(), ibcHeaderCurrent.NextValidatorsHash()) { + src.clientTrustedState = provider.ClientTrustedState{ + ClientState: src.clientState, + IBCHeader: ibcHeaderCurrent, + } + return + } + } pp.log.Debug("No cached IBC header for client trusted height", zap.String("chain_id", src.info.ChainID), zap.String("client_id", src.info.ClientID), zap.Uint64("height", src.clientState.ConsensusHeight.RevisionHeight+1), ) return + } src.clientTrustedState = provider.ClientTrustedState{ ClientState: src.clientState, @@ -740,11 +756,12 @@ func (pp *PathProcessor) assembleMessage( switch m := msg.(type) { case packetIBCMessage: message, err = pp.assemblePacketMessage(ctx, m, src, dst) - om.pktMsgs[i] = packetMessageToTrack{ - msg: m, - assembled: err == nil, + tracker := packetMessageToTrack{ + msg: m, } if err == nil { + tracker.m = message + om.pktMsgs[i] = tracker dst.log.Debug("Will send packet message", zap.String("event_type", m.eventType), zap.Uint64("sequence", m.info.Sequence), @@ -756,11 +773,10 @@ func (pp *PathProcessor) assembleMessage( } case connectionIBCMessage: message, err = pp.assembleConnectionMessage(ctx, m, src, dst) - om.connMsgs[i] = connectionMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := connectionMessageToTrack{msg: m} if err == nil { + tracker.m = message + om.connMsgs[i] = tracker dst.log.Debug("Will send connection message", zap.String("event_type", m.eventType), zap.String("connection_id", m.info.ConnID), @@ -768,11 +784,10 @@ func (pp *PathProcessor) assembleMessage( } case channelIBCMessage: message, err = pp.assembleChannelMessage(ctx, m, src, dst) - om.chanMsgs[i] = channelMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := channelMessageToTrack{msg: m} if err == nil { + tracker.m = message + om.chanMsgs[i] = tracker dst.log.Debug("Will send channel message", zap.String("event_type", m.eventType), zap.String("channel_id", m.info.ChannelID), @@ -781,11 +796,10 @@ func (pp *PathProcessor) assembleMessage( } case clientICQMessage: message, err = pp.assembleClientICQMessage(ctx, m, src, dst) - om.clientICQMsgs[i] = clientICQMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := clientICQMessageToTrack{msg: m} if err == nil { + tracker.m = message + om.clientICQMsgs[i] = tracker dst.log.Debug("Will send ICQ message", zap.String("type", m.info.Type), zap.String("query_id", string(m.info.QueryID)), @@ -796,7 +810,6 @@ func (pp *PathProcessor) assembleMessage( pp.log.Error("Error assembling channel message", zap.Error(err)) return } - om.Append(message) } func (pp *PathProcessor) assembleAndSendMessages( @@ -831,136 +844,228 @@ func (pp *PathProcessor) assembleAndSendMessages( return nil } } - om := outgoingMessages{ - msgs: make( - []provider.RelayerMessage, - 0, - len(messages.packetMessages)+len(messages.connectionMessages)+len(messages.channelMessages)+len(messages.clientICQMessages), - ), - } + om := outgoingMessages{} msgUpdateClient, err := pp.assembleMsgUpdateClient(ctx, src, dst) if err != nil { return err } - om.Append(msgUpdateClient) + om.msgUpdateClient = msgUpdateClient // Each assembleMessage call below will make a query on the source chain, so these operations can run in parallel. var wg sync.WaitGroup - // connection messages are highest priority om.connMsgs = make([]connectionMessageToTrack, len(messages.connectionMessages)) for i, msg := range messages.connectionMessages { wg.Add(1) go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) } - wg.Wait() + om.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) + for i, msg := range messages.channelMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } - if len(om.msgs) == 1 { - om.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) - // only assemble and send channel handshake messages if there are no conn handshake messages - // this prioritizes connection handshake messages, useful if a connection handshake needs to occur before a channel handshake - for i, msg := range messages.channelMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } + om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) + for i, msg := range messages.clientICQMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } - wg.Wait() + om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) + for i, msg := range messages.packetMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) } - if len(om.msgs) == 1 { - om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) - // only assemble and send ICQ messages if there are no conn or chan handshake messages - for i, msg := range messages.clientICQMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) - } + wg.Wait() - wg.Wait() + successCount := 0 + for _, m := range om.connMsgs { + if m.m != nil { + successCount++ + } } - - if len(om.msgs) == 1 { - om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) - // only assemble and send packet messages if there are no handshake messages - for i, msg := range messages.packetMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + for _, m := range om.chanMsgs { + if m.m != nil { + successCount++ + } + } + for _, m := range om.clientICQMsgs { + if m.m != nil { + successCount++ + } + } + for _, m := range om.pktMsgs { + if m.m != nil { + successCount++ } - - wg.Wait() } - if len(om.msgs) == 1 && !needsClientUpdate { + if successCount == 0 && !needsClientUpdate { // only msgUpdateClient, don't need to send return errors.New("all messages failed to assemble") } - for _, m := range om.connMsgs { - dst.trackProcessingConnectionMessage(m) + for _, t := range om.connMsgs { + dst.trackProcessingConnectionMessage(t) + if t.m == nil { + continue + } + go pp.sendConnectionMessage(ctx, src, dst, om.msgUpdateClient, t) } - for _, m := range om.chanMsgs { - dst.trackProcessingChannelMessage(m) + for _, t := range om.chanMsgs { + dst.trackProcessingChannelMessage(t) + if t.m == nil { + continue + } + go pp.sendChannelMessage(ctx, src, dst, om.msgUpdateClient, t) } - for _, m := range om.clientICQMsgs { - dst.trackProcessingClientICQMessage(m) + for _, t := range om.clientICQMsgs { + dst.trackProcessingClientICQMessage(t) + if t.m == nil { + continue + } + go pp.sendClientICQMessage(ctx, src, dst, om.msgUpdateClient, t) } - for _, m := range om.pktMsgs { - dst.trackProcessingPacketMessage(m) + for _, t := range om.pktMsgs { + dst.trackProcessingPacketMessage(t) + if t.m == nil { + continue + } + go pp.sendPacketMessage(ctx, src, dst, om.msgUpdateClient, t) } - go pp.sendMessages(ctx, src, dst, &om, pp.memo) - return nil } -func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om *outgoingMessages, memo string) { +func (pp *PathProcessor) sendPacketMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker packetMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - _, txSuccess, err := dst.chainProvider.SendMessages(ctx, om.msgs, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) if err != nil { if errors.Is(err, chantypes.ErrRedundantTx) { - pp.log.Debug("Packet(s) already handled by another relayer", + pp.log.Debug("Packet already handled by another relayer", zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), + zap.String("event_type", tracker.msg.eventType), zap.Error(err), ) return } - pp.log.Error("Error sending messages", + pp.log.Error("Error sending packet message", zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), + zap.String("event_type", tracker.msg.eventType), zap.Error(err), ) return } - if !txSuccess { - dst.log.Error("Error sending messages, transaction was not successful") + + // TODO tx not in committed block yet, so can't guarantee that tx was successful + + if pp.metrics == nil { return } - if pp.metrics == nil { + var channel, port string + if tracker.msg.eventType == chantypes.EventTypeRecvPacket { + channel = tracker.msg.info.DestChannel + port = tracker.msg.info.DestPort + } else { + channel = tracker.msg.info.SourceChannel + port = tracker.msg.info.SourcePort + } + pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) +} + +func (pp *PathProcessor) sendChannelMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker channelMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + if err != nil { + pp.log.Error("Error sending channel handshake message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.String("event_type", tracker.msg.eventType), + zap.Error(err), + ) return } - for _, m := range om.pktMsgs { - var channel, port string - if m.msg.eventType == chantypes.EventTypeRecvPacket { - channel = m.msg.info.DestChannel - port = m.msg.info.DestPort - } else { - channel = m.msg.info.SourceChannel - port = m.msg.info.SourcePort - } - pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, m.msg.eventType) +} + +func (pp *PathProcessor) sendConnectionMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker connectionMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + if err != nil { + pp.log.Error("Error sending connection handshake message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.String("event_type", tracker.msg.eventType), + zap.Error(err), + ) + return + } +} + +func (pp *PathProcessor) sendClientICQMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker clientICQMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + if err != nil { + pp.log.Error("Error sending client ICQ message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.String("event_type", tracker.msg.info.Type), + zap.Error(err), + ) + return } } diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index a949a6aad..573a47f26 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -3,7 +3,6 @@ package processor import ( "strconv" "strings" - "sync" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" @@ -207,12 +206,11 @@ func channelInfoChannelKey(c provider.ChannelInfo) ChannelKey { // outgoingMessages is a slice of relayer messages that can be // appended to concurrently. type outgoingMessages struct { - mu sync.Mutex - msgs []provider.RelayerMessage - pktMsgs []packetMessageToTrack - connMsgs []connectionMessageToTrack - chanMsgs []channelMessageToTrack - clientICQMsgs []clientICQMessageToTrack + msgUpdateClient provider.RelayerMessage + pktMsgs []packetMessageToTrack + connMsgs []connectionMessageToTrack + chanMsgs []channelMessageToTrack + clientICQMsgs []clientICQMessageToTrack } // MarshalLogObject satisfies the zapcore.ObjectMarshaler interface @@ -247,33 +245,24 @@ func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } -// Append acquires a lock on om's mutex and then appends msg. -// When there are no more possible concurrent calls to Append, -// it is safe to directly access om.msgs. -func (om *outgoingMessages) Append(msg provider.RelayerMessage) { - om.mu.Lock() - defer om.mu.Unlock() - om.msgs = append(om.msgs, msg) -} - type packetMessageToTrack struct { - msg packetIBCMessage - assembled bool + msg packetIBCMessage + m provider.RelayerMessage } type connectionMessageToTrack struct { - msg connectionIBCMessage - assembled bool + msg connectionIBCMessage + m provider.RelayerMessage } type channelMessageToTrack struct { - msg channelIBCMessage - assembled bool + msg channelIBCMessage + m provider.RelayerMessage } type clientICQMessageToTrack struct { - msg clientICQMessage - assembled bool + msg clientICQMessage + m provider.RelayerMessage } // orderFromString parses a string into a channel order byte. diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 3abf0235d..4288f6b6f 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -49,7 +49,7 @@ type LatestBlock struct { type IBCHeader interface { Height() uint64 ConsensusState() ibcexported.ConsensusState - // require conversion implementation for third party chains + NextValidatorsHash() []byte } // ClientState holds the current state of a client from a single chain's perspective @@ -367,6 +367,7 @@ type ChainProvider interface { SendMessage(ctx context.Context, msg RelayerMessage, memo string) (*RelayerTxResponse, bool, error) SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error) + SendMessagesToMempool(ctx context.Context, msgs []RelayerMessage, memo string) error ChainName() string ChainId() string From a23f1ecfff8c8337304e023960b45622ff77c9eb Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 00:39:31 -0700 Subject: [PATCH 03/15] Fix mockIBCHeader --- relayer/processor/types_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/relayer/processor/types_test.go b/relayer/processor/types_test.go index 28dc697c7..39224dc74 100644 --- a/relayer/processor/types_test.go +++ b/relayer/processor/types_test.go @@ -12,6 +12,7 @@ type mockIBCHeader struct{} func (h mockIBCHeader) Height() uint64 { return 0 } func (h mockIBCHeader) ConsensusState() ibcexported.ConsensusState { return nil } +func (h mockIBCHeader) NextValidatorsHash() []byte { return nil } func TestIBCHeaderCachePrune(t *testing.T) { cache := make(processor.IBCHeaderCache) From e1a5fa391d06fa946532b3ee10780670c11617fc Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 00:46:37 -0700 Subject: [PATCH 04/15] Fix NPE --- relayer/processor/path_processor_internal.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 156bd72f0..b0b91c0de 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -491,14 +491,14 @@ func (pp *PathProcessor) updateClientTrustedState(src *pathEndRuntime, dst *path // need to assemble new trusted state ibcHeader, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight+1] if !ok { - if ibcHeaderCurrent, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight]; ok { - if bytes.Equal(dst.clientTrustedState.IBCHeader.NextValidatorsHash(), ibcHeaderCurrent.NextValidatorsHash()) { - src.clientTrustedState = provider.ClientTrustedState{ - ClientState: src.clientState, - IBCHeader: ibcHeaderCurrent, - } - return + if ibcHeaderCurrent, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight]; ok && + dst.clientTrustedState.IBCHeader != nil && + bytes.Equal(dst.clientTrustedState.IBCHeader.NextValidatorsHash(), ibcHeaderCurrent.NextValidatorsHash()) { + src.clientTrustedState = provider.ClientTrustedState{ + ClientState: src.clientState, + IBCHeader: ibcHeaderCurrent, } + return } pp.log.Debug("No cached IBC header for client trusted height", zap.String("chain_id", src.info.ChainID), From aa9894d9fd11e5d82af6b258e61636e7d8bf54c5 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 00:56:05 -0700 Subject: [PATCH 05/15] Fix UpdateFeesSpent race --- relayer/chains/cosmos/tx.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index 342f6b8a5..c39c58691 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -349,6 +349,8 @@ func (cc *CosmosProvider) broadcastTx( return err } + cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) + // TODO: maybe we need to check if the node has tx indexing enabled? // if not, we need to find a new way to block until inclusion in a block @@ -370,7 +372,6 @@ func (cc *CosmosProvider) broadcastTx( // transaction was executed, log the success or failure using the tx response code // NOTE: error is nil, logic should use the returned error to determine if the // transaction was successfully executed. - cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) if resp.Code != 0 { cc.LogFailedTx(rlyResp, nil, msgs) From f20bfc35b616090242950dc11387c877056f9540 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 16:18:24 -0700 Subject: [PATCH 06/15] Fix needsClientUpdate --- interchaintest/client_threshold_test.go | 7 ++-- relayer/processor/path_processor_internal.go | 35 +++++++++++++++++--- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/interchaintest/client_threshold_test.go b/interchaintest/client_threshold_test.go index 8b608ca68..5c7d92c1c 100644 --- a/interchaintest/client_threshold_test.go +++ b/interchaintest/client_threshold_test.go @@ -81,9 +81,10 @@ func TestScenarioClientThresholdUpdate(t *testing.T) { // Build interchain require.NoError(t, ic.Build(ctx, eRep, interchaintest.InterchainBuildOptions{ - TestName: t.Name(), - Client: client, - NetworkID: network, + TestName: t.Name(), + Client: client, + NetworkID: network, + BlockDatabaseFile: interchaintest.DefaultBlockDatabaseFilepath(), })) t.Cleanup(func() { _ = ic.Close() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index b0b91c0de..72a8056cc 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -902,11 +902,6 @@ func (pp *PathProcessor) assembleAndSendMessages( } } - if successCount == 0 && !needsClientUpdate { - // only msgUpdateClient, don't need to send - return errors.New("all messages failed to assemble") - } - for _, t := range om.connMsgs { dst.trackProcessingConnectionMessage(t) if t.m == nil { @@ -939,9 +934,39 @@ func (pp *PathProcessor) assembleAndSendMessages( go pp.sendPacketMessage(ctx, src, dst, om.msgUpdateClient, t) } + if successCount == 0 { + if needsClientUpdate { + go pp.sendClientUpdate(ctx, src, dst, om.msgUpdateClient) + return nil + } + // only msgUpdateClient, don't need to send + return errors.New("all messages failed to assemble") + } + return nil } +func (pp *PathProcessor) sendClientUpdate( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, +) { + ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + err := dst.chainProvider.SendMessagesToMempool(ctx, []provider.RelayerMessage{msgUpdateClient}, pp.memo) + if err != nil { + pp.log.Error("Error sending client update message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + return + } +} + func (pp *PathProcessor) sendPacketMessage( ctx context.Context, src, dst *pathEndRuntime, From 7f672bd143f8617f0c9fae075f67d65f4a927047 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 23:07:06 -0700 Subject: [PATCH 07/15] Make configurable. Cleanup logging --- cmd/chains.go | 2 + interchaintest/relayer.go | 2 + relayer/chains/cosmos/log.go | 5 +- relayer/chains/cosmos/provider.go | 37 +-- relayer/chains/cosmos/tx.go | 249 ++++++++----------- relayer/processor/path_processor_internal.go | 122 +++++++-- relayer/provider/provider.go | 29 ++- 7 files changed, 258 insertions(+), 188 deletions(-) diff --git a/cmd/chains.go b/cmd/chains.go index 92429eb99..8e29ec5b1 100644 --- a/cmd/chains.go +++ b/cmd/chains.go @@ -13,6 +13,7 @@ import ( "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" "github.com/spf13/cobra" registry "github.com/strangelove-ventures/lens/client/chain_registry" "go.uber.org/zap" @@ -476,6 +477,7 @@ func addChainsFromRegistry(ctx context.Context, a *appState, chains []string) er OutputFormat: chainConfig.OutputFormat, SignModeStr: chainConfig.SignModeStr, ExtraCodecs: chainConfig.ExtraCodecs, + Broadcast: provider.BroadcastModeSingle, } prov, err := pcfg.NewProvider( diff --git a/interchaintest/relayer.go b/interchaintest/relayer.go index 6c9be1189..983ae9561 100644 --- a/interchaintest/relayer.go +++ b/interchaintest/relayer.go @@ -14,6 +14,7 @@ import ( "github.com/cosmos/relayer/v2/internal/relayertest" "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" interchaintestcosmos "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos" "github.com/strangelove-ventures/interchaintest/v7/ibc" "github.com/stretchr/testify/require" @@ -77,6 +78,7 @@ func (r *Relayer) AddChainConfiguration(ctx context.Context, _ ibc.RelayerExecRe Timeout: "10s", OutputFormat: "json", SignModeStr: "direct", + Broadcast: provider.BroadcastModeSingle, }, }) diff --git a/relayer/chains/cosmos/log.go b/relayer/chains/cosmos/log.go index 2a85a1d94..75e43a982 100644 --- a/relayer/chains/cosmos/log.go +++ b/relayer/chains/cosmos/log.go @@ -64,7 +64,10 @@ func (cc *CosmosProvider) LogFailedTx(res *provider.RelayerTxResponse, err error } } - if res.Code != 0 && res.Data != "" { + if res.Code != 0 { + if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil { + fields = append(fields, zap.NamedError("sdk_error", sdkErr)) + } fields = append(fields, zap.Object("response", res)) cc.log.Warn( "Sent transaction but received failure response", diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index 1e74082c3..ec579843d 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -31,20 +31,21 @@ var ( const tendermintEncodingThreshold = "v0.37.0-alpha" type CosmosProviderConfig struct { - Key string `json:"key" yaml:"key"` - ChainName string `json:"-" yaml:"-"` - ChainID string `json:"chain-id" yaml:"chain-id"` - RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` - AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` - KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` - GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` - GasPrices string `json:"gas-prices" yaml:"gas-prices"` - MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` - Debug bool `json:"debug" yaml:"debug"` - Timeout string `json:"timeout" yaml:"timeout"` - OutputFormat string `json:"output-format" yaml:"output-format"` - SignModeStr string `json:"sign-mode" yaml:"sign-mode"` - ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` + Key string `json:"key" yaml:"key"` + ChainName string `json:"-" yaml:"-"` + ChainID string `json:"chain-id" yaml:"chain-id"` + RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` + AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` + KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` + GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` + GasPrices string `json:"gas-prices" yaml:"gas-prices"` + MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` + Debug bool `json:"debug" yaml:"debug"` + Timeout string `json:"timeout" yaml:"timeout"` + OutputFormat string `json:"output-format" yaml:"output-format"` + SignModeStr string `json:"sign-mode" yaml:"sign-mode"` + ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` + Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` } func (pc CosmosProviderConfig) Validate() error { @@ -54,6 +55,10 @@ func (pc CosmosProviderConfig) Validate() error { return nil } +func (pc CosmosProviderConfig) BroadcastMode() provider.BroadcastMode { + return pc.Broadcast +} + // NewProvider validates the CosmosProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) { if err := pc.Validate(); err != nil { @@ -71,6 +76,10 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb } pc.ChainName = chainName + if pc.Broadcast == "" { + pc.Broadcast = provider.BroadcastModeBatch + } + return &CosmosProvider{ log: log, ChainClient: *cc, diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index c39c58691..fd51c7490 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -192,11 +192,12 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela } rlyResp := &provider.RelayerTxResponse{ - Height: resp.Height, - TxHash: resp.TxHash, - Code: resp.Code, - Data: resp.Data, - Events: parseEventsFromTxResponse(resp), + Height: resp.Height, + TxHash: resp.TxHash, + Codespace: resp.Codespace, + Code: resp.Code, + Data: resp.Data, + Events: parseEventsFromTxResponse(resp), } // transaction was executed, log the success or failure using the tx response code @@ -214,90 +215,34 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela return rlyResp, true, nil } -func (cc *CosmosProvider) SendMessagesToMempool(ctx context.Context, msgs []provider.RelayerMessage, memo string) error { - var fees sdk.Coins +// SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo. +// This method will return once the transaction has entered the mempool. +// In an async goroutine, will wait for the tx to be included in the block unless asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) SendMessagesToMempool( + ctx context.Context, + msgs []provider.RelayerMessage, + memo string, + asyncCtx context.Context, + asyncCallback func(*provider.RelayerTxResponse, error), +) error { // Guard against account sequence number mismatch errors by locking for the specific wallet for // the account sequence query all the way through the transaction broadcast success/fail. cc.txMu.Lock() defer cc.txMu.Unlock() - txBytes, sequence, f, err := cc.buildMessages(ctx, msgs, memo) - fees = f + txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo) if err != nil { - errMsg := err.Error() - // Account sequence mismatch errors can happen on the simulated transaction also. - if strings.Contains(errMsg, sdkerrors.ErrWrongSequence.Error()) { + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { cc.handleAccountSequenceMismatchError(err) - return err - } - - // Occasionally the client will be out of date, - // and we will receive an RPC error like: - // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: channel handshake open try failed: failed channel state verification for client (07-tendermint-0): client state height < proof height ({0 58} < {0 59}), please ensure the client has been updated: invalid height: invalid request - // or - // rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: receive packet verification failed: couldn't verify counterparty packet commitment: failed packet commitment verification for client (07-tendermint-0): client state height < proof height ({0 142} < {0 143}), please ensure the client has been updated: invalid height: invalid request - // - // No amount of retrying will fix this. The client needs to be updated. - // Unfortunately, the entirety of that error message originates on the server, - // so there is not an obvious way to access a more structured error value. - // - // If this logic should ever fail due to the string values of the error messages on the server - // changing from the client's version of the library, - // at worst this will run more unnecessary retries. - if strings.Contains(errMsg, sdkerrors.ErrInvalidHeight.Error()) { - cc.log.Info( - "Skipping retry due to invalid height error", - zap.Error(err), - ) - return err - } - - // On a fast retry, it is possible to have an invalid connection state. - // Retrying that message also won't fix the underlying state mismatch, - // so log it and mark it as unrecoverable. - if strings.Contains(errMsg, conntypes.ErrInvalidConnectionState.Error()) { - cc.log.Info( - "Skipping retry due to invalid connection state", - zap.Error(err), - ) - return err - } - - // Also possible to have an invalid channel state on a fast retry. - if strings.Contains(errMsg, chantypes.ErrInvalidChannelState.Error()) { - cc.log.Info( - "Skipping retry due to invalid channel state", - zap.Error(err), - ) - return err - } - - // If the message reported an invalid proof, back off. - // NOTE: this error string ("invalid proof") will match other errors too, - // but presumably it is safe to stop retrying in those cases as well. - if strings.Contains(errMsg, commitmenttypes.ErrInvalidProof.Error()) { - cc.log.Info( - "Skipping retry due to invalid proof", - zap.Error(err), - ) - return err - } - - // Invalid packets should not be retried either. - if strings.Contains(errMsg, chantypes.ErrInvalidPacket.Error()) { - cc.log.Info( - "Skipping retry due to invalid packet", - zap.Error(err), - ) - return err } return err } - if err := cc.broadcastTx(ctx, txBytes, msgs, fees, defaultBroadcastWaitTimeout); err != nil { + if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback); err != nil { if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { cc.handleAccountSequenceMismatchError(err) } @@ -305,47 +250,50 @@ func (cc *CosmosProvider) SendMessagesToMempool(ctx context.Context, msgs []prov return err } - // we had a potentially successful tx with this sequence, so update it to the next + // we had a successful tx broadcast with this sequence, so update it to the next cc.updateNextAccountSequence(sequence + 1) return nil } -// broadcastTx broadcasts a TX and then waits for the TX to be included in the block. -// The waiting will either be canceled after the waitTimeout has run out or the context -// exited. +// sdkError will return the Cosmos SDK registered error for a given codespace/code combo if registered, otherwise nil. +func (cc *CosmosProvider) sdkError(codespace string, code uint32) error { + // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace + // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go + err := errors.Unwrap(sdkerrors.ABCIError(codespace, code, "error broadcasting transaction")) + if err.Error() != errUnknown { + return err + } + return nil +} + +// broadcastTx broadcasts a transaction with the given raw bytes and then, in an async goroutine, waits for the tx to be included in the block. +// The wait will end after either the asyncTimeout has run out or the asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. func (cc *CosmosProvider) broadcastTx( - ctx context.Context, - tx []byte, - msgs []provider.RelayerMessage, - fees sdk.Coins, - waitTimeout time.Duration, + ctx context.Context, // context for tx broadcast + tx []byte, // raw tx to be broadcasted + msgs []provider.RelayerMessage, // used for logging only + fees sdk.Coins, // used for metrics + + asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast + asyncTimeout time.Duration, // timeout for waiting for block inclusion + asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion ) error { - // broadcast tx sync waits for check tx to pass - // NOTE: this can return w/ a timeout - // need to investigate if this will leave the tx - // in the mempool or we can retry the broadcast at that - // point - - syncRes, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx) + res, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx) if err != nil { - if syncRes == nil { + if res == nil { // There are some cases where BroadcastTxSync will return an error but the associated // ResultBroadcastTx will be nil. return err } rlyResp := &provider.RelayerTxResponse{ - TxHash: syncRes.Hash.String(), - Code: syncRes.Code, + TxHash: res.Hash.String(), + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data.String(), } - cc.LogFailedTx(rlyResp, nil, msgs) - return err - } - - // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace - // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go - err = errors.Unwrap(sdkerrors.ABCIError(syncRes.Codespace, syncRes.Code, "error broadcasting transaction")) - if err.Error() != errUnknown { + cc.LogFailedTx(rlyResp, err, msgs) return err } @@ -354,39 +302,65 @@ func (cc *CosmosProvider) broadcastTx( // TODO: maybe we need to check if the node has tx indexing enabled? // if not, we need to find a new way to block until inclusion in a block - go func() { - ctx := context.Background() // TODO: update - resp, err := cc.waitForBlockInclusion(ctx, syncRes, waitTimeout) - if err != nil { - cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) - return - } - rlyResp := &provider.RelayerTxResponse{ - Height: resp.Height, - TxHash: resp.TxHash, - Code: resp.Code, - Data: resp.Data, - Events: parseEventsFromTxResponse(resp), - } + go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallback) - // transaction was executed, log the success or failure using the tx response code - // NOTE: error is nil, logic should use the returned error to determine if the - // transaction was successfully executed. + return nil +} - if resp.Code != 0 { - cc.LogFailedTx(rlyResp, nil, msgs) - return +// waitForTx waits for a transaction to be included in a block, logs success/fail, then invokes callback. +// This is intended to be called as an async goroutine. +func (cc *CosmosProvider) waitForTx( + ctx context.Context, + txHash []byte, + msgs []provider.RelayerMessage, // used for logging only + waitTimeout time.Duration, + callback func(*provider.RelayerTxResponse, error), +) { + res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout) + if err != nil { + cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) + if callback != nil { + callback(nil, err) } + return + } - cc.LogSuccessTx(resp, msgs) - }() + rlyResp := &provider.RelayerTxResponse{ + Height: res.Height, + TxHash: res.TxHash, + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data, + Events: parseEventsFromTxResponse(res), + } - return nil + // transaction was executed, log the success or failure using the tx response code + // NOTE: error is nil, logic should use the returned error to determine if the + // transaction was successfully executed. + + if res.Code != 0 { + // Check for any registered SDK errors + err := cc.sdkError(res.Codespace, res.Code) + if err == nil { + err = fmt.Errorf("transaction failed to execute") + } + if callback != nil { + callback(nil, err) + } + cc.LogFailedTx(rlyResp, nil, msgs) + return + } + + if callback != nil { + callback(rlyResp, nil) + } + cc.LogSuccessTx(res, msgs) } +// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation. func (cc *CosmosProvider) waitForBlockInclusion( ctx context.Context, - syncRes *coretypes.ResultBroadcastTx, + txHash []byte, waitTimeout time.Duration, ) (*sdk.TxResponse, error) { exitAfter := time.After(waitTimeout) @@ -394,39 +368,32 @@ func (cc *CosmosProvider) waitForBlockInclusion( select { case <-exitAfter: return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, lensclient.ErrTimeoutAfterWaitingForTxBroadcast) - // TODO: this is potentially less than optimal and may - // be better as something configurable + // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. case <-time.After(time.Millisecond * 100): - resTx, err := cc.ChainClient.RPCClient.Tx(ctx, syncRes.Hash, false) + res, err := cc.ChainClient.RPCClient.Tx(ctx, txHash, false) if err == nil { - return cc.mkTxResult(resTx) + return cc.mkTxResult(res) } case <-ctx.Done(): return nil, ctx.Err() } } - } +// mkTxResult decodes a tendermint transaction into an SDK TxResponse. func (cc *CosmosProvider) mkTxResult(resTx *coretypes.ResultTx) (*sdk.TxResponse, error) { - txb, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx) + txbz, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx) if err != nil { return nil, err } - p, ok := txb.(intoAny) + p, ok := txbz.(intoAny) if !ok { - return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txb) + return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz) } any := p.AsAny() - // TODO: maybe don't make up the time here? - // we can fetch the block for the block time buts thats - // more round trips - // TODO: logs get rendered as base64 encoded, need to fix this somehow - return sdk.NewResponseResultTx(resTx, any, time.Now().Format(time.RFC3339)), nil + return sdk.NewResponseResultTx(resTx, any, ""), nil } -// Deprecated: this interface is used only internally for scenario we are -// deprecating (StdTxConfig support) type intoAny interface { AsAny() *codectypes.Any } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 72a8056cc..bed19d60c 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -902,11 +902,18 @@ func (pp *PathProcessor) assembleAndSendMessages( } } + broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + batchMsgs := []provider.RelayerMessage{om.msgUpdateClient} + for _, t := range om.connMsgs { dst.trackProcessingConnectionMessage(t) if t.m == nil { continue } + if broadcastBatch { + batchMsgs = append(batchMsgs, t.m) + continue + } go pp.sendConnectionMessage(ctx, src, dst, om.msgUpdateClient, t) } @@ -915,6 +922,10 @@ func (pp *PathProcessor) assembleAndSendMessages( if t.m == nil { continue } + if broadcastBatch { + batchMsgs = append(batchMsgs, t.m) + continue + } go pp.sendChannelMessage(ctx, src, dst, om.msgUpdateClient, t) } @@ -923,7 +934,12 @@ func (pp *PathProcessor) assembleAndSendMessages( if t.m == nil { continue } + if broadcastBatch { + batchMsgs = append(batchMsgs, t.m) + continue + } go pp.sendClientICQMessage(ctx, src, dst, om.msgUpdateClient, t) + } for _, t := range om.pktMsgs { @@ -931,9 +947,17 @@ func (pp *PathProcessor) assembleAndSendMessages( if t.m == nil { continue } + if broadcastBatch { + batchMsgs = append(batchMsgs, t.m) + continue + } go pp.sendPacketMessage(ctx, src, dst, om.msgUpdateClient, t) } + if broadcastBatch { + go pp.sendBatchMessages(ctx, src, dst, batchMsgs, om.pktMsgs) + } + if successCount == 0 { if needsClientUpdate { go pp.sendClientUpdate(ctx, src, dst, om.msgUpdateClient) @@ -951,10 +975,10 @@ func (pp *PathProcessor) sendClientUpdate( src, dst *pathEndRuntime, msgUpdateClient provider.RelayerMessage, ) { - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(ctx, []provider.RelayerMessage{msgUpdateClient}, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, []provider.RelayerMessage{msgUpdateClient}, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending client update message", zap.String("src_chain_id", src.info.ChainID), @@ -967,6 +991,54 @@ func (pp *PathProcessor) sendClientUpdate( } } +func (pp *PathProcessor) sendBatchMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgs []provider.RelayerMessage, + pktMsgs []packetMessageToTrack, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || pp.metrics == nil { + return + } + for _, tracker := range pktMsgs { + var channel, port string + if tracker.msg.eventType == chantypes.EventTypeRecvPacket { + channel = tracker.msg.info.DestChannel + port = tracker.msg.info.DestPort + } else { + channel = tracker.msg.info.SourceChannel + port = tracker.msg.info.SourcePort + } + pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) + } + }) + if err != nil { + if errors.Is(err, chantypes.ErrRedundantTx) { + pp.log.Debug("Packet(s) already handled by another relayer", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + return + } + pp.log.Error("Error sending batch of messages", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + return + } +} + func (pp *PathProcessor) sendPacketMessage( ctx context.Context, src, dst *pathEndRuntime, @@ -975,10 +1047,24 @@ func (pp *PathProcessor) sendPacketMessage( ) { msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || pp.metrics == nil { + return + } + var channel, port string + if tracker.msg.eventType == chantypes.EventTypeRecvPacket { + channel = tracker.msg.info.DestChannel + port = tracker.msg.info.DestPort + } else { + channel = tracker.msg.info.SourceChannel + port = tracker.msg.info.SourcePort + } + pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) + }) if err != nil { if errors.Is(err, chantypes.ErrRedundantTx) { pp.log.Debug("Packet already handled by another relayer", @@ -1001,22 +1087,6 @@ func (pp *PathProcessor) sendPacketMessage( ) return } - - // TODO tx not in committed block yet, so can't guarantee that tx was successful - - if pp.metrics == nil { - return - } - - var channel, port string - if tracker.msg.eventType == chantypes.EventTypeRecvPacket { - channel = tracker.msg.info.DestChannel - port = tracker.msg.info.DestPort - } else { - channel = tracker.msg.info.SourceChannel - port = tracker.msg.info.SourcePort - } - pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) } func (pp *PathProcessor) sendChannelMessage( @@ -1027,10 +1097,10 @@ func (pp *PathProcessor) sendChannelMessage( ) { msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending channel handshake message", zap.String("src_chain_id", src.info.ChainID), @@ -1052,10 +1122,10 @@ func (pp *PathProcessor) sendConnectionMessage( ) { msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending connection handshake message", zap.String("src_chain_id", src.info.ChainID), @@ -1077,10 +1147,10 @@ func (pp *PathProcessor) sendClientICQMessage( ) { msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(ctx, msgs, pp.memo) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending client ICQ message", zap.String("src_chain_id", src.info.ChainID), diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 4288f6b6f..a49d19a90 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -18,9 +18,17 @@ import ( "go.uber.org/zap/zapcore" ) +type BroadcastMode string + +const ( + BroadcastModeSingle BroadcastMode = "single" + BroadcastModeBatch BroadcastMode = "batch" +) + type ProviderConfig interface { NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (ChainProvider, error) Validate() error + BroadcastMode() BroadcastMode } type RelayerMessage interface { @@ -29,11 +37,12 @@ type RelayerMessage interface { } type RelayerTxResponse struct { - Height int64 - TxHash string - Code uint32 - Data string - Events []RelayerEvent + Height int64 + TxHash string + Codespace string + Code uint32 + Data string + Events []RelayerEvent } type RelayerEvent struct { @@ -193,6 +202,7 @@ func (es loggableEvents) MarshalLogArray(enc zapcore.ArrayEncoder) error { func (r RelayerTxResponse) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddInt64("height", r.Height) enc.AddString("tx_hash", r.TxHash) + enc.AddString("codespace", r.Codespace) enc.AddUint32("code", r.Code) enc.AddString("data", r.Data) enc.AddArray("events", loggableEvents(r.Events)) @@ -367,7 +377,14 @@ type ChainProvider interface { SendMessage(ctx context.Context, msg RelayerMessage, memo string) (*RelayerTxResponse, bool, error) SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error) - SendMessagesToMempool(ctx context.Context, msgs []RelayerMessage, memo string) error + SendMessagesToMempool( + ctx context.Context, + msgs []RelayerMessage, + memo string, + + asyncCtx context.Context, + asyncCallback func(*RelayerTxResponse, error), + ) error ChainName() string ChainId() string From c260ff748d3a10c4d5f97afae8e2f489042749ab Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sun, 5 Feb 2023 23:12:39 -0700 Subject: [PATCH 08/15] Revert unnecessary test changes --- interchaintest/client_threshold_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/interchaintest/client_threshold_test.go b/interchaintest/client_threshold_test.go index 5c7d92c1c..8b608ca68 100644 --- a/interchaintest/client_threshold_test.go +++ b/interchaintest/client_threshold_test.go @@ -81,10 +81,9 @@ func TestScenarioClientThresholdUpdate(t *testing.T) { // Build interchain require.NoError(t, ic.Build(ctx, eRep, interchaintest.InterchainBuildOptions{ - TestName: t.Name(), - Client: client, - NetworkID: network, - BlockDatabaseFile: interchaintest.DefaultBlockDatabaseFilepath(), + TestName: t.Name(), + Client: client, + NetworkID: network, })) t.Cleanup(func() { _ = ic.Close() From d66c94466361f89f0d0362caf1256e954da4edb6 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 6 Feb 2023 17:03:37 -0700 Subject: [PATCH 09/15] Add fallback mechanism to single if failure to broadcast as batch --- cmd/chains.go | 2 +- interchaintest/relayer.go | 2 +- relayer/processor/path_end_runtime.go | 18 ++++++++---- relayer/processor/path_processor_internal.go | 30 ++++++++++---------- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/cmd/chains.go b/cmd/chains.go index 8e29ec5b1..11e3b99e7 100644 --- a/cmd/chains.go +++ b/cmd/chains.go @@ -477,7 +477,7 @@ func addChainsFromRegistry(ctx context.Context, a *appState, chains []string) er OutputFormat: chainConfig.OutputFormat, SignModeStr: chainConfig.SignModeStr, ExtraCodecs: chainConfig.ExtraCodecs, - Broadcast: provider.BroadcastModeSingle, + Broadcast: provider.BroadcastModeBatch, } prov, err := pcfg.NewProvider( diff --git a/interchaintest/relayer.go b/interchaintest/relayer.go index 983ae9561..cb93ca948 100644 --- a/interchaintest/relayer.go +++ b/interchaintest/relayer.go @@ -78,7 +78,7 @@ func (r *Relayer) AddChainConfiguration(ctx context.Context, _ ibc.RelayerExecRe Timeout: "10s", OutputFormat: "json", SignModeStr: "direct", - Broadcast: provider.BroadcastModeSingle, + Broadcast: provider.BroadcastModeBatch, }, }) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 0f5877001..d8482fb1e 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -625,7 +625,7 @@ func (pathEnd *pathEndRuntime) shouldSendClientICQMessage(message provider.Clien return true } -func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) uint64 { eventType := t.msg.eventType sequence := t.msg.info.Sequence channelKey, err := t.msg.channelKey() @@ -636,7 +636,7 @@ func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTra zap.Uint64("sequence", sequence), zap.Error(err), ) - return + return 0 } msgProcessCache, ok := pathEnd.packetProcessing[channelKey] if !ok { @@ -660,9 +660,11 @@ func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTra retryCount: retryCount, assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMessageToTrack) uint64 { eventType := t.msg.eventType connectionKey := connectionInfoConnectionKey(t.msg.info).Counterparty() msgProcessCache, ok := pathEnd.connProcessing[eventType] @@ -682,9 +684,11 @@ func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMess retryCount: retryCount, assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToTrack) uint64 { eventType := t.msg.eventType channelKey := channelInfoChannelKey(t.msg.info).Counterparty() msgProcessCache, ok := pathEnd.channelProcessing[eventType] @@ -704,9 +708,11 @@ func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToT retryCount: retryCount, assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) uint64 { retryCount := uint64(0) queryID := t.msg.info.QueryID @@ -720,4 +726,6 @@ func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessag retryCount: retryCount, assembled: t.m != nil, } + + return retryCount } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index bed19d60c..6470a6815 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -880,25 +880,25 @@ func (pp *PathProcessor) assembleAndSendMessages( wg.Wait() - successCount := 0 + assembled := 0 for _, m := range om.connMsgs { if m.m != nil { - successCount++ + assembled++ } } for _, m := range om.chanMsgs { if m.m != nil { - successCount++ + assembled++ } } for _, m := range om.clientICQMsgs { if m.m != nil { - successCount++ + assembled++ } } for _, m := range om.pktMsgs { if m.m != nil { - successCount++ + assembled++ } } @@ -906,11 +906,11 @@ func (pp *PathProcessor) assembleAndSendMessages( batchMsgs := []provider.RelayerMessage{om.msgUpdateClient} for _, t := range om.connMsgs { - dst.trackProcessingConnectionMessage(t) + retries := dst.trackProcessingConnectionMessage(t) if t.m == nil { continue } - if broadcastBatch { + if broadcastBatch && retries == 0 { batchMsgs = append(batchMsgs, t.m) continue } @@ -918,11 +918,11 @@ func (pp *PathProcessor) assembleAndSendMessages( } for _, t := range om.chanMsgs { - dst.trackProcessingChannelMessage(t) + retries := dst.trackProcessingChannelMessage(t) if t.m == nil { continue } - if broadcastBatch { + if broadcastBatch && retries == 0 { batchMsgs = append(batchMsgs, t.m) continue } @@ -930,11 +930,11 @@ func (pp *PathProcessor) assembleAndSendMessages( } for _, t := range om.clientICQMsgs { - dst.trackProcessingClientICQMessage(t) + retries := dst.trackProcessingClientICQMessage(t) if t.m == nil { continue } - if broadcastBatch { + if broadcastBatch && retries == 0 { batchMsgs = append(batchMsgs, t.m) continue } @@ -943,22 +943,22 @@ func (pp *PathProcessor) assembleAndSendMessages( } for _, t := range om.pktMsgs { - dst.trackProcessingPacketMessage(t) + retries := dst.trackProcessingPacketMessage(t) if t.m == nil { continue } - if broadcastBatch { + if broadcastBatch && retries == 0 { batchMsgs = append(batchMsgs, t.m) continue } go pp.sendPacketMessage(ctx, src, dst, om.msgUpdateClient, t) } - if broadcastBatch { + if len(batchMsgs) > 1 { go pp.sendBatchMessages(ctx, src, dst, batchMsgs, om.pktMsgs) } - if successCount == 0 { + if assembled == 0 { if needsClientUpdate { go pp.sendClientUpdate(ctx, src, dst, om.msgUpdateClient) return nil From 62e6d78430b7708ec45731b02c6d72fe33649c4f Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Fri, 10 Feb 2023 15:01:07 -0700 Subject: [PATCH 10/15] Improve logging --- relayer/processor/path_processor_internal.go | 142 +++++++++++++------ 1 file changed, 97 insertions(+), 45 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 6470a6815..6a939de34 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -3,6 +3,7 @@ package processor import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "sort" @@ -13,6 +14,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -762,7 +764,7 @@ func (pp *PathProcessor) assembleMessage( if err == nil { tracker.m = message om.pktMsgs[i] = tracker - dst.log.Debug("Will send packet message", + dst.log.Debug("Assembled packet message", zap.String("event_type", m.eventType), zap.Uint64("sequence", m.info.Sequence), zap.String("src_channel", m.info.SourceChannel), @@ -777,7 +779,7 @@ func (pp *PathProcessor) assembleMessage( if err == nil { tracker.m = message om.connMsgs[i] = tracker - dst.log.Debug("Will send connection message", + dst.log.Debug("Assembled connection message", zap.String("event_type", m.eventType), zap.String("connection_id", m.info.ConnID), ) @@ -788,7 +790,7 @@ func (pp *PathProcessor) assembleMessage( if err == nil { tracker.m = message om.chanMsgs[i] = tracker - dst.log.Debug("Will send channel message", + dst.log.Debug("Assembled channel message", zap.String("event_type", m.eventType), zap.String("channel_id", m.info.ChannelID), zap.String("port_id", m.info.PortID), @@ -800,7 +802,7 @@ func (pp *PathProcessor) assembleMessage( if err == nil { tracker.m = message om.clientICQMsgs[i] = tracker - dst.log.Debug("Will send ICQ message", + dst.log.Debug("Assembled ICQ message", zap.String("type", m.info.Type), zap.String("query_id", string(m.info.QueryID)), ) @@ -978,6 +980,8 @@ func (pp *PathProcessor) sendClientUpdate( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() + dst.log.Debug("Will relay client update") + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, []provider.RelayerMessage{msgUpdateClient}, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending client update message", @@ -1000,6 +1004,8 @@ func (pp *PathProcessor) sendBatchMessages( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() + dst.log.Debug("Will relay batch of messages", zap.Int("count", len(msgs))) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, func(rtr *provider.RelayerTxResponse, err error) { // only increment metrics counts for successful packets if err != nil || pp.metrics == nil { @@ -1018,23 +1024,18 @@ func (pp *PathProcessor) sendBatchMessages( } }) if err != nil { - if errors.Is(err, chantypes.ErrRedundantTx) { - pp.log.Debug("Packet(s) already handled by another relayer", - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Error(err), - ) - return - } - pp.log.Error("Error sending batch of messages", + errFields := []zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), zap.Error(err), - ) + } + if errors.Is(err, chantypes.ErrRedundantTx) { + pp.log.Debug("Packet(s) already handled by another relayer", errFields...) + return + } + pp.log.Error("Error sending batch of messages", errFields...) return } } @@ -1050,7 +1051,22 @@ func (pp *PathProcessor) sendPacketMessage( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, func(rtr *provider.RelayerTxResponse, err error) { + packetFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("src_port", tracker.msg.info.SourcePort), + zap.String("src_channel", tracker.msg.info.SourceChannel), + zap.String("dst_port", tracker.msg.info.DestPort), + zap.String("dst_channel", tracker.msg.info.DestChannel), + zap.Uint64("sequence", tracker.msg.info.Sequence), + zap.String("timeout_height", fmt.Sprintf("%d-%d", tracker.msg.info.TimeoutHeight.RevisionNumber, tracker.msg.info.TimeoutHeight.RevisionHeight)), + zap.Uint64("timeout_timestamp", tracker.msg.info.TimeoutTimestamp), + zap.String("data", base64.StdEncoding.EncodeToString(tracker.msg.info.Data)), + zap.String("ack", base64.StdEncoding.EncodeToString(tracker.msg.info.Ack)), + } + + dst.log.Debug("Will relay packet message", packetFields...) + + callback := func(rtr *provider.RelayerTxResponse, err error) { // only increment metrics counts for successful packets if err != nil || pp.metrics == nil { return @@ -1064,29 +1080,26 @@ func (pp *PathProcessor) sendPacketMessage( port = tracker.msg.info.SourcePort } pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) - }) + } + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, callback) if err != nil { - if errors.Is(err, chantypes.ErrRedundantTx) { - pp.log.Debug("Packet already handled by another relayer", - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.String("event_type", tracker.msg.eventType), - zap.Error(err), - ) - return - } - pp.log.Error("Error sending packet message", + errFields := append([]zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.String("event_type", tracker.msg.eventType), - zap.Error(err), - ) + }, packetFields...) + errFields = append(errFields, zap.Error(err)) + + if errors.Is(err, chantypes.ErrRedundantTx) { + pp.log.Debug("Packet already handled by another relayer", errFields...) + return + } + pp.log.Error("Error sending packet message", errFields...) return } + dst.log.Debug("Packet message broadcast completed", packetFields...) } func (pp *PathProcessor) sendChannelMessage( @@ -1100,18 +1113,34 @@ func (pp *PathProcessor) sendChannelMessage( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() + channelFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("port_id", tracker.msg.info.PortID), + zap.String("channel_id", tracker.msg.info.ChannelID), + zap.String("counterparty_port_id", tracker.msg.info.CounterpartyPortID), + zap.String("counterparty_channel_id", tracker.msg.info.CounterpartyChannelID), + zap.String("connection_id", tracker.msg.info.ConnID), + zap.String("counterparty_connection_id", tracker.msg.info.CounterpartyConnID), + zap.String("order", tracker.msg.info.Order.String()), + zap.String("version", tracker.msg.info.Version), + } + + dst.log.Debug("Will relay channel message", channelFields...) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { - pp.log.Error("Error sending channel handshake message", + errFields := []zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.String("event_type", tracker.msg.eventType), - zap.Error(err), - ) + } + errFields = append(errFields, channelFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending channel handshake message", errFields...) return } + dst.log.Debug("Channel handshake message broadcast completed", channelFields...) } func (pp *PathProcessor) sendConnectionMessage( @@ -1125,18 +1154,31 @@ func (pp *PathProcessor) sendConnectionMessage( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() + connFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("client_id", tracker.msg.info.ClientID), + zap.String("counterparty_client_id", tracker.msg.info.CounterpartyClientID), + zap.String("connection_id", tracker.msg.info.ConnID), + zap.String("counterparty_connection_id", tracker.msg.info.CounterpartyConnID), + zap.String("counterparty_commitment_prefix", tracker.msg.info.CounterpartyCommitmentPrefix.String()), + } + + dst.log.Debug("Will relay connection message", connFields...) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { - pp.log.Error("Error sending connection handshake message", + errFields := []zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.String("event_type", tracker.msg.eventType), - zap.Error(err), - ) + } + errFields = append(errFields, connFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending connection handshake message", errFields...) return } + dst.log.Debug("Connection handshake message broadcast completed", connFields...) } func (pp *PathProcessor) sendClientICQMessage( @@ -1150,18 +1192,28 @@ func (pp *PathProcessor) sendClientICQMessage( broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() + icqFields := []zapcore.Field{ + zap.String("type", tracker.msg.info.Type), + zap.String("query_id", string(tracker.msg.info.QueryID)), + zap.String("request", string(tracker.msg.info.Request)), + } + + dst.log.Debug("Will relay Stride ICQ message", icqFields...) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) if err != nil { - pp.log.Error("Error sending client ICQ message", + errFields := []zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.String("event_type", tracker.msg.info.Type), - zap.Error(err), - ) + } + errFields = append(errFields, icqFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending client ICQ message", errFields...) return } + dst.log.Debug("Stride ICQ message broadcast completed", icqFields...) } func (pp *PathProcessor) assemblePacketMessage( From da25b41908078e06c3e4904514845bc64496ffef Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sat, 11 Feb 2023 00:41:36 -0700 Subject: [PATCH 11/15] Add backoff to update client retry to avoid many back to back at threshold --- relayer/processor/path_end_runtime.go | 2 ++ relayer/processor/path_processor_internal.go | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index d8482fb1e..333a00bb0 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -46,6 +46,8 @@ type pathEndRuntime struct { // inSync indicates whether queries are in sync with latest height of the chain. inSync bool + lastClientUpdateHeight uint64 + metrics *PrometheusMetrics } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 6a939de34..8215831a9 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -832,10 +832,11 @@ func (pp *PathProcessor) assembleAndSendMessages( consensusHeightTime = dst.clientState.ConsensusTime } clientUpdateThresholdMs := pp.clientUpdateThresholdTime.Milliseconds() - if (float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || - (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs) { + if (dst.latestBlock.Height-blocksToRetrySendAfter) > dst.lastClientUpdateHeight && + ((float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || + (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs)) { needsClientUpdate = true - pp.log.Info("Client close to expiration", + pp.log.Info("Client update threshold condition met", zap.String("chain_id:", dst.info.ChainID), zap.String("client_id:", dst.info.ClientID), zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()), @@ -982,6 +983,8 @@ func (pp *PathProcessor) sendClientUpdate( dst.log.Debug("Will relay client update") + dst.lastClientUpdateHeight = dst.latestBlock.Height + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, []provider.RelayerMessage{msgUpdateClient}, pp.memo, ctx, nil) if err != nil { pp.log.Error("Error sending client update message", @@ -993,6 +996,7 @@ func (pp *PathProcessor) sendClientUpdate( ) return } + dst.log.Debug("Client update broadcast completed") } func (pp *PathProcessor) sendBatchMessages( @@ -1038,6 +1042,7 @@ func (pp *PathProcessor) sendBatchMessages( pp.log.Error("Error sending batch of messages", errFields...) return } + dst.log.Debug("Batch messages broadcast completed") } func (pp *PathProcessor) sendPacketMessage( From 1fcb35d80da00cc3d0fe4e0bf031ccdf9bba0c69 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Sat, 11 Feb 2023 00:45:13 -0700 Subject: [PATCH 12/15] Add error when node does not have tx indexer enabled --- relayer/chains/cosmos/tx.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index fd51c7490..a33c1b9eb 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -374,6 +374,9 @@ func (cc *CosmosProvider) waitForBlockInclusion( if err == nil { return cc.mkTxResult(res) } + if strings.Contains(err.Error(), "transaction indexing is disabled") { + return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") + } case <-ctx.Done(): return nil, ctx.Err() } From e7a340a789b027bd66f30387798537553e78a130 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 13 Feb 2023 00:34:59 -0700 Subject: [PATCH 13/15] Fix data race --- relayer/processor/path_end_runtime.go | 4 +++- relayer/processor/path_processor_internal.go | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 333a00bb0..5adfabef2 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -2,6 +2,7 @@ package processor import ( "context" + "sync" "time" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" @@ -46,7 +47,8 @@ type pathEndRuntime struct { // inSync indicates whether queries are in sync with latest height of the chain. inSync bool - lastClientUpdateHeight uint64 + lastClientUpdateHeight uint64 + lastClientUpdateHeightMu sync.Mutex metrics *PrometheusMetrics } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 8215831a9..5ad31c962 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -832,7 +832,12 @@ func (pp *PathProcessor) assembleAndSendMessages( consensusHeightTime = dst.clientState.ConsensusTime } clientUpdateThresholdMs := pp.clientUpdateThresholdTime.Milliseconds() - if (dst.latestBlock.Height-blocksToRetrySendAfter) > dst.lastClientUpdateHeight && + + dst.lastClientUpdateHeightMu.Lock() + enoughBlocksPassed := (dst.latestBlock.Height - blocksToRetrySendAfter) > dst.lastClientUpdateHeight + dst.lastClientUpdateHeightMu.Unlock() + + if enoughBlocksPassed && ((float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs)) { needsClientUpdate = true @@ -983,7 +988,9 @@ func (pp *PathProcessor) sendClientUpdate( dst.log.Debug("Will relay client update") + dst.lastClientUpdateHeightMu.Lock() dst.lastClientUpdateHeight = dst.latestBlock.Height + dst.lastClientUpdateHeightMu.Unlock() err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, []provider.RelayerMessage{msgUpdateClient}, pp.memo, ctx, nil) if err != nil { From b68af8fe0023963742e3026c6b0065049fa1c82c Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 13 Feb 2023 01:55:51 -0700 Subject: [PATCH 14/15] Fix race in mergeCacheData --- relayer/processor/path_end_runtime.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 5adfabef2..e48ad6944 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -291,8 +291,11 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache } func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) { - pathEnd.inSync = d.InSync + pathEnd.lastClientUpdateHeightMu.Lock() pathEnd.latestBlock = d.LatestBlock + pathEnd.lastClientUpdateHeightMu.Unlock() + + pathEnd.inSync = d.InSync pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState if d.ClientState.ConsensusHeight != pathEnd.clientState.ConsensusHeight { From b27e5681accea7eb0a0feefc0e3b7d8addd71396 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 13 Feb 2023 01:07:15 -0700 Subject: [PATCH 15/15] Add query for poa chains --- relayer/chains/cosmos/provider.go | 14 ++---------- relayer/chains/cosmos/query.go | 38 +++++++++++++++++-------------- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index a1ded512d..6dc37f460 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -220,19 +220,9 @@ func (cc *CosmosProvider) Address() (string, error) { } func (cc *CosmosProvider) TrustingPeriod(ctx context.Context) (time.Duration, error) { - res, err := cc.QueryStakingParams(ctx) - - var unbondingTime time.Duration + unbondingTime, err := cc.QueryUnbondingPeriod(ctx) if err != nil { - // Attempt ICS query - consumerUnbondingPeriod, consumerErr := cc.queryConsumerUnbondingPeriod(ctx) - if consumerErr != nil { - return 0, - fmt.Errorf("failed to query unbonding period as both standard and consumer chain: %s: %w", err.Error(), consumerErr) - } - unbondingTime = consumerUnbondingPeriod - } else { - unbondingTime = res.UnbondingTime + return 0, err } // We want the trusting period to be 85% of the unbonding time. diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 19769f638..da7ac1607 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -178,24 +178,24 @@ func (cc *CosmosProvider) QueryBalanceWithAddress(ctx context.Context, address s return coins, nil } -func (cc *CosmosProvider) queryConsumerUnbondingPeriod(ctx context.Context) (time.Duration, error) { +func (cc *CosmosProvider) querySubspaceUnbondingPeriod(subspace string, ctx context.Context) (time.Duration, error) { queryClient := proposal.NewQueryClient(cc) - params := proposal.QueryParamsRequest{Subspace: "ccvconsumer", Key: "UnbondingPeriod"} + params := proposal.QueryParamsRequest{Subspace: subspace, Key: "UnbondingPeriod"} resICS, err := queryClient.Params(ctx, ¶ms) if err != nil { - return 0, fmt.Errorf("failed to make ccvconsumer params request: %w", err) + return 0, fmt.Errorf("failed to make %s params request: %w", subspace, err) } if resICS.Param.Value == "" { - return 0, fmt.Errorf("ccvconsumer unbonding period is empty") + return 0, fmt.Errorf("%s unbonding period is empty", subspace) } unbondingPeriod, err := strconv.ParseUint(strings.ReplaceAll(resICS.Param.Value, `"`, ""), 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse unbonding period from ccvconsumer param: %w", err) + return 0, fmt.Errorf("failed to parse unbonding period from %s param: %w", subspace, err) } return time.Duration(unbondingPeriod), nil @@ -203,22 +203,26 @@ func (cc *CosmosProvider) queryConsumerUnbondingPeriod(ctx context.Context) (tim // QueryUnbondingPeriod returns the unbonding period of the chain func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Duration, error) { - req := stakingtypes.QueryParamsRequest{} - queryClient := stakingtypes.NewQueryClient(cc) - - res, err := queryClient.Params(ctx, &req) - if err != nil { - // Attempt ICS query - consumerUnbondingPeriod, consumerErr := cc.queryConsumerUnbondingPeriod(ctx) - if consumerErr != nil { - return 0, - fmt.Errorf("failed to query unbonding period as both standard and consumer chain: %s: %w", err.Error(), consumerErr) - } + res, err := cc.QueryStakingParams(ctx) + if err == nil { + return res.UnbondingTime, nil + } + // Attempt ICS query + consumerUnbondingPeriod, consumerErr := cc.querySubspaceUnbondingPeriod("ccvconsumer", ctx) + if consumerErr == nil { return consumerUnbondingPeriod, nil } - return res.Params.UnbondingTime, nil + poaUnbondingPeriod, poaErr := cc.querySubspaceUnbondingPeriod("poa", ctx) + if poaErr == nil { + return poaUnbondingPeriod, nil + } + + return 0, fmt.Errorf( + "failed to query unbonding period as both standard, consumer, and poa chain: %s, %s, %s", + err.Error(), consumerErr.Error(), poaErr.Error(), + ) } // QueryTendermintProof performs an ABCI query with the given key and returns