Skip to content

Commit cd8f0b1

Browse files
committed
Bring code up to speed.
* Use taskflow as a library * Move requires to root * Fix git path * Update oslo Change-Id: Iae8329a639e26881fbc3286479a429ae75149493
1 parent e9d2aac commit cd8f0b1

56 files changed

Lines changed: 778 additions & 3252 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

billingstack/central/flows/merchant.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# Author: Endre Karlson <endre.karlson@hp.com>
66
#
77
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8-
# not use this file except in compliance with the License. You may obtain
8+
# not use this file except in co68mpliance with the License. You may obtain
99
# a copy of the License at
1010
#
1111
# http://www.apache.org/licenses/LICENSE-2.0
@@ -15,9 +15,10 @@
1515
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1616
# License for the specific language governing permissions and limitations
1717
# under the License.
18+
from taskflow.patterns import linear_flow
19+
1820
from billingstack import tasks
1921
from billingstack.openstack.common import log
20-
from billingstack.taskflow.patterns import linear_flow
2122

2223
ACTION = 'merchant:create'
2324

@@ -27,23 +28,16 @@
2728
class EntryCreateTask(tasks.RootTask):
2829
def __init__(self, storage, **kw):
2930
super(EntryCreateTask, self).__init__(**kw)
30-
self.requires.update(['merchant'])
31-
self.provides.update(['merchant'])
3231
self.storage = storage
3332

34-
def __call__(self, context, merchant):
35-
values = self.storage.create_merchant(context, merchant)
36-
return {'merchant': values}
33+
def execute(self, ctxt, values):
34+
return self.storage.create_merchant(ctxt, values)
3735

3836

39-
def create_flow(storage, values):
37+
def create_flow(storage):
4038
flow = linear_flow.Flow(ACTION)
4139

42-
flow.add(tasks.ValuesInjectTask(
43-
{'merchant': values},
44-
prefix=ACTION + ':initial'))
45-
46-
entry_task = EntryCreateTask(storage, prefix=ACTION)
47-
entry_task_id = flow.add(entry_task)
40+
entry_task = EntryCreateTask(storage, provides='merchant', prefix=ACTION)
41+
flow.add(entry_task)
4842

49-
return entry_task_id, tasks._attach_debug_listeners(flow)
43+
return flow

billingstack/central/service.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import sys
1717

1818
from oslo.config import cfg
19+
from taskflow.engines import run as run_flow
20+
21+
1922
from billingstack.openstack.common import log as logging
2023
from billingstack.openstack.common.rpc import service as rpc_service
2124
from billingstack.openstack.common import service as os_service
@@ -103,9 +106,10 @@ def get_pg_provider(self, ctxt, pgp_id):
103106

104107
# Merchant
105108
def create_merchant(self, ctxt, values):
106-
id_, flow = merchant.create_flow(self.storage_conn, values)
107-
flow.run(ctxt)
108-
return flow.results[id_]['merchant']
109+
flow = merchant.create_flow(self.storage_conn)
110+
result = run_flow(flow, engine_conf="parallel",
111+
store={'values': values, 'ctxt': ctxt})
112+
return result['merchant']
109113

110114
def list_merchants(self, ctxt, **kw):
111115
return self.storage_conn.list_merchants(ctxt, **kw)

billingstack/collector/flows/gateway_configuration.py

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1616
# License for the specific language governing permissions and limitations
1717
# under the License.
18+
from taskflow.patterns import linear_flow
19+
1820
from billingstack import exceptions
1921
from billingstack import tasks
2022
from billingstack.collector import states
2123
from billingstack.openstack.common import log
2224
from billingstack.payment_gateway import get_provider
23-
from billingstack.taskflow.patterns import linear_flow, threaded_flow
2425

2526

2627
ACTION = 'gateway_configuration:create'
@@ -31,32 +32,11 @@
3132
class EntryCreateTask(tasks.RootTask):
3233
def __init__(self, storage, **kw):
3334
super(EntryCreateTask, self).__init__(**kw)
34-
self.requires.update(['gateway_config'])
35-
self.provides.update(['gateway_config'])
3635
self.storage = storage
3736

38-
def __call__(self, context, gateway_config):
39-
gateway_config['state'] = states.VERIFYING
40-
values = self.storage.create_pg_config(context, gateway_config)
41-
return {'gateway_config': values}
42-
43-
44-
class ThreadStartTask(tasks.RootTask):
45-
"""
46-
This is the end of the current flow, we'll fire off a new threaded flow
47-
that does stuff towards the actual Gateway which may include blocking code.
48-
"""
49-
def __init__(self, storage, **kw):
50-
super(ThreadStartTask, self).__init__(**kw)
51-
self.requires.update(['gateway_config'])
52-
self.storage = storage
53-
54-
def __call__(self, ctxt, gateway_config):
55-
flow = threaded_flow.Flow(ACTION + ':backend')
56-
flow.add(tasks.ValuesInjectTask({'gateway_config': gateway_config}))
57-
flow.add(PrerequirementsTask(self.storage))
58-
flow.add(BackendVerifyTask(self.storage))
59-
flow.run(ctxt)
37+
def execute(self, ctxt, values):
38+
values['state'] = states.VERIFYING
39+
return self.storage.create_pg_config(ctxt, values)
6040

6141

6242
class PrerequirementsTask(tasks.RootTask):
@@ -65,20 +45,11 @@ class PrerequirementsTask(tasks.RootTask):
6545
"""
6646
def __init__(self, storage, **kw):
6747
super(PrerequirementsTask, self).__init__(**kw)
68-
self.requires.update(['gateway_config'])
69-
self.provides.update([
70-
'gateway_config',
71-
'gateway_provider'
72-
])
7348
self.storage = storage
7449

75-
def __call__(self, ctxt, gateway_config):
76-
gateway_provider = self.storage.get_pg_provider(
77-
gateway_config['providedr_id'])
78-
return {
79-
'gateway_config': gateway_config,
80-
'gateway_provider': gateway_provider
81-
}
50+
def execute(self, ctxt, gateway_config):
51+
return self.storage.get_pg_provider(
52+
ctxt, gateway_config['provider_id'])
8253

8354

8455
class BackendVerifyTask(tasks.RootTask):
@@ -92,11 +63,10 @@ class BackendVerifyTask(tasks.RootTask):
9263
"""
9364
def __init__(self, storage, **kw):
9465
super(BackendVerifyTask, self).__init__(**kw)
95-
self.requires.update(['gateway_config', 'gateway_provider'])
9666
self.storage = storage
9767

98-
def __call__(self, ctxt, gateway_config, gateway_provider):
99-
gateway_provider_cls = get_provider[gateway_provider['name']]
68+
def execute(self, ctxt, gateway_config, gateway_provider):
69+
gateway_provider_cls = get_provider(gateway_provider['name'])
10070
gateway_provider_obj = gateway_provider_cls(gateway_config)
10171

10272
try:
@@ -109,14 +79,19 @@ def __call__(self, ctxt, gateway_config, gateway_provider):
10979
ctxt, gateway_config['id'], {'state': states.ACTIVE})
11080

11181

112-
def create_flow(storage, values):
113-
flow = linear_flow.Flow(ACTION)
82+
def create_flow(storage):
83+
flow = linear_flow.Flow(ACTION + ':initial')
84+
85+
entry_task = EntryCreateTask(
86+
storage, provides='gateway_config', prefix=ACTION)
87+
flow.add(entry_task)
11488

115-
flow.add(tasks.ValuesInjectTask(
116-
{'gateway_config': values},
117-
prefix=ACTION + ':initial'))
89+
backend_flow = linear_flow.Flow(ACTION + ':backend')
90+
prereq_task = PrerequirementsTask(
91+
storage, provides='gateway_provider', prefix=ACTION)
92+
backend_flow.add(prereq_task)
93+
backend_flow.add(BackendVerifyTask(storage, prefix=ACTION))
11894

119-
entry_task = EntryCreateTask(storage, prefix=ACTION)
120-
entry_task_id = flow.add(entry_task)
95+
flow.add(backend_flow)
12196

122-
return entry_task_id, tasks._attach_debug_listeners(flow)
97+
return flow

billingstack/collector/flows/payment_method.py

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
1616
# License for the specific language governing permissions and limitations
1717
# under the License.
18+
from taskflow.patterns import linear_flow
19+
1820
from billingstack import exceptions
1921
from billingstack import tasks
2022
from billingstack.collector import states
2123
from billingstack.openstack.common import log
2224
from billingstack.payment_gateway import get_provider
23-
from billingstack.taskflow.patterns import linear_flow, threaded_flow
2425

2526

2627
ACTION = 'payment_method:create'
@@ -34,34 +35,11 @@ class EntryCreateTask(tasks.RootTask):
3435
"""
3536
def __init__(self, storage, **kw):
3637
super(EntryCreateTask, self).__init__(**kw)
37-
self.requires.update(['payment_method'])
38-
self.provides.update(['payment_method'])
39-
self.storage = storage
40-
41-
def __call__(self, ctxt, payment_method):
42-
payment_method['state'] = states.PENDING
43-
values = self.storage.create_payment_method(ctxt, payment_method)
44-
return {'payment_method': values}
45-
46-
47-
class ThreadStartTask(tasks.RootTask):
48-
"""
49-
This is the end of the current flow, we'll fire off a new threaded flow
50-
that does stuff towards the actual Gateway which may include blocking code.
51-
52-
This fires off a new flow that is threaded / greenthreads?
53-
"""
54-
def __init__(self, storage, **kw):
55-
super(ThreadStartTask, self).__init__(**kw)
56-
self.requires.update(['payment_method'])
5738
self.storage = storage
5839

59-
def __call__(self, ctxt, payment_method):
60-
flow = threaded_flow.Flow(ACTION + ':backend')
61-
flow.add(tasks.ValuesInjectTask({'payment_method': payment_method}))
62-
flow.add(PrerequirementsTask(self.storage))
63-
flow.add(BackendCreateTask(self.storage))
64-
flow.run(ctxt)
40+
def execute(self, ctxt, values):
41+
values['state'] = states.PENDING
42+
return self.storage.create_payment_method(ctxt, values)
6543

6644

6745
class PrerequirementsTask(tasks.RootTask):
@@ -70,33 +48,23 @@ class PrerequirementsTask(tasks.RootTask):
7048
"""
7149
def __init__(self, storage, **kw):
7250
super(PrerequirementsTask, self).__init__(**kw)
73-
self.requires.update(['payment_method'])
74-
self.provides.update([
75-
'payment_method',
76-
'gateway_config',
77-
'gateway_provider'])
7851
self.storage = storage
7952

80-
def __call__(self, ctxt, **kw):
81-
kw['gateway_config'] = self.storage.get_pg_config(
82-
ctxt, kw['payment_method']['provider_config_id'])
83-
84-
kw['gateway_provider'] = self.storage.get_pg_provider(
85-
ctxt, kw['gateway_config']['provider_id'])
86-
87-
return kw
53+
def execute(self, ctxt, values):
54+
data = {}
55+
data['gateway_config'] = self.storage.get_pg_config(
56+
ctxt, values['provider_config_id'])
57+
data['gateway_provider'] = self.storage.get_pg_provider(
58+
ctxt, data['gateway_config']['provider_id'])
59+
return data
8860

8961

9062
class BackendCreateTask(tasks.RootTask):
9163
def __init__(self, storage, **kw):
9264
super(BackendCreateTask, self).__init__(**kw)
93-
self.requires.update([
94-
'payment_method',
95-
'gateway_config',
96-
'gateway_provider'])
9765
self.storage = storage
9866

99-
def __call__(self, ctxt, payment_method, gateway_config, gateway_provider):
67+
def execute(self, ctxt, payment_method, gateway_config, gateway_provider):
10068
gateway_provider_cls = get_provider(gateway_provider['name'])
10169
gateway_provider_obj = gateway_provider_cls(gateway_config)
10270

@@ -110,19 +78,26 @@ def __call__(self, ctxt, payment_method, gateway_config, gateway_provider):
11078
raise
11179

11280

113-
def create_flow(storage, payment_method):
81+
def create_flow(storage):
11482
"""
11583
The flow for the service to start
11684
"""
11785
flow = linear_flow.Flow(ACTION + ':initial')
11886

119-
flow.add(tasks.ValuesInjectTask(
120-
{'payment_method': payment_method},
121-
prefix=ACTION))
87+
entry_task = EntryCreateTask(storage, provides='payment_method',
88+
prefix=ACTION)
89+
flow.add(entry_task)
12290

123-
entry_task = EntryCreateTask(storage, prefix=ACTION)
124-
entry_task_id = flow.add(entry_task)
91+
backend_flow = linear_flow.Flow(ACTION + ':backend')
92+
prereq_task = PrerequirementsTask(
93+
storage,
94+
provides=set([
95+
'gateway_config',
96+
'gateway_provider']),
97+
prefix=ACTION)
98+
backend_flow.add(prereq_task)
99+
backend_flow.add(BackendCreateTask(storage, prefix=ACTION))
125100

126-
flow.add(ThreadStartTask(storage, prefix=ACTION))
101+
flow.add(backend_flow)
127102

128-
return entry_task_id, tasks._attach_debug_listeners(flow)
103+
return flow

billingstack/collector/service.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import sys
2121

2222
from oslo.config import cfg
23+
from taskflow.engines import run as run_flow
24+
2325
from billingstack.openstack.common import log as logging
2426
from billingstack.openstack.common.rpc import service as rpc_service
2527
from billingstack.openstack.common import service as os_service
@@ -64,10 +66,9 @@ def list_pg_providers(self, ctxt, **kw):
6466

6567
# PGC
6668
def create_pg_config(self, ctxt, values):
67-
id_, flow = gateway_configuration.create_flow(
68-
self.storage_conn, values)
69-
flow.run(ctxt)
70-
return flow.results[id_]['gateway_config']
69+
flow = gateway_configuration.create_flow(self.storage_conn)
70+
results = run_flow(flow, store={'values': values, 'ctxt': ctxt})
71+
return results['gateway_config']
7172

7273
def list_pg_configs(self, ctxt, **kw):
7374
return self.storage_conn.list_pg_configs(ctxt, **kw)
@@ -83,10 +84,9 @@ def delete_pg_config(self, ctxt, id_):
8384

8485
# PM
8586
def create_payment_method(self, ctxt, values):
86-
id_, flow = payment_method.create_flow(
87-
self.storage_conn, values)
88-
flow.run(ctxt)
89-
return flow.results[id_]['payment_method']
87+
flow = payment_method.create_flow(self.storage_conn)
88+
results = run_flow(flow, store={'values': values, 'ctxt': ctxt})
89+
return results['payment_method']
9090

9191
def list_payment_methods(self, ctxt, **kw):
9292
return self.storage_conn.list_payment_methods(ctxt, **kw)

billingstack/openstack/common/context.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,15 @@ class RequestContext(object):
4040
"""
4141

4242
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
43-
read_only=False, show_deleted=False, request_id=None):
43+
read_only=False, show_deleted=False, request_id=None,
44+
instance_uuid=None):
4445
self.auth_token = auth_token
4546
self.user = user
4647
self.tenant = tenant
4748
self.is_admin = is_admin
4849
self.read_only = read_only
4950
self.show_deleted = show_deleted
51+
self.instance_uuid = instance_uuid
5052
if not request_id:
5153
request_id = generate_request_id()
5254
self.request_id = request_id
@@ -58,7 +60,8 @@ def to_dict(self):
5860
'read_only': self.read_only,
5961
'show_deleted': self.show_deleted,
6062
'auth_token': self.auth_token,
61-
'request_id': self.request_id}
63+
'request_id': self.request_id,
64+
'instance_uuid': self.instance_uuid}
6265

6366

6467
def get_admin_context(show_deleted=False):

0 commit comments

Comments
 (0)