Skip to content

Commit 391889b

Browse files
committed
add HostFilterPolicy and tests
1 parent 9aaf1ec commit 391889b

5 files changed

Lines changed: 375 additions & 3 deletions

File tree

CHANGELOG.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
Features
55
--------
66
* Add idle_heartbeat_timeout cluster option to tune how long to wait for heartbeat responses. (PYTHON-762)
7+
* Add HostFilterPolicy (PYTHON-761)
78

89
Bug Fixes
910
---------
@@ -19,6 +20,10 @@ Other
1920
* Bump Cython dependency version to 0.25.2 (PYTHON-754)
2021
* Fix DeprecationWarning when using lz4 (PYTHON-769)
2122

23+
Other
24+
-----
25+
* Deprecate WhiteListRoundRobinPolicy (PYTHON-759)
26+
2227
3.10.0
2328
======
2429
May 24, 2017

cassandra/policies.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from random import randint, shuffle
1818
from threading import Lock
1919
import socket
20+
from warnings import warn
2021

2122
from cassandra import ConsistencyLevel, OperationTimedOut
2223

@@ -396,6 +397,10 @@ def on_remove(self, *args, **kwargs):
396397

397398
class WhiteListRoundRobinPolicy(RoundRobinPolicy):
398399
"""
400+
|wlrrp| **is deprecated. It will be removed in 4.0.** It can effectively be
401+
reimplemented using :class:`.HostFilterPolicy`. For more information, see
402+
PYTHON-758_.
403+
399404
A subclass of :class:`.RoundRobinPolicy` which evenly
400405
distributes queries across all nodes in the cluster,
401406
regardless of what datacenter the nodes may be in, but
@@ -405,12 +410,25 @@ class WhiteListRoundRobinPolicy(RoundRobinPolicy):
405410
https://datastax-oss.atlassian.net/browse/JAVA-145
406411
Where connection errors occur when connection
407412
attempts are made to private IP addresses remotely
413+
414+
.. |wlrrp| raw:: html
415+
416+
<b><code>WhiteListRoundRobinPolicy</code></b>
417+
418+
.. _PYTHON-758: https://datastax-oss.atlassian.net/browse/PYTHON-758
419+
408420
"""
409421
def __init__(self, hosts):
410422
"""
411423
The `hosts` parameter should be a sequence of hosts to permit
412424
connections to.
413425
"""
426+
msg = ('WhiteListRoundRobinPolicy is deprecated. '
427+
'It will be removed in 4.0. '
428+
'It can effectively be reimplemented using HostFilterPolicy.')
429+
warn(msg, DeprecationWarning)
430+
# DeprecationWarnings are silent by default so we also log the message
431+
log.warning(msg)
414432

415433
self._allowed_hosts = hosts
416434
self._allowed_hosts_resolved = [endpoint[4][0] for a in self._allowed_hosts
@@ -441,6 +459,116 @@ def on_add(self, host):
441459
RoundRobinPolicy.on_add(self, host)
442460

443461

462+
class HostFilterPolicy(LoadBalancingPolicy):
463+
"""
464+
A :class:`.LoadBalancingPolicy` subclass configured with a child policy,
465+
and a single-argument predicate. This policy defers to the child policy for
466+
hosts where ``predicate(host)`` is truthy. Hosts for which
467+
``predicate(host)`` is falsey will be considered :attr:`.IGNORED`, and will
468+
not be used in a query plan.
469+
470+
This can be used in the cases where you need a whitelist or blacklist
471+
policy, e.g. to prepare for decommissioning nodes or for testing:
472+
473+
.. code-block:: python
474+
475+
def address_is_ignored(host):
476+
return host.address in [ignored_address0, ignored_address1]
477+
478+
blacklist_filter_policy = HostFilterPolicy(
479+
child_policy=RoundRobinPolicy(),
480+
predicate=address_is_ignored
481+
)
482+
483+
cluster = Cluster(
484+
primary_host,
485+
load_balancing_policy=blacklist_filter_policy,
486+
)
487+
488+
Please note that whitelist and blacklist policies are not recommended for
489+
general, day-to-day use. You probably want something like
490+
:class:`.DCAwareRoundRobinPolicy`, which prefers a local DC but has
491+
fallbacks, over a brute-force method like whitelisting or blacklisting.
492+
"""
493+
494+
def __init__(self, child_policy, predicate):
495+
"""
496+
:param child_policy: an instantiated :class:`.LoadBalancingPolicy`
497+
that this one will defer to.
498+
:param predicate: a one-parameter function that takes a :class:`.Host`.
499+
If it returns a falsey value, the :class:`.Host` will
500+
be :attr:`.IGNORED` and not returned in query plans.
501+
"""
502+
super(HostFilterPolicy, self).__init__()
503+
self._child_policy = child_policy
504+
self._predicate = predicate
505+
506+
def on_up(self, host, *args, **kwargs):
507+
if self.predicate(host):
508+
return self._child_policy.on_up(host, *args, **kwargs)
509+
510+
def on_down(self, host, *args, **kwargs):
511+
if self.predicate(host):
512+
return self._child_policy.on_down(host, *args, **kwargs)
513+
514+
def on_add(self, host, *args, **kwargs):
515+
if self.predicate(host):
516+
return self._child_policy.on_add(host, *args, **kwargs)
517+
518+
def on_remove(self, host, *args, **kwargs):
519+
if self.predicate(host):
520+
return self._child_policy.on_remove(host, *args, **kwargs)
521+
522+
@property
523+
def predicate(self):
524+
"""
525+
A predicate, set on object initialization, that takes a :class:`.Host`
526+
and returns a value. If the value is falsy, the :class:`.Host` is
527+
:class:`~HostDistance.IGNORED`. If the value is truthy,
528+
:class:`.HostFilterPolicy` defers to the child policy to determine the
529+
host's distance.
530+
531+
This is a read-only value set in ``__init__``, implemented as a
532+
``property``.
533+
"""
534+
return self._predicate
535+
536+
def distance(self, host):
537+
"""
538+
Checks if ``predicate(host)``, then returns
539+
:attr:`~HostDistance.IGNORED` if falsey, and defers to the child policy
540+
otherwise.
541+
"""
542+
if self.predicate(host):
543+
return self._child_policy.distance(host)
544+
else:
545+
return HostDistance.IGNORED
546+
547+
def populate(self, cluster, hosts):
548+
self._child_policy.populate(
549+
cluster=cluster,
550+
hosts=[h for h in hosts if self.predicate(h)]
551+
)
552+
553+
def make_query_plan(self, working_keyspace=None, query=None):
554+
"""
555+
Defers to the child policy's
556+
:meth:`.LoadBalancingPolicy.make_query_plan`. Since host changes (up,
557+
down, addition, and removal) have not been propagated to the child
558+
policy, the child policy will only ever return policies for which
559+
:meth:`.predicate(host)` was truthy when that change occurred.
560+
"""
561+
child_qp = self._child_policy.make_query_plan(
562+
working_keyspace=working_keyspace, query=query
563+
)
564+
for host in child_qp:
565+
if self.predicate(host):
566+
yield host
567+
568+
def check_supported(self):
569+
return self._child_policy.check_supported()
570+
571+
444572
class ConvictionPolicy(object):
445573
"""
446574
A policy which decides when hosts should be considered down

docs/api/cassandra/policies.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ Load Balancing
2424
.. autoclass:: TokenAwarePolicy
2525
:members:
2626

27+
.. autoclass:: HostFilterPolicy
28+
29+
# we document these methods manually so we can specify a param to predicate
30+
31+
.. automethod:: predicate(host)
32+
.. automethod:: distance
33+
.. automethod:: make_query_plan
34+
2735
Translating Server Node Addresses
2836
---------------------------------
2937

tests/integration/long/test_loadbalancingpolicies.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from cassandra.concurrent import execute_concurrent_with_args
2424
from cassandra.metadata import murmur3
2525
from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy,
26-
TokenAwarePolicy, WhiteListRoundRobinPolicy)
26+
TokenAwarePolicy, WhiteListRoundRobinPolicy,
27+
HostFilterPolicy)
2728
from cassandra.query import SimpleStatement
2829

2930
from tests.integration import use_singledc, use_multidc, remove_cluster, PROTOCOL_VERSION
@@ -665,3 +666,37 @@ def test_white_list(self):
665666
pass
666667
finally:
667668
cluster.shutdown()
669+
670+
def test_black_list_with_host_filter_policy(self):
671+
use_singledc()
672+
keyspace = 'test_black_list_with_hfp'
673+
ignored_address = (IP_FORMAT % 2)
674+
hfp = HostFilterPolicy(
675+
child_policy=RoundRobinPolicy(),
676+
predicate=lambda host: host.address != ignored_address
677+
)
678+
cluster = Cluster(
679+
(IP_FORMAT % 1,),
680+
load_balancing_policy=hfp,
681+
protocol_version=PROTOCOL_VERSION,
682+
topology_event_refresh_window=0,
683+
status_event_refresh_window=0
684+
)
685+
self.addCleanup(cluster.shutdown)
686+
session = cluster.connect()
687+
self._wait_for_nodes_up([1, 2, 3])
688+
689+
self.assertNotIn(ignored_address, [h.address for h in hfp.make_query_plan()])
690+
691+
create_schema(cluster, session, keyspace)
692+
self._insert(session, keyspace)
693+
self._query(session, keyspace)
694+
695+
self.coordinator_stats.assert_query_count_equals(self, 1, 6)
696+
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
697+
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
698+
699+
# policy should not allow reconnecting to ignored host
700+
force_stop(2)
701+
self._wait_for_nodes_down([2])
702+
self.assertFalse(cluster.metadata._hosts[ignored_address].is_currently_reconnecting())

0 commit comments

Comments
 (0)