2626import java .util .List ;
2727import java .util .Map ;
2828import java .util .Optional ;
29- import java .util .Set ;
3029import java .util .concurrent .TimeUnit ;
3130import java .util .concurrent .atomic .AtomicInteger ;
3231import java .util .function .Consumer ;
3332import java .util .regex .Matcher ;
3433import java .util .regex .Pattern ;
35- import java .util .stream .Collectors ;
3634import okhttp3 .Call ;
3735import okhttp3 .OkHttpClient ;
3836import okhttp3 .Request ;
@@ -138,7 +136,7 @@ public <T> void addFeaturesListener(
138136
139137 if (productFeaturesByteArray != null ) {
140138 try {
141- dl .deserializeAndAccept (productFeaturesByteArray , PollingRateHinter .NOOP );
139+ dl .deserializeAndAccept (name , productFeaturesByteArray , PollingRateHinter .NOOP );
142140 } catch (RuntimeException | IOException e ) {
143141 log .warn ("Error applying features for {}" , name , e );
144142 }
@@ -292,12 +290,6 @@ private void handleAgentResponse(ResponseBody body) {
292290 }
293291 }
294292
295- try {
296- unapplyConfigs (configsToApply , this );
297- } catch (ReportableException e ) {
298- errorMessage = e .getMessage ();
299- }
300-
301293 updateNextState (fleetResponse , inspectedConfigurationKeys , errorMessage );
302294
303295 if (successes == 0 && failures > 0 ) {
@@ -308,35 +300,7 @@ private void handleAgentResponse(ResponseBody body) {
308300 rescheduleBaseOnConfiguration (this .durationHint );
309301 }
310302
311- private void unapplyConfigs (List <String > configsToApply , PollingRateHinter hinter ) {
312- Set <String > activeProducts =
313- configsToApply .stream ()
314- .map (ConfigurationPoller ::extractProductFromKey )
315- .collect (Collectors .toSet ());
316- // it WILL not unapply configurations for products that we're subscribed to
317- // but for which we see no configurations
318- for (ConfigState configState : this .nextClientState .configStates ) {
319- String previousProduct = configState .product ;
320- if (!activeProducts .contains (previousProduct )) {
321- DeserializerAndListener <?> dl = this .listeners .get (previousProduct );
322- if (dl != null ) {
323- log .info ("Unapplying configuration for {}" , previousProduct );
324- try {
325- dl .listener .accept (null , hinter );
326- } catch (Exception e ) {
327- ratelimitedLogger .warn (
328- "Error unapplying configuration for " + previousProduct + ": " + e .getMessage ());
329- }
330- }
331- }
332- }
333- }
334-
335- private boolean processConfigKey (
336- RemoteConfigResponse fleetResponse ,
337- String configKey ,
338- List <String > inspectedConfigurationKeys ,
339- PollingRateHinter pollingRateHinter ) {
303+ private DeserializerAndListener <?> extractDeserializerAndListenerFromKey (String configKey ) {
340304 String productName = extractProductFromKey (configKey );
341305 if (productName == null ) {
342306 throw new ReportableException ("Cannot extract product from key " + configKey );
@@ -352,6 +316,23 @@ private boolean processConfigKey(
352316 + " is not being handled" );
353317 }
354318
319+ return dl ;
320+ }
321+
322+ private boolean notifyConfigurationKeyRemoved (
323+ String configKey , PollingRateHinter pollingRateHinter ) throws IOException {
324+ return extractDeserializerAndListenerFromKey (configKey )
325+ .deserializeAndAccept (configKey , null , pollingRateHinter );
326+ }
327+
328+ private boolean processConfigKey (
329+ RemoteConfigResponse fleetResponse ,
330+ String configKey ,
331+ List <String > inspectedConfigurationKeys ,
332+ PollingRateHinter pollingRateHinter ) {
333+ // find right product from configKey
334+ DeserializerAndListener <?> dl = extractDeserializerAndListenerFromKey (configKey );
335+
355336 // check if the hash of this configuration file actually changed
356337 CachedTargetFile cachedTargetFile = this .cachedTargetFiles .get (configKey );
357338 RemoteConfigResponse .Targets .ConfigTarget target = fleetResponse .getTarget (configKey );
@@ -389,8 +370,7 @@ private boolean processConfigKey(
389370
390371 try {
391372 log .debug ("Applying configuration for {}" , configKey );
392- boolean result = dl .deserializeAndAccept (fileContent , pollingRateHinter );
393- return result ;
373+ return dl .deserializeAndAccept (configKey , fileContent , pollingRateHinter );
394374 } catch (IOException | RuntimeException ex ) {
395375 ratelimitedLogger .warn ("Error handling configuration for " + configKey , ex );
396376 return false ;
@@ -484,6 +464,12 @@ private void updateNextState(
484464 String configKey = cachedConfigKeysIter .next ();
485465 if (!inspectedConfigKeys .contains (configKey )) {
486466 cachedConfigKeysIter .remove ();
467+ try {
468+ log .debug ("Removing configuration for {}" , configKey );
469+ notifyConfigurationKeyRemoved (configKey , this );
470+ } catch (IOException | RuntimeException ex ) {
471+ ratelimitedLogger .warn ("Error handling configuration for " + configKey , ex );
472+ }
487473 }
488474 }
489475 }
@@ -524,7 +510,7 @@ private void loadFromFile(File file, DeserializerAndListener<?> deserializerAndL
524510
525511 boolean res =
526512 deserializerAndListener .deserializeAndAccept (
527- outputStream .toByteArray (), PollingRateHinter .NOOP );
513+ file . getAbsolutePath (), outputStream .toByteArray (), PollingRateHinter .NOOP );
528514 if (!res ) {
529515 ratelimitedLogger .warn ("Failed reading or applying configuration from {}" , file );
530516 } else {
@@ -539,7 +525,7 @@ private void loadFromFile(File file, DeserializerAndListener<?> deserializerAndL
539525 // marked as synchronized only to satisfy spotbugs,
540526 // because this method is only called from synchronized methods anyway
541527 private synchronized boolean featuresChangeListener (
542- FeaturesConfig fconfig , PollingRateHinter hinter ) {
528+ String configKey , FeaturesConfig fconfig , PollingRateHinter hinter ) {
543529 if (fconfig == null ) {
544530 log .warn ("Features configuration was pulled, which is unexpected" );
545531 return true ;
@@ -556,7 +542,7 @@ private synchronized boolean featuresChangeListener(
556542
557543 try {
558544 byte [] productFeaturesByteArray = fconfig .getProductFeaturesByteArray (product );
559- dl .deserializeAndAccept (productFeaturesByteArray , hinter );
545+ dl .deserializeAndAccept (product , productFeaturesByteArray , hinter );
560546 } catch (IOException | RuntimeException e ) {
561547 ratelimitedLogger .warn ("Error processing features for {}" , product );
562548 }
@@ -583,12 +569,19 @@ private static class DeserializerAndListener<T> {
583569 this .listener = listener ;
584570 }
585571
586- boolean deserializeAndAccept (byte [] bytes , PollingRateHinter hinter ) throws IOException {
587- T configuration = this .deserializer .deserialize (bytes );
588- if (configuration == null ) {
589- return false ;
572+ boolean deserializeAndAccept (String configKey , byte [] bytes , PollingRateHinter hinter )
573+ throws IOException {
574+ T configuration = null ;
575+
576+ if (bytes != null ) {
577+ configuration = this .deserializer .deserialize (bytes );
578+ // ensure deserializer return a value.
579+ if (configuration == null ) {
580+ throw new RuntimeException ("Configuration deserializer didn't provide a configuration" );
581+ }
590582 }
591- return this .listener .accept (configuration , hinter );
583+
584+ return this .listener .accept (configKey , configuration , hinter );
592585 }
593586 }
594587
0 commit comments