-
-
Notifications
You must be signed in to change notification settings - Fork 267
feat: bulk checker goroutine cancellation #1694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,8 @@ import ( | |||||||||||||||
| "sync" | ||||||||||||||||
| "sync/atomic" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/pkg/errors" | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+9
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider using the standard Since Go 1.13, the standard library's Apply this diff to switch to the standard library: - "github.com/pkg/errors"
+ "errors"📝 Committable suggestion
Suggested change
|
||||||||||||||||
| "golang.org/x/sync/errgroup" | ||||||||||||||||
| "golang.org/x/sync/semaphore" | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -123,6 +125,15 @@ func (bc *BulkChecker) CollectAndSortRequests() { | |||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // prepareRequestList sorts and prepares a copy of the request list. | ||||||||||||||||
| func (bc *BulkChecker) prepareRequestList() []BulkCheckerRequest { | ||||||||||||||||
| bc.mu.Lock() | ||||||||||||||||
| defer bc.mu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| bc.sortRequests() // Sort requests based on ID | ||||||||||||||||
| return append([]BulkCheckerRequest{}, bc.list...) // Return a copy of the list to avoid modifying the original | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // StopCollectingRequests Signal to stop collecting requests and close the channel | ||||||||||||||||
| func (bc *BulkChecker) StopCollectingRequests() { | ||||||||||||||||
| bc.mu.Lock() | ||||||||||||||||
|
|
@@ -152,17 +163,18 @@ func (bc *BulkChecker) ExecuteRequests(size uint32) error { | |||||||||||||||
| // Wait for request collection to complete before proceeding | ||||||||||||||||
| bc.wg.Wait() | ||||||||||||||||
|
|
||||||||||||||||
| // Create a context with cancellation that will be used to stop further processing | ||||||||||||||||
| ctx, cancel := context.WithCancel(bc.ctx) | ||||||||||||||||
| defer cancel() // Ensure the context is canceled when done | ||||||||||||||||
|
|
||||||||||||||||
| // Track the number of successful permission checks | ||||||||||||||||
| successCount := int64(0) | ||||||||||||||||
| // Semaphore to control the maximum number of concurrent permission checks | ||||||||||||||||
| sem := semaphore.NewWeighted(int64(bc.concurrencyLimit)) | ||||||||||||||||
| var mu sync.Mutex | ||||||||||||||||
|
|
||||||||||||||||
| // Lock the mutex to prevent race conditions while sorting and copying the list of requests | ||||||||||||||||
| bc.mu.Lock() | ||||||||||||||||
| bc.sortRequests() // Sort requests based on id | ||||||||||||||||
| listCopy := append([]BulkCheckerRequest{}, bc.list...) // Create a copy of the list to avoid modifying the original during processing | ||||||||||||||||
| bc.mu.Unlock() // Unlock the mutex after sorting and copying | ||||||||||||||||
| // Sort and copy the list of requests | ||||||||||||||||
| listCopy := bc.prepareRequestList() | ||||||||||||||||
|
|
||||||||||||||||
| // Pre-allocate a slice to store the results of the permission checks | ||||||||||||||||
| results := make([]base.CheckResult, len(listCopy)) | ||||||||||||||||
|
|
@@ -173,6 +185,7 @@ func (bc *BulkChecker) ExecuteRequests(size uint32) error { | |||||||||||||||
| for i, currentRequest := range listCopy { | ||||||||||||||||
| // If we've reached the success limit, stop processing further requests | ||||||||||||||||
| if atomic.LoadInt64(&successCount) >= int64(size) { | ||||||||||||||||
| cancel() // Cancel the context to stop further goroutines | ||||||||||||||||
| break | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -181,18 +194,31 @@ func (bc *BulkChecker) ExecuteRequests(size uint32) error { | |||||||||||||||
|
|
||||||||||||||||
| // Use errgroup to manage the goroutines, which allows for error handling and synchronization | ||||||||||||||||
| bc.g.Go(func() error { | ||||||||||||||||
| // Check if the context has been canceled, and return without error if so | ||||||||||||||||
| if err := ctx.Err(); err == context.Canceled { | ||||||||||||||||
| return nil // Gracefully exit the goroutine if context is canceled | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+198
to
+201
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle all context-related errors when checking context Currently, only Apply this diff: - if err := ctx.Err(); err == context.Canceled {
+ if err := ctx.Err(); err != nil {
return nil // Gracefully exit the goroutine if context is canceled or deadline exceeded
}Alternatively, use the - if err := ctx.Err(); err == context.Canceled {
+ if IsContextRelatedError(ctx, ctx.Err()) {
return nil // Gracefully exit the goroutine if context is canceled or deadline exceeded
}📝 Committable suggestion
Suggested change
|
||||||||||||||||
| // Acquire a slot in the semaphore to control concurrency | ||||||||||||||||
| if err := sem.Acquire(bc.ctx, 1); err != nil { | ||||||||||||||||
| return err // Return an error if semaphore acquisition fails | ||||||||||||||||
| if err := sem.Acquire(ctx, 1); err != nil { | ||||||||||||||||
| // Return nil instead of error if the context was canceled during semaphore acquisition | ||||||||||||||||
| if IsContextRelatedError(ctx, err) { | ||||||||||||||||
| return nil | ||||||||||||||||
| } | ||||||||||||||||
| return err // Return an error if semaphore acquisition fails for other reasons | ||||||||||||||||
| } | ||||||||||||||||
| defer sem.Release(1) // Ensure the semaphore slot is released after processing | ||||||||||||||||
|
|
||||||||||||||||
| var result base.CheckResult | ||||||||||||||||
| if req.Result == base.CheckResult_CHECK_RESULT_UNSPECIFIED { | ||||||||||||||||
| // Perform the permission check if the result is not already specified | ||||||||||||||||
| cr, err := bc.checker.Check(bc.ctx, req.Request) | ||||||||||||||||
| cr, err := bc.checker.Check(ctx, req.Request) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| return err // Return an error if the check fails | ||||||||||||||||
| // Handle context cancellation error here | ||||||||||||||||
| if IsContextRelatedError(ctx, err) { | ||||||||||||||||
| return nil // Ignore the cancellation error | ||||||||||||||||
| } | ||||||||||||||||
| return err // Return the actual error if it's not due to cancellation | ||||||||||||||||
| } | ||||||||||||||||
| result = cr.GetCan() // Get the result from the check | ||||||||||||||||
| } else { | ||||||||||||||||
|
|
@@ -230,13 +256,22 @@ func (bc *BulkChecker) ExecuteRequests(size uint32) error { | |||||||||||||||
| } | ||||||||||||||||
| mu.Unlock() // Unlock the mutex after updating the results and processed index | ||||||||||||||||
|
|
||||||||||||||||
| // If the success limit has been reached, cancel the context to stop further processing | ||||||||||||||||
| if atomic.LoadInt64(&successCount) >= int64(size) { | ||||||||||||||||
| cancel() | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| return nil // Return nil to indicate successful processing | ||||||||||||||||
| }) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Wait for all goroutines to complete and check for any errors | ||||||||||||||||
| if err := bc.g.Wait(); err != nil { | ||||||||||||||||
| return err // Return the error if any goroutine returned an error | ||||||||||||||||
| // Ignore context cancellation as an error | ||||||||||||||||
| if IsContextRelatedError(ctx, err) { | ||||||||||||||||
| return nil | ||||||||||||||||
| } | ||||||||||||||||
| return err // Return other errors if any goroutine returned an error | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| return nil // Return nil if all processing completed successfully | ||||||||||||||||
|
|
@@ -307,3 +342,8 @@ func (s *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.Per | |||||||||||||||
| Result: result, | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // IsContextRelatedError checks if the error is due to context cancellation, deadline exceedance, or closed connection | ||||||||||||||||
| func IsContextRelatedError(ctx context.Context, err error) bool { | ||||||||||||||||
| return errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded) | ||||||||||||||||
| } | ||||||||||||||||
|
Comment on lines
+347
to
+349
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix The function currently checks Apply this diff: func IsContextRelatedError(ctx context.Context, err error) bool {
- return errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded)
+ return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}📝 Committable suggestion
Suggested change
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -82,6 +82,7 @@ type CheckResponse struct { | |||||||||||||||||||||
| // VisitsMap - a thread-safe map of ENR records. | ||||||||||||||||||||||
| type VisitsMap struct { | ||||||||||||||||||||||
| er sync.Map | ||||||||||||||||||||||
| ea sync.Map | ||||||||||||||||||||||
| published sync.Map | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -91,6 +92,12 @@ func (s *VisitsMap) AddER(entity *base.Entity, relation string) bool { | |||||||||||||||||||||
| return !existed | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func (s *VisitsMap) AddEA(entityType, attribute string) bool { | ||||||||||||||||||||||
| key := fmt.Sprintf("%s$%s", entityType, attribute) | ||||||||||||||||||||||
| _, existed := s.er.LoadOrStore(key, struct{}{}) | ||||||||||||||||||||||
| return !existed | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+95
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix the map used in The Please apply the following fix: func (s *VisitsMap) AddEA(entityType, attribute string) bool {
key := fmt.Sprintf("%s$%s", entityType, attribute)
- _, existed := s.er.LoadOrStore(key, struct{}{})
+ _, existed := s.ea.LoadOrStore(key, struct{}{})
return !existed
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| func (s *VisitsMap) AddPublished(entity *base.Entity) bool { | ||||||||||||||||||||||
| key := tuple.EntityToString(entity) | ||||||||||||||||||||||
| _, existed := s.published.LoadOrStore(key, struct{}{}) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Issues with the change from
name_templatetoversion_templateThe
version_templatekey was not found in GoReleaser's documentation for snapshot configuration. Additionally,name_templateis still present in.goreleaser.yml. Please verify and correct the configuration to ensure it aligns with GoReleaser's requirements.🔗 Analysis chain
Confirm the change from
name_templatetoversion_templateThe change from
name_templatetoversion_templatein thesnapshotsection appears to be intentional. This modification alters how versioning for snapshots is defined.To ensure this change aligns with GoReleaser's current best practices, please run the following verification script:
This script will help verify if the change is consistent with GoReleaser's current documentation and if all relevant occurrences have been updated.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 673