summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortivrfoa <lescoutinhovr@gmail.com>2024-02-01 07:49:47 -0300
committerGitHub <noreply@github.com>2024-02-01 11:49:47 +0100
commitfdd539e1f950bae64829036764c15869a00cd475 (patch)
treeb90406301425c0bb44de25767db450d04438f33c
parente7c92094bd1315115a38b8ddb1cec239d252f9ec (diff)
Exit earlier from loop when a new Result is created (#668)
* Exit earlier from loop when a new Result is created * 3) Make a cache of long[] name to String, to avoid `ByteBuffer.allocate` * and creating new UTF-8 strings. I didn't profile, so it's just a guess * that this map will be a bit faster. Although it's outside the main loop, so * not a big difference ...; * 4) Exit earlier from loop if a new entry was created. * revert: Remove cache to city name * As I was not able to make it faster... make it slower As I was not able to make it faster ... so I'll make it slower, because my current solution should *not* stay at the top, as it added basically nothing.
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java327
1 files changed, 127 insertions, 200 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java b/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java
index 54f13cb..e6e9632 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java
@@ -38,34 +38,44 @@ import java.util.concurrent.atomic.AtomicInteger;
* already, and maybe even 1st place for the 10k too.
* See: https://github.com/gunnarmorling/1brc/pull/606
*
- * But as I was already coding something, I'll submit just to
- * see if it will be faster than his *previous* 10k time of
- * 00:04.516
- *
- * Changes:
- * It's a similar idea of my previous solution, that if you split
- * the chunks evenly, some threads might finish much faster and
- * stay idle, so:
- * 1) Create more chunks than threads, so the ones that finish first
- * can do something;
- * 2) Decrease chunk sizes as we get closer to the end of the file.
+ * As I was not able to make it faster ... so I'll make it slower,
+ * because my current solution should *not* stay at the top, as it added
+ * basically nothing.
*/
public class CalculateAverage_tivrfoa {
private static final String FILE = "./measurements.txt";
- private static final int MIN_TEMP = -999;
- private static final int MAX_TEMP = 999;
+
+ private static final int MAX_CITIES = 10_000;
+ private static final int BUCKETS_LEN = 1 << 17;
+ private static final int LAST_BUCKET_ENTRY = BUCKETS_LEN - 1;
+ private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+ private static final AtomicInteger chunkIdx = new AtomicInteger();
+ private static long[] chunks;
+ private static int numChunks;
// Holding the current result for a single city.
private static class Result {
- long lastNameLong, secondLastNameLong;
+ long lastNameLong;
long[] name;
int count;
short min, max;
long sum;
- private Result() {
- this.min = MAX_TEMP;
- this.max = MIN_TEMP;
+ private Result(short number, long nameAddress, byte nameLength, Scanner scanner) {
+ this.min = number;
+ this.max = number;
+ this.sum = number;
+ this.count = 1;
+
+ name = new long[(nameLength / Long.BYTES) + 1];
+ int pos = 0, i = 0;
+ for (; i < nameLength + 1 - Long.BYTES; i += Long.BYTES) {
+ name[pos++] = scanner.getLongAt(nameAddress + i);
+ }
+
+ int remainingShift = (64 - (nameLength + 1 - i) << 3);
+ lastNameLong = (scanner.getLongAt(nameAddress + i) << remainingShift);
+ name[pos] = lastNameLong >> remainingShift;
}
public String toString() {
@@ -88,6 +98,17 @@ public class CalculateAverage_tivrfoa {
count += other.count;
}
+ private void add(short number) {
+ if (number < min) {
+ min = number;
+ }
+ if (number > max) {
+ max = number;
+ }
+ sum += number;
+ count++;
+ }
+
public String calcName() {
ByteBuffer bb = ByteBuffer.allocate(name.length * Long.BYTES).order(ByteOrder.nativeOrder());
bb.asLongBuffer().put(name);
@@ -99,139 +120,95 @@ public class CalculateAverage_tivrfoa {
}
}
- private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
- private static final AtomicInteger chunkIdx = new AtomicInteger();
- private static long[] chunks;
- private static int numChunks;
+ /**
+ * From:
+ * https://github.com/OpenHFT/Zero-Allocation-Hashing/blob/ea/src/main/java/net/openhft/hashing/XXH3.java
+ *
+ * Less collisions, but it will make the code slower. xD
+ *
+ * One interesting thing about Thomas' solution that I
+ * started to work with (d0a28599), is that it basically does not have
+ * any collision for the small data set (sometimes none!), but it
+ * has lots of collisions for the 10k, hence its poor performance.
+ *
+ */
+ private static long XXH3_avalanche(long h64) {
+ h64 ^= h64 >>> 37;
+ h64 *= 0x165667919E3779F9L;
+ return h64 ^ (h64 >>> 32);
+ }
private static final class SolveChunk extends Thread {
- private long chunkStart, chunkEnd;
- private Result[] results = new Result[10_000];
- private Result[] buckets = new Result[1 << 17];
+ private int chunkStartIdx;
+ private Result[] results = new Result[MAX_CITIES];
+ private Result[] buckets = new Result[BUCKETS_LEN];
private int resIdx = 0;
- public SolveChunk(long chunkStart, long chunkEnd) {
- this.chunkStart = chunkStart;
- this.chunkEnd = chunkEnd;
+ public SolveChunk(int chunkStartIdx) {
+ this.chunkStartIdx = chunkStartIdx;
}
@Override
public void run() {
- parseLoop();
- int chunk = chunkIdx.getAndIncrement();
- if (chunk < numChunks) {
- chunkStart = chunks[chunk];
- chunkEnd = chunks[chunk + 1];
- run();
- }
- }
+ for (; chunkStartIdx < numChunks; chunkStartIdx = chunkIdx.getAndIncrement()) {
+ Scanner scanner = new Scanner(chunks[chunkStartIdx], chunks[chunkStartIdx + 1]);
+ long word = scanner.getLong();
+ long pos = findDelimiter(word);
+ while (scanner.hasNext()) {
+ long nameAddress = scanner.pos();
+ long hash = 0;
+
+ while (true) {
+ if (pos != 0) {
+ pos = Long.numberOfTrailingZeros(pos) >>> 3;
+ scanner.add(pos);
+ word = mask(word, pos);
+ hash ^= XXH3_avalanche(word);
+ break;
+ }
+ else {
+ scanner.add(8);
+ hash ^= XXH3_avalanche(word);
+ }
- private void parseLoop() {
- Scanner scanner = new Scanner(chunkStart, chunkEnd);
- long word = scanner.getLong();
- long pos = findDelimiter(word);
- while (scanner.hasNext()) {
- long nameAddress = scanner.pos();
- long hash = 0;
-
- // Search for ';', one long at a time.
- if (pos != 0) {
- pos = Long.numberOfTrailingZeros(pos) >>> 3;
- scanner.add(pos);
- word = mask(word, pos);
- hash = word;
-
- int number = scanNumber(scanner);
- long nextWord = scanner.getLong();
- long nextPos = findDelimiter(nextWord);
-
- Result existingResult = buckets[hashToIndex(hash, buckets)];
- if (existingResult != null && existingResult.lastNameLong == word) {
- word = nextWord;
- pos = nextPos;
- record(existingResult, number);
- continue;
+ word = scanner.getLong();
+ pos = findDelimiter(word);
}
- scanner.setPos(nameAddress + pos);
- }
- else {
- scanner.add(8);
- hash = word;
- long prevWord = word;
- word = scanner.getLong();
- pos = findDelimiter(word);
- if (pos != 0) {
- pos = Long.numberOfTrailingZeros(pos) >>> 3;
- scanner.add(pos);
- word = mask(word, pos);
- hash ^= word;
-
- Result existingResult = buckets[hashToIndex(hash, buckets)];
- if (existingResult != null && existingResult.lastNameLong == word && existingResult.secondLastNameLong == prevWord) {
- int number = scanNumber(scanner);
- word = scanner.getLong();
- pos = findDelimiter(word);
- record(existingResult, number);
- continue;
+ byte nameLength = (byte) (scanner.pos() - nameAddress);
+ short number = scanNumber(scanner);
+
+ int tableIndex = hashToIndex(hash);
+ outer: while (true) {
+ Result existingResult = buckets[tableIndex];
+ if (existingResult == null) {
+ var newResult = new Result(number, nameAddress, nameLength, scanner);
+ buckets[tableIndex] = newResult;
+ results[resIdx++] = newResult;
+ break;
}
- }
- else {
- scanner.add(8);
- hash ^= word;
- while (true) {
- word = scanner.getLong();
- pos = findDelimiter(word);
- if (pos != 0) {
- pos = Long.numberOfTrailingZeros(pos) >>> 3;
- scanner.add(pos);
- word = mask(word, pos);
- hash ^= word;
- break;
- }
- else {
- scanner.add(8);
- hash ^= word;
+ int i = 0;
+ int namePos = 0;
+ for (; i < nameLength + 1 - 8; i += 8) {
+ if (namePos >= existingResult.name.length || existingResult.name[namePos++] != scanner.getLongAt(nameAddress + i)) {
+ tableIndex = (tableIndex + 31) & (LAST_BUCKET_ENTRY);
+ continue outer;
}
}
- }
- }
- // Save length of name for later.
- int nameLength = (int) (scanner.pos() - nameAddress);
- int number = scanNumber(scanner);
-
- // Final calculation for index into hash table.
- int tableIndex = hashToIndex(hash, buckets);
- outer: while (true) {
- Result existingResult = buckets[tableIndex];
- if (existingResult == null) {
- existingResult = newEntry(buckets, nameAddress, tableIndex, nameLength, scanner);
- results[resIdx++] = existingResult;
- }
- // Check for collision.
- int i = 0;
- int namePos = 0;
- for (; i < nameLength + 1 - 8; i += 8) {
- if (namePos >= existingResult.name.length || existingResult.name[namePos++] != scanner.getLongAt(nameAddress + i)) {
- tableIndex = (tableIndex + 31) & (buckets.length - 1);
- continue outer;
+ int remainingShift = (64 - (nameLength + 1 - i) << 3);
+ if (((existingResult.lastNameLong ^ (scanner.getLongAt(nameAddress + i) << remainingShift)) == 0)) {
+ existingResult.add(number);
+ break;
+ }
+ else {
+ tableIndex = (tableIndex + 31) & (LAST_BUCKET_ENTRY);
}
}
- int remainingShift = (64 - (nameLength + 1 - i) << 3);
- if (((existingResult.lastNameLong ^ (scanner.getLongAt(nameAddress + i) << remainingShift)) == 0)) {
- record(existingResult, number);
- break;
- }
- else {
- // Collision error, try next.
- tableIndex = (tableIndex + 31) & (buckets.length - 1);
- }
+ word = scanner.getLong();
+ pos = findDelimiter(word);
}
-
- word = scanner.getLong();
- pos = findDelimiter(word);
}
}
}
@@ -247,77 +224,49 @@ public class CalculateAverage_tivrfoa {
}
}
- public static void main(String[] args) throws Exception {
- boolean runTrick = true;
- for (var arg : args) {
- if (arg.equals("--worker")) {
- runTrick = false;
- break;
- }
- }
- if (runTrick) {
- spawnWorker();
- return;
- }
-
+ public static void main(String[] args) throws InterruptedException, IOException {
chunks = getSegments(NUM_CPUS);
numChunks = chunks.length - 1;
final SolveChunk[] threads = new SolveChunk[NUM_CPUS];
chunkIdx.set(NUM_CPUS);
for (int i = 0; i < NUM_CPUS; i++) {
- threads[i] = new SolveChunk(chunks[i], chunks[i + 1]);
+ threads[i] = new SolveChunk(i);
threads[i].start();
}
+ System.out.println(getMap(threads));
+ System.out.close();
+ }
+
+ private static TreeMap<String, Result> getMap(SolveChunk[] threads) throws InterruptedException {
TreeMap<String, Result> map = new TreeMap<>();
- for (int i = 0; i < NUM_CPUS; ++i) {
+ threads[0].join();
+ for (var r : threads[0].results) {
+ if (r == null)
+ break;
+ map.put(r.calcName(), r);
+ }
+ for (int i = 1; i < NUM_CPUS; ++i) {
threads[i].join();
mergeIntoFinalMap(map, threads[i].results);
}
- System.out.println(map);
- System.out.close();
- }
-
- private static void spawnWorker() throws IOException {
- ProcessHandle.Info info = ProcessHandle.current().info();
- ArrayList<String> workerCommand = new ArrayList<>();
- info.command().ifPresent(workerCommand::add);
- info.arguments().ifPresent(args -> workerCommand.addAll(Arrays.asList(args)));
- workerCommand.add("--worker");
- new ProcessBuilder()
- .command(workerCommand)
- .inheritIO()
- .redirectOutput(ProcessBuilder.Redirect.PIPE)
- .start()
- .getInputStream()
- .transferTo(System.out);
+ return map;
}
- private static int scanNumber(Scanner scanPtr) {
+ private static short scanNumber(Scanner scanPtr) {
scanPtr.add(1);
long numberWord = scanPtr.getLong();
int decimalSepPos = Long.numberOfTrailingZeros(~numberWord & 0x10101000);
int number = convertIntoNumber(decimalSepPos, numberWord);
scanPtr.add((decimalSepPos >>> 3) + 3);
- return number;
+ return (short) number;
}
- private static void record(Result existingResult, int number) {
- if (number < existingResult.min) {
- existingResult.min = (short) number;
- }
- if (number > existingResult.max) {
- existingResult.max = (short) number;
- }
- existingResult.sum += number;
- existingResult.count++;
- }
-
- private static int hashToIndex(long hash, Result[] results) {
+ private static int hashToIndex(long hash) {
int hashAsInt = (int) (hash ^ (hash >>> 28));
int finalHash = (hashAsInt ^ (hashAsInt >>> 17));
- return (finalHash & (results.length - 1));
+ return (finalHash & LAST_BUCKET_ENTRY);
}
private static long mask(long word, long pos) {
@@ -346,28 +295,6 @@ public class CalculateAverage_tivrfoa {
return tmp;
}
- private static Result newEntry(Result[] results, long nameAddress, int hash, int nameLength, Scanner scanner) {
- Result r = new Result();
- results[hash] = r;
- long[] name = new long[(nameLength / Long.BYTES) + 1];
- int pos = 0;
- int i = 0;
- for (; i < nameLength + 1 - Long.BYTES; i += Long.BYTES) {
- name[pos++] = scanner.getLongAt(nameAddress + i);
- }
-
- if (pos > 0) {
- r.secondLastNameLong = name[pos - 1];
- }
-
- int remainingShift = (64 - (nameLength + 1 - i) << 3);
- long lastWord = (scanner.getLongAt(nameAddress + i) << remainingShift);
- r.lastNameLong = lastWord;
- name[pos] = lastWord >> remainingShift;
- r.name = name;
- return r;
- }
-
/**
* - Split 70% of the file in even chunks for all cpus;
* - Create smaller chunks for the remainder of the file.