Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
finish tests
  • Loading branch information
chillaq committed Jan 21, 2026
commit f2ad152ed3eb99ed3fa304378503ce82f0efcf50
2 changes: 1 addition & 1 deletion splitio/events/events_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def start(self):

self._running = True
_LOGGER.debug('Starting SDK Event Task worker')
asyncio.get_running_loop().create_task(self._run())
asyncio.get_running_loop().create_task(self._run(), name="EventsTaskWorker")

async def stop(self, stop_flag=None):
"""Stop worker."""
Expand Down
106 changes: 105 additions & 1 deletion tests/client/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,4 +1109,108 @@ async def _make_factory_with_apikey(apikey, *_, **__):
await asyncio.sleep(0.5)
assert factory.destroyed
assert len(build_redis.mock_calls) == 2


@pytest.mark.asyncio
async def test_internal_ready_event_notification(self, mocker):
"""Test that a client with in-memory storage is sending internal events correctly."""
# Setup synchronizer
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None):
synchronizer = mocker.Mock(spec=SynchronizerAsync)
async def sync_all(*_):
return None
synchronizer.sync_all = sync_all

def start_periodic_fetching():
pass
synchronizer.start_periodic_fetching = start_periodic_fetching

def start_periodic_data_recording():
pass
synchronizer.start_periodic_data_recording = start_periodic_data_recording

self._ready_flag = ready_flag
self._synchronizer = synchronizer
self._streaming_enabled = False
self._telemetry_runtime_producer = telemetry_runtime_producer

mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer)

async def synchronize_config(*_):
await asyncio.sleep(2)
pass
mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config)

async def record_ready_time(*_):
pass
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time)

async def record_active_and_redundant_factories(*_):
pass
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories)

# Start factory and make assertions
factory = await get_factory_async('some_api_key', config={'streamingEmabled': False})
for task in asyncio.all_tasks():
if task.get_name() == "EventsTaskWorker":
task.cancel()
try:
await factory.block_until_ready(3)
except:
pass
await asyncio.sleep(.2)
event = await factory._internal_events_queue.get()
assert event.internal_event == SdkInternalEvent.SDK_READY
assert event.metadata == None
await factory.destroy()

@pytest.mark.asyncio
async def test_internal_timeout_event_notification(self, mocker):
"""Test that a client with in-memory storage is sending internal events correctly."""
# Setup synchronizer
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None):
synchronizer = mocker.Mock(spec=SynchronizerAsync)
async def sync_all(*_):
return None
synchronizer.sync_all = sync_all

def start_periodic_fetching():
pass
synchronizer.start_periodic_fetching = start_periodic_fetching

def start_periodic_data_recording():
pass
synchronizer.start_periodic_data_recording = start_periodic_data_recording

self._ready_flag = ready_flag
self._synchronizer = synchronizer
self._streaming_enabled = False
self._telemetry_runtime_producer = telemetry_runtime_producer

mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer)

async def synchronize_config(*_):
await asyncio.sleep(3)
pass
mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config)

async def record_ready_time(*_):
pass
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time)

async def record_active_and_redundant_factories(*_):
pass
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories)

# Start factory and make assertions
factory = await get_factory_async('some_api_key', config={'streamingEmabled': False})
for task in asyncio.all_tasks():
if task.get_name() == "EventsTaskWorker":
task.cancel()
try:
await factory.block_until_ready(1)
except:
pass
event = await factory._internal_events_queue.get()
assert event.internal_event == SdkInternalEvent.SDK_TIMED_OUT
assert event.metadata == None
await factory.destroy()
168 changes: 168 additions & 0 deletions tests/integration/test_client_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,174 @@ def _ready_callback(self, metadata):
def _timeout_callback(self, metadata):
self.timeout_flag = True

class InMemoryEventsNotificationAsyncTests(object):
"""Inmemory storage-based events notification tests."""

ready_flag = False
timeout_flag = False

@pytest.mark.asyncio
async def test_sdk_timeout_fire(self):
"""Prepare storages with test data."""
factory2 = await get_factory_async('some_api_key')
client = factory2.client()
await client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback)
try:
await factory2.block_until_ready(1)
except Exception as e:
pass

await asyncio.sleep(1)
assert self.timeout_flag

"""Shut down the factory."""
await factory2.destroy()

@pytest.mark.asyncio
async def test_sdk_ready(self):
"""Prepare storages with test data."""
events_queue = asyncio.Queue()
split_storage = InMemorySplitStorageAsync(events_queue)
segment_storage = InMemorySegmentStorageAsync(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)

split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
with open(split_fn, 'r') as flo:
data = json.loads(flo.read())
for split in data['ff']['d']:
await split_storage.update([splits.from_raw(split)], [], 0)

for rbs in data['rbs']['d']:
await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
await segment_storage.put(segments.from_raw(data))

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
await segment_storage.put(segments.from_raw(data))

telemetry_storage = await InMemoryTelemetryStorageAsync.create()
telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()

storages = {
'splits': split_storage,
'segments': segment_storage,
'rule_based_segments': rb_segment_storage,
'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer),
'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer),
}
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue)

# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
try:
factory = SplitFactoryAsync('some_api_key',
storages,
True,
recorder,
events_queue,
events_manager,
None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
) # pylint:disable=attribute-defined-outside-init
internal_events_task.start()
except:
pass

client = factory.client()
await client.on(SdkEvent.SDK_READY, self._ready_callback)
await factory.block_until_ready(5)
assert self.ready_flag

"""Shut down the factory."""
await internal_events_task.stop()
await factory.destroy()

@pytest.mark.asyncio
async def test_sdk_ready_fire_later(self):
"""Prepare storages with test data."""
events_queue = asyncio.Queue()
split_storage = InMemorySplitStorageAsync(events_queue)
segment_storage = InMemorySegmentStorageAsync(events_queue)
rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)

split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
with open(split_fn, 'r') as flo:
data = json.loads(flo.read())
for split in data['ff']['d']:
await split_storage.update([splits.from_raw(split)], [], 0)

for rbs in data['rbs']['d']:
await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
await segment_storage.put(segments.from_raw(data))

segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
with open(segment_fn, 'r') as flo:
data = json.loads(flo.read())
await segment_storage.put(segments.from_raw(data))

telemetry_storage = await InMemoryTelemetryStorageAsync.create()
telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage)
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()

storages = {
'splits': split_storage,
'segments': segment_storage,
'rule_based_segments': rb_segment_storage,
'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer),
'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer),
}
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue)

# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
try:
factory = SplitFactoryAsync('some_api_key',
storages,
True,
recorder,
events_queue,
events_manager,
None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
) # pylint:disable=attribute-defined-outside-init
internal_events_task.start()
except:
pass

client = factory.client()
await factory.block_until_ready(5)
await client.on(SdkEvent.SDK_READY, self._ready_callback)

"""Shut down the factory."""
await internal_events_task.stop()
await factory.destroy()

async def _ready_callback(self, metadata):
self.ready_flag = True

async def _timeout_callback(self, metadata):
self.timeout_flag = True

class InMemoryIntegrationAsyncTests(object):
"""Inmemory storage-based integration tests."""

Expand Down
75 changes: 75 additions & 0 deletions tests/storage/test_inmemory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,36 @@ async def test_flag_sets_withut_config_sets(self):
await storage.update([split3], [], 1)
assert await storage.get_feature_flags_by_sets(['set05']) == ['split3']
assert await storage.get_feature_flags_by_sets(['set04', 'set05']) == ['split3']

@pytest.mark.asyncio
async def test_internal_event_notification(self, mocker):
"""Test retrieving a list of all split names."""
split1 = mocker.Mock()
name1_prop = mocker.PropertyMock()
name1_prop.return_value = 'split1'
type(split1).name = name1_prop
split2 = mocker.Mock()
name2_prop = mocker.PropertyMock()
name2_prop.return_value = 'split2'
type(split2).name = name2_prop
sets_property = mocker.PropertyMock()
sets_property.return_value = ['set_1']
type(split1).sets = sets_property
type(split2).sets = sets_property
events_queue = asyncio.Queue()
storage = InMemorySplitStorageAsync(events_queue)
await storage.update([split1, split2], [], -1)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.FLAGS_UPDATED
assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE
assert event.metadata.get_names() == {'split1', 'split2'}

await storage.kill_locally('split1', 'default_treatment', 3)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.FLAG_KILLED_NOTIFICATION
assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE
assert event.metadata.get_names() == {'split1'}

class InMemorySegmentStorageTests(object):
"""In memory segment storage tests."""

Expand Down Expand Up @@ -855,6 +884,23 @@ async def test_segment_update(self):
assert not await storage.segment_contains('some_segment', 'key3')
assert await storage.get_change_number('some_segment') == 456

@pytest.mark.asyncio
async def test_internal_event_notification(self):
"""Test updating a segment."""
events_queue = asyncio.Queue()
storage = InMemorySegmentStorageAsync(events_queue)
segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123)
await storage.put(segment)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
assert len(event.metadata.get_names()) == 0

await storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
assert len(event.metadata.get_names()) == 0

class InMemoryImpressionsStorageTests(object):
"""InMemory impressions storage test cases."""
Expand Down Expand Up @@ -2027,3 +2073,32 @@ async def test_contains(self):
assert await storage.contains(["segment1"])
assert await storage.contains(["segment1", "segment3"])
assert not await storage.contains(["segment5"])

@pytest.mark.asyncio
async def test_internal_event_notification(self, mocker):
"""Test storing and retrieving splits works."""
events_queue = asyncio.Queue()
rbs_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)

segment1 = mocker.Mock(spec=RuleBasedSegment)
name_property = mocker.PropertyMock()
name_property.return_value = 'some_segment'
type(segment1).name = name_property

segment2 = mocker.Mock()
name2_prop = mocker.PropertyMock()
name2_prop.return_value = 'segment2'
type(segment2).name = name2_prop

await rbs_storage.update([segment1, segment2], [], -1)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
assert len(event.metadata.get_names()) == 0

await rbs_storage.update([], ['some_segment'], -1)
event = await events_queue.get()
assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
assert len(event.metadata.get_names()) == 0

Loading