@@ -102,7 +102,7 @@ public Observer<T> getObserver() {
102102
103103 }
104104
105- private final ConcurrentHashMap <K , GroupState <K , T >> groups = new ConcurrentHashMap <K , GroupState <K , T >>();
105+ private final ConcurrentHashMap <Object , GroupState <K , T >> groups = new ConcurrentHashMap <Object , GroupState <K , T >>();
106106
107107 private static final NotificationLite <Object > nl = NotificationLite .instance ();
108108
@@ -166,10 +166,18 @@ void requestFromGroupedObservable(long n, GroupState<K, T> group) {
166166 }
167167 }
168168
169+ private Object groupedKey (K key ) {
170+ return key == null ? NULL_KEY : key ;
171+ }
172+
173+ private K getKey (Object groupedKey ) {
174+ return groupedKey == NULL_KEY ? null : (K ) groupedKey ;
175+ }
176+
169177 @ Override
170178 public void onNext (T t ) {
171179 try {
172- final K key = keySelector .call (t );
180+ final Object key = groupedKey ( keySelector .call (t ) );
173181 GroupState <K , T > group = groups .get (key );
174182 if (group == null ) {
175183 // this group doesn't exist
@@ -185,10 +193,10 @@ public void onNext(T t) {
185193 }
186194 }
187195
188- private GroupState <K , T > createNewGroup (final K key ) {
196+ private GroupState <K , T > createNewGroup (final Object key ) {
189197 final GroupState <K , T > groupState = new GroupState <K , T >();
190198
191- GroupedObservable <K , R > go = GroupedObservable .create (key , new OnSubscribe <R >() {
199+ GroupedObservable <K , R > go = GroupedObservable .create (getKey ( key ) , new OnSubscribe <R >() {
192200
193201 @ Override
194202 public void call (final Subscriber <? super R > o ) {
@@ -252,7 +260,7 @@ public void onNext(T t) {
252260 return groupState ;
253261 }
254262
255- private void cleanupGroup (K key ) {
263+ private void cleanupGroup (Object key ) {
256264 GroupState <K , T > removed ;
257265 removed = groups .remove (key );
258266 if (removed != null ) {
@@ -357,4 +365,5 @@ public Object call(Object t) {
357365 }
358366 };
359367
368+ private static final Object NULL_KEY = new Object ();
360369}
0 commit comments