Skip to content

Commit c59e514

Browse files
DerekBumoleg-jukovec
authored andcommitted
api: create different responses for different requests
Different implementations of the `Response` interface created. Special types of responses are used with special requests. Thus `Response` interface was simplified: some special methods were moved to the corresponding implementations. This means that if a user wants to access this methods, the response should be casted to its actual type. `SelectResponse`, `ExecuteResponse`, `PrepareResponse`, `PushResponse` are part of a public API. `Pos()`, `MetaData()`, `SQLInfo()` methods created for them to get specific info. `Future` constructors now accept `Request` as their argument. `Future` methods `AppendPush` and `SetResponse` accept response `Header` and data as their arguments. `IsPush()` method is used to return the information if the current response is a `PushResponse`. `PushCode` constant is removed. To get information, if the current response is a push response, `IsPush()` method could be used instead. After this patch, operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`, `Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` return response data instead of an actual responses. After this patch, operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`, `Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Pooler` return response data instead of an actual responses. Part of #237
1 parent 4784b55 commit c59e514

25 files changed

+1326
-708
lines changed

CHANGELOG.md

+14-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
3030
- Support connection via an existing socket fd (#321)
3131
- `Header` struct for the response header (#237). It can be accessed via
3232
`Header()` method of the `Response` interface.
33+
- `Response` method added to the `Request` interface (#237)
34+
- New `LogAppendPushFailed` connection log constant (#237).
35+
It is logged when connection fails to append a push response.
3336

3437
### Changed
3538

@@ -70,12 +73,21 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
7073
- Rename `pool.GetPoolInfo` to `pool.GetInfo`. Change return type to
7174
`map[string]ConnectionInfo` (#321)
7275
- `Response` is now an interface (#237)
73-
- All responses are now implementations of the `Response` interface (#237)
76+
- All responses are now implementations of the `Response` interface (#237).
77+
`SelectResponse`, `ExecuteResponse`, `PrepareResponse`, `PushResponse` are part
78+
of a public API. `Pos()`, `MetaData()`, `SQLInfo()` methods created for them
79+
to get specific info.
80+
Special types of responses are used with special requests.
7481
- `IsPush()` method is added to the response iterator (#237). It returns
7582
the information if the current response is a `PushResponse`.
7683
`PushCode` constant is removed.
7784
- Method `Get` for `Future` now returns response data (#237). To get the actual
78-
response new `GetResponse` method has been added.
85+
response new `GetResponse` method has been added. Methods `AppendPush` and
86+
`SetResponse` accept response `Header` and data as their arguments.
87+
- `Future` constructors now accept `Request` as their argument (#237)
88+
- Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
89+
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` and `Pooler`
90+
return response data instead of an actual responses (#237)
7991

8092
### Deprecated
8193

README.md

+18
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ The subpackage has been deleted. You could use `pool` instead.
202202
unique string ID, which allows them to be distinguished.
203203
* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been changed
204204
to `map[string]ConnectionInfo`.
205+
* Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
206+
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Pooler` return
207+
response data instead of an actual responses.
205208

206209
#### crud package
207210

@@ -257,6 +260,7 @@ longer accept `ops` argument (operations) as an `interface{}`. `*Operations`
257260
needs to be passed instead.
258261
* `UpdateRequest` and `UpsertRequest` structs no longer accept `interface{}`
259262
for an `ops` field. `*Operations` needs to be used instead.
263+
* `Response` method added to the `Request` interface.
260264

261265
#### Response changes
262266

@@ -265,11 +269,25 @@ for an `ops` field. `*Operations` needs to be used instead.
265269
`Header()` method.
266270
* `ResponseIterator` interface now has `IsPush()` method.
267271
It returns true if the current response is a push response.
272+
* For each request type, a different response type is created. They all
273+
implement a `Response` interface. `SelectResponse`, `PrepareResponse`,
274+
`ExecuteResponse`, `PushResponse` are a part of a public API.
275+
`Pos()`, `MetaData()`, `SQLInfo()` methods created for them to get specific info.
276+
Special types of responses are used with special requests.
268277

269278
#### Future changes
270279

271280
* Method `Get` now returns response data instead of the actual response.
272281
* New method `GetResponse` added to get an actual response.
282+
* `Future` constructors now accept `Request` as their argument.
283+
* Methods `AppendPush` and `SetResponse` accepts response `Header` and data
284+
as their arguments.
285+
286+
#### Connector changes
287+
288+
Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
289+
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` return
290+
response data instead of an actual responses.
273291

274292
#### Connect function
275293

box_error_test.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,7 @@ func TestErrorTypeEval(t *testing.T) {
304304

305305
for name, testcase := range tupleCases {
306306
t.Run(name, func(t *testing.T) {
307-
resp, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
308-
require.Nil(t, err)
309-
data, err := resp.Decode()
307+
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
310308
require.Nil(t, err)
311309
require.NotNil(t, data)
312310
require.Equal(t, len(data), 1)
@@ -438,14 +436,11 @@ func TestErrorTypeSelect(t *testing.T) {
438436
_, err := conn.Eval(insertEval, []interface{}{})
439437
require.Nilf(t, err, "Tuple has been successfully inserted")
440438

441-
var resp Response
442439
var offset uint32 = 0
443440
var limit uint32 = 1
444-
resp, err = conn.Select(space, index, offset, limit, IterEq,
441+
data, err := conn.Select(space, index, offset, limit, IterEq,
445442
[]interface{}{testcase.tuple.pk})
446443
require.Nil(t, err)
447-
data, err := resp.Decode()
448-
require.Nil(t, err)
449444
require.NotNil(t, data)
450445
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
451446
tpl, ok := data[0].([]interface{})

connection.go

+25-8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ const (
6161
LogUnexpectedResultId
6262
// LogWatchEventReadFailed is logged when failed to read a watch event.
6363
LogWatchEventReadFailed
64+
// LogAppendPushFailed is logged when failed to append a push response.
65+
LogAppendPushFailed
6466
)
6567

6668
// ConnEvent is sent throw Notify channel specified in Opts.
@@ -103,6 +105,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
103105
case LogWatchEventReadFailed:
104106
err := v[0].(error)
105107
log.Printf("tarantool: unable to parse watch event: %s", err)
108+
case LogAppendPushFailed:
109+
err := v[0].(error)
110+
log.Printf("tarantool: unable to append a push response: %s", err)
106111
default:
107112
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
108113
log.Print(args...)
@@ -818,10 +823,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
818823
return
819824
}
820825

821-
resp := &ConnResponse{header: header, buf: buf}
822826
var fut *Future = nil
823827
if iproto.Type(header.Code) == iproto.IPROTO_EVENT {
824-
if event, err := readWatchEvent(&resp.buf); err == nil {
828+
if event, err := readWatchEvent(&buf); err == nil {
825829
events <- event
826830
} else {
827831
err = ClientError{
@@ -833,11 +837,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
833837
continue
834838
} else if header.Code == uint32(iproto.IPROTO_CHUNK) {
835839
if fut = conn.peekFuture(header.RequestId); fut != nil {
836-
fut.AppendPush(resp)
840+
if err := fut.AppendPush(header, &buf); err != nil {
841+
err = ClientError{
842+
ErrProtocolError,
843+
fmt.Sprintf("failed to append push response: %s", err),
844+
}
845+
conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
846+
}
837847
}
838848
} else {
839849
if fut = conn.fetchFuture(header.RequestId); fut != nil {
840-
fut.SetResponse(resp)
850+
if err := fut.SetResponse(header, &buf); err != nil {
851+
fut.SetError(fmt.Errorf("failed to set response: %w", err))
852+
}
841853
conn.markDone(fut)
842854
}
843855
}
@@ -873,8 +885,10 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
873885
}
874886
}
875887

876-
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
888+
func (conn *Connection) newFuture(req Request) (fut *Future) {
889+
ctx := req.Ctx()
877890
fut = NewFuture()
891+
fut.SetRequest(req)
878892
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
879893
select {
880894
case conn.rlimit <- struct{}{}:
@@ -984,7 +998,7 @@ func (conn *Connection) decrementRequestCnt() {
984998
func (conn *Connection) send(req Request, streamId uint64) *Future {
985999
conn.incrementRequestCnt()
9861000

987-
fut := conn.newFuture(req.Ctx())
1001+
fut := conn.newFuture(req)
9881002
if fut.ready == nil {
9891003
conn.decrementRequestCnt()
9901004
return fut
@@ -1053,8 +1067,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10531067

10541068
if req.Async() {
10551069
if fut = conn.fetchFuture(reqid); fut != nil {
1056-
resp := &ConnResponse{}
1057-
fut.SetResponse(resp)
1070+
header := Header{
1071+
RequestId: reqid,
1072+
Code: OkCode,
1073+
}
1074+
fut.SetResponse(header, nil)
10581075
conn.markDone(fut)
10591076
}
10601077
}

connector.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -13,41 +13,41 @@ type Connector interface {
1313

1414
// Deprecated: the method will be removed in the next major version,
1515
// use a PingRequest object + Do() instead.
16-
Ping() (Response, error)
16+
Ping() ([]interface{}, error)
1717
// Deprecated: the method will be removed in the next major version,
1818
// use a SelectRequest object + Do() instead.
1919
Select(space, index interface{}, offset, limit uint32, iterator Iter,
20-
key interface{}) (Response, error)
20+
key interface{}) ([]interface{}, error)
2121
// Deprecated: the method will be removed in the next major version,
2222
// use an InsertRequest object + Do() instead.
23-
Insert(space interface{}, tuple interface{}) (Response, error)
23+
Insert(space interface{}, tuple interface{}) ([]interface{}, error)
2424
// Deprecated: the method will be removed in the next major version,
2525
// use a ReplicaRequest object + Do() instead.
26-
Replace(space interface{}, tuple interface{}) (Response, error)
26+
Replace(space interface{}, tuple interface{}) ([]interface{}, error)
2727
// Deprecated: the method will be removed in the next major version,
2828
// use a DeleteRequest object + Do() instead.
29-
Delete(space, index interface{}, key interface{}) (Response, error)
29+
Delete(space, index interface{}, key interface{}) ([]interface{}, error)
3030
// Deprecated: the method will be removed in the next major version,
3131
// use a UpdateRequest object + Do() instead.
32-
Update(space, index interface{}, key interface{}, ops *Operations) (Response, error)
32+
Update(space, index interface{}, key interface{}, ops *Operations) ([]interface{}, error)
3333
// Deprecated: the method will be removed in the next major version,
3434
// use a UpsertRequest object + Do() instead.
35-
Upsert(space interface{}, tuple interface{}, ops *Operations) (Response, error)
35+
Upsert(space interface{}, tuple interface{}, ops *Operations) ([]interface{}, error)
3636
// Deprecated: the method will be removed in the next major version,
3737
// use a CallRequest object + Do() instead.
38-
Call(functionName string, args interface{}) (Response, error)
38+
Call(functionName string, args interface{}) ([]interface{}, error)
3939
// Deprecated: the method will be removed in the next major version,
4040
// use a Call16Request object + Do() instead.
41-
Call16(functionName string, args interface{}) (Response, error)
41+
Call16(functionName string, args interface{}) ([]interface{}, error)
4242
// Deprecated: the method will be removed in the next major version,
4343
// use a Call17Request object + Do() instead.
44-
Call17(functionName string, args interface{}) (Response, error)
44+
Call17(functionName string, args interface{}) ([]interface{}, error)
4545
// Deprecated: the method will be removed in the next major version,
4646
// use an EvalRequest object + Do() instead.
47-
Eval(expr string, args interface{}) (Response, error)
47+
Eval(expr string, args interface{}) ([]interface{}, error)
4848
// Deprecated: the method will be removed in the next major version,
4949
// use an ExecuteRequest object + Do() instead.
50-
Execute(expr string, args interface{}) (Response, error)
50+
Execute(expr string, args interface{}) ([]interface{}, error)
5151

5252
// Deprecated: the method will be removed in the next major version,
5353
// use a SelectRequest object + Do() instead.

crud/common.go

+7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ package crud
5555

5656
import (
5757
"context"
58+
"io"
5859

5960
"github.com/tarantool/go-iproto"
6061

@@ -84,6 +85,12 @@ func (req baseRequest) Async() bool {
8485
return req.impl.Async()
8586
}
8687

88+
// Response creates a response for the baseRequest.
89+
func (req baseRequest) Response(header tarantool.Header,
90+
body io.Reader) (tarantool.Response, error) {
91+
return req.impl.Response(header, body)
92+
}
93+
8794
type spaceRequest struct {
8895
baseRequest
8996
space string

dial.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -404,16 +404,11 @@ func identify(w writeFlusher, r io.Reader) (ProtocolInfo, error) {
404404
}
405405
data, err := resp.Decode()
406406
if err != nil {
407-
switch err := err.(type) {
408-
case Error:
409-
if err.Code == iproto.ER_UNKNOWN_REQUEST_TYPE {
410-
// IPROTO_ID requests are not supported by server.
411-
return info, nil
412-
}
413-
return info, err
414-
default:
415-
return info, fmt.Errorf("decode response body error: %w", err)
407+
if iproto.Error(resp.Header().Code) == iproto.ER_UNKNOWN_REQUEST_TYPE {
408+
// IPROTO_ID requests are not supported by server.
409+
return info, nil
416410
}
411+
return info, err
417412
}
418413

419414
if len(data) == 0 {
@@ -511,12 +506,12 @@ func readResponse(r io.Reader) (Response, error) {
511506

512507
respBytes, err := read(r, lenbuf[:])
513508
if err != nil {
514-
return &ConnResponse{}, fmt.Errorf("read error: %w", err)
509+
return &BaseResponse{}, fmt.Errorf("read error: %w", err)
515510
}
516511

517512
buf := smallBuf{b: respBytes}
518513
header, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
519-
resp := &ConnResponse{header: header, buf: buf}
514+
resp := &BaseResponse{header: header, buf: buf}
520515
if err != nil {
521516
return resp, fmt.Errorf("decode response header error: %w", err)
522517
}

0 commit comments

Comments
 (0)