diff --git a/internal/blocking/factory.go b/internal/blocking/factory.go index a754b1c..2bc3353 100644 --- a/internal/blocking/factory.go +++ b/internal/blocking/factory.go @@ -3,8 +3,8 @@ package blocking import ( "context" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" - "github.com/bitmagnet-io/bitmagnet/internal/database/dao" "github.com/bitmagnet-io/bitmagnet/internal/protocol" + "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/fx" "sync" "time" @@ -12,7 +12,7 @@ import ( type Params struct { fx.In - Dao lazy.Lazy[*dao.Query] + Pool lazy.Lazy[*pgxpool.Pool] PgxPoolWait *sync.WaitGroup `name:"pgx_pool_wait"` } @@ -24,13 +24,13 @@ type Result struct { func New(params Params) Result { lazyManager := lazy.New[Manager](func() (Manager, error) { - d, err := params.Dao.Get() + pool, err := params.Pool.Get() if err != nil { return nil, err } params.PgxPoolWait.Add(1) return &manager{ - dao: d, + pool: pool, buffer: make(map[protocol.ID]struct{}, 1000), maxBufferSize: 1000, maxFlushWait: time.Minute * 5, diff --git a/internal/blocking/manager.go b/internal/blocking/manager.go index bf84944..ec6a286 100644 --- a/internal/blocking/manager.go +++ b/internal/blocking/manager.go @@ -2,24 +2,28 @@ package blocking import ( "context" + "database/sql" + "errors" + "fmt" "github.com/bitmagnet-io/bitmagnet/internal/bloom" - "github.com/bitmagnet-io/bitmagnet/internal/database/dao" "github.com/bitmagnet-io/bitmagnet/internal/protocol" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "sync" "time" ) type Manager interface { Filter(ctx context.Context, hashes []protocol.ID) ([]protocol.ID, error) - Block(ctx context.Context, hashes []protocol.ID) error + Block(ctx context.Context, hashes []protocol.ID, flush bool) error Flush(ctx context.Context) error } type manager struct { mutex sync.Mutex - dao *dao.Query + pool *pgxpool.Pool buffer map[protocol.ID]struct{} - filter bloom.StableBloomFilter + filter *bloom.StableBloomFilter maxBufferSize int lastFlushedAt time.Time maxFlushWait time.Duration @@ -28,7 +32,7 @@ type manager struct { func (m *manager) Filter(ctx context.Context, hashes []protocol.ID) ([]protocol.ID, error) { m.mutex.Lock() defer m.mutex.Unlock() - if m.filter.Cells() == 0 || m.shouldFlush() { + if m.filter == nil || m.shouldFlush() { if flushErr := m.flush(ctx); flushErr != nil { return nil, flushErr } @@ -46,13 +50,13 @@ func (m *manager) Filter(ctx context.Context, hashes []protocol.ID) ([]protocol. return filtered, nil } -func (m *manager) Block(ctx context.Context, hashes []protocol.ID) error { +func (m *manager) Block(ctx context.Context, hashes []protocol.ID, flush bool) error { m.mutex.Lock() defer m.mutex.Unlock() for _, hash := range hashes { m.buffer[hash] = struct{}{} } - if m.shouldFlush() { + if flush || m.shouldFlush() { if flushErr := m.flush(ctx); flushErr != nil { return flushErr } @@ -69,18 +73,103 @@ func (m *manager) Flush(ctx context.Context) error { return m.flush(ctx) } +const blockedTorrentsBloomFilterKey = "blocked_torrents" + func (m *manager) flush(ctx context.Context) error { var hashes []protocol.ID for hash := range m.buffer { hashes = append(hashes, hash) } - bf, blockErr := m.dao.BlockTorrents(ctx, hashes) - if blockErr != nil { - return blockErr + + tx, err := m.pool.BeginTx(ctx, pgx.TxOptions{ + AccessMode: pgx.ReadWrite, + }) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) } + defer func() { + _ = tx.Rollback(ctx) + }() + + if len(hashes) > 0 { + _, err = tx.Exec(ctx, "DELETE FROM torrents WHERE info_hash = any($1)", hashes) + if err != nil { + return fmt.Errorf("failed to delete from torrents table: %w", err) + } + } + + bf := bloom.NewDefaultStableBloomFilter() + + lobs := tx.LargeObjects() + + found := false + + var oid uint32 + var nullOid sql.NullInt32 + err = tx.QueryRow(ctx, "SELECT oid FROM bloom_filters WHERE key = $1", blockedTorrentsBloomFilterKey).Scan(&nullOid) + if err == nil { + found = true + if nullOid.Valid { + oid = uint32(nullOid.Int32) + obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeRead) + if err != nil { + return fmt.Errorf("failed to open large object for reading: %w", err) + } + _, err = bf.ReadFrom(obj) + obj.Close() + if err != nil { + return fmt.Errorf("failed to read current bloom filter: %w", err) + } + } + } else if !errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("failed to get bloom filter object ID: %w", err) + } + + if oid == 0 { + // Create a new Large Object. + // We pass 0, so the DB can pick an available oid for us. + oid, err = lobs.Create(ctx, 0) + if err != nil { + return fmt.Errorf("failed to create large object: %w", err) + } + } + + for _, hash := range hashes { + bf.Add(hash[:]) + } + + obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeWrite) + if err != nil { + return fmt.Errorf("failed to open large object for writing: %w", err) + } + + _, err = bf.WriteTo(obj) + if err != nil { + return fmt.Errorf("failed to write to large object: %w", err) + } + + now := time.Now() + if !found { + _, err = tx.Exec(ctx, "INSERT INTO bloom_filters (key, oid, created_at, updated_at) VALUES ($1, $2, $3, $4)", blockedTorrentsBloomFilterKey, oid, now, now) + if err != nil { + return fmt.Errorf("failed to save new bloom filter record: %w", err) + } + } else if !nullOid.Valid { + _, err = tx.Exec(ctx, "UPDATE bloom_filters SET oid = $1, updated_at = $2 WHERE key = $3", oid, now, blockedTorrentsBloomFilterKey) + if err != nil { + return fmt.Errorf("failed to update bloom filter record: %w", err) + } + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + m.buffer = make(map[protocol.ID]struct{}) - m.filter = bf.Filter - m.lastFlushedAt = time.Now() + m.filter = bf + m.lastFlushedAt = now + return nil } diff --git a/internal/database/dao/bloom_filters.gen.go b/internal/database/dao/bloom_filters.gen.go deleted file mode 100644 index b4da39d..0000000 --- a/internal/database/dao/bloom_filters.gen.go +++ /dev/null @@ -1,392 +0,0 @@ -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. - -package dao - -import ( - "context" - - "gorm.io/gorm" - "gorm.io/gorm/clause" - "gorm.io/gorm/schema" - - "gorm.io/gen" - "gorm.io/gen/field" - - "gorm.io/plugin/dbresolver" - - "github.com/bitmagnet-io/bitmagnet/internal/model" -) - -func newBloomFilter(db *gorm.DB, opts ...gen.DOOption) bloomFilter { - _bloomFilter := bloomFilter{} - - _bloomFilter.bloomFilterDo.UseDB(db, opts...) - _bloomFilter.bloomFilterDo.UseModel(&model.BloomFilter{}) - - tableName := _bloomFilter.bloomFilterDo.TableName() - _bloomFilter.ALL = field.NewAsterisk(tableName) - _bloomFilter.Key = field.NewString(tableName, "key") - _bloomFilter.Filter = field.NewField(tableName, "bytes") - _bloomFilter.CreatedAt = field.NewTime(tableName, "created_at") - _bloomFilter.UpdatedAt = field.NewTime(tableName, "updated_at") - - _bloomFilter.fillFieldMap() - - return _bloomFilter -} - -type bloomFilter struct { - bloomFilterDo - - ALL field.Asterisk - Key field.String - Filter field.Field - CreatedAt field.Time - UpdatedAt field.Time - - fieldMap map[string]field.Expr -} - -func (b bloomFilter) Table(newTableName string) *bloomFilter { - b.bloomFilterDo.UseTable(newTableName) - return b.updateTableName(newTableName) -} - -func (b bloomFilter) As(alias string) *bloomFilter { - b.bloomFilterDo.DO = *(b.bloomFilterDo.As(alias).(*gen.DO)) - return b.updateTableName(alias) -} - -func (b *bloomFilter) updateTableName(table string) *bloomFilter { - b.ALL = field.NewAsterisk(table) - b.Key = field.NewString(table, "key") - b.Filter = field.NewField(table, "bytes") - b.CreatedAt = field.NewTime(table, "created_at") - b.UpdatedAt = field.NewTime(table, "updated_at") - - b.fillFieldMap() - - return b -} - -func (b *bloomFilter) GetFieldByName(fieldName string) (field.OrderExpr, bool) { - _f, ok := b.fieldMap[fieldName] - if !ok || _f == nil { - return nil, false - } - _oe, ok := _f.(field.OrderExpr) - return _oe, ok -} - -func (b *bloomFilter) fillFieldMap() { - b.fieldMap = make(map[string]field.Expr, 4) - b.fieldMap["key"] = b.Key - b.fieldMap["bytes"] = b.Filter - b.fieldMap["created_at"] = b.CreatedAt - b.fieldMap["updated_at"] = b.UpdatedAt -} - -func (b bloomFilter) clone(db *gorm.DB) bloomFilter { - b.bloomFilterDo.ReplaceConnPool(db.Statement.ConnPool) - return b -} - -func (b bloomFilter) replaceDB(db *gorm.DB) bloomFilter { - b.bloomFilterDo.ReplaceDB(db) - return b -} - -type bloomFilterDo struct{ gen.DO } - -type IBloomFilterDo interface { - gen.SubQuery - Debug() IBloomFilterDo - WithContext(ctx context.Context) IBloomFilterDo - WithResult(fc func(tx gen.Dao)) gen.ResultInfo - ReplaceDB(db *gorm.DB) - ReadDB() IBloomFilterDo - WriteDB() IBloomFilterDo - As(alias string) gen.Dao - Session(config *gorm.Session) IBloomFilterDo - Columns(cols ...field.Expr) gen.Columns - Clauses(conds ...clause.Expression) IBloomFilterDo - Not(conds ...gen.Condition) IBloomFilterDo - Or(conds ...gen.Condition) IBloomFilterDo - Select(conds ...field.Expr) IBloomFilterDo - Where(conds ...gen.Condition) IBloomFilterDo - Order(conds ...field.Expr) IBloomFilterDo - Distinct(cols ...field.Expr) IBloomFilterDo - Omit(cols ...field.Expr) IBloomFilterDo - Join(table schema.Tabler, on ...field.Expr) IBloomFilterDo - LeftJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo - RightJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo - Group(cols ...field.Expr) IBloomFilterDo - Having(conds ...gen.Condition) IBloomFilterDo - Limit(limit int) IBloomFilterDo - Offset(offset int) IBloomFilterDo - Count() (count int64, err error) - Scopes(funcs ...func(gen.Dao) gen.Dao) IBloomFilterDo - Unscoped() IBloomFilterDo - Create(values ...*model.BloomFilter) error - CreateInBatches(values []*model.BloomFilter, batchSize int) error - Save(values ...*model.BloomFilter) error - First() (*model.BloomFilter, error) - Take() (*model.BloomFilter, error) - Last() (*model.BloomFilter, error) - Find() ([]*model.BloomFilter, error) - FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.BloomFilter, err error) - FindInBatches(result *[]*model.BloomFilter, batchSize int, fc func(tx gen.Dao, batch int) error) error - Pluck(column field.Expr, dest interface{}) error - Delete(...*model.BloomFilter) (info gen.ResultInfo, err error) - Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error) - UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error) - Updates(value interface{}) (info gen.ResultInfo, err error) - UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error) - UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error) - UpdateColumns(value interface{}) (info gen.ResultInfo, err error) - UpdateFrom(q gen.SubQuery) gen.Dao - Attrs(attrs ...field.AssignExpr) IBloomFilterDo - Assign(attrs ...field.AssignExpr) IBloomFilterDo - Joins(fields ...field.RelationField) IBloomFilterDo - Preload(fields ...field.RelationField) IBloomFilterDo - FirstOrInit() (*model.BloomFilter, error) - FirstOrCreate() (*model.BloomFilter, error) - FindByPage(offset int, limit int) (result []*model.BloomFilter, count int64, err error) - ScanByPage(result interface{}, offset int, limit int) (count int64, err error) - Scan(result interface{}) (err error) - Returning(value interface{}, columns ...string) IBloomFilterDo - UnderlyingDB() *gorm.DB - schema.Tabler -} - -func (b bloomFilterDo) Debug() IBloomFilterDo { - return b.withDO(b.DO.Debug()) -} - -func (b bloomFilterDo) WithContext(ctx context.Context) IBloomFilterDo { - return b.withDO(b.DO.WithContext(ctx)) -} - -func (b bloomFilterDo) ReadDB() IBloomFilterDo { - return b.Clauses(dbresolver.Read) -} - -func (b bloomFilterDo) WriteDB() IBloomFilterDo { - return b.Clauses(dbresolver.Write) -} - -func (b bloomFilterDo) Session(config *gorm.Session) IBloomFilterDo { - return b.withDO(b.DO.Session(config)) -} - -func (b bloomFilterDo) Clauses(conds ...clause.Expression) IBloomFilterDo { - return b.withDO(b.DO.Clauses(conds...)) -} - -func (b bloomFilterDo) Returning(value interface{}, columns ...string) IBloomFilterDo { - return b.withDO(b.DO.Returning(value, columns...)) -} - -func (b bloomFilterDo) Not(conds ...gen.Condition) IBloomFilterDo { - return b.withDO(b.DO.Not(conds...)) -} - -func (b bloomFilterDo) Or(conds ...gen.Condition) IBloomFilterDo { - return b.withDO(b.DO.Or(conds...)) -} - -func (b bloomFilterDo) Select(conds ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Select(conds...)) -} - -func (b bloomFilterDo) Where(conds ...gen.Condition) IBloomFilterDo { - return b.withDO(b.DO.Where(conds...)) -} - -func (b bloomFilterDo) Order(conds ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Order(conds...)) -} - -func (b bloomFilterDo) Distinct(cols ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Distinct(cols...)) -} - -func (b bloomFilterDo) Omit(cols ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Omit(cols...)) -} - -func (b bloomFilterDo) Join(table schema.Tabler, on ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Join(table, on...)) -} - -func (b bloomFilterDo) LeftJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.LeftJoin(table, on...)) -} - -func (b bloomFilterDo) RightJoin(table schema.Tabler, on ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.RightJoin(table, on...)) -} - -func (b bloomFilterDo) Group(cols ...field.Expr) IBloomFilterDo { - return b.withDO(b.DO.Group(cols...)) -} - -func (b bloomFilterDo) Having(conds ...gen.Condition) IBloomFilterDo { - return b.withDO(b.DO.Having(conds...)) -} - -func (b bloomFilterDo) Limit(limit int) IBloomFilterDo { - return b.withDO(b.DO.Limit(limit)) -} - -func (b bloomFilterDo) Offset(offset int) IBloomFilterDo { - return b.withDO(b.DO.Offset(offset)) -} - -func (b bloomFilterDo) Scopes(funcs ...func(gen.Dao) gen.Dao) IBloomFilterDo { - return b.withDO(b.DO.Scopes(funcs...)) -} - -func (b bloomFilterDo) Unscoped() IBloomFilterDo { - return b.withDO(b.DO.Unscoped()) -} - -func (b bloomFilterDo) Create(values ...*model.BloomFilter) error { - if len(values) == 0 { - return nil - } - return b.DO.Create(values) -} - -func (b bloomFilterDo) CreateInBatches(values []*model.BloomFilter, batchSize int) error { - return b.DO.CreateInBatches(values, batchSize) -} - -// Save : !!! underlying implementation is different with GORM -// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values) -func (b bloomFilterDo) Save(values ...*model.BloomFilter) error { - if len(values) == 0 { - return nil - } - return b.DO.Save(values) -} - -func (b bloomFilterDo) First() (*model.BloomFilter, error) { - if result, err := b.DO.First(); err != nil { - return nil, err - } else { - return result.(*model.BloomFilter), nil - } -} - -func (b bloomFilterDo) Take() (*model.BloomFilter, error) { - if result, err := b.DO.Take(); err != nil { - return nil, err - } else { - return result.(*model.BloomFilter), nil - } -} - -func (b bloomFilterDo) Last() (*model.BloomFilter, error) { - if result, err := b.DO.Last(); err != nil { - return nil, err - } else { - return result.(*model.BloomFilter), nil - } -} - -func (b bloomFilterDo) Find() ([]*model.BloomFilter, error) { - result, err := b.DO.Find() - return result.([]*model.BloomFilter), err -} - -func (b bloomFilterDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.BloomFilter, err error) { - buf := make([]*model.BloomFilter, 0, batchSize) - err = b.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error { - defer func() { results = append(results, buf...) }() - return fc(tx, batch) - }) - return results, err -} - -func (b bloomFilterDo) FindInBatches(result *[]*model.BloomFilter, batchSize int, fc func(tx gen.Dao, batch int) error) error { - return b.DO.FindInBatches(result, batchSize, fc) -} - -func (b bloomFilterDo) Attrs(attrs ...field.AssignExpr) IBloomFilterDo { - return b.withDO(b.DO.Attrs(attrs...)) -} - -func (b bloomFilterDo) Assign(attrs ...field.AssignExpr) IBloomFilterDo { - return b.withDO(b.DO.Assign(attrs...)) -} - -func (b bloomFilterDo) Joins(fields ...field.RelationField) IBloomFilterDo { - for _, _f := range fields { - b = *b.withDO(b.DO.Joins(_f)) - } - return &b -} - -func (b bloomFilterDo) Preload(fields ...field.RelationField) IBloomFilterDo { - for _, _f := range fields { - b = *b.withDO(b.DO.Preload(_f)) - } - return &b -} - -func (b bloomFilterDo) FirstOrInit() (*model.BloomFilter, error) { - if result, err := b.DO.FirstOrInit(); err != nil { - return nil, err - } else { - return result.(*model.BloomFilter), nil - } -} - -func (b bloomFilterDo) FirstOrCreate() (*model.BloomFilter, error) { - if result, err := b.DO.FirstOrCreate(); err != nil { - return nil, err - } else { - return result.(*model.BloomFilter), nil - } -} - -func (b bloomFilterDo) FindByPage(offset int, limit int) (result []*model.BloomFilter, count int64, err error) { - result, err = b.Offset(offset).Limit(limit).Find() - if err != nil { - return - } - - if size := len(result); 0 < limit && 0 < size && size < limit { - count = int64(size + offset) - return - } - - count, err = b.Offset(-1).Limit(-1).Count() - return -} - -func (b bloomFilterDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) { - count, err = b.Count() - if err != nil { - return - } - - err = b.Offset(offset).Limit(limit).Scan(result) - return -} - -func (b bloomFilterDo) Scan(result interface{}) (err error) { - return b.DO.Scan(result) -} - -func (b bloomFilterDo) Delete(models ...*model.BloomFilter) (result gen.ResultInfo, err error) { - return b.DO.Delete(models) -} - -func (b *bloomFilterDo) withDO(do gen.Dao) *bloomFilterDo { - b.DO = *do.(*gen.DO) - return b -} diff --git a/internal/database/dao/gen.go b/internal/database/dao/gen.go index 2b82725..39c90af 100644 --- a/internal/database/dao/gen.go +++ b/internal/database/dao/gen.go @@ -17,7 +17,6 @@ import ( var ( Q = new(Query) - BloomFilter *bloomFilter Content *content ContentAttribute *contentAttribute ContentCollection *contentCollection @@ -37,7 +36,6 @@ var ( func SetDefault(db *gorm.DB, opts ...gen.DOOption) { *Q = *Use(db, opts...) - BloomFilter = &Q.BloomFilter Content = &Q.Content ContentAttribute = &Q.ContentAttribute ContentCollection = &Q.ContentCollection @@ -58,7 +56,6 @@ func SetDefault(db *gorm.DB, opts ...gen.DOOption) { func Use(db *gorm.DB, opts ...gen.DOOption) *Query { return &Query{ db: db, - BloomFilter: newBloomFilter(db, opts...), Content: newContent(db, opts...), ContentAttribute: newContentAttribute(db, opts...), ContentCollection: newContentCollection(db, opts...), @@ -80,7 +77,6 @@ func Use(db *gorm.DB, opts ...gen.DOOption) *Query { type Query struct { db *gorm.DB - BloomFilter bloomFilter Content content ContentAttribute contentAttribute ContentCollection contentCollection @@ -103,7 +99,6 @@ func (q *Query) Available() bool { return q.db != nil } func (q *Query) clone(db *gorm.DB) *Query { return &Query{ db: db, - BloomFilter: q.BloomFilter.clone(db), Content: q.Content.clone(db), ContentAttribute: q.ContentAttribute.clone(db), ContentCollection: q.ContentCollection.clone(db), @@ -133,7 +128,6 @@ func (q *Query) WriteDB() *Query { func (q *Query) ReplaceDB(db *gorm.DB) *Query { return &Query{ db: db, - BloomFilter: q.BloomFilter.replaceDB(db), Content: q.Content.replaceDB(db), ContentAttribute: q.ContentAttribute.replaceDB(db), ContentCollection: q.ContentCollection.replaceDB(db), @@ -153,7 +147,6 @@ func (q *Query) ReplaceDB(db *gorm.DB) *Query { } type queryCtx struct { - BloomFilter IBloomFilterDo Content IContentDo ContentAttribute IContentAttributeDo ContentCollection IContentCollectionDo @@ -173,7 +166,6 @@ type queryCtx struct { func (q *Query) WithContext(ctx context.Context) *queryCtx { return &queryCtx{ - BloomFilter: q.BloomFilter.WithContext(ctx), Content: q.Content.WithContext(ctx), ContentAttribute: q.ContentAttribute.WithContext(ctx), ContentCollection: q.ContentCollection.WithContext(ctx), diff --git a/internal/database/dao/torrents_delete_block.go b/internal/database/dao/torrents_delete_block.go deleted file mode 100644 index 54dc4f1..0000000 --- a/internal/database/dao/torrents_delete_block.go +++ /dev/null @@ -1,74 +0,0 @@ -package dao - -import ( - "context" - "database/sql/driver" - "errors" - "github.com/bitmagnet-io/bitmagnet/internal/bloom" - "github.com/bitmagnet-io/bitmagnet/internal/model" - "github.com/bitmagnet-io/bitmagnet/internal/protocol" - "gorm.io/gorm" -) - -const blockedTorrentsBloomFilterKey = "blocked_torrents" - -func (q *Query) DeleteAndBlockTorrents(ctx context.Context, infoHashes []protocol.ID) (model.BloomFilter, error) { - var valuers []driver.Valuer - for _, infoHash := range infoHashes { - valuers = append(valuers, infoHash) - } - var bf model.BloomFilter - if txErr := q.Transaction(func(tx *Query) error { - if _, deleteErr := tx.Torrent.WithContext(ctx).Where(tx.Torrent.InfoHash.In(valuers...)).Delete(); deleteErr != nil { - return deleteErr - } - pBf, txErr := blockTx(ctx, tx, infoHashes) - if txErr != nil { - return txErr - } - bf = *pBf - return nil - }); txErr != nil { - return bf, txErr - } - return bf, nil -} - -func (q *Query) BlockTorrents(ctx context.Context, infoHashes []protocol.ID) (model.BloomFilter, error) { - var bf model.BloomFilter - if txErr := q.Transaction(func(tx *Query) error { - pBf, txErr := blockTx(ctx, tx, infoHashes) - if txErr != nil { - return txErr - } - bf = *pBf - return nil - }); txErr != nil { - return bf, txErr - } - return bf, nil -} - -func blockTx(ctx context.Context, tx *Query, infoHashes []protocol.ID) (*model.BloomFilter, error) { - bf, bfErr := tx.BloomFilter.WithContext(ctx).Where( - tx.BloomFilter.Key.Eq(blockedTorrentsBloomFilterKey), - ).First() - if errors.Is(bfErr, gorm.ErrRecordNotFound) { - bf = &model.BloomFilter{ - Key: blockedTorrentsBloomFilterKey, - Filter: *bloom.NewDefaultStableBloomFilter(), - } - } else if bfErr != nil { - return nil, bfErr - } - if len(infoHashes) == 0 { - return bf, nil - } - for _, infoHash := range infoHashes { - bf.Filter.Add(infoHash[:]) - } - if saveErr := tx.BloomFilter.WithContext(ctx).Save(bf); saveErr != nil { - return nil, saveErr - } - return bf, nil -} diff --git a/internal/database/gen/gen.go b/internal/database/gen/gen.go index 434561e..8ff536b 100644 --- a/internal/database/gen/gen.go +++ b/internal/database/gen/gen.go @@ -380,12 +380,6 @@ func BuildGenerator(db *gorm.DB) *gen.Generator { torrentContentBaseOptions..., )..., ) - bloomFilters := g.GenerateModel( - "bloom_filters", - gen.FieldRename("bytes", "Filter"), - gen.FieldType("bytes", "bloom.StableBloomFilter"), - createdAtReadOnly, - ) keyValues := g.GenerateModel( "key_values", createdAtReadOnly, @@ -446,7 +440,6 @@ func BuildGenerator(db *gorm.DB) *gen.Generator { content, contentCollectionContent, contentAttributes, - bloomFilters, keyValues, queueJobs, ) diff --git a/internal/dhtcrawler/request_meta_info.go b/internal/dhtcrawler/request_meta_info.go index 23c774a..718b7aa 100644 --- a/internal/dhtcrawler/request_meta_info.go +++ b/internal/dhtcrawler/request_meta_info.go @@ -44,7 +44,7 @@ func (c *crawler) doRequestMetaInfo( continue } if banErr := c.banningChecker.Check(res.Info); banErr != nil { - _ = c.blockingManager.Block(ctx, []protocol.ID{hash}) + _ = c.blockingManager.Block(ctx, []protocol.ID{hash}, false) return metainforequester.Response{}, banErr } return res, nil diff --git a/internal/gql/gqlfx/module.go b/internal/gql/gqlfx/module.go index 1a31f82..98d7e96 100644 --- a/internal/gql/gqlfx/module.go +++ b/internal/gql/gqlfx/module.go @@ -2,6 +2,7 @@ package gqlfx import ( "github.com/99designs/gqlgen/graphql" + "github.com/bitmagnet-io/bitmagnet/internal/blocking" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker" "github.com/bitmagnet-io/bitmagnet/internal/database/dao" @@ -68,6 +69,10 @@ func New() fx.Option { if err != nil { return nil, err } + bm, err := p.BlockingManager.Get() + if err != nil { + return nil, err + } return &resolvers.Resolver{ Dao: d, Search: s, @@ -76,6 +81,7 @@ func New() fx.Option { QueueManager: qm, TorrentMetricsClient: tm, Processor: pr, + BlockingManager: bm, }, nil }), } @@ -103,6 +109,7 @@ type Params struct { QueueManager lazy.Lazy[manager.Manager] TorrentMetricsClient lazy.Lazy[torrentmetrics.Client] Processor lazy.Lazy[processor.Processor] + BlockingManager lazy.Lazy[blocking.Manager] } type Result struct { diff --git a/internal/gql/resolvers/mutation.resolvers.go b/internal/gql/resolvers/mutation.resolvers.go index c20e1e3..15e1946 100644 --- a/internal/gql/resolvers/mutation.resolvers.go +++ b/internal/gql/resolvers/mutation.resolvers.go @@ -27,7 +27,7 @@ func (r *mutationResolver) Queue(ctx context.Context) (gqlmodel.QueueMutation, e // Delete is the resolver for the delete field. func (r *torrentMutationResolver) Delete(ctx context.Context, obj *gqlmodel.TorrentMutation, infoHashes []protocol.ID) (*string, error) { - _, err := r.Dao.DeleteAndBlockTorrents(ctx, infoHashes) + err := r.BlockingManager.Block(ctx, infoHashes, true) return nil, err } diff --git a/internal/gql/resolvers/resolver.go b/internal/gql/resolvers/resolver.go index 74fd24e..ce8af5d 100644 --- a/internal/gql/resolvers/resolver.go +++ b/internal/gql/resolvers/resolver.go @@ -1,6 +1,7 @@ package resolvers import ( + "github.com/bitmagnet-io/bitmagnet/internal/blocking" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker" "github.com/bitmagnet-io/bitmagnet/internal/database/dao" "github.com/bitmagnet-io/bitmagnet/internal/database/search" @@ -24,4 +25,5 @@ type Resolver struct { QueueManager manager.Manager TorrentMetricsClient torrentmetrics.Client Processor processor.Processor + BlockingManager blocking.Manager } diff --git a/internal/model/bloom_filters.gen.go b/internal/model/bloom_filters.gen.go deleted file mode 100644 index 89a2b67..0000000 --- a/internal/model/bloom_filters.gen.go +++ /dev/null @@ -1,26 +0,0 @@ -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. - -package model - -import ( - "time" - - "github.com/bitmagnet-io/bitmagnet/internal/bloom" -) - -const TableNameBloomFilter = "bloom_filters" - -// BloomFilter mapped from table -type BloomFilter struct { - Key string `gorm:"column:key;primaryKey" json:"key"` - Filter bloom.StableBloomFilter `gorm:"column:bytes;not null" json:"bytes"` - CreatedAt time.Time `gorm:"column:created_at;not null;<-:create" json:"createdAt"` - UpdatedAt time.Time `gorm:"column:updated_at;not null" json:"updatedAt"` -} - -// TableName BloomFilter's table name -func (*BloomFilter) TableName() string { - return TableNameBloomFilter -} diff --git a/internal/processor/persist.go b/internal/processor/persist.go index 69207b9..279ea6f 100644 --- a/internal/processor/persist.go +++ b/internal/processor/persist.go @@ -44,7 +44,7 @@ func (c processor) persist(ctx context.Context, payload persistPayload) error { } } if len(payload.deleteInfoHashes) > 0 { - if blockErr := c.blockingManager.Block(ctx, payload.deleteInfoHashes); blockErr != nil { + if blockErr := c.blockingManager.Block(ctx, payload.deleteInfoHashes, false); blockErr != nil { return blockErr } } diff --git a/migrations/00020_bloom_filters_large_object.sql b/migrations/00020_bloom_filters_large_object.sql new file mode 100644 index 0000000..77cd55d --- /dev/null +++ b/migrations/00020_bloom_filters_large_object.sql @@ -0,0 +1,18 @@ +-- +goose Up +-- +goose StatementBegin + +alter table bloom_filters add column oid oid; +update bloom_filters set oid = lo_from_bytea(0, bytes::bytea) where true; +alter table bloom_filters drop column bytes; + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +alter table bloom_filters add column bytes bytea; +update bloom_filters set bytes = ''::bytea where true; +alter table bloom_filters alter column bytes set not null; +alter table bloom_filters drop column oid; + +-- +goose StatementEnd