-
Notifications
You must be signed in to change notification settings - Fork 126
Expand file tree
/
Copy pathByteBufferProxy.java
More file actions
339 lines (299 loc) · 11.9 KB
/
ByteBufferProxy.java
File metadata and controls
339 lines (299 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
* Copyright © 2016-2025 The LmdbJava Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lmdbjava;
import static java.lang.Long.reverseBytes;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.ByteBuffer.allocateDirect;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Objects.requireNonNull;
import static org.lmdbjava.Env.SHOULD_CHECK;
import static org.lmdbjava.UnsafeAccess.UNSAFE;
import java.lang.reflect.Field;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.Comparator;
import jnr.ffi.Pointer;
/**
* {@link ByteBuffer}-based proxy.
*
* <p>There are two concrete {@link ByteBuffer} proxy implementations available:
*
* <ul>
* <li>A "fast" implementation: {@link UnsafeProxy}
* <li>A "safe" implementation: {@link ReflectiveProxy}
* </ul>
*
* <p>Users nominate which implementation they prefer by referencing the {@link #PROXY_OPTIMAL} or
* {@link #PROXY_SAFE} field when invoking {@link Env#create(org.lmdbjava.BufferProxy)}.
*/
public final class ByteBufferProxy {
/**
* The fastest {@link ByteBuffer} proxy that is available on this platform. This will always be
* the same instance as {@link #PROXY_SAFE} if the {@link UnsafeAccess#DISABLE_UNSAFE_PROP} has
* been set to <code>true</code> and/or {@link UnsafeAccess} is unavailable. Guaranteed to never
* be null.
*/
public static final BufferProxy<ByteBuffer> PROXY_OPTIMAL;
/** The safe, reflective {@link ByteBuffer} proxy for this system. Guaranteed to never be null. */
public static final BufferProxy<ByteBuffer> PROXY_SAFE;
private static final ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
static {
PROXY_SAFE = new ReflectiveProxy();
PROXY_OPTIMAL = getProxyOptimal();
}
private ByteBufferProxy() {}
private static BufferProxy<ByteBuffer> getProxyOptimal() {
try {
return new UnsafeProxy();
} catch (final RuntimeException e) {
return PROXY_SAFE;
}
}
/** The buffer must be a direct buffer (not heap allocated). */
public static final class BufferMustBeDirectException extends LmdbException {
private static final long serialVersionUID = 1L;
/** Creates a new instance. */
public BufferMustBeDirectException() {
super("The buffer must be a direct buffer (not heap allocated");
}
}
/**
* Provides {@link ByteBuffer} pooling and address resolution for concrete {@link BufferProxy}
* implementations.
*/
abstract static class AbstractByteBufferProxy extends BufferProxy<ByteBuffer> {
protected static final String FIELD_NAME_ADDRESS = "address";
protected static final String FIELD_NAME_CAPACITY = "capacity";
/**
* A thread-safe pool for a given length. If the buffer found is valid (ie not of a negative
* length) then that buffer is used. If no valid buffer is found, a new buffer is created.
*/
private static final ThreadLocal<ArrayDeque<ByteBuffer>> BUFFERS =
withInitial(() -> new ArrayDeque<>(16));
/**
* Lexicographically compare two buffers.
*
* @param o1 left operand (required)
* @param o2 right operand (required)
* @return as specified by {@link Comparable} interface
*/
public static int compareLexicographically(final ByteBuffer o1, final ByteBuffer o2) {
requireNonNull(o1);
requireNonNull(o2);
final int minLength = Math.min(o1.limit(), o2.limit());
final int minWords = minLength / Long.BYTES;
final boolean reverse1 = o1.order() == LITTLE_ENDIAN;
final boolean reverse2 = o2.order() == LITTLE_ENDIAN;
for (int i = 0; i < minWords * Long.BYTES; i += Long.BYTES) {
final long lw = reverse1 ? reverseBytes(o1.getLong(i)) : o1.getLong(i);
final long rw = reverse2 ? reverseBytes(o2.getLong(i)) : o2.getLong(i);
final int diff = Long.compareUnsigned(lw, rw);
if (diff != 0) {
return diff;
}
}
for (int i = minWords * Long.BYTES; i < minLength; i++) {
final int lw = Byte.toUnsignedInt(o1.get(i));
final int rw = Byte.toUnsignedInt(o2.get(i));
final int result = Integer.compareUnsigned(lw, rw);
if (result != 0) {
return result;
}
}
return o1.remaining() - o2.remaining();
}
/**
* Buffer comparator specifically for 4/8 byte keys that are unsigned ints/longs, i.e. when
* using MDB_INTEGER_KEY/MDB_INTEGERDUP. Compares the buffers numerically.
*
* @param o1 left operand (required)
* @param o2 right operand (required)
* @return as specified by {@link Comparable} interface
*/
public static int compareAsIntegerKeys(final ByteBuffer o1, final ByteBuffer o2) {
requireNonNull(o1);
requireNonNull(o2);
// Both buffers should be same length according to LMDB API.
// From the LMDB docs for MDB_INTEGER_KEY
// numeric keys in native byte order: either unsigned int or size_t. The keys must all be of
// the same size.
final int len1 = o1.limit();
final int len2 = o2.limit();
if (len1 != len2) {
throw new RuntimeException(
"Length mismatch, len1: "
+ len1
+ ", len2: "
+ len2
+ ". Lengths must be identical and either 4 or 8 bytes.");
}
// Keys for MDB_INTEGER_KEY are written in native order so ensure we read them in that order
o1.order(NATIVE_ORDER);
o2.order(NATIVE_ORDER);
// TODO it might be worth the DbiBuilder having a method to capture fixedKeyLength() or -1
// for variable length keys. This can be passed to getComparator(..) so it can return a
// comparator that doesn't need to test the length every time. There may be other benefits
// to the Dbi knowing the key length if it is fixed.
if (len1 == 8) {
final long lw = o1.getLong(0);
final long rw = o2.getLong(0);
return Long.compareUnsigned(lw, rw);
} else if (len1 == 4) {
final int lw = o1.getInt(0);
final int rw = o2.getInt(0);
return Integer.compareUnsigned(lw, rw);
} else {
// size_t and int are likely to be 8bytes and 4bytes respectively on 64bit.
// If 32bit then would be 4/2 respectively.
// Short.compareUnsigned is not available in Java8.
// For now just fall back to our standard comparator
return compareLexicographically(o1, o2);
}
}
static Field findField(final Class<?> c, final String name) {
Class<?> clazz = c;
do {
try {
final Field field = clazz.getDeclaredField(name);
field.setAccessible(true);
return field;
} catch (final NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
} while (clazz != null);
throw new LmdbException(name + " not found");
}
protected final long address(final ByteBuffer buffer) {
if (SHOULD_CHECK && !buffer.isDirect()) {
throw new BufferMustBeDirectException();
}
return ((sun.nio.ch.DirectBuffer) buffer).address() + buffer.position();
}
@Override
protected final ByteBuffer allocate() {
final ArrayDeque<ByteBuffer> queue = BUFFERS.get();
final ByteBuffer buffer = queue.poll();
if (buffer != null && buffer.capacity() >= 0) {
return buffer;
} else {
return allocateDirect(0);
}
}
@Override
public Comparator<ByteBuffer> getComparator(final DbiFlagSet dbiFlagSet) {
if (dbiFlagSet.areAnySet(DbiFlagSet.INTEGER_KEY_FLAGS)) {
return AbstractByteBufferProxy::compareAsIntegerKeys;
} else {
return AbstractByteBufferProxy::compareLexicographically;
}
}
@Override
protected final void deallocate(final ByteBuffer buff) {
buff.order(BIG_ENDIAN);
final ArrayDeque<ByteBuffer> queue = BUFFERS.get();
queue.offer(buff);
}
@Override
protected byte[] getBytes(final ByteBuffer buffer) {
final byte[] dest = new byte[buffer.limit()];
buffer.get(dest, 0, buffer.limit());
return dest;
}
}
/**
* A proxy that uses Java reflection to modify byte buffer fields, and official JNR-FFF methods to
* manipulate native pointers.
*/
private static final class ReflectiveProxy extends AbstractByteBufferProxy {
private static final Field ADDRESS_FIELD;
private static final Field CAPACITY_FIELD;
static {
ADDRESS_FIELD = findField(Buffer.class, FIELD_NAME_ADDRESS);
CAPACITY_FIELD = findField(Buffer.class, FIELD_NAME_CAPACITY);
}
@Override
protected Pointer in(final ByteBuffer buffer, final Pointer ptr) {
ptr.putAddress(STRUCT_FIELD_OFFSET_DATA, address(buffer));
ptr.putLong(STRUCT_FIELD_OFFSET_SIZE, buffer.remaining());
return null;
}
@Override
protected Pointer in(final ByteBuffer buffer, final int size, final Pointer ptr) {
ptr.putLong(STRUCT_FIELD_OFFSET_SIZE, size);
ptr.putAddress(STRUCT_FIELD_OFFSET_DATA, address(buffer));
return null;
}
@Override
protected ByteBuffer out(final ByteBuffer buffer, final Pointer ptr) {
final long ptrAddr = ptr.address();
final long addr = ptr.getAddress(STRUCT_FIELD_OFFSET_DATA);
final long size = ptr.getLong(STRUCT_FIELD_OFFSET_SIZE);
try {
ADDRESS_FIELD.set(buffer, addr);
CAPACITY_FIELD.set(buffer, (int) size);
} catch (final IllegalArgumentException | IllegalAccessException e) {
throw new LmdbException("Cannot modify buffer", e);
}
buffer.clear();
return buffer;
}
}
/**
* A proxy that uses Java's "unsafe" class to directly manipulate byte buffer fields and JNR-FFF
* allocated memory pointers.
*/
private static final class UnsafeProxy extends AbstractByteBufferProxy {
private static final long ADDRESS_OFFSET;
private static final long CAPACITY_OFFSET;
static {
try {
final Field address = findField(Buffer.class, FIELD_NAME_ADDRESS);
final Field capacity = findField(Buffer.class, FIELD_NAME_CAPACITY);
ADDRESS_OFFSET = UNSAFE.objectFieldOffset(address);
CAPACITY_OFFSET = UNSAFE.objectFieldOffset(capacity);
} catch (final SecurityException e) {
throw new LmdbException("Field access error", e);
}
}
@Override
protected Pointer in(final ByteBuffer buffer, final Pointer ptr) {
final long ptrAddr = ptr.address();
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, buffer.remaining());
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, address(buffer));
return null;
}
@Override
protected Pointer in(final ByteBuffer buffer, final int size, final Pointer ptr) {
final long ptrAddr = ptr.address();
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE, size);
UNSAFE.putLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA, address(buffer));
return null;
}
@Override
protected ByteBuffer out(final ByteBuffer buffer, final Pointer ptr) {
final long ptrAddr = ptr.address();
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
UNSAFE.putLong(buffer, ADDRESS_OFFSET, addr);
UNSAFE.putInt(buffer, CAPACITY_OFFSET, (int) size);
buffer.clear();
return buffer;
}
}
}