Semaphore và semaphore package trong Golang

Chào các bác,

Trong bài ngày hôm nay tôi sẽ giới thiệu với các bác sơ lược về khái niệm semaphore và package semaphore trong Golang.

Semaphore là một khái niệm phổ biến trong đồng bộ tiến trình (process synchronization) nhưng ít khi được nhắc tới trong cộng đồng lập trình tại Việt Nam.

Semaphore là gì?

Một semaphore là một biến hoặc kiểu dữ liệu được dùng để kiểm soát truy cập vào tài nguyên chung bởi các tiến trình hoặc luồng xử lý và tránh các vấn đề về vùng trọng yếu (đọc thêm tại: critical section/critical region) trong một hệ thống thực thi đồng thời (concurrent system/environment).

Semaphore kiểm soát truy cập vào tài nguyên chung bằng cách giới hạn số lượng tiến trình hoặc luồng xử lý được phép truy cập vào tài nguyên tại một thời điểm nhất định. Điều này giúp tránh các bác tránh việc hỏng dữ liệu xảy ra trong điều kiện chạy đua giữa các tiến trình hoặc luồng xử lý (race conditions).

Semaphore hoạt động như thế nào?

Như đã định nghĩa ở trên, semaphore là một biến hoặc một kiểu dữ liệu chứa thông tin về các tài nguyên được sử dụng chung giữa các tiến trình/luồng xử lý (từ đây tôi sẽ tạm gọi chung là các processes).

Dù được định nghĩa theo cách nào, Semaphore luôn có một giá trị kiểu số nguyên (sau đây tôi sẽ tạm gọi giá trị này là S). Giá trị S giảm đi khi một process nhận (acquire) được tài nguyên cần xử lý và tăng lên khi process đã xử lý xong và giải phóng (release) tài nguyên ấy. Semaphore có nguyên lý hoạt động dựa trên 2 thao tác là waitsignal hay tạm gọi dịch là đợibáo (tôi biên lại thế này cho dễ hiểu, không nhằm dịch thoát ý).

  • Đợi (wait): Giảm giá trị S. Nếu giá trị S trở thành âm, các process sẽ bị chặn cho đến khi được thông báo.

  • Báo (signal): Tăng giá trị S. Nếu các processes đang bị chặn bởi thao tác đợi ở trên và giá trị của S là một số dương, semaphore thông báo tới các processes ấy rằng chúng có thể sử dụng tài nguyên.

Cụ thể hơn, Khi S > 0, một process có thể lấy được tài nguyên để xử lý và giảm giá trị của S. Khi S = 0, process này phải đợi cho đến khi một process khác xử lý và giải phóng tài nguyên rồi tăng giá trị của S để báo cho hệ thống rằng tài nguyên có thể sử dụng được.

Các loại Semaphore

Semaphore được phân thành 2 loại:

  • Semaphore nhị phân (binary semaphore): Chỉ chứa giá trị 0 và 1 và được được dùng để bảo vệ quyền truy cập vào một tài nguyên.

  • Semaphore đếm (counting semaphore): Có bao gồm một giá trị là số nguyên dương bất kỳ trong kiểu dữ liệu. Được dùng để bảo vệ quyền truy cập tới nhiều tài nguyên một cách giới hạn.

Rồi thế cuối cùng dùng Semaphore làm gì?

Đọc xong đoạn ở trên các bác sẽ thấy khó hiểu, cũng như tôi vậy! Tôi cũng vậy, phải gãi đầu gãi tai khi mới tiếp cận khái niệm này lần đầu ở trang này.

Hãy lấy một ví dụ thực tế: Giả sử các bác được yêu cầu làm chức năng xuất một danh sách các sản phẩm ra một file Excel sử dụng API do bên thứ 3 cung cấp, có thể API này là của một hệ thống thương mại điện tử hoặc một hệ thống ERP. Đường dẫn của API trông na ná thế này:

some-api.com/products?page=1&limit=200

API liệt kê danh sách sản phẩm này cho phép phân trang dữ liệu. Vì dữ liệu sản phẩm khá lớn với khoảng vài trăm nghìn bản ghi, các bác quyết định sẽ tận dụng goroutines để tạo nhiều API request cùng lúc cho nhanh. Vấn đề ở đây là API đặt ra giới hạn số request các bác có thể gửi cùng lúc. Trong ví dụ này tôi để giới hạn ấy là client chỉ được gọi tối đa 5 request cùng lúc tới API, quá giới hạn này API sẽ báo lỗi "API rate limit exceeded".

Nếu chỉ thuần túy dùng goroutines, code của các bác sẽ trông na ná như thế này và đương nhiên là sẽ có lỗi khi thực thi chương trình:

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
)

const API_URL = "https://some-api.com/products"

type Product struct {
    Name  string `json:"name,omitempty"`
    Price string `json:"price,omitempty"`
}

func main() {
    var products []*Product

    for i := 0; i <= 100; i++ {
        go func(page int) {
            resp, err := http.Get(fmt.Sprintf("%s?page=%d&limit=250", API_URL, page+1))
            if err != nil {
                return
            }

            defer resp.Body.Close()

            prod := new(Product)
            if err := json.NewDecoder(resp.Body).Decode(&prod); err != nil {
                return
            }

            products = append(products, prod)
        }(i)
    }

    // Xử lý logic xuất dữ liệu
}

Bài toán của chúng ta ở đây sẽ là: Làm thế nào để chương trình của chúng ta gọi 5 request cùng lúc, đợi 5 request ấy thành công, rồi lại gọi 5 request tiếp theo và cứ thế lặp lại quy trình này cho tới khi chương trình có đủ dữ liệu xuất file và kết thúc mà không báo lỗi API rate limit?

Sử dụng Semaphore trong Golang

Giờ thì đến đoạn hay nhé. Để giải quyết vấn đề trên trong Golang, hẳn các bác sẽ nghĩ ngay đến các từ khóa là, goroutines, wait groupchannel. Nhưng chúng ta có thể sử dụng kết hợp chúng như thế nào?

Trước hết, chúng ta sẽ cần tạo ra một worker pool. Một worker pool là một nhóm goroutines cố định chạy nền đợi công việc được giao cho chúng. Hãy thử triển khai worker pool với wait group:

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
)

const API_URL = "https://some-api.com/products"

type Product struct {
    Name  string `json:"name,omitempty"`
    Price string `json:"price,omitempty"`
}

func main() {
    var products []*Product

    // Tạo 1 worker pool với 100 processes chạy đồng thời
    wg := &sync.WaitGroup{}
    wg.Add(100)

    for i := 0; i <= 100; i++ {
        go func(page int) {
            defer wg.Done()

            resp, err := http.Get(fmt.Sprintf("%s?page=%d&limit=250", API_URL, page+1))
            if err != nil {
                return
            }

            defer resp.Body.Close()

            prod := new(Product)
            if err := json.NewDecoder(resp.Body).Decode(&prod); err != nil {
                return
            }

            products = append(products, prod)
        }(i)
    }

    wg.Wait()

    // Xử lý logic xuất dữ liệu
}

Chắc chắn với worker pool chứa 100 processes chạy cùng lúc như ở trên, các bác vẫn chưa đạt được mục tiêu vì cần giới hạn mỗi lần gọi API chỉ có 5 request được thực thi.

Chiếu theo phần lý thuyết về Semaphore ở trên, chúng ta sẽ tạo một Semaphore đếm (counting semaphore). Trong ví dụ này tôi sử dụng một buffered channel có capacity là 5.

func main() {
    var products []*Product

    // Tạo 1 worker pool với 100 process chạy đồng thời
    wg := &sync.WaitGroup{}
    wg.Add(100)

    // Tạo một semaphore có sức chứa tối đa là 5, lúc này S = 0 vì semaphore chưa có gì
    sem := make(chan bool, 5)
    for i := 0; i <= 100; i++ {
        // Thêm vào semaphore làm tăng số lượng tài nguyên chưa xử lý, 
        // khi tổng số lượng bằng sức chứa hay S = 5, 
        // semaphore sẽ chặn việc thêm mới cho đến khi tài nguyên được xử lý hết
        sem <- true

        go func(page int) {
            defer wg.Done()
            // Giải phóng tài nguyên đã được xử lý trong semaphore, 
            // làm cho S < 5 và bỏ chặn để cho phép tài nguyên chưa xử lý khác
            // được thêm vào semaphore
            defer func() { <-sem }()

            resp, err := http.Get(fmt.Sprintf("%s?page=%d&limit=250", API_URL, page+1))
            if err != nil {
                return
            }

            defer resp.Body.Close()

            prod := new(Product)
            if err := json.NewDecoder(resp.Body).Decode(&prod); err != nil {
                return
            }

            products = append(products, prod)
        }(i)
    }

    wg.Wait()

    // Xử lý logic xuất dữ liệu
}

Trong đoạn code kể trên, các bác có thể thấy tôi vẫn dùng wait group để tạo worker pool với nhóm 100 API request chạy đồng thời, nhưng tôi có khai báo thêm biến sem là một semaphore giới hạn tại mỗi thời điểm chỉ có 5 request được chạy mà thôi, nếu vượt quá số lượng 5 sẽ chặn. Để hiểu được tại sao điều này có thể xảy ra, các bác nên đọc lại cơ chế của buffered channel.

Ứng dụng package semaphore trong Golang

Để tạo ra cơ chế semaphore thì chỉ đơn giản như trên, nhưng trong nhiều trường hợp chúng ta sẽ cần định nghĩa cấu trúc semaphore phức tạp hơn. Cụ thể là khi các process của chúng ta có sự ưu tiên nhất định về thực thi. Chẳng hạn như khi ta muốn ưu tiên chạy trước 10 trang đầu tiên của API. Khi ấy ta sẽ cần đặt trọng số cho công việc gọi 10 trang đầu tiên cao hơn. Về package semaphore trong Golang, các bác có thể đọc về package này chi tiết tại đây.

Trong ví dụ dưới đây, tôi sẽ sử dụng package semaphore cho đoạn code trên và đặt trọng số tương đương nhau cho các phần gọi API.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"

    "golang.org/x/sync/semaphore"
)

const API_URL = "https://some-api.com/products"

type Product struct {
    Name  string `json:"name,omitempty"`
    Price string `json:"price,omitempty"`
}

func main() {
    var products []*Product

    // Tạo 1 worker pool với 100 process chạy đồng thời
    wg := &sync.WaitGroup{}
    wg.Add(100)

    // Tạo một semaphore có sức chứa tối đa là 5, lúc này S = 0 vì semaphore chưa có gì
    sem := semaphore.NewWeighted(5)
    for i := 0; i <= 100; i++ {
        // Thêm vào semaphore làm tăng số lượng tài nguyên chưa xử lý,
        // khi tổng số lượng bằng sức chứa hay S = 5,
        // semaphore sẽ chặn việc thêm mới cho đến khi tài nguyên được xử lý hết
        if err := sem.Acquire(context.Background(), 1); err != nil {
            return
        }

        go func(page int) {
            defer wg.Done()
            // Giải phóng tài nguyên đã được xử lý trong semaphore,
            // làm cho S < 5 và bỏ chặn để cho phép tài nguyên chưa xử lý khác
            defer sem.Release(1)

            resp, err := http.Get(fmt.Sprintf("%s?page=%d&limit=250", API_URL, page+1))
            if err != nil {
                return
            }

            defer resp.Body.Close()

            prod := new(Product)
            if err := json.NewDecoder(resp.Body).Decode(&prod); err != nil {
                return
            }

            products = append(products, prod)
        }(i)
    }

    wg.Wait()

    // Xử lý logic xuất dữ liệu
}

Trong code ở trên, trọng số của chúng ta là 5. Theo định nghĩa của package thì trọng số này là giá trị trọng số kết hợp lớn nhất cho việc truy cập đồng thời (maximum combined weight for concurrent access). Phương thức Acquire() của package semaphore cho phép tiếp nhận process với trọng số n vào semaphore. Ở đây ta để trọng số là 1 đối với tất cả các process. Nó sẽ chặn cho đến khi process trong semaphore được xử lý xong hoặc context của chương trình kết thúc. Sau mỗi lần xử lý xong một process, các bác gọi phương thức Release() để giải phóng process khỏi semaphore, cho phép process khác được tiếp nhận. Phương thức này cũng cần truyền vào trọng số n để biết được tiến trình với trọng số nào đang được giải phóng.

Nếu muốn ưu tiên 10 trang đầu thì ta làm thế nào?

Rất đơn giản! Các bác có thể nhìn code mẫu sau và sẽ hiểu ngay:

// ...
for i := 0; i <= 100; i++ {
    var weight int64 = 1
    if i >= 0 && i <= 10 {
        weight = 3
    }

    if err := sem.Acquire(context.Background(), weight); err != nil {
        return
    }

    go func(page int, weight int64) {
        defer wg.Done()
        defer sem.Release(weight)
        // ...
    }(i, weight)
}
//...

Thêm câu điều kiện vào vòng lặp, các bác có thể thay đổi trọng số đối với các process mà các bác muốn ưu tiên chạy trước.

Rồi sao nữa?

Code ở trên cũng khá ổn rồi nhỉ các bác? Tuy nhiên có lẽ chúng ta sẽ cần cải tiến nó để xử lý lỗi tốt hơn. Tôi sẽ không đi sâu vào chi tiết phần cải tiến này vì thời lượng có hạn. Nhưng bí quyết nằm ở việc sử dụng error group thay cho wait group nhé các bác! Ngoài ra, việc ứng dụng semaphore có liên quan tới một thuật toán có tên là leaky bucket, trong đó worker pool được coi như một cái xô đựng nước và các process có thể được ẩn dụ là các giọt nước. Các bác có thể đọc thêm nếu hứng thú với thuật toán này.

Chúc các bác code vui vẻ và hẹn gặp lại ở các bài viết sau!