
Goroutine Worker Pools - Go Optimization Guide
Goroutine Worker Pools in Go Go’s lightweight concurrency model makes spawning goroutines nearly free in terms of syntax and initial memory footprint—but that freedom isn’t as cheap as it may seem at first glance. Under high load, unbounded concurren
goperf.dev
1. Goroutine Worker Pools
대부분의 작업에 있어서 Goroutine을 사용하면 효과적이지만 대규모로 사용하면 문제가 생길 수 있습니다.
각 Goroutine에는 스택 공간이 필요하고, 스케줄링 오버헤드가 발생합니다.
활성 Goroutine 수가 증가하면 HTTP요청, queue의 작업 혹은 channel의 task들은 성능을 크게 낮춥니다.
그래서 Worker pool은 Goroutine 수를 고정하고 shared job queue에서 task들을 당겨서 가져가게 합니다.
이런 동작은 시스템이 처리 가능한 능력보다 더 많은 동작을 하는것을 막아줍니다.
따라서, Worker pool은 특히 각 task가 예측가능한 비용을 가지고 있고 시스템 전체의 쓰루풋 성능이 안정적이어야 하는 경우에 유용합니다.
기본 Worker Pool 구현
func worker(id int, jobs <-chan int, results chan<- [32]byte) {
for j := range jobs {
results <- doWork(j)
}
}
func doWork(n int) [32]byte {
data := []byte(fmt.Sprintf("payload-%d", n))
return sha256.Sum256(data) //
}
func main() {
jobs := make(chan int, 100)
results := make(chan [32]byte, 100)
for w := 1; w <= 5; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 10; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= 10; a++ {
<-results
}
}
위 구현은 5개의 worker가 channel로 부터 일을 받고 결과를 channel로 전송하는 예시입니다.
worker pool은 동시에 5개의 task를 처리할 수 있죠
Worker Count와 CPU Cores의 관계
최적의 Worker 수는 CPU 개수와 크게 연관이 있습니다.
일반적으로 worker의 수는 logical CPU core 수보다 같거나 약간 적게 운용하면 좋습니다.
만약 당신의 task가 I/O bound (e.g. 네트워크 콜, disk I/O, database query)라면, pool size는 core 수보다 클 수 있습니다
왜냐면, worker는 다른 task들이 실행되게 허용하면서 block 되는데 시간을 쓸거거든요.
반면에 CPU-heavy 한 작업이라면 contention과 context switching을 피하기 위해서 좀 작은 pool을 유지하면 좋습니다.
worker가 너무 많으면 아래와 같은 단점이 있을 수 있습니다.
- 스케줄러 경합: Go의 런타임은 CPU 코어보다 더 많은 실행 가능한 고루틴을 관리해야 합니다.
- 컨텍스트 전환: 과도한 고루틴은 CPU 컨텍스트 전환을 자주 일으켜 사이클을 낭비합니다.
- 메모리 압력: 각 고루틴은 스택 공간을 소비합니다. 작업자가 많을수록 메모리 사용량이 증가합니다.
- 캐시 스래싱: 고루틴이 코어 간에 바운스됨에 따라 CPU 캐시 효율성이 저하됩니다.
2. Atomic 연산과 Synchronization Primitives
동시성이 높은 시스템에서는 성능 관점에서 단순히 무엇을 처리 하느냐가 아니라 무엇을 피하느냐가 중요합니다.
Lock Contention, Cache line bouncing, Memory fence 등은 스케일링 한계에 도달하기 훨씬 전에 처리량을 제한하는 일반적인 함정입니다.
Atomic 연산은 이런 함정들을 피하기 위해 Go가 제공하는 도구중에 하나입니다.
Atomic 연산에 대한 이해
Atomic 연산을 이용하면 Mutex와 같은 명시적인 잠금 메커니즘 없이 공유 데이터에 안전하게 동시 접근이 가능합니다.
`sync/atomic` 패키지는 카운터, 플래그 또는 간단한 상태 전환에 이상적인 하위 수준 메모리 접근을 지원합니다.
Atomic 연산은 메모리 접근 경합시에 성능을 높여줍니다.
Lock은 오버헤드를 유발하고, 많은 Goroutine이 mutex를 두고 경쟁할때 context switching, lock queue 관리로 인해 성능이 저하됩니다.
아래와 같은 경우에 Atomic 연산이 유용합니다.
- High-throughput Counter and flags
- lock-free queue와 list
- lock이 너무 비용이 드는 low-latency path
일반적으로 사용하는 Atomic 연산 함수들
- atomic.AddInt64, , 등: 값을 원자성으로 추가합니다.atomic.AddUint32
- atomic.LoadInt64, : 값을 원자적으로 읽습니다.atomic.LoadPointer
- atomic.StoreInt64, : 값을 원자적으로 씁니다.atomic.StorePointer
- atomic.CompareAndSwapInt64: 조건부로 값을 원자적으로 업데이트합니다.
실제 예제로 보는 Atomic 연산 사용
1. High-throughput metrics and counter
var requests atomic.Int64
func handleRequest() {
requests.Add(1)
}
이 코드는 여러개의 goroutine이 lock없이 동시에 값을 업데이트 할 수 있게 해줍니다.
성능 저하도 덜하구요.
2. Fast, Lock-free flags
var shutdown atomic.Int32
func mainLoop() {
for {
if shutdown.Load() == 1 {
break
}
// do work
}
}
func stop() {
shutdown.Store(1)
}
이 패턴은 하나의 goroutine이 다른 goroutine을 멈추게 할 수 있습니다.
3. Once-only initialization
sync.Once를 대체 할 수 있습니다.
var initialized atomic.Int32
func maybeInit() {
if initialized.CompareAndSwap(0, 1) {
// initialize resources
}
}
이 동작은 첫번째 goroutine만 initialize하고 나머지는 안하게 할 수 있습니다.
4. Lock-Free Queues or Freelist Structures
고성능의 자료구조를 아래와 같이 만들 수 있습니다.
type node struct {
next *node
val any
}
var head atomic.Pointer[node]
func push(n *node) {
for {
old := head.Load()
n.next = old
if head.CompareAndSwap(old, n) {
return
}
}
}
이 예시는 lock-free stack입니다.
일반적으로 object pools와 work-stealing queues에서 많이 사용됩니다.
5. Reducing Lock Contention
불필요한 lock contention을 줄일 수 있습니다.
특정 기능을 켜고 끄는데 사용할 수 있죠.
if !atomic.CompareAndSwapInt32(&someFlag, 0, 1) {
return // work already in progress or completed
}
mu.Lock()
defer mu.Unlock()
// perform one-time expensive initialization
위의 동작은 하나의 Goroutine만 실행되고 다른 고루틴은 일찍 종료되도록 보장합니다.
검사와 업데이트가 모두 원자적으로 발생합니다.
Synchronization Primitive
`sync.Mutex`, `sync.RWMutex`, `sync.Cond`를 사용합니다.
더 많은 설명은 Go 문서에 잘 나와있으니 생략하겠습니다.
3. Lazy Initialization
Go에서는 몇가지 리소스는 initialize하는데 비용이 큽니다.
왜 Lazy initialization이 중요한가.
database connection, cache 혹은 큰 자료구조를 application 실행시에 initialize하는것은 launch time을 상당히 늘리고, 불필요한 메모리를 소모합니다.
Thread-safe init을 위해 Sync.Once 쓰기
Go는 `sync.Once` type을 lazy init을 위해서 제공합니다.
var (
resource *MyResource
once sync.Once
)
func getResource() *MyResource {
once.Do(func() {
resource = expensiveInit()
})
return resource
}
얼마나 많은 Goroutine이 있던지 `expensiveInit()`은 정확히 한번만 실행됩니다.
Output Value가 있다면 `sync.OnceValue`와 `sync.OnceValues`를 씁니다.
Go 1.21부터 만약 init 로직이 value를 반환해야한다면 `sync.OnceValue`나 `sync.OnceValues`를 쓰면됩니다.
var getResource = sync.OnceValue(func() *MyResource {
return expensiveInit()
})
func processData() {
res := getResource()
// use res
}
-------------------------------------------------------
var getConfig = sync.OnceValues(func() (*Config, error) {
return loadConfig("config.yml")
})
func processData() {
config, err := getConfig()
if err != nil {
log.Fatal(err)
}
// use config
}
4. Immutable Data Sharing
고성능 Go 어플리케이션을 구축할때는 일반적인 병목 중 하나는 Shared Data에 대한 동시 접근입니다.
전통적인 방식은 mutex 또는 채널을 활용하는데 신중하게 사용하지 않으면 버그나 복잡성만 늘어 날 수 있습니다.
강력한 대안은 Immutable Data Sharing 입니다.
공유 데이터가 생성 된 후 변경되지 않도록 시스템을 설계하는것입니다.
Immutable Data Sharing을 쓰는 이유
- Lock이 필요없습니다. : 여러 Goroutine이 동기화 없이 데이터를 읽을 수 있습니다.
- 더 쉬운 추론 : 데이터를 변경할 수 없는 경우 전체 클래스의 경합 조건을 피할 수 있습니다.
- Copy-on-write 최적화 : 원본을 변경하지 않고 구조체의 새 버전을 만들 수 있으며, 이는 상태를 다시 로드하거나 버전 관리에 유용합니다.
실제 예시 : Shared Config
Step 1 : Config struct를 구성
// config.go
type Config struct {
LogLevel string
Timeout time.Duration
Features map[string]bool // This needs attention!
}
Step 2 : Deep Immutability를 보장
config의 변경이 없더라도 누군가 실수로 Shared map을 변경할 수 있습니다.
이를 방지하기 위해 방어용 복사본을 만듭니다.
func NewConfig(logLevel string, timeout time.Duration, features map[string]bool) *Config {
copiedFeatures := make(map[string]bool, len(features))
for k, v := range features {
copiedFeatures[k] = v
}
return &Config{
LogLevel: logLevel,
Timeout: timeout,
Features: copiedFeatures,
}
}
Step 3 : Atomic swapping
config를 안전하게 저장하고 업데이트 위해서 `atomic.Value`를 씁니다.
var currentConfig atomic.Pointer[Config]
func LoadInitialConfig() {
cfg := NewConfig("info", 5*time.Second, map[string]bool{"beta": true})
currentConfig.Store(cfg)
}
func GetConfig() *Config {
return currentConfig.Load()
}
Step 4 : Handler 안에서 사용하기
var currentConfig atomic.Pointer[Config]
func LoadInitialConfig() {
cfg := NewConfig("info", 5*time.Second, map[string]bool{"beta": true})
currentConfig.Store(cfg)
}
func GetConfig() *Config {
return currentConfig.Load()
}
또 다른 예시 : Immutable Routing Table
Step 1 : Route Struct 정의하기
type Route struct {
Path string
Backend string
}
type RoutingTable struct {
Routes []Route
}
Step 2 : Immutable version 만들기
func NewRoutingTable(routes []Route) *RoutingTable {
copied := make([]Route, len(routes))
copy(copied, routes)
return &RoutingTable{Routes: copied}
}
Step 3 : Atomic하게 데이터 저장하기
var currentRoutes atomic.Pointer[RoutingTable]
func LoadInitialRoutes() {
table := NewRoutingTable([]Route{
{Path: "/api", Backend: "http://api.internal"},
{Path: "/admin", Backend: "http://admin.internal"},
})
currentRoutes.Store(table)
}
func GetRoutingTable() *RoutingTable {
return currentRoutes.Load()
}
Step 4 : Concurrent하게 Request Route하기
func routeRequest(path string) string {
table := GetRoutingTable()
for _, route := range table.Routes {
if strings.HasPrefix(path, route.Path) {
return route.Backend
}
}
return ""
}
5. 효율적인 Context 관리
http 요청이나 worker goroutine의 동작, 혹은 외부 서비스의 query를 작업시에 종종 timeout이나 다른 이유로 인해 중간에 취소를 해야합니다.
GO의 Context는 이를 thread-safe하고 consistent하게 관리할 수 있게 합니다.
Context가 중요한 이유
Go는 두개의 Context 생성자를 제공합니다.
- context.Background() : 일반적으로 응용 프로그램의 최 상위 수준에서 사용되는 루트 컨텍스트 입니다. (main, init 등)
- context.TODO() : 사용할 컨텍스트가 명확하지 않거나 주변 코드가 컨텍스트 전파를 위해 아직 연결되지 않은 경우에 사용됩니다. 이는 context logic이 나중에 채워져야하는것을 상기시킵니다.
Go의 패키지는 API 경계를 넘어 기한, 취소신호 및 기타 요청 범위 값을 전달하도록 설계되었습니다.
일반적은 context workflow는 http handler, main함수 혹은 RPC 서버와 같은 프로그램의 진입점에서 시작됩니다.
- context.WithCancel(parent) 을 통해 취소 가능한 컨텍스트를 만들 수 있습니다.
- context.WithTimeout(parent, duration) : 특정 시간이 지나면 자동으로 취소됩니다.
- context.WithDeadline(parent, time) : 특정 시점에 cancel을 합니다.
- context.WithValue(parent, key, value) : request-scoped data를 연결하기 위함입니다.
Context가 수동 혹은 timeout에 의해서 취소되면 바로 `<- ctx.Done()`으로 알려집니다.
실용적인 Context 사용 예시
1. HTTP 서버 요청 취소
func handler(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
select {
case <-time.After(5 * time.Second):
fmt.Fprintln(w, "Response after delay")
case <-ctx.Done():
log.Println("Client disconnected")
}
}
2. Database 연산 Timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, "SELECT * FROM users")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
timeout이 있으면 데이터베이스가 동작하지 않더라도 hang이 걸리지 않습니다.
3. Request ID를 trace를 위해 전파시키기
func main() {
ctx := context.WithValue(context.Background(), "requestID", "12345")
handleRequest(ctx)
}
func handleRequest(ctx context.Context) {
log.Printf("Handling request with ID: %v", ctx.Value("requestID"))
}
4. Concurrent Worker Management
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
go worker(ctx, i)
}
// Cancel workers after some condition or signal
cancel()
5. Graceful Shutdown in CLI tools
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
<-ctx.Done()
fmt.Println("Shutting down...")
6. Streaming and Real-time data pipeline
context는 Kafka consumer, WebSocket reader, 그리고 custom pub/sub pipeline과 같은 steaming system에 이상적입니다.
func streamData(ctx context.Context, ch <-chan Data) {
for {
select {
case <-ctx.Done():
return
case data := <-ch:
process(data)
}
}
}
7. Middleware와 Rate limiting
func rateLimitMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Suppose this is the result of some rate-limiting logic
rateLimited := true // or false depending on logic
// Embed the result into the context
ctx := context.WithValue(r.Context(), "rateLimited", rateLimited)
// Pass the updated context to the next handler
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func handler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if limited, ok := ctx.Value("rateLimited").(bool); ok && limited {
http.Error(w, "Too many requests", http.StatusTooManyRequests)
return
}
fmt.Fprintln(w, "Request accepted")
}
'백엔드 > Golang' 카테고리의 다른 글
Golang 애플리케이션 성능 최적화 - 1. 메모리 최적화 (0) | 2025.04.05 |
---|---|
Golang Convention : Error wrapping vs Opaque error (0) | 2024.11.17 |
Golang Convention 중 논의를 해야 할 사항 정리 (0) | 2024.11.10 |
Golang 에러 처리 - (1) Google Guide/Best Practice 찾아보기 (2) | 2024.09.07 |
Golang zero-value 알아보기 (0) | 2024.07.09 |
개발 및 IT 관련 포스팅을 작성 하는 블로그입니다.
IT 기술 및 개인 개발에 대한 내용을 작성하는 블로그입니다. 많은 분들과 소통하며 의견을 나누고 싶습니다.