Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add jason's unlocked (unencrypted) solution #40

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 362 additions & 0 deletions jason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
package main

import (
"bytes"
"fmt"
"io"
"math"
"math/bits"
"os"
"os/exec"
"runtime"
"runtime/debug"
"sort"
"strings"
"syscall"
"unsafe"
)

type Counter struct {
Key uint64
Name string
Max int64
Min int64
Total int64
Count int64
}

func (c *Counter) Add(v int64) {
if v > c.Max {
c.Max = v
}
if v < c.Min {
c.Min = v
}
c.Total += v
c.Count++
}

func (c *Counter) Combine(other *Counter) {
if other.Max > c.Max {
c.Max = other.Max
}
if other.Min < c.Min {
c.Min = other.Min
}
c.Total += other.Total
c.Count += other.Count
}

func (c *Counter) Avg() int64 {
return int64(math.Round(float64(c.Total) / float64(c.Count)))
}

func (c *Counter) String() string {
sw := strings.Builder{}
sw.Write(intToBytes(c.Min))
sw.WriteByte('/')
sw.Write(intToBytes(c.Avg()))
sw.WriteByte('/')
sw.Write(intToBytes(c.Max))
return sw.String()
}

func intToBytes(num int64) []byte {
if num < 0 {
if num < -99 {
return []byte{'-', '0' + byte(-num/100), '0' + byte(((-num)/10)%10), '.', '0' + byte((-num)%10)}
} else {
return []byte{'-', '0' + byte((-num)/10), '.', '0' + byte((-num)%10)}
}
} else if num > 99 {
return []byte{'0' + byte(num/100), '0' + byte((num/10)%10), '.', '0' + byte(num%10)}
} else {
return []byte{'0' + byte(num/10), '.', '0' + byte(num%10)}
}
}

func getSemicolonMask(value uint64) uint64 {
match := value ^ 0x3b3b3b3b3b3b3b3b
return (match - 0x0101010101010101) & (^match & 0x8080808080808080)
}

const numBuckets = 16384

func runWorker(data []byte) []*Counter {
var buckets [numBuckets][]*Counter
var i uintptr

segments := uintptr(unsafe.Pointer(unsafe.SliceData(data)))
var key uint64
var nextI uintptr = segments
var maxBound uintptr = segments + uintptr(len(data)) - 5

// manyBuckets := make([][]map[uint64]struct{}, 64)
// for i := range manyBuckets {
// manyBuckets[i] = make([]map[uint64]struct{}, numBuckets)
// for j := range manyBuckets[i] {
// manyBuckets[i][j] = make(map[uint64]struct{})
// }
// }

outerLoop:
for nextI < maxBound {
i = nextI
key = 1
var mask uint64
var segment uint64 = *(*uint64)(unsafe.Pointer(nextI))
for {
mask = getSemicolonMask(segment)
key += ((mask - 1) & segment)
nextI += (uintptr(bits.TrailingZeros64(mask)) >> 3)
segment = *(*uint64)(unsafe.Pointer(nextI))
if mask != 0 {
break
}
}

lookup := uint64(key ^ key>>29)

// for i := 0; i < 64; i++ {
// manyBuckets[i][(key^(key>>i))&(numBuckets-1)][key] = struct{}{}
// }

bucket := buckets[lookup&(numBuckets-1)]

// Inspect the 4th bit of the first byte to determine if it is a '-' symbol.
negativity := (0b1 & (segment >> (8 + 4))) - 1

// Determine the position of the '.' point, one of 0, 1, 2. Where 0 means a number like 1.1.
decimalPos := (((segment >> (16 + 3)) & 0b10) | ((segment >> (24 + 4)) & 0b1)) - 1

// If negative, we need to clear the first 4 bits so it's treated as 0 in the next step.
segment &= (0xfffffffffffff00 ^ (negativity & 0xf00))

// Shift the number such that the hundreds field is in the LSB.
segment >>= (decimalPos << 3)

nextI += uintptr(decimalPos + 5)

// Mask out only the integer values of the string.
segment &= 0x0f000f0f
// 0x00000000UU00TTHH * 1
// 0x0000UU00TTHH0000 * 10
// 0x000000TTHH000000 * 100
// Perform fucked up multiplication to compute the integer representation of the string
// in very few instructions.
segment = (segment * (10<<16 | 100<<24 | 1)) >> 24

// Recover the 10 bit result from position 24, and multiply by the sign.
segment = ((segment & ((1 << 10) - 1)) ^ negativity) - negativity

for _, v := range bucket {
if v.Key == key {
v.Add(int64(segment))
continue outerLoop
}
}

buckets[lookup&(numBuckets-1)] = append(bucket, &Counter{
Key: key,
Name: string(data[i-segments : nextI-segments-uintptr(decimalPos)-5]),
Max: int64(segment),
Min: int64(segment),
Total: int64(segment),
Count: 1,
})
}

results := make([]*Counter, 0, 512)

for _, v := range buckets {
results = append(results, v...)
}

// for i, b := range manyBuckets {
// hits := 0
// depth := 0
// verification := 0
// for _, v := range b {
// if len(v) > 1 {
// depth += len(v)
// hits++
// }
// verification += len(v)
// }
// fmt.Printf("bucket %d has %d hits with %d depth and verification %d\n", i, hits, depth, verification)
// }

return results
}

var numThreads = runtime.NumCPU() * 4

type abortWriter struct {
w io.Writer
c chan struct{}
}

func (w *abortWriter) Write(p []byte) (n int, err error) {
if bytes.Contains(p, []byte("--- REAL EOF ---\n")) {
n, err := w.w.Write([]byte(strings.ReplaceAll(string(p), "--- REAL EOF ---\n", "")))
select {
case <-w.c:
default:
close(w.c)
}
return n, err
}

return w.w.Write(p)
}

func (w *abortWriter) Wait() {
<-w.c
}

const altModeVar = "BRCGO_ALT_MODE"

func main() {
debug.SetGCPercent(-1)
debug.SetMemoryLimit(1024 * 1024 * 1024 * 10)

if os.Getenv(altModeVar) == "" || os.Getenv(altModeVar) == "0" {
// im totally not doing this because i want to avoid the overhead of the kernel cleaning up
// the shared memory pages swapped in from the massive mmaped reads.
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}
cmd := exec.Command(os.Args[0], args...)
cmd.Env = append(cmd.Env, altModeVar+"=2")
w := &abortWriter{
w: os.Stdout,
c: make(chan struct{}),
}
cmd.Stdout = w
cmd.Stderr = os.Stderr

err := cmd.Start()
if err != nil {
panic(err)
}

cmd.Process.Release()

w.Wait()
return
}

inAltMode := os.Getenv(altModeVar) == "2"

defer func() {
if inAltMode {
fmt.Println("--- REAL EOF ---")
}
}()

openFile := "measurements.txt"
if len(os.Args) > 1 {
openFile = os.Args[1]
}

data, err := mmap(openFile)
if err != nil {
panic(err)
}

// use a single thread if the problem is too small
if len(data) < 2048 {
numThreads = 1
}

type pk struct {
counters []*Counter
id int
}

results := make(chan pk, numThreads)

jobSize := (len(data) / numThreads) - 10
lastJobEnd := 0
for i := 0; i < numThreads; i++ {
i := i
if i == numThreads-1 {
go func(left int) {
results <- pk{
runWorker(data[left:]), i,
}
}(lastJobEnd)
} else {
// find an appropriate new line boundary to split on
delim := bytes.IndexByte(data[lastJobEnd+jobSize:], '\n')
go func(left, right int) {
results <- pk{
runWorker(data[left:right]), i,
}
}(lastJobEnd, lastJobEnd+jobSize+delim)
lastJobEnd += jobSize + delim + 1
}
}

combinedResults := make(map[string]*Counter)
for i := 0; i < numThreads; i++ {
p := <-results

for _, value := range p.counters {
if _, found := combinedResults[value.Name]; found {
combinedResults[value.Name].Combine(value)
} else {
combinedResults[value.Name] = value
}
}
}

var allResults []string
for _, v := range combinedResults {
allResults = append(allResults, v.Name+"="+v.String())
}
sort.Strings(allResults)

for _, v := range allResults {
fmt.Println(v)
}

if inAltMode {
fmt.Println("--- REAL EOF ---")
}

os.Stdout.Sync()

syscall.Munmap(data)
}

func mmap(filename string) ([]byte, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
}

size := fi.Size()
if size == 0 {
return nil, nil
}
if size < 0 {
return nil, fmt.Errorf("mmap: file %q has negative size", filename)
}
if size != int64(int(size)) {
return nil, fmt.Errorf("mmap: file %q is too large", filename)
}

data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return nil, err
}

return data, err
}