diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index d4a7de6ee80..3d229555b79 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -101,7 +101,9 @@ fn columns_to_iter_recursive<'a, I: 'a>( where I: DataPages, { - use DataType::*; + use crate::datatypes::PhysicalType::*; + use crate::datatypes::PrimitiveType::*; + if init.is_empty() && is_primitive(&field.data_type) { return Ok(Box::new( page_iter_to_arrays( @@ -114,13 +116,13 @@ where )); } - Ok(match field.data_type().to_logical_type() { + Ok(match field.data_type().to_physical_type() { Boolean => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size) } - Int8 => { + Primitive(Int8) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -131,7 +133,7 @@ where |x: i32| x as i8, ) } - Int16 => { + Primitive(Int16) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -142,7 +144,7 @@ where |x: i32| x as i16, ) } - Int32 => { + Primitive(Int32) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -153,7 +155,7 @@ where |x: i32| x, ) } - Int64 => { + Primitive(Int64) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -164,7 +166,7 @@ where |x: i64| x, ) } - UInt8 => { + Primitive(UInt8) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -175,7 +177,7 @@ where |x: i32| x as u8, ) } - UInt16 => { + Primitive(UInt16) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -186,7 +188,7 @@ where |x: i32| x as u16, ) } - UInt32 => { + Primitive(UInt32) => { init.push(InitNested::Primitive(field.is_nullable)); let type_ = types.pop().unwrap(); match type_.physical_type { @@ -212,7 +214,7 @@ where } } } - UInt64 => { + Primitive(UInt64) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -223,7 +225,7 @@ where |x: i64| x as u64, ) } - Float32 => { + Primitive(Float32) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -234,7 +236,7 @@ where |x: f32| x, ) } - Float64 => { + Primitive(Float64) => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( @@ -285,63 +287,68 @@ where chunk_size, ) } - List(inner) | LargeList(inner) | FixedSizeList(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array); - Ok((nested, array)) - }); - Box::new(iter) as _ - } - Struct(fields) => { - let columns = fields - .iter() - .rev() - .map(|f| { - let mut init = init.clone(); - init.push(InitNested::Struct(field.is_nullable)); - let n = n_columns(&f.data_type); - let columns = columns.drain(columns.len() - n..).collect(); - let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) - }) - .collect::>>()?; - let columns = columns.into_iter().rev().collect(); - Box::new(struct_::StructIterator::new(columns, fields.clone())) - } - Map(inner, _) => { - init.push(InitNested::List(field.is_nullable)); - let iter = columns_to_iter_recursive( - columns, - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - Box::new(iter.map(move |x| { - let (nested, inner) = x?; - let array = MapArray::new( - field.data_type().clone(), - vec![0, inner.len() as i32].into(), - inner, - None, - ); - Ok((nested, array.boxed())) - })) - } - other => { - return Err(Error::nyi(format!( - "Deserializing type {other:?} from parquet" - ))) - } + + _ => match field.data_type().to_logical_type() { + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array); + Ok((nested, array)) + }); + Box::new(iter) as _ + } + DataType::Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); + let n = n_columns(&f.data_type); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(struct_::StructIterator::new(columns, fields.clone())) + } + DataType::Map(inner, _) => { + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + Box::new(iter.map(move |x| { + let (nested, inner) = x?; + let array = MapArray::new( + field.data_type().clone(), + vec![0, inner.len() as i32].into(), + inner, + None, + ); + Ok((nested, array.boxed())) + })) + } + other => { + return Err(Error::nyi(format!( + "Deserializing type {other:?} from parquet" + ))) + } + }, }) } diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 8f75ea6ad9c..ea6dacf609b 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -320,8 +320,11 @@ fn timestamp<'a, I: 'a + DataPages>( ) -> Result> { if physical_type == &PhysicalType::Int96 { let iter = primitive::Iter::new(pages, data_type, chunk_size, int96_to_i64_ns); - - let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + let logical_type = PrimitiveLogicalType::Timestamp { + unit: ParquetTimeUnit::Nanoseconds, + is_adjusted_to_utc: false, + }; + let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit); return match (factor, is_multiplier) { (1, _) => Ok(dyn_iter(iden(iter))), (a, true) => Ok(dyn_iter(op(iter, move |x| x * a))), @@ -353,7 +356,11 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); + let logical_type = PrimitiveLogicalType::Timestamp { + unit: ParquetTimeUnit::Nanoseconds, + is_adjusted_to_utc: false, + }; + let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit); return match (factor, is_multiplier) { (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages,