from decimal import Decimal import unittest from pandas import Timedelta, Timestamp from feldera import PipelineBuilder from tests import TEST_CLIENT, unique_pipeline_name from feldera.runtime_config import RuntimeConfig from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS class TestUDF(unittest.TestCase): def test_local(self): sql = """ CREATE TYPE my_struct AS ( i INT, s VARCHAR ); CREATE TABLE t ( i INT, ti TINYINT, si SMALLINT, bi BIGINT, r REAL, d DOUBLE, bin VARBINARY, dt DATE, t TIME, ts TIMESTAMP, a INT ARRAY, m MAP, v VARIANT, b BOOLEAN, dc DECIMAL(7,2), s VARCHAR, ms MY_STRUCT ) with ('materialized' = 'true'); CREATE FUNCTION bool2bool(i BOOLEAN) RETURNS BOOLEAN; CREATE FUNCTION nbool2nbool(i BOOLEAN NOT NULL) RETURNS BOOLEAN NOT NULL; CREATE FUNCTION i2i(i INT) RETURNS INT; CREATE FUNCTION ni2ni(i INT NOT NULL) RETURNS INT NOT NULL; CREATE FUNCTION ti2ti(i TINYINT) RETURNS TINYINT; CREATE FUNCTION nti2nti(i TINYINT NOT NULL) RETURNS TINYINT NOT NULL; CREATE FUNCTION si2si(i SMALLINT) RETURNS SMALLINT; CREATE FUNCTION nsi2nsi(i SMALLINT NOT NULL) RETURNS SMALLINT NOT NULL; CREATE FUNCTION bi2bi(i BIGINT) RETURNS BIGINT; CREATE FUNCTION nbi2nbi(i BIGINT NOT NULL) RETURNS BIGINT NOT NULL; CREATE FUNCTION r2r(i REAL) RETURNS REAL; CREATE FUNCTION nr2nr(i REAL NOT NULL) RETURNS REAL NOT NULL; CREATE FUNCTION d2d(i DOUBLE) RETURNS DOUBLE; CREATE FUNCTION nd2nd(i DOUBLE NOT NULL) RETURNS DOUBLE NOT NULL; CREATE FUNCTION bin2bin(i VARBINARY) RETURNS VARBINARY; CREATE FUNCTION nbin2nbin(i VARBINARY NOT NULL) RETURNS VARBINARY NOT NULL; CREATE FUNCTION date2date(i DATE) RETURNS DATE; CREATE FUNCTION ndate2ndate(i DATE NOT NULL) RETURNS DATE NOT NULL; CREATE FUNCTION ts2ts(i TIMESTAMP) RETURNS TIMESTAMP; CREATE FUNCTION nts2nts(i TIMESTAMP NOT NULL) RETURNS TIMESTAMP NOT NULL; CREATE FUNCTION t2t(i TIME) RETURNS TIME; CREATE FUNCTION nt2nt(i TIME NOT NULL) RETURNS TIME NOT NULL; CREATE FUNCTION arr2arr(i INT ARRAY) RETURNS INT ARRAY; CREATE FUNCTION narr2narr(i INT ARRAY NOT NULL) RETURNS INT ARRAY NOT NULL; CREATE FUNCTION map2map(i MAP) RETURNS MAP; CREATE FUNCTION nmap2nmap(i MAP NOT NULL) RETURNS MAP NOT NULL; CREATE FUNCTION var2var(i VARIANT) RETURNS VARIANT; CREATE FUNCTION nvar2nvar(i VARIANT NOT NULL) RETURNS VARIANT NOT NULL; CREATE FUNCTION dec2dec(i DECIMAL(7, 2)) RETURNS DECIMAL(7, 2); CREATE FUNCTION ndec2ndec(i DECIMAL(7, 2) NOT NULL) RETURNS DECIMAL(7, 2) NOT NULL; CREATE FUNCTION str2str(i VARCHAR) RETURNS VARCHAR; CREATE FUNCTION nstr2nstr(i VARCHAR NOT NULL) RETURNS VARCHAR NOT NULL; CREATE FUNCTION struct2struct(i my_struct) RETURNS my_struct; CREATE FUNCTION nstruct2nstruct(i my_struct NOT NULL) RETURNS my_struct NOT NULL; CREATE MATERIALIZED VIEW v AS SELECT bool2bool(b), nbool2nbool(COALESCE(b, FALSE)), i2i(i), ni2ni(COALESCE(i, 0)), ti2ti(ti), nti2nti(COALESCE(ti, 0)), si2si(si), nsi2nsi(COALESCE(si, 0)), bi2bi(bi), nbi2nbi(COALESCE(bi, 0)), r2r(r), nr2nr(COALESCE(r, 0.0)), d2d(d), nd2nd(COALESCE(d, 0.0)), bin2bin(bin), nbin2nbin(COALESCE(bin, x'')), date2date(dt), ndate2ndate(COALESCE(dt, DATE '2023-01-01')), ts2ts(ts), nts2nts(COALESCE(ts, TIMESTAMP '2023-01-01 00:00:00')), t2t(t), nt2nt(COALESCE(t, TIME '00:00:00')), arr2arr(a), narr2narr(a), map2map(m), nmap2nmap(m), var2var(v), nvar2nvar(COALESCE(v, VARIANTNULL())), dec2dec(dc), ndec2ndec(COALESCE(dc, 0)), str2str(s), nstr2nstr(COALESCE(s, '')) FROM t; """ udfs = """ use feldera_sqllib::F32; use crate::*; pub fn bool2bool(i: Option) -> Result, Box> { Ok(i) } pub fn nbool2nbool(i: bool) -> Result> { Ok(i) } pub fn i2i(i: Option) -> Result, Box> { Ok(i) } pub fn ni2ni(i: i32) -> Result> { Ok(i) } pub fn ti2ti(i: Option) -> Result, Box> { Ok(i) } pub fn nti2nti(i: i8) -> Result> { Ok(i) } pub fn si2si(i: Option) -> Result, Box> { Ok(i) } pub fn nsi2nsi(i: i16) -> Result> { Ok(i) } pub fn bi2bi(i: Option) -> Result, Box> { Ok(i) } pub fn nbi2nbi(i: i64) -> Result> { Ok(i) } pub fn r2r(i: Option) -> Result, Box> { Ok(i) } pub fn nr2nr(i: F32) -> Result> { Ok(i) } pub fn d2d(i: Option) -> Result, Box> { Ok(i) } pub fn nd2nd(i: F64) -> Result> { Ok(i) } pub fn bin2bin(i: Option) -> Result, Box> { Ok(i) } pub fn nbin2nbin(i: ByteArray) -> Result> { Ok(i) } pub fn date2date(i: Option) -> Result, Box> { Ok(i) } pub fn ndate2ndate(i: Date) -> Result> { Ok(i) } pub fn ts2ts(i: Option) -> Result, Box> { Ok(i) } pub fn nts2nts(i: Timestamp) -> Result> { Ok(i) } pub fn t2t(i: Option