Skip to content

Commit 789e9a2

Browse files
committed
fix(cursor): return NotEnoughData
It fixes regression introduced in #169. Previously, the cursor checked if unhandled bytes are left. Return this behaviour to detect schema mismatch. Closes #185
1 parent f3771b0 commit 789e9a2

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

src/cursor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ impl<T> RowCursor<T> {
126126

127127
match self.raw.next().await? {
128128
Some(chunk) => self.bytes.extend(chunk),
129+
None if self.bytes.remaining() > 0 => {
130+
// If some data is left, we have an incomplete row in the buffer.
131+
// This is usually a schema mismatch on the client side.
132+
return Err(Error::NotEnoughData);
133+
}
129134
None => return Ok(None),
130135
}
131136
}

src/rowbinary/de.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ use std::{convert::TryFrom, mem, str};
22

33
use crate::error::{Error, Result};
44
use bytes::Buf;
5-
use serde::de::{EnumAccess, VariantAccess};
65
use serde::{
7-
de::{DeserializeSeed, Deserializer, SeqAccess, Visitor},
6+
de::{DeserializeSeed, Deserializer, EnumAccess, SeqAccess, VariantAccess, Visitor},
87
Deserialize,
98
};
109

tests/it/cursor_error.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use clickhouse::{Client, Compression};
1+
use serde::Deserialize;
2+
3+
use clickhouse::{error::Error, Client, Compression, Row};
24

35
#[tokio::test]
46
async fn deferred() {
@@ -96,3 +98,40 @@ async fn deferred_lz4() {
9698
assert_ne!(i, 0); // we're interested only in errors during processing
9799
assert!(err.to_string().contains("TIMEOUT_EXCEEDED"));
98100
}
101+
102+
// See #185.
103+
#[tokio::test]
104+
async fn invalid_schema() {
105+
#[derive(Debug, Row, Deserialize)]
106+
#[allow(dead_code)]
107+
struct MyRow {
108+
no: u32,
109+
dec: Option<String>, // valid schema: u64-based types
110+
}
111+
112+
let client = prepare_database!();
113+
114+
client
115+
.query(
116+
"CREATE TABLE test(no UInt32, dec Nullable(Decimal64(4)))
117+
ENGINE = MergeTree
118+
ORDER BY no",
119+
)
120+
.execute()
121+
.await
122+
.unwrap();
123+
124+
client
125+
.query("INSERT INTO test VALUES (1, 1.1), (2, 2.2), (3, 3.3)")
126+
.execute()
127+
.await
128+
.unwrap();
129+
130+
let err = client
131+
.query("SELECT ?fields FROM test")
132+
.fetch_all::<MyRow>()
133+
.await
134+
.unwrap_err();
135+
136+
assert!(matches!(err, Error::NotEnoughData));
137+
}

0 commit comments

Comments
 (0)