1515# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1616# License for the specific language governing permissions and limitations
1717# under the License.
18+ from taskflow .patterns import linear_flow
19+
1820from billingstack import exceptions
1921from billingstack import tasks
2022from billingstack .collector import states
2123from billingstack .openstack .common import log
2224from billingstack .payment_gateway import get_provider
23- from billingstack .taskflow .patterns import linear_flow , threaded_flow
2425
2526
2627ACTION = 'gateway_configuration:create'
3132class EntryCreateTask (tasks .RootTask ):
3233 def __init__ (self , storage , ** kw ):
3334 super (EntryCreateTask , self ).__init__ (** kw )
34- self .requires .update (['gateway_config' ])
35- self .provides .update (['gateway_config' ])
3635 self .storage = storage
3736
38- def __call__ (self , context , gateway_config ):
39- gateway_config ['state' ] = states .VERIFYING
40- values = self .storage .create_pg_config (context , gateway_config )
41- return {'gateway_config' : values }
42-
43-
44- class ThreadStartTask (tasks .RootTask ):
45- """
46- This is the end of the current flow, we'll fire off a new threaded flow
47- that does stuff towards the actual Gateway which may include blocking code.
48- """
49- def __init__ (self , storage , ** kw ):
50- super (ThreadStartTask , self ).__init__ (** kw )
51- self .requires .update (['gateway_config' ])
52- self .storage = storage
53-
54- def __call__ (self , ctxt , gateway_config ):
55- flow = threaded_flow .Flow (ACTION + ':backend' )
56- flow .add (tasks .ValuesInjectTask ({'gateway_config' : gateway_config }))
57- flow .add (PrerequirementsTask (self .storage ))
58- flow .add (BackendVerifyTask (self .storage ))
59- flow .run (ctxt )
37+ def execute (self , ctxt , values ):
38+ values ['state' ] = states .VERIFYING
39+ return self .storage .create_pg_config (ctxt , values )
6040
6141
6242class PrerequirementsTask (tasks .RootTask ):
@@ -65,20 +45,11 @@ class PrerequirementsTask(tasks.RootTask):
6545 """
6646 def __init__ (self , storage , ** kw ):
6747 super (PrerequirementsTask , self ).__init__ (** kw )
68- self .requires .update (['gateway_config' ])
69- self .provides .update ([
70- 'gateway_config' ,
71- 'gateway_provider'
72- ])
7348 self .storage = storage
7449
75- def __call__ (self , ctxt , gateway_config ):
76- gateway_provider = self .storage .get_pg_provider (
77- gateway_config ['providedr_id' ])
78- return {
79- 'gateway_config' : gateway_config ,
80- 'gateway_provider' : gateway_provider
81- }
50+ def execute (self , ctxt , gateway_config ):
51+ return self .storage .get_pg_provider (
52+ ctxt , gateway_config ['provider_id' ])
8253
8354
8455class BackendVerifyTask (tasks .RootTask ):
@@ -92,11 +63,10 @@ class BackendVerifyTask(tasks.RootTask):
9263 """
9364 def __init__ (self , storage , ** kw ):
9465 super (BackendVerifyTask , self ).__init__ (** kw )
95- self .requires .update (['gateway_config' , 'gateway_provider' ])
9666 self .storage = storage
9767
98- def __call__ (self , ctxt , gateway_config , gateway_provider ):
99- gateway_provider_cls = get_provider [ gateway_provider ['name' ]]
68+ def execute (self , ctxt , gateway_config , gateway_provider ):
69+ gateway_provider_cls = get_provider ( gateway_provider ['name' ])
10070 gateway_provider_obj = gateway_provider_cls (gateway_config )
10171
10272 try :
@@ -109,14 +79,19 @@ def __call__(self, ctxt, gateway_config, gateway_provider):
10979 ctxt , gateway_config ['id' ], {'state' : states .ACTIVE })
11080
11181
112- def create_flow (storage , values ):
113- flow = linear_flow .Flow (ACTION )
82+ def create_flow (storage ):
83+ flow = linear_flow .Flow (ACTION + ':initial' )
84+
85+ entry_task = EntryCreateTask (
86+ storage , provides = 'gateway_config' , prefix = ACTION )
87+ flow .add (entry_task )
11488
115- flow .add (tasks .ValuesInjectTask (
116- {'gateway_config' : values },
117- prefix = ACTION + ':initial' ))
89+ backend_flow = linear_flow .Flow (ACTION + ':backend' )
90+ prereq_task = PrerequirementsTask (
91+ storage , provides = 'gateway_provider' , prefix = ACTION )
92+ backend_flow .add (prereq_task )
93+ backend_flow .add (BackendVerifyTask (storage , prefix = ACTION ))
11894
119- entry_task = EntryCreateTask (storage , prefix = ACTION )
120- entry_task_id = flow .add (entry_task )
95+ flow .add (backend_flow )
12196
122- return entry_task_id , tasks . _attach_debug_listeners ( flow )
97+ return flow
0 commit comments