-
Notifications
You must be signed in to change notification settings - Fork 38
fix: fix EOF error when decoding columns with empty string #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| package client | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "encoding/binary" | ||
| "testing" | ||
| ) | ||
|
|
||
| func buildNullIndicatorBytes(nulls []bool) []byte { | ||
| var buf bytes.Buffer | ||
| hasNull := false | ||
| for _, n := range nulls { | ||
| if n { | ||
| hasNull = true | ||
| break | ||
| } | ||
| } | ||
| if !hasNull { | ||
| buf.WriteByte(0) | ||
| return buf.Bytes() | ||
| } | ||
| buf.WriteByte(1) | ||
| packed := make([]byte, (len(nulls)+7)/8) | ||
| for i, n := range nulls { | ||
| if n { | ||
| packed[i/8] |= 0b10000000 >> (uint(i) % 8) | ||
| } | ||
| } | ||
| buf.Write(packed) | ||
| return buf.Bytes() | ||
| } | ||
|
|
||
| func TestBinaryArrayColumnDecoder_EmptyString(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| buf.Write(buildNullIndicatorBytes([]bool{false})) | ||
| _ = binary.Write(&buf, binary.BigEndian, int32(0)) | ||
|
|
||
| col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 1) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.GetPositionCount() != 1 { | ||
| t.Fatalf("expected positionCount=1, got %d", col.GetPositionCount()) | ||
| } | ||
| if col.IsNull(0) { | ||
| t.Fatal("row 0 should not be null") | ||
| } | ||
| val, err := col.GetBinary(0) | ||
| if err != nil { | ||
| t.Fatalf("GetBinary(0) failed: %v", err) | ||
| } | ||
| if len(val.values) != 0 { | ||
| t.Fatalf("expected empty string, got %q", string(val.values)) | ||
| } | ||
| } | ||
|
|
||
| func TestBinaryArrayColumnDecoder_NullThenEmptyString(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| buf.Write(buildNullIndicatorBytes([]bool{true, false})) | ||
| _ = binary.Write(&buf, binary.BigEndian, int32(0)) | ||
|
|
||
| col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 2) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if !col.IsNull(0) { | ||
| t.Error("row 0 should be null") | ||
| } | ||
| if col.IsNull(1) { | ||
| t.Error("row 1 should not be null") | ||
| } | ||
| val, err := col.GetBinary(1) | ||
| if err != nil { | ||
| t.Fatalf("GetBinary(1) failed: %v", err) | ||
| } | ||
| if len(val.values) != 0 { | ||
| t.Fatalf("expected empty string, got %q", string(val.values)) | ||
| } | ||
| } | ||
|
|
||
| func TestBinaryArrayColumnDecoder_WithNull(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| buf.Write(buildNullIndicatorBytes([]bool{false, true, false})) | ||
| writeText := func(s string) { | ||
| _ = binary.Write(&buf, binary.BigEndian, int32(len(s))) | ||
| buf.WriteString(s) | ||
| } | ||
| writeText("hello") | ||
| writeText("world") | ||
|
|
||
| col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 3) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.IsNull(0) { | ||
| t.Error("row 0 should not be null") | ||
| } | ||
| if v, _ := col.GetBinary(0); string(v.values) != "hello" { | ||
| t.Errorf("row 0: expected \"hello\", got %q", string(v.values)) | ||
| } | ||
| if !col.IsNull(1) { | ||
| t.Error("row 1 should be null") | ||
| } | ||
| if col.IsNull(2) { | ||
| t.Error("row 2 should not be null") | ||
| } | ||
| if v, _ := col.GetBinary(2); string(v.values) != "world" { | ||
| t.Errorf("row 2: expected \"world\", got %q", string(v.values)) | ||
| } | ||
| } | ||
|
|
||
| func TestInt64ArrayColumnDecoder_WithNull(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| buf.Write(buildNullIndicatorBytes([]bool{false, true, false})) | ||
| _ = binary.Write(&buf, binary.BigEndian, int64(100)) | ||
| _ = binary.Write(&buf, binary.BigEndian, int64(200)) | ||
|
|
||
| col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), INT64, 3) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.IsNull(0) { | ||
| t.Error("row 0 should not be null") | ||
| } | ||
| if v, _ := col.GetLong(0); v != 100 { | ||
| t.Errorf("row 0: expected 100, got %d", v) | ||
| } | ||
| if !col.IsNull(1) { | ||
| t.Error("row 1 should be null") | ||
| } | ||
| if col.IsNull(2) { | ||
| t.Error("row 2 should not be null") | ||
| } | ||
| if v, _ := col.GetLong(2); v != 200 { | ||
| t.Errorf("row 2: expected 200, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestColumnDecoder_ZeroPositionCount(t *testing.T) { | ||
| empty := func() *bytes.Reader { return bytes.NewReader([]byte{}) } | ||
|
|
||
| t.Run("Int32ArrayColumnDecoder", func(t *testing.T) { | ||
| col, err := (&Int32ArrayColumnDecoder{}).ReadColumn(empty(), INT32, 0) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.GetPositionCount() != 0 { | ||
| t.Errorf("expected positionCount=0, got %d", col.GetPositionCount()) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("Int64ArrayColumnDecoder", func(t *testing.T) { | ||
| col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(empty(), INT64, 0) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.GetPositionCount() != 0 { | ||
| t.Errorf("expected positionCount=0, got %d", col.GetPositionCount()) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("ByteArrayColumnDecoder", func(t *testing.T) { | ||
| col, err := (&ByteArrayColumnDecoder{}).ReadColumn(empty(), BOOLEAN, 0) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.GetPositionCount() != 0 { | ||
| t.Errorf("expected positionCount=0, got %d", col.GetPositionCount()) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("BinaryArrayColumnDecoder", func(t *testing.T) { | ||
| col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(empty(), TEXT, 0) | ||
| if err != nil { | ||
| t.Fatalf("ReadColumn failed: %v", err) | ||
| } | ||
| if col.GetPositionCount() != 0 { | ||
| t.Errorf("expected positionCount=0, got %d", col.GetPositionCount()) | ||
| } | ||
| }) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -569,10 +569,12 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD | |
| request.SessionId = s.sessionId | ||
| request.StatementId = s.requestStatementId | ||
| resp, err = s.client.ExecuteQueryStatementV2(context.Background(), &request) | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| if err == nil && resp != nil { | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| } | ||
| } | ||
| } | ||
| return nil, err | ||
|
|
@@ -597,10 +599,12 @@ func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common. | |
| if s.reconnect() { | ||
| request.SessionId = s.sessionId | ||
| resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request) | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| if err == nil && resp != nil { | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| } | ||
| } | ||
|
Comment on lines
+602
to
608
|
||
| } | ||
| return nil, err | ||
|
|
@@ -626,10 +630,12 @@ func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat | |
| if s.reconnect() { | ||
| request.SessionId = s.sessionId | ||
| resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request) | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| if err == nil && resp != nil { | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| } | ||
| } | ||
|
Comment on lines
+633
to
639
|
||
| } | ||
| return nil, err | ||
|
|
@@ -653,10 +659,12 @@ func (s *Session) ExecuteFastLastDataQueryForOnePrefixPath(prefixes []string, ti | |
| if s.reconnect() { | ||
| request.SessionId = s.sessionId | ||
| resp, err = s.client.ExecuteFastLastDataQueryForOnePrefixPath(context.Background(), &request) | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| if err == nil && resp != nil { | ||
| if statusErr := VerifySuccess(resp.Status); statusErr == nil { | ||
| return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList()) | ||
| } else { | ||
| return nil, statusErr | ||
| } | ||
| } | ||
|
Comment on lines
+662
to
668
|
||
| } | ||
| return nil, err | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,3 +125,7 @@ func (s *SessionDataSet) GetColumnNames() []string { | |
| func (s *SessionDataSet) GetColumnTypes() []string { | ||
| return s.ioTDBRpcDataSet.columnTypeList | ||
| } | ||
|
|
||
| func (s *SessionDataSet) GetCurrentRowTime() int64 { | ||
| return s.ioTDBRpcDataSet.GetCurrentRowTime() | ||
|
Comment on lines
+129
to
+130
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new
if err == nil && resp != nil { ... }guard avoids a panic, but iferr == nilandresp == nilthis function will fall through and return(nil, nil)(since the finalreturn nil, errreturns a nil error). Please handle theresp == nilcase explicitly (both for the initial RPC call and the reconnect retry) and return a non-nil error when the RPC returns a nil response.