mirror of
https://github.com/bitmagnet-io/bitmagnet.git
synced 2025-12-28 06:34:17 +00:00
Convert bloom filters to streamed large objects (#396)
This commit is contained in:
parent
542fecc33d
commit
00f46125f9
@ -3,8 +3,8 @@ package blocking
|
||||
import (
|
||||
"context"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/database/dao"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"go.uber.org/fx"
|
||||
"sync"
|
||||
"time"
|
||||
@ -12,7 +12,7 @@ import (
|
||||
|
||||
type Params struct {
|
||||
fx.In
|
||||
Dao lazy.Lazy[*dao.Query]
|
||||
Pool lazy.Lazy[*pgxpool.Pool]
|
||||
PgxPoolWait *sync.WaitGroup `name:"pgx_pool_wait"`
|
||||
}
|
||||
|
||||
@ -24,13 +24,13 @@ type Result struct {
|
||||
|
||||
func New(params Params) Result {
|
||||
lazyManager := lazy.New[Manager](func() (Manager, error) {
|
||||
d, err := params.Dao.Get()
|
||||
pool, err := params.Pool.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params.PgxPoolWait.Add(1)
|
||||
return &manager{
|
||||
dao: d,
|
||||
pool: pool,
|
||||
buffer: make(map[protocol.ID]struct{}, 1000),
|
||||
maxBufferSize: 1000,
|
||||
maxFlushWait: time.Minute * 5,
|
||||
|
||||
@ -2,24 +2,28 @@ package blocking
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/bloom"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/database/dao"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
Filter(ctx context.Context, hashes []protocol.ID) ([]protocol.ID, error)
|
||||
Block(ctx context.Context, hashes []protocol.ID) error
|
||||
Block(ctx context.Context, hashes []protocol.ID, flush bool) error
|
||||
Flush(ctx context.Context) error
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
mutex sync.Mutex
|
||||
dao *dao.Query
|
||||
pool *pgxpool.Pool
|
||||
buffer map[protocol.ID]struct{}
|
||||
filter bloom.StableBloomFilter
|
||||
filter *bloom.StableBloomFilter
|
||||
maxBufferSize int
|
||||
lastFlushedAt time.Time
|
||||
maxFlushWait time.Duration
|
||||
@ -28,7 +32,7 @@ type manager struct {
|
||||
func (m *manager) Filter(ctx context.Context, hashes []protocol.ID) ([]protocol.ID, error) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
if m.filter.Cells() == 0 || m.shouldFlush() {
|
||||
if m.filter == nil || m.shouldFlush() {
|
||||
if flushErr := m.flush(ctx); flushErr != nil {
|
||||
return nil, flushErr
|
||||
}
|
||||
@ -46,13 +50,13 @@ func (m *manager) Filter(ctx context.Context, hashes []protocol.ID) ([]protocol.
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
func (m *manager) Block(ctx context.Context, hashes []protocol.ID) error {
|
||||
func (m *manager) Block(ctx context.Context, hashes []protocol.ID, flush bool) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
for _, hash := range hashes {
|
||||
m.buffer[hash] = struct{}{}
|
||||
}
|
||||
if m.shouldFlush() {
|
||||
if flush || m.shouldFlush() {
|
||||
if flushErr := m.flush(ctx); flushErr != nil {
|
||||
return flushErr
|
||||
}
|
||||
@ -69,18 +73,103 @@ func (m *manager) Flush(ctx context.Context) error {
|
||||
return m.flush(ctx)
|
||||
}
|
||||
|
||||
const blockedTorrentsBloomFilterKey = "blocked_torrents"
|
||||
|
||||
func (m *manager) flush(ctx context.Context) error {
|
||||
var hashes []protocol.ID
|
||||
for hash := range m.buffer {
|
||||
hashes = append(hashes, hash)
|
||||
}
|
||||
bf, blockErr := m.dao.BlockTorrents(ctx, hashes)
|
||||
if blockErr != nil {
|
||||
return blockErr
|
||||
|
||||
tx, err := m.pool.BeginTx(ctx, pgx.TxOptions{
|
||||
AccessMode: pgx.ReadWrite,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = tx.Rollback(ctx)
|
||||
}()
|
||||
|
||||
if len(hashes) > 0 {
|
||||
_, err = tx.Exec(ctx, "DELETE FROM torrents WHERE info_hash = any($1)", hashes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete from torrents table: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
bf := bloom.NewDefaultStableBloomFilter()
|
||||
|
||||
lobs := tx.LargeObjects()
|
||||
|
||||
found := false
|
||||
|
||||
var oid uint32
|
||||
var nullOid sql.NullInt32
|
||||
err = tx.QueryRow(ctx, "SELECT oid FROM bloom_filters WHERE key = $1", blockedTorrentsBloomFilterKey).Scan(&nullOid)
|
||||
if err == nil {
|
||||
found = true
|
||||
if nullOid.Valid {
|
||||
oid = uint32(nullOid.Int32)
|
||||
obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeRead)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open large object for reading: %w", err)
|
||||
}
|
||||
_, err = bf.ReadFrom(obj)
|
||||
obj.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read current bloom filter: %w", err)
|
||||
}
|
||||
}
|
||||
} else if !errors.Is(err, pgx.ErrNoRows) {
|
||||
return fmt.Errorf("failed to get bloom filter object ID: %w", err)
|
||||
}
|
||||
|
||||
if oid == 0 {
|
||||
// Create a new Large Object.
|
||||
// We pass 0, so the DB can pick an available oid for us.
|
||||
oid, err = lobs.Create(ctx, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create large object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, hash := range hashes {
|
||||
bf.Add(hash[:])
|
||||
}
|
||||
|
||||
obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeWrite)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open large object for writing: %w", err)
|
||||
}
|
||||
|
||||
_, err = bf.WriteTo(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to large object: %w", err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if !found {
|
||||
_, err = tx.Exec(ctx, "INSERT INTO bloom_filters (key, oid, created_at, updated_at) VALUES ($1, $2, $3, $4)", blockedTorrentsBloomFilterKey, oid, now, now)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save new bloom filter record: %w", err)
|
||||
}
|
||||
} else if !nullOid.Valid {
|
||||
_, err = tx.Exec(ctx, "UPDATE bloom_filters SET oid = $1, updated_at = $2 WHERE key = $3", oid, now, blockedTorrentsBloomFilterKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update bloom filter record: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
|
||||
m.buffer = make(map[protocol.ID]struct{})
|
||||
m.filter = bf.Filter
|
||||
m.lastFlushedAt = time.Now()
|
||||
m.filter = bf
|
||||
m.lastFlushedAt = now
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1,392 +0,0 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"gorm.io/gorm/schema"
|
||||
|
||||
"gorm.io/gen"
|
||||
"gorm.io/gen/field"
|
||||
|
||||
"gorm.io/plugin/dbresolver"
|
||||
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/model"
|
||||
)
|
||||
|
||||
func newBloomFilter(db *gorm.DB, opts ...gen.DOOption) bloomFilter {
|
||||
_bloomFilter := bloomFilter{}
|
||||
|
||||
_bloomFilter.bloomFilterDo.UseDB(db, opts...)
|
||||
_bloomFilter.bloomFilterDo.UseModel(&model.BloomFilter{})
|
||||
|
||||
tableName := _bloomFilter.bloomFilterDo.TableName()
|
||||
_bloomFilter.ALL = field.NewAsterisk(tableName)
|
||||
_bloomFilter.Key = field.NewString(tableName, "key")
|
||||
_bloomFilter.Filter = field.NewField(tableName, "bytes")
|
||||
_bloomFilter.CreatedAt = field.NewTime(tableName, "created_at")
|
||||
_bloomFilter.UpdatedAt = field.NewTime(tableName, "updated_at")
|
||||
|
||||
_bloomFilter.fillFieldMap()
|
||||
|
||||
return _bloomFilter
|
||||
}
|
||||
|
||||
type bloomFilter struct {
|
||||
bloomFilterDo
|
||||
|
||||
ALL field.Asterisk
|
||||
Key field.String
|
||||
Filter field.Field
|
||||
CreatedAt field.Time
|
||||
UpdatedAt field.Time
|
||||
|
||||
fieldMap map[string]field.Expr
|
||||
}
|
||||
|
||||
func (b bloomFilter) Table(newTableName string) *bloomFilter {
|
||||
b.bloomFilterDo.UseTable(newTableName)
|
||||
return b.updateTableName(newTableName)
|
||||
}
|
||||
|
||||
func (b bloomFilter) As(alias string) *bloomFilter {
|
||||
b.bloomFilterDo.DO = *(b.bloomFilterDo.As(alias).(*gen.DO))
|
||||
return b.updateTableName(alias)
|
||||
}
|
||||
|
||||
func (b *bloomFilter) updateTableName(table string) *bloomFilter {
|
||||
b.ALL = field.NewAsterisk(table)
|
||||
b.Key = field.NewString(table, "key")
|
||||
b.Filter = field.NewField(table, "bytes")
|
||||
b.CreatedAt = field.NewTime(table, "created_at")
|
||||
b.UpdatedAt = field.NewTime(table, "updated_at")
|
||||
|
||||
b.fillFieldMap()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *bloomFilter) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
|
||||
_f, ok := b.fieldMap[fieldName]
|
||||
if !ok || _f == nil {
|
||||
return nil, false
|
||||
}
|
||||
_oe, ok := _f.(field.OrderExpr)
|
||||
return _oe, ok
|
||||
}
|
||||
|
||||
func (b *bloomFilter) fillFieldMap() {
|
||||
b.fieldMap = make(map[string]field.Expr, 4)
|
||||
b.fieldMap["key"] = b.Key
|
||||
b.fieldMap["bytes"] = b.Filter
|
||||
b.fieldMap["created_at"] = b.CreatedAt
|
||||
b.fieldMap["updated_at"] = b.UpdatedAt
|
||||
}
|
||||
|
||||
func (b bloomFilter) clone(db *gorm.DB) bloomFilter {
|
||||
b.bloomFilterDo.ReplaceConnPool(db.Statement.ConnPool)
|
||||
return b
|
||||
}
|
||||
|
||||
func (b bloomFilter) replaceDB(db *gorm.DB) bloomFilter {
|
||||
b.bloomFilterDo.ReplaceDB(db)
|
||||
return b
|
||||
}
|
||||
|
||||
type bloomFilterDo struct{ gen.DO }
|
||||
|
||||
type IBloomFilterDo interface {
|
||||
gen.SubQuery
|
||||
Debug() IBloomFilterDo
|
||||
WithContext(ctx context.Context) IBloomFilterDo
|
||||
WithResult(fc func(tx gen.Dao)) gen.ResultInfo
|
||||
ReplaceDB(db *gorm.DB)
|
||||
ReadDB() IBloomFilterDo
|
||||
WriteDB() IBloomFilterDo
|
||||
As(alias string) gen.Dao
|
||||
Session(config *gorm.Session) IBloomFilterDo
|
||||
Columns(cols ...field.Expr) gen.Columns
|
||||
Clauses(conds ...clause.Expression) IBloomFilterDo
|
||||
Not(conds ...gen.Condition) IBloomFilterDo
|
||||
Or(conds ...gen.Condition) IBloomFilterDo
|
||||
Select(conds ...field.Expr) IBloomFilterDo
|
||||
Where(conds ...gen.Condition) IBloomFilterDo
|
||||
Order(conds ...field.Expr) IBloomFilterDo
|
||||
Distinct(cols ...field.Expr) IBloomFilterDo
|
||||
Omit(cols ...field.Expr) IBloomFilterDo
|
||||
Join(table schema.Tabler, on ...field.Expr) IBloomFilterDo
|
||||
LeftJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo
|
||||
RightJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo
|
||||
Group(cols ...field.Expr) IBloomFilterDo
|
||||
Having(conds ...gen.Condition) IBloomFilterDo
|
||||
Limit(limit int) IBloomFilterDo
|
||||
Offset(offset int) IBloomFilterDo
|
||||
Count() (count int64, err error)
|
||||
Scopes(funcs ...func(gen.Dao) gen.Dao) IBloomFilterDo
|
||||
Unscoped() IBloomFilterDo
|
||||
Create(values ...*model.BloomFilter) error
|
||||
CreateInBatches(values []*model.BloomFilter, batchSize int) error
|
||||
Save(values ...*model.BloomFilter) error
|
||||
First() (*model.BloomFilter, error)
|
||||
Take() (*model.BloomFilter, error)
|
||||
Last() (*model.BloomFilter, error)
|
||||
Find() ([]*model.BloomFilter, error)
|
||||
FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.BloomFilter, err error)
|
||||
FindInBatches(result *[]*model.BloomFilter, batchSize int, fc func(tx gen.Dao, batch int) error) error
|
||||
Pluck(column field.Expr, dest interface{}) error
|
||||
Delete(...*model.BloomFilter) (info gen.ResultInfo, err error)
|
||||
Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
Updates(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
UpdateColumns(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateFrom(q gen.SubQuery) gen.Dao
|
||||
Attrs(attrs ...field.AssignExpr) IBloomFilterDo
|
||||
Assign(attrs ...field.AssignExpr) IBloomFilterDo
|
||||
Joins(fields ...field.RelationField) IBloomFilterDo
|
||||
Preload(fields ...field.RelationField) IBloomFilterDo
|
||||
FirstOrInit() (*model.BloomFilter, error)
|
||||
FirstOrCreate() (*model.BloomFilter, error)
|
||||
FindByPage(offset int, limit int) (result []*model.BloomFilter, count int64, err error)
|
||||
ScanByPage(result interface{}, offset int, limit int) (count int64, err error)
|
||||
Scan(result interface{}) (err error)
|
||||
Returning(value interface{}, columns ...string) IBloomFilterDo
|
||||
UnderlyingDB() *gorm.DB
|
||||
schema.Tabler
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Debug() IBloomFilterDo {
|
||||
return b.withDO(b.DO.Debug())
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) WithContext(ctx context.Context) IBloomFilterDo {
|
||||
return b.withDO(b.DO.WithContext(ctx))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) ReadDB() IBloomFilterDo {
|
||||
return b.Clauses(dbresolver.Read)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) WriteDB() IBloomFilterDo {
|
||||
return b.Clauses(dbresolver.Write)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Session(config *gorm.Session) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Session(config))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Clauses(conds ...clause.Expression) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Clauses(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Returning(value interface{}, columns ...string) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Returning(value, columns...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Not(conds ...gen.Condition) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Not(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Or(conds ...gen.Condition) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Or(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Select(conds ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Select(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Where(conds ...gen.Condition) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Where(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Order(conds ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Order(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Distinct(cols ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Distinct(cols...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Omit(cols ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Omit(cols...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Join(table schema.Tabler, on ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Join(table, on...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) LeftJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.LeftJoin(table, on...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) RightJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.RightJoin(table, on...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Group(cols ...field.Expr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Group(cols...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Having(conds ...gen.Condition) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Having(conds...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Limit(limit int) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Limit(limit))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Offset(offset int) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Offset(offset))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Scopes(funcs ...func(gen.Dao) gen.Dao) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Scopes(funcs...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Unscoped() IBloomFilterDo {
|
||||
return b.withDO(b.DO.Unscoped())
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Create(values ...*model.BloomFilter) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return b.DO.Create(values)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) CreateInBatches(values []*model.BloomFilter, batchSize int) error {
|
||||
return b.DO.CreateInBatches(values, batchSize)
|
||||
}
|
||||
|
||||
// Save : !!! underlying implementation is different with GORM
|
||||
// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values)
|
||||
func (b bloomFilterDo) Save(values ...*model.BloomFilter) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return b.DO.Save(values)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) First() (*model.BloomFilter, error) {
|
||||
if result, err := b.DO.First(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.BloomFilter), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Take() (*model.BloomFilter, error) {
|
||||
if result, err := b.DO.Take(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.BloomFilter), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Last() (*model.BloomFilter, error) {
|
||||
if result, err := b.DO.Last(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.BloomFilter), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Find() ([]*model.BloomFilter, error) {
|
||||
result, err := b.DO.Find()
|
||||
return result.([]*model.BloomFilter), err
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.BloomFilter, err error) {
|
||||
buf := make([]*model.BloomFilter, 0, batchSize)
|
||||
err = b.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error {
|
||||
defer func() { results = append(results, buf...) }()
|
||||
return fc(tx, batch)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) FindInBatches(result *[]*model.BloomFilter, batchSize int, fc func(tx gen.Dao, batch int) error) error {
|
||||
return b.DO.FindInBatches(result, batchSize, fc)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Attrs(attrs ...field.AssignExpr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Attrs(attrs...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Assign(attrs ...field.AssignExpr) IBloomFilterDo {
|
||||
return b.withDO(b.DO.Assign(attrs...))
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Joins(fields ...field.RelationField) IBloomFilterDo {
|
||||
for _, _f := range fields {
|
||||
b = *b.withDO(b.DO.Joins(_f))
|
||||
}
|
||||
return &b
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Preload(fields ...field.RelationField) IBloomFilterDo {
|
||||
for _, _f := range fields {
|
||||
b = *b.withDO(b.DO.Preload(_f))
|
||||
}
|
||||
return &b
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) FirstOrInit() (*model.BloomFilter, error) {
|
||||
if result, err := b.DO.FirstOrInit(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.BloomFilter), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) FirstOrCreate() (*model.BloomFilter, error) {
|
||||
if result, err := b.DO.FirstOrCreate(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.BloomFilter), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) FindByPage(offset int, limit int) (result []*model.BloomFilter, count int64, err error) {
|
||||
result, err = b.Offset(offset).Limit(limit).Find()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if size := len(result); 0 < limit && 0 < size && size < limit {
|
||||
count = int64(size + offset)
|
||||
return
|
||||
}
|
||||
|
||||
count, err = b.Offset(-1).Limit(-1).Count()
|
||||
return
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) {
|
||||
count, err = b.Count()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = b.Offset(offset).Limit(limit).Scan(result)
|
||||
return
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Scan(result interface{}) (err error) {
|
||||
return b.DO.Scan(result)
|
||||
}
|
||||
|
||||
func (b bloomFilterDo) Delete(models ...*model.BloomFilter) (result gen.ResultInfo, err error) {
|
||||
return b.DO.Delete(models)
|
||||
}
|
||||
|
||||
func (b *bloomFilterDo) withDO(do gen.Dao) *bloomFilterDo {
|
||||
b.DO = *do.(*gen.DO)
|
||||
return b
|
||||
}
|
||||
@ -17,7 +17,6 @@ import (
|
||||
|
||||
var (
|
||||
Q = new(Query)
|
||||
BloomFilter *bloomFilter
|
||||
Content *content
|
||||
ContentAttribute *contentAttribute
|
||||
ContentCollection *contentCollection
|
||||
@ -37,7 +36,6 @@ var (
|
||||
|
||||
func SetDefault(db *gorm.DB, opts ...gen.DOOption) {
|
||||
*Q = *Use(db, opts...)
|
||||
BloomFilter = &Q.BloomFilter
|
||||
Content = &Q.Content
|
||||
ContentAttribute = &Q.ContentAttribute
|
||||
ContentCollection = &Q.ContentCollection
|
||||
@ -58,7 +56,6 @@ func SetDefault(db *gorm.DB, opts ...gen.DOOption) {
|
||||
func Use(db *gorm.DB, opts ...gen.DOOption) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
BloomFilter: newBloomFilter(db, opts...),
|
||||
Content: newContent(db, opts...),
|
||||
ContentAttribute: newContentAttribute(db, opts...),
|
||||
ContentCollection: newContentCollection(db, opts...),
|
||||
@ -80,7 +77,6 @@ func Use(db *gorm.DB, opts ...gen.DOOption) *Query {
|
||||
type Query struct {
|
||||
db *gorm.DB
|
||||
|
||||
BloomFilter bloomFilter
|
||||
Content content
|
||||
ContentAttribute contentAttribute
|
||||
ContentCollection contentCollection
|
||||
@ -103,7 +99,6 @@ func (q *Query) Available() bool { return q.db != nil }
|
||||
func (q *Query) clone(db *gorm.DB) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
BloomFilter: q.BloomFilter.clone(db),
|
||||
Content: q.Content.clone(db),
|
||||
ContentAttribute: q.ContentAttribute.clone(db),
|
||||
ContentCollection: q.ContentCollection.clone(db),
|
||||
@ -133,7 +128,6 @@ func (q *Query) WriteDB() *Query {
|
||||
func (q *Query) ReplaceDB(db *gorm.DB) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
BloomFilter: q.BloomFilter.replaceDB(db),
|
||||
Content: q.Content.replaceDB(db),
|
||||
ContentAttribute: q.ContentAttribute.replaceDB(db),
|
||||
ContentCollection: q.ContentCollection.replaceDB(db),
|
||||
@ -153,7 +147,6 @@ func (q *Query) ReplaceDB(db *gorm.DB) *Query {
|
||||
}
|
||||
|
||||
type queryCtx struct {
|
||||
BloomFilter IBloomFilterDo
|
||||
Content IContentDo
|
||||
ContentAttribute IContentAttributeDo
|
||||
ContentCollection IContentCollectionDo
|
||||
@ -173,7 +166,6 @@ type queryCtx struct {
|
||||
|
||||
func (q *Query) WithContext(ctx context.Context) *queryCtx {
|
||||
return &queryCtx{
|
||||
BloomFilter: q.BloomFilter.WithContext(ctx),
|
||||
Content: q.Content.WithContext(ctx),
|
||||
ContentAttribute: q.ContentAttribute.WithContext(ctx),
|
||||
ContentCollection: q.ContentCollection.WithContext(ctx),
|
||||
|
||||
@ -1,74 +0,0 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/bloom"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/model"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/protocol"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const blockedTorrentsBloomFilterKey = "blocked_torrents"
|
||||
|
||||
func (q *Query) DeleteAndBlockTorrents(ctx context.Context, infoHashes []protocol.ID) (model.BloomFilter, error) {
|
||||
var valuers []driver.Valuer
|
||||
for _, infoHash := range infoHashes {
|
||||
valuers = append(valuers, infoHash)
|
||||
}
|
||||
var bf model.BloomFilter
|
||||
if txErr := q.Transaction(func(tx *Query) error {
|
||||
if _, deleteErr := tx.Torrent.WithContext(ctx).Where(tx.Torrent.InfoHash.In(valuers...)).Delete(); deleteErr != nil {
|
||||
return deleteErr
|
||||
}
|
||||
pBf, txErr := blockTx(ctx, tx, infoHashes)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
bf = *pBf
|
||||
return nil
|
||||
}); txErr != nil {
|
||||
return bf, txErr
|
||||
}
|
||||
return bf, nil
|
||||
}
|
||||
|
||||
func (q *Query) BlockTorrents(ctx context.Context, infoHashes []protocol.ID) (model.BloomFilter, error) {
|
||||
var bf model.BloomFilter
|
||||
if txErr := q.Transaction(func(tx *Query) error {
|
||||
pBf, txErr := blockTx(ctx, tx, infoHashes)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
bf = *pBf
|
||||
return nil
|
||||
}); txErr != nil {
|
||||
return bf, txErr
|
||||
}
|
||||
return bf, nil
|
||||
}
|
||||
|
||||
func blockTx(ctx context.Context, tx *Query, infoHashes []protocol.ID) (*model.BloomFilter, error) {
|
||||
bf, bfErr := tx.BloomFilter.WithContext(ctx).Where(
|
||||
tx.BloomFilter.Key.Eq(blockedTorrentsBloomFilterKey),
|
||||
).First()
|
||||
if errors.Is(bfErr, gorm.ErrRecordNotFound) {
|
||||
bf = &model.BloomFilter{
|
||||
Key: blockedTorrentsBloomFilterKey,
|
||||
Filter: *bloom.NewDefaultStableBloomFilter(),
|
||||
}
|
||||
} else if bfErr != nil {
|
||||
return nil, bfErr
|
||||
}
|
||||
if len(infoHashes) == 0 {
|
||||
return bf, nil
|
||||
}
|
||||
for _, infoHash := range infoHashes {
|
||||
bf.Filter.Add(infoHash[:])
|
||||
}
|
||||
if saveErr := tx.BloomFilter.WithContext(ctx).Save(bf); saveErr != nil {
|
||||
return nil, saveErr
|
||||
}
|
||||
return bf, nil
|
||||
}
|
||||
@ -380,12 +380,6 @@ func BuildGenerator(db *gorm.DB) *gen.Generator {
|
||||
torrentContentBaseOptions...,
|
||||
)...,
|
||||
)
|
||||
bloomFilters := g.GenerateModel(
|
||||
"bloom_filters",
|
||||
gen.FieldRename("bytes", "Filter"),
|
||||
gen.FieldType("bytes", "bloom.StableBloomFilter"),
|
||||
createdAtReadOnly,
|
||||
)
|
||||
keyValues := g.GenerateModel(
|
||||
"key_values",
|
||||
createdAtReadOnly,
|
||||
@ -446,7 +440,6 @@ func BuildGenerator(db *gorm.DB) *gen.Generator {
|
||||
content,
|
||||
contentCollectionContent,
|
||||
contentAttributes,
|
||||
bloomFilters,
|
||||
keyValues,
|
||||
queueJobs,
|
||||
)
|
||||
|
||||
@ -44,7 +44,7 @@ func (c *crawler) doRequestMetaInfo(
|
||||
continue
|
||||
}
|
||||
if banErr := c.banningChecker.Check(res.Info); banErr != nil {
|
||||
_ = c.blockingManager.Block(ctx, []protocol.ID{hash})
|
||||
_ = c.blockingManager.Block(ctx, []protocol.ID{hash}, false)
|
||||
return metainforequester.Response{}, banErr
|
||||
}
|
||||
return res, nil
|
||||
|
||||
@ -2,6 +2,7 @@ package gqlfx
|
||||
|
||||
import (
|
||||
"github.com/99designs/gqlgen/graphql"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/blocking"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/database/dao"
|
||||
@ -68,6 +69,10 @@ func New() fx.Option {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bm, err := p.BlockingManager.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resolvers.Resolver{
|
||||
Dao: d,
|
||||
Search: s,
|
||||
@ -76,6 +81,7 @@ func New() fx.Option {
|
||||
QueueManager: qm,
|
||||
TorrentMetricsClient: tm,
|
||||
Processor: pr,
|
||||
BlockingManager: bm,
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
@ -103,6 +109,7 @@ type Params struct {
|
||||
QueueManager lazy.Lazy[manager.Manager]
|
||||
TorrentMetricsClient lazy.Lazy[torrentmetrics.Client]
|
||||
Processor lazy.Lazy[processor.Processor]
|
||||
BlockingManager lazy.Lazy[blocking.Manager]
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
|
||||
@ -27,7 +27,7 @@ func (r *mutationResolver) Queue(ctx context.Context) (gqlmodel.QueueMutation, e
|
||||
|
||||
// Delete is the resolver for the delete field.
|
||||
func (r *torrentMutationResolver) Delete(ctx context.Context, obj *gqlmodel.TorrentMutation, infoHashes []protocol.ID) (*string, error) {
|
||||
_, err := r.Dao.DeleteAndBlockTorrents(ctx, infoHashes)
|
||||
err := r.BlockingManager.Block(ctx, infoHashes, true)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package resolvers
|
||||
|
||||
import (
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/blocking"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/database/dao"
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/database/search"
|
||||
@ -24,4 +25,5 @@ type Resolver struct {
|
||||
QueueManager manager.Manager
|
||||
TorrentMetricsClient torrentmetrics.Client
|
||||
Processor processor.Processor
|
||||
BlockingManager blocking.Manager
|
||||
}
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/bitmagnet-io/bitmagnet/internal/bloom"
|
||||
)
|
||||
|
||||
const TableNameBloomFilter = "bloom_filters"
|
||||
|
||||
// BloomFilter mapped from table <bloom_filters>
|
||||
type BloomFilter struct {
|
||||
Key string `gorm:"column:key;primaryKey" json:"key"`
|
||||
Filter bloom.StableBloomFilter `gorm:"column:bytes;not null" json:"bytes"`
|
||||
CreatedAt time.Time `gorm:"column:created_at;not null;<-:create" json:"createdAt"`
|
||||
UpdatedAt time.Time `gorm:"column:updated_at;not null" json:"updatedAt"`
|
||||
}
|
||||
|
||||
// TableName BloomFilter's table name
|
||||
func (*BloomFilter) TableName() string {
|
||||
return TableNameBloomFilter
|
||||
}
|
||||
@ -44,7 +44,7 @@ func (c processor) persist(ctx context.Context, payload persistPayload) error {
|
||||
}
|
||||
}
|
||||
if len(payload.deleteInfoHashes) > 0 {
|
||||
if blockErr := c.blockingManager.Block(ctx, payload.deleteInfoHashes); blockErr != nil {
|
||||
if blockErr := c.blockingManager.Block(ctx, payload.deleteInfoHashes, false); blockErr != nil {
|
||||
return blockErr
|
||||
}
|
||||
}
|
||||
|
||||
18
migrations/00020_bloom_filters_large_object.sql
Normal file
18
migrations/00020_bloom_filters_large_object.sql
Normal file
@ -0,0 +1,18 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
|
||||
alter table bloom_filters add column oid oid;
|
||||
update bloom_filters set oid = lo_from_bytea(0, bytes::bytea) where true;
|
||||
alter table bloom_filters drop column bytes;
|
||||
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
|
||||
alter table bloom_filters add column bytes bytea;
|
||||
update bloom_filters set bytes = ''::bytea where true;
|
||||
alter table bloom_filters alter column bytes set not null;
|
||||
alter table bloom_filters drop column oid;
|
||||
|
||||
-- +goose StatementEnd
|
||||
Loading…
Reference in New Issue
Block a user