Skip to content
Listing Objects

Listing Objects

ListObjects returns all objects of a given type that a subject has a specific relation on.

Basic Usage

// Find all repositories user can read (with pagination)
repoIDs, cursor, err := checker.ListObjects(ctx,
    authz.User("123"),
    authz.RelCanRead,
    "repository",
    melange.PageOptions{Limit: 100},
)
if err != nil {
    return err
}

// repoIDs = ["repo-1", "repo-456", "repo-789"]
// cursor = nil when no more pages, or a string to fetch the next page

Parameters

ParameterTypeDescription
subject_typetextType of the subject (e.g., 'user')
subject_idtextID of the subject
relationtextThe relation to check
object_typetextType of objects to return
p_limitintMaximum number of results per page (NULL = no limit)
p_aftertextCursor from previous page (NULL = start from beginning)

Return Value

Returns a table with object_id and next_cursor columns. The next_cursor value is repeated on every row for convenience - use the last row’s cursor to fetch the next page. Returns an empty result set if no objects found (not an error).

Ordering: Results are ordered deterministically by object_id to ensure stable pagination across requests.

Pagination

Cursor-Based Pagination

Use cursor-based pagination to iterate through large result sets efficiently:

// Paginate through all accessible repositories
var cursor *string
for {
    ids, next, err := checker.ListObjects(ctx,
        authz.User("123"),
        authz.RelCanRead,
        "repository",
        melange.PageOptions{Limit: 100, After: cursor},
    )
    if err != nil {
        return err
    }

    for _, id := range ids {
        // Process each repository ID
        fmt.Println("Accessible:", id)
    }

    if next == nil {
        break // No more pages
    }
    cursor = next
}

Fetching All Results

For convenience, use the ListObjectsAll helper which automatically paginates through all results:

// Get all accessible repositories (auto-paginates internally)
allRepoIDs, err := checker.ListObjectsAll(ctx,
    authz.User("123"),
    authz.RelCanRead,
    "repository",
)
if err != nil {
    return err
}
// allRepoIDs contains all accessible repository IDs
Use with caution: ListObjectsAll loads all IDs into memory. For large datasets, prefer paginated queries with ListObjects to control memory usage.

Examples

Filter a List of Resources

// Get all repositories from your data layer
repos, err := db.GetAllRepositories(ctx)
if err != nil {
    return err
}

// Get IDs the user can access (using ListObjectsAll for convenience)
accessibleIDs, err := checker.ListObjectsAll(ctx, user, "can_read", "repository")
if err != nil {
    return err
}

// Build a set for O(1) lookup
accessSet := make(map[string]bool, len(accessibleIDs))
for _, id := range accessibleIDs {
    accessSet[id] = true
}

// Filter to only accessible repos
var visibleRepos []Repository
for _, repo := range repos {
    if accessSet[fmt.Sprint(repo.ID)] {
        visibleRepos = append(visibleRepos, repo)
    }
}

Fetch Only Accessible Resources

// Get accessible IDs first (using ListObjectsAll)
ids, err := checker.ListObjectsAll(ctx, user, "can_read", "document")
if err != nil {
    return nil, err
}

if len(ids) == 0 {
    return []Document{}, nil
}

// Query only those documents
docs, err := db.GetDocumentsByIDs(ctx, ids)
return docs, err

Check Multiple Permissions

type RepoWithPermissions struct {
    Repository
    CanRead   bool
    CanWrite  bool
    CanDelete bool
}

// Fetch all permission sets in parallel (using ListObjectsAll)
var (
    readIDs, writeIDs, deleteIDs []string
    readErr, writeErr, deleteErr error
)

var wg sync.WaitGroup
wg.Add(3)

go func() {
    defer wg.Done()
    readIDs, readErr = checker.ListObjectsAll(ctx, user, "can_read", "repository")
}()
go func() {
    defer wg.Done()
    writeIDs, writeErr = checker.ListObjectsAll(ctx, user, "can_write", "repository")
}()
go func() {
    defer wg.Done()
    deleteIDs, deleteErr = checker.ListObjectsAll(ctx, user, "can_delete", "repository")
}()
wg.Wait()

// Check errors...

// Build permission maps
canRead := toSet(readIDs)
canWrite := toSet(writeIDs)
canDelete := toSet(deleteIDs)

// Annotate repos with permissions
for _, repo := range repos {
    id := fmt.Sprint(repo.ID)
    result = append(result, RepoWithPermissions{
        Repository: repo,
        CanRead:    canRead[id],
        CanWrite:   canWrite[id],
        CanDelete:  canDelete[id],
    })
}

Performance Characteristics

ListObjects uses a recursive CTE that walks the permission graph in a single query. Performance depends primarily on the number of accessible objects (results), not total tuple count:

Result CountTypical Latency
<10 objects300-500 μs
10-1001-10 ms
100-1K10-50 ms
1K-10K30-200 ms
10K+200-1000 ms

Performance scales with result set size. For large datasets:

  1. Use pagination - Use p_limit to control result size and avoid loading unbounded data
  2. Pre-filter candidates - If you know the user only cares about certain objects, filter at the application layer first
  3. Cache results - Cache ListObjects results for repeated queries

Decision Override Behavior

DecisionBehavior
DecisionUnsetNormal database query
DecisionDenyReturns empty slice (no access)
DecisionAllowFalls through to database query

Note: DecisionAllow cannot enumerate “all” objects, so it performs the normal query. If you need to return all objects when in admin mode, query your database directly.

Caching

ListObjects does not use the single-tuple permission cache. For caching list results, implement application-level caching:

type CachedChecker struct {
    *melange.Checker
    listCache *lru.Cache
}

func (c *CachedChecker) ListObjectsAll(ctx context.Context, subject SubjectLike, relation RelationLike, objectType ObjectType) ([]string, error) {
    key := fmt.Sprintf("list:%s:%s:%s:%s",
        subject.FGASubject().Type,
        subject.FGASubject().ID,
        relation.FGARelation(),
        objectType,
    )

    if cached, ok := c.listCache.Get(key); ok {
        return cached.([]string), nil
    }

    ids, err := c.Checker.ListObjectsAll(ctx, subject, relation, objectType)
    if err != nil {
        return nil, err
    }

    c.listCache.Add(key, ids)
    return ids, nil
}

Common Patterns

Paginated Access Control

func GetAccessibleRepos(ctx context.Context, user User, cursor *string, pageSize int) ([]Repository, *string, error) {
    // Get a page of accessible IDs with built-in pagination
    ids, nextCursor, err := checker.ListObjects(ctx, user, "can_read", "repository",
        melange.PageOptions{Limit: pageSize, After: cursor})
    if err != nil {
        return nil, nil, err
    }

    if len(ids) == 0 {
        return []Repository{}, nil, nil
    }

    // Fetch the repos for this page
    repos, err := db.GetRepositoriesByIDs(ctx, ids)
    return repos, nextCursor, err
}

Admin Override

func GetVisibleRepos(ctx context.Context, user User, isAdmin bool) ([]Repository, error) {
    if isAdmin {
        // Admins see everything
        return db.GetAllRepositories(ctx)
    }

    // Regular users see only accessible repos
    ids, err := checker.ListObjectsAll(ctx, user, "can_read", "repository")
    if err != nil {
        return nil, err
    }

    return db.GetRepositoriesByIDs(ctx, ids)
}

Transaction Consistency

Paginated queries across multiple calls can observe changes between pages. For consistency-critical flows, run paging inside a transaction with repeatable-read or snapshot semantics:

// For consistent pagination across pages, use a transaction
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
    return err
}
defer tx.Rollback()

txChecker := melange.NewChecker(tx)

var allIDs []string
var cursor *string
for {
    ids, next, err := txChecker.ListObjects(ctx, user, "can_read", "document",
        melange.PageOptions{Limit: 100, After: cursor})
    if err != nil {
        return err
    }
    allIDs = append(allIDs, ids...)
    if next == nil {
        break
    }
    cursor = next
}

tx.Commit()

See Also