forked from facebookarchive/AsyncDisplayKit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathASRunLoopQueue.mm
More file actions
132 lines (110 loc) · 3.55 KB
/
ASRunLoopQueue.mm
File metadata and controls
132 lines (110 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//
// ASRunLoopQueue.m
// AsyncDisplayKit
//
// Created by Rahul Malik on 3/7/16.
// Copyright © 2016 Facebook. All rights reserved.
//
#import "ASRunLoopQueue.h"
#import "ASThread.h"
#import <cstdlib>
#import <deque>
static void runLoopSourceCallback(void *info) {
// No-op
}
@interface ASRunLoopQueue () {
CFRunLoopRef _runLoop;
CFRunLoopObserverRef _runLoopObserver;
CFRunLoopSourceRef _runLoopSource;
std::deque<id> _internalQueue;
ASDN::RecursiveMutex _internalQueueLock;
}
@property (nonatomic, copy) void (^queueConsumer)(id dequeuedItem, BOOL isQueueDrained);
@end
@implementation ASRunLoopQueue
- (instancetype)initWithRunLoop:(CFRunLoopRef)runloop andHandler:(void(^)(id dequeuedItem, BOOL isQueueDrained))handlerBlock
{
if (self = [super init]) {
_runLoop = runloop;
_internalQueue = std::deque<id>();
_queueConsumer = [handlerBlock copy];
_batchSize = 1;
void (^handlerBlock) (CFRunLoopObserverRef observer, CFRunLoopActivity activity) = ^(CFRunLoopObserverRef observer, CFRunLoopActivity activity) {
[self processQueue];
};
_runLoopObserver = CFRunLoopObserverCreateWithHandler(NULL, kCFRunLoopBeforeWaiting, true, 0, handlerBlock);
CFRunLoopAddObserver(_runLoop, _runLoopObserver, kCFRunLoopCommonModes);
// It is not guaranteed that the runloop will turn if it has no scheduled work, and this causes processing of
// the queue to stop. Attaching a custom loop source to the run loop and signal it if new work needs to be done
CFRunLoopSourceContext *runLoopSourceContext = (CFRunLoopSourceContext *)calloc(1, sizeof(CFRunLoopSourceContext));
runLoopSourceContext->perform = runLoopSourceCallback;
_runLoopSource = CFRunLoopSourceCreate(NULL, 0, runLoopSourceContext);
CFRunLoopAddSource(runloop, _runLoopSource, kCFRunLoopCommonModes);
free(runLoopSourceContext);
}
return self;
}
- (void)dealloc
{
if (CFRunLoopContainsSource(_runLoop, _runLoopSource, kCFRunLoopCommonModes)) {
CFRunLoopRemoveSource(_runLoop, _runLoopSource, kCFRunLoopCommonModes);
}
CFRelease(_runLoopSource);
_runLoopSource = nil;
if (CFRunLoopObserverIsValid(_runLoopObserver)) {
CFRunLoopObserverInvalidate(_runLoopObserver);
}
CFRelease(_runLoopObserver);
_runLoopObserver = nil;
}
- (void)processQueue
{
std::deque<id> itemsToProcess = std::deque<id>();
BOOL isQueueDrained = NO;
{
ASDN::MutexLocker l(_internalQueueLock);
// Early-exit if the queue is empty.
if (_internalQueue.empty()) {
return;
}
// Snatch the next batch of items.
NSUInteger totalNodeCount = _internalQueue.size();
for (int i = 0; i < MIN(self.batchSize, totalNodeCount); i++) {
id node = _internalQueue[0];
itemsToProcess.push_back(node);
_internalQueue.pop_front();
}
if (_internalQueue.empty()) {
isQueueDrained = YES;
}
}
unsigned long numberOfItems = itemsToProcess.size();
for (int i = 0; i < numberOfItems; i++) {
if (isQueueDrained && i == numberOfItems - 1) {
self.queueConsumer(itemsToProcess[i], YES);
} else {
self.queueConsumer(itemsToProcess[i], isQueueDrained);
}
}
}
- (void)enqueue:(id)object
{
if (!object) {
return;
}
ASDN::MutexLocker l(_internalQueueLock);
// Check if the object exists.
BOOL foundObject = NO;
for (id currentObject : _internalQueue) {
if (currentObject == object) {
foundObject = YES;
break;
}
}
if (!foundObject) {
_internalQueue.push_back(object);
CFRunLoopSourceSignal(_runLoopSource);
CFRunLoopWakeUp(_runLoop);
}
}
@end