11use arrow:: array:: RecordBatch ;
22use arrow:: ipc:: writer:: StreamWriter ;
33use arrow:: util:: pretty:: pretty_format_batches;
4+ use arrow_digest:: { RecordDigest , RecordDigestV0 } ;
45use arrow_json:: writer:: LineDelimited ;
56use arrow_json:: WriterBuilder ;
67use async_stream:: { stream, try_stream} ;
@@ -18,6 +19,7 @@ use parquet::arrow::async_writer::AsyncFileWriter;
1819use parquet:: arrow:: AsyncArrowWriter ;
1920use parquet:: basic:: Compression ;
2021use parquet:: file:: properties:: WriterProperties ;
22+ use sha2:: Sha256 ;
2123use std:: convert:: Infallible ;
2224use tokio:: sync:: mpsc:: error:: SendError ;
2325use tokio:: sync:: oneshot:: Receiver ;
@@ -96,6 +98,29 @@ pub(crate) fn stream_text_query(
9698 }
9799}
98100
101+ pub ( crate ) async fn hash_query_result ( df : DataFrame ) -> Result < String , PipelineError > {
102+ let schema = df. schema ( ) . inner ( ) . clone ( ) ;
103+ let stream_executor = execute_stream ( df)
104+ . await
105+ . map_err ( |e| PipelineError :: AdHocQueryError {
106+ error : e. to_string ( ) ,
107+ df : None ,
108+ } ) ?;
109+ let mut stream = stream_executor. map_err ( |e| PipelineError :: AdHocQueryError {
110+ error : e. to_string ( ) ,
111+ df : Some ( Box :: new ( e) ) ,
112+ } ) ?;
113+
114+ let mut digest = RecordDigestV0 :: < Sha256 > :: new ( & schema) ;
115+ while let Some ( batch) = stream. next ( ) . await {
116+ let batch = batch. map_err ( PipelineError :: from) ?;
117+ digest. update ( & batch) ;
118+ }
119+
120+ let hash = digest. finalize ( ) ;
121+ Ok ( format ! ( "{:X}" , hash) )
122+ }
123+
99124pub ( crate ) fn stream_json_query (
100125 df : DataFrame ,
101126) -> impl Stream < Item = Result < ByteString , PipelineError > > {
0 commit comments