summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEugene Huang <1035393+elh@users.noreply.github.com>2024-02-03 04:37:28 +0800
committerGitHub <noreply@github.com>2024-02-02 21:37:28 +0100
commite1fb378acce53d8c3035ee4813ae377aaf51aa3c (patch)
tree7a4898a0682ef824cb9dc32a7f4e6fdadeec1c0d /src
parentd2fef8844da3e986c1b9b6ce297fb379d74b11e5 (diff)
Add elh's Go solution (#435)
* add elh's Go solution * update elh. fix a bad for loop and add some tuning env vars
Diffstat (limited to 'src')
-rw-r--r--src/main/go/elh/Dockerfile24
-rw-r--r--src/main/go/elh/go.mod3
-rw-r--r--src/main/go/elh/main.go301
3 files changed, 328 insertions, 0 deletions
diff --git a/src/main/go/elh/Dockerfile b/src/main/go/elh/Dockerfile
new file mode 100644
index 0000000..920570f
--- /dev/null
+++ b/src/main/go/elh/Dockerfile
@@ -0,0 +1,24 @@
+#
+# Copyright 2023 The original authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM golang AS builder
+WORKDIR /app
+COPY . ./
+RUN go build -ldflags "-w -s" -o /1brc-go .
+
+FROM scratch AS runner
+WORKDIR /
+COPY --from=builder /1brc-go /
diff --git a/src/main/go/elh/go.mod b/src/main/go/elh/go.mod
new file mode 100644
index 0000000..e05dc76
--- /dev/null
+++ b/src/main/go/elh/go.mod
@@ -0,0 +1,3 @@
+module github.com/elh/1brc-go
+
+go 1.21.5
diff --git a/src/main/go/elh/main.go b/src/main/go/elh/main.go
new file mode 100644
index 0000000..4f42d38
--- /dev/null
+++ b/src/main/go/elh/main.go
@@ -0,0 +1,301 @@
+package main
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "log"
+ "math"
+ "os"
+ "path/filepath"
+ "runtime"
+ "runtime/pprof"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+ "unsafe"
+)
+
+// go run main.go [measurements_file]
+// tune env vars for performance
+//
+// Environment variables:
+// - NUM_PARSERS: number of parsers to run concurrently. if unset, defaults
+// to runtime.NumCPU()
+// - PARSE_CHUNK_SIZE_MB: size of each chunk to parse. if unset, defaults to
+// defaultParseChunkSize
+// - PROFILE: if "true", enables profiling
+
+var (
+ // others: "heap", "threadcreate", "block", "mutex"
+ profileTypes = []string{"goroutine", "allocs"}
+)
+
+const (
+ defaultMeasurementsPath = "measurements.txt"
+ maxNameLen = 100
+ maxNameNum = 10000
+
+ // tuned for a 2023 Macbook M2 Pro
+ defaultParseChunkSizeMB = 64
+ mb = 1024 * 1024 // bytes
+)
+
+type Stats struct {
+ Min, Max, Sum float64
+ Count int
+}
+
+// rounding floats to 1 decimal place with 0.05 rounding up to 0.1
+func round(x float64) float64 {
+ return math.Floor((x+0.05)*10) / 10
+}
+
+// parseFloatFast is a high performance float parser using the assumption that
+// the byte slice will always have a single decimal digit.
+func parseFloatFast(bs []byte) float64 {
+ var intStartIdx int // is negative?
+ if bs[0] == '-' {
+ intStartIdx = 1
+ }
+
+ v := float64(bs[len(bs)-1]-'0') / 10 // single decimal digit
+ place := 1.0
+ for i := len(bs) - 3; i >= intStartIdx; i-- { // integer part
+ v += float64(bs[i]-'0') * place
+ place *= 10
+ }
+
+ if intStartIdx == 1 {
+ v *= -1
+ }
+ return v
+}
+
+// size is the intended number of bytes to parse. buffer should be longer than size
+// because we need to continue reading until the end of the line in order to
+// properly segment the entire file and not miss any data.
+func parseAt(f *os.File, buf []byte, offset int64, size int) map[string]*Stats {
+ stats := make(map[string]*Stats, maxNameNum)
+ n, err := f.ReadAt(buf, offset) // load the buffer
+ if err != nil && err != io.EOF {
+ log.Fatal(err)
+ }
+
+ lastName := make([]byte, maxNameLen) // last name parsed
+ var lastNameLen int
+ isScanningName := true // currently scanning name or value?
+
+ // if offset is non-zero, skip to the first new line
+ var idx, start int
+ if offset != 0 {
+ for idx < n {
+ if buf[idx] == '\n' {
+ idx++
+ start = idx
+ break
+ }
+ idx++
+ }
+ }
+ // tick tock between parsing names and values while accummulating stats
+ for {
+ if isScanningName {
+ for idx < n {
+ if buf[idx] == ';' {
+ nameBs := buf[start:idx]
+ lastNameLen = copy(lastName, nameBs)
+
+ idx++
+ start = idx
+ isScanningName = false
+ break
+ }
+ idx++
+ }
+ } else {
+ for idx < n {
+ if buf[idx] == '\n' {
+ valueBs := buf[start:idx]
+ value := parseFloatFast(valueBs)
+
+ nameUnsafe := unsafe.String(&lastName[0], lastNameLen)
+ if s, ok := stats[nameUnsafe]; !ok {
+ name := string(lastName[:lastNameLen]) // actually allocate string
+ stats[name] = &Stats{Min: value, Max: value, Sum: value, Count: 1}
+ } else {
+ if value < s.Min {
+ s.Min = value
+ }
+ if value > s.Max {
+ s.Max = value
+ }
+ s.Sum += value
+ s.Count++
+ }
+
+ idx++
+ start = idx
+ isScanningName = true
+ break
+ }
+ idx++
+ }
+ }
+ // terminate when we hit the first newline after the intended size OR
+ // when we hit the end of the file
+ if (isScanningName && idx >= size) || idx >= n {
+ break
+ }
+ }
+
+ return stats
+}
+
+func printResults(stats map[string]*Stats) { // doesn't help
+ // sorted alphabetically for output
+ names := make([]string, 0, len(stats))
+ for name := range stats {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ var builder strings.Builder
+ for i, name := range names {
+ s := stats[name]
+ // gotcha: first round the sum to to remove float precision errors!
+ avg := round(round(s.Sum) / float64(s.Count))
+ builder.WriteString(fmt.Sprintf("%s=%.1f/%.1f/%.1f", name, s.Min, avg, s.Max))
+ if i < len(names)-1 {
+ builder.WriteString(", ")
+ }
+ }
+
+ writer := bufio.NewWriter(os.Stdout)
+ fmt.Fprintf(writer, "{%s}\n", builder.String())
+ writer.Flush()
+}
+
+// Read file in chunks and parse concurrently. N parsers work off of a chunk
+// offset chan and send results on an output chan. The results are merged into a
+// single map of stats and printed.
+func main() {
+ // parse env vars and inputs
+ shouldProfile := os.Getenv("PROFILE") == "true"
+ var err error
+ var numParsers int
+ {
+ if os.Getenv("NUM_PARSERS") != "" {
+ numParsers, err = strconv.Atoi(os.Getenv("NUM_PARSERS"))
+ if err != nil {
+ log.Fatal(fmt.Errorf("failed to parse NUM_PARSERS: %w", err))
+ }
+ } else {
+ numParsers = runtime.NumCPU()
+ }
+ }
+ var parseChunkSize int
+ {
+ if os.Getenv("PARSE_CHUNK_SIZE_MB") != "" {
+ parseChunkSizeMB, err := strconv.Atoi(os.Getenv("PARSE_CHUNK_SIZE_MB"))
+ if err != nil {
+ log.Fatal(fmt.Errorf("failed to parse PARSE_CHUNK_SIZE_MB: %w", err))
+ }
+ parseChunkSize = parseChunkSizeMB * mb
+ } else {
+ parseChunkSize = defaultParseChunkSizeMB * mb
+ }
+ }
+
+ measurementsPath := defaultMeasurementsPath
+ if len(os.Args) > 1 {
+ measurementsPath = os.Args[1]
+ }
+
+ // profile code
+ if shouldProfile {
+ nowUnix := time.Now().Unix()
+ os.MkdirAll(fmt.Sprintf("profiles/%d", nowUnix), 0755)
+ for _, profileType := range profileTypes {
+ file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.%s.pprof",
+ nowUnix, filepath.Base(measurementsPath), profileType))
+ defer file.Close()
+ defer pprof.Lookup(profileType).WriteTo(file, 0)
+ }
+
+ file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.cpu.pprof",
+ nowUnix, filepath.Base(measurementsPath)))
+ defer file.Close()
+ pprof.StartCPUProfile(file)
+ defer pprof.StopCPUProfile()
+ }
+
+ // read file
+ f, err := os.Open(measurementsPath)
+ if err != nil {
+ log.Fatal(fmt.Errorf("failed to open %s file: %w", measurementsPath, err))
+ }
+ defer f.Close()
+
+ info, err := f.Stat()
+ if err != nil {
+ log.Fatal(fmt.Errorf("failed to read %s file: %w", measurementsPath, err))
+ }
+
+ // kick off "parser" workers
+ wg := sync.WaitGroup{}
+ wg.Add(numParsers)
+
+ // buffered to not block on merging
+ chunkOffsetCh := make(chan int64, numParsers)
+ chunkStatsCh := make(chan map[string]*Stats, numParsers)
+
+ go func() {
+ i := 0
+ for i < int(info.Size()) {
+ chunkOffsetCh <- int64(i)
+ i += parseChunkSize
+ }
+ close(chunkOffsetCh)
+ }()
+
+ for i := 0; i < numParsers; i++ {
+ // WARN: w/ extra padding for line overflow. Each chunk should be read past
+ // the intended size to the next new line. 128 bytes should be enough for
+ // a max 100 byte name + the float value.
+ buf := make([]byte, parseChunkSize+128)
+ go func() {
+ for chunkOffset := range chunkOffsetCh {
+ chunkStatsCh <- parseAt(f, buf, chunkOffset, parseChunkSize)
+ }
+ wg.Done()
+ }()
+ }
+
+ go func() {
+ wg.Wait()
+ close(chunkStatsCh)
+ }()
+
+ mergedStats := make(map[string]*Stats, maxNameNum)
+ for chunkStats := range chunkStatsCh {
+ for name, s := range chunkStats {
+ if ms, ok := mergedStats[name]; !ok {
+ mergedStats[name] = s
+ } else {
+ if s.Min < ms.Min {
+ ms.Min = s.Min
+ }
+ if s.Max > ms.Max {
+ ms.Max = s.Max
+ }
+ ms.Sum += s.Sum
+ ms.Count += s.Count
+ }
+ }
+ }
+
+ printResults(mergedStats)
+}