Skip to content

Commit 09ec127

Browse files
committed
CLOUDSTACK-6743: Use edge-triggering in MessageDetector to handle bogus wakeup gracefully. Level triggering plus bogus wakeup can cause a tight loop to spin
1 parent df6ce24 commit 09ec127

12 files changed

Lines changed: 522 additions & 18 deletions

File tree

api/src/org/apache/cloudstack/context/CallContext.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,20 @@ public Account getCallingAccount() {
114114
}
115115

116116
public static CallContext current() {
117-
return s_currentContext.get();
117+
CallContext context = s_currentContext.get();
118+
119+
// TODO other than async job and api dispatches, there are many system background running threads
120+
// that do not setup CallContext at all, however, many places in code that are touched by these background tasks
121+
// assume not-null CallContext. Following is a fix to address therefore caused NPE problems
122+
//
123+
// There are security implications with this. It assumes that all system background running threads are
124+
// indeed have no problem in running under system context.
125+
//
126+
if (context == null) {
127+
context = registerSystemCallContextOnceOnly();
128+
}
129+
130+
return context;
118131
}
119132

120133
/**

framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageDetector.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,31 @@
1818
*/
1919
package org.apache.cloudstack.framework.messagebus;
2020

21+
import org.apache.log4j.Logger;
22+
2123
public class MessageDetector implements MessageSubscriber {
24+
private static final Logger s_logger = Logger.getLogger(MessageDetector.class);
2225

2326
private MessageBus _messageBus;
2427
private String[] _subjects;
2528

26-
private volatile boolean _signalled = false;
27-
2829
public MessageDetector() {
2930
_messageBus = null;
3031
_subjects = null;
3132
}
3233

33-
public boolean waitAny(long timeoutInMiliseconds) {
34-
synchronized (this) {
35-
if (_signalled)
36-
return true;
34+
public void waitAny(long timeoutInMiliseconds) {
35+
if (timeoutInMiliseconds < 100) {
36+
s_logger.warn("waitAny is passed with a too short time-out interval. " + timeoutInMiliseconds + "ms");
37+
timeoutInMiliseconds = 100;
38+
}
3739

40+
synchronized (this) {
3841
try {
3942
wait(timeoutInMiliseconds);
4043
} catch (InterruptedException e) {
4144
}
4245
}
43-
return _signalled;
4446
}
4547

4648
public void open(MessageBus messageBus, String[] subjects) {
@@ -69,9 +71,20 @@ public void close() {
6971

7072
@Override
7173
public void onPublishMessage(String senderAddress, String subject, Object args) {
72-
synchronized (this) {
73-
_signalled = true;
74-
notifyAll();
74+
if (subjectMatched(subject)) {
75+
synchronized (this) {
76+
notifyAll();
77+
}
78+
}
79+
}
80+
81+
private boolean subjectMatched(String subject) {
82+
if (_subjects != null) {
83+
for (String sub : _subjects) {
84+
if (sub.equals(subject))
85+
return true;
86+
}
7587
}
88+
return false;
7689
}
7790
}

framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,7 @@ public void run() {
138138
try {
139139
int count = 0;
140140
while (count < 2) {
141-
if (detector.waitAny(1000)) {
142-
System.out.println("Detected signal on bus");
143-
count++;
144-
} else {
145-
System.out.println("Waiting timed out");
146-
}
141+
detector.waitAny(1000);
147142
}
148143
} finally {
149144
detector.close();

framework/jobs/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,21 @@
5656
<groupId>org.apache.cloudstack</groupId>
5757
<artifactId>cloud-framework-config</artifactId>
5858
<version>${project.version}</version>
59-
</dependency>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.apache.cloudstack</groupId>
62+
<artifactId>cloud-framework-events</artifactId>
63+
<version>${project.version}</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.apache.cloudstack</groupId>
67+
<artifactId>cloud-engine-schema</artifactId>
68+
<version>${project.version}</version>
69+
</dependency>
70+
<dependency>
71+
<groupId>commons-io</groupId>
72+
<artifactId>commons-io</artifactId>
73+
<scope>test</scope>
74+
</dependency>
6075
</dependencies>
6176
</project>
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
package org.apache.cloudstack.framework.jobs;
18+
19+
/*
20+
* This integration test requires real DB setup, it is not meant to run at per-build
21+
* basis, it can only be opened in developer's run
22+
*
23+
*
24+
25+
@RunWith(SpringJUnit4ClassRunner.class)
26+
@ContextConfiguration(locations = "classpath:/AsyncJobManagerTestContext.xml")
27+
public class AsyncJobManagerTest extends TestCase {
28+
private static final Logger s_logger =
29+
Logger.getLogger(AsyncJobManagerTest.class);
30+
31+
@Inject
32+
AsyncJobManager _jobMgr;
33+
34+
@Inject
35+
AsyncJobTestDashboard _testDashboard;
36+
37+
@Override
38+
@Before
39+
public void setUp() throws Exception {
40+
try {
41+
ComponentContext.initComponentsLifeCycle();
42+
} catch (Exception ex) {
43+
ex.printStackTrace();
44+
s_logger.error(ex.getMessage());
45+
}
46+
}
47+
48+
@Override
49+
@After
50+
public void tearDown() throws Exception {
51+
}
52+
53+
public void testWaitBehave() {
54+
55+
final Object me = this;
56+
new Thread(new Runnable() {
57+
58+
@Override
59+
public void run() {
60+
s_logger.info("Sleeping...");
61+
try {
62+
Thread.sleep(3000);
63+
} catch (InterruptedException e) {
64+
}
65+
66+
s_logger.info("wakeup");
67+
synchronized (me) {
68+
me.notifyAll();
69+
}
70+
}
71+
72+
}).start();
73+
74+
s_logger.info("First wait");
75+
synchronized (me) {
76+
try {
77+
wait(5000);
78+
} catch (InterruptedException e) {
79+
// TODO Auto-generated catch block
80+
e.printStackTrace();
81+
}
82+
}
83+
s_logger.info("First wait done");
84+
85+
s_logger.info("Second wait");
86+
synchronized (me) {
87+
try {
88+
wait(5000);
89+
} catch (InterruptedException e) {
90+
// TODO Auto-generated catch block
91+
e.printStackTrace();
92+
}
93+
}
94+
s_logger.info("Second wait done");
95+
}
96+
97+
@Test
98+
public void test() {
99+
final int TOTAL_JOBS_PER_QUEUE = 5;
100+
final int TOTAL_QUEUES = 100;
101+
102+
for (int i = 0; i < TOTAL_QUEUES; i++) {
103+
for (int j = 0; j < TOTAL_JOBS_PER_QUEUE; j++) {
104+
AsyncJobVO job = new AsyncJobVO();
105+
job.setCmd("TestCmd");
106+
job.setDispatcher("TestJobDispatcher");
107+
job.setCmdInfo("TestCmd info");
108+
109+
_jobMgr.submitAsyncJob(job, "fakequeue", i);
110+
111+
s_logger.info("Job submitted. job " + job.getId() + ", queue: " + i);
112+
}
113+
}
114+
115+
while (true) {
116+
if (_testDashboard.getCompletedJobCount() == TOTAL_JOBS_PER_QUEUE * TOTAL_QUEUES)
117+
break;
118+
119+
try {
120+
Thread.sleep(1000);
121+
} catch (InterruptedException e) {
122+
}
123+
}
124+
125+
s_logger.info("Test done with " + _testDashboard.getCompletedJobCount() + " job executed");
126+
}
127+
}
128+
129+
*/
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.jobs;
20+
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
24+
import org.apache.cloudstack.framework.config.ConfigDepot;
25+
import org.apache.cloudstack.framework.config.ScopedConfigStorage;
26+
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
27+
import org.apache.cloudstack.framework.config.dao.ConfigurationDaoImpl;
28+
import org.apache.cloudstack.framework.config.impl.ConfigDepotImpl;
29+
30+
import com.cloud.storage.dao.StoragePoolDetailsDaoImpl;
31+
32+
@Configuration
33+
public class AsyncJobManagerTestConfiguration {
34+
35+
@Bean
36+
public ConfigDepot configDepot() {
37+
return new ConfigDepotImpl();
38+
}
39+
40+
@Bean
41+
public ConfigurationDao configDao() {
42+
return new ConfigurationDaoImpl();
43+
}
44+
45+
@Bean
46+
public ScopedConfigStorage scopedConfigStorage() {
47+
return new StoragePoolDetailsDaoImpl();
48+
}
49+
50+
@Bean
51+
public AsyncJobTestDashboard testDashboard() {
52+
return new AsyncJobTestDashboard();
53+
}
54+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.jobs;
20+
21+
public class AsyncJobTestDashboard {
22+
int _completedJobCount = 0;
23+
int _concurrencyCount = 0;
24+
25+
public AsyncJobTestDashboard() {
26+
}
27+
28+
public synchronized int getCompletedJobCount() {
29+
return _completedJobCount;
30+
}
31+
32+
public synchronized void jobCompleted() {
33+
_completedJobCount++;
34+
}
35+
36+
public synchronized int getConcurrencyCount() {
37+
return _concurrencyCount;
38+
}
39+
40+
public synchronized void increaseConcurrency() {
41+
_concurrencyCount++;
42+
}
43+
44+
public synchronized void decreaseConcurrency() {
45+
_concurrencyCount--;
46+
}
47+
}

0 commit comments

Comments
 (0)