@@ -4,8 +4,6 @@ use std::str::FromStr;
4
4
use ndarray:: Zip ;
5
5
use pgrx:: iter:: { SetOfIterator , TableIterator } ;
6
6
use pgrx:: * ;
7
- use pyo3:: prelude:: * ;
8
- use pyo3:: types:: { IntoPyDict , PyDict } ;
9
7
10
8
#[ cfg( feature = "python" ) ]
11
9
use serde_json:: json;
@@ -634,40 +632,6 @@ pub fn transform_string(
634
632
}
635
633
}
636
634
637
- struct TransformStreamIterator {
638
- locals : Py < PyDict > ,
639
- }
640
-
641
- impl TransformStreamIterator {
642
- fn new ( python_iter : Py < PyAny > ) -> Self {
643
- let locals = Python :: with_gil ( |py| -> Result < Py < PyDict > , PyErr > {
644
- Ok ( [ ( "python_iter" , python_iter) ] . into_py_dict ( py) . into ( ) )
645
- } )
646
- . map_err ( |e| error ! ( "{e}" ) )
647
- . unwrap ( ) ;
648
- Self { locals }
649
- }
650
- }
651
-
652
- impl Iterator for TransformStreamIterator {
653
- type Item = String ;
654
- fn next ( & mut self ) -> Option < Self :: Item > {
655
- // We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
656
- Python :: with_gil ( |py| -> Result < Option < String > , PyErr > {
657
- let code = "next(python_iter)" ;
658
- let res: & PyAny = py. eval ( code, Some ( self . locals . as_ref ( py) ) , None ) ?;
659
- if res. is_none ( ) {
660
- Ok ( None )
661
- } else {
662
- let res: String = res. extract ( ) ?;
663
- Ok ( Some ( res) )
664
- }
665
- } )
666
- . map_err ( |e| error ! ( "{e}" ) )
667
- . unwrap ( )
668
- }
669
- }
670
-
671
635
#[ cfg( all( feature = "python" , not( feature = "use_as_lib" ) ) ) ]
672
636
#[ pg_extern( immutable, parallel_safe, name = "transform_stream" ) ]
673
637
#[ allow( unused_variables) ] // cache is maintained for api compatibility
@@ -678,11 +642,11 @@ pub fn transform_stream_json(
678
642
cache : default ! ( bool , false ) ,
679
643
) -> SetOfIterator < ' static , String > {
680
644
// We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
681
- let python_iter = crate :: bindings :: transformers :: transform_stream ( & task . 0 , & args . 0 , input )
682
- . map_err ( |e| error ! ( "{e}" ) )
683
- . unwrap ( ) ;
684
- let res = TransformStreamIterator :: new ( python_iter ) ;
685
- SetOfIterator :: new ( res )
645
+ let python_iter =
646
+ crate :: bindings :: transformers :: transform_stream_iterator ( & task . 0 , & args . 0 , input )
647
+ . map_err ( |e| error ! ( "{e}" ) )
648
+ . unwrap ( ) ;
649
+ SetOfIterator :: new ( python_iter )
686
650
}
687
651
688
652
#[ cfg( all( feature = "python" , not( feature = "use_as_lib" ) ) ) ]
@@ -696,11 +660,11 @@ pub fn transform_stream_string(
696
660
) -> SetOfIterator < ' static , String > {
697
661
let task_json = json ! ( { "task" : task } ) ;
698
662
// We can unwrap this becuase if there is an error the current transaction is aborted in the map_err call
699
- let python_iter = crate :: bindings :: transformers :: transform_stream ( & task_json , & args . 0 , input )
700
- . map_err ( |e| error ! ( "{e}" ) )
701
- . unwrap ( ) ;
702
- let res = TransformStreamIterator :: new ( python_iter ) ;
703
- SetOfIterator :: new ( res )
663
+ let python_iter =
664
+ crate :: bindings :: transformers :: transform_stream_iterator ( & task_json , & args . 0 , input )
665
+ . map_err ( |e| error ! ( "{e}" ) )
666
+ . unwrap ( ) ;
667
+ SetOfIterator :: new ( python_iter )
704
668
}
705
669
706
670
#[ cfg( feature = "python" ) ]
0 commit comments