back to old cache

This commit is contained in:
yourok
2019-10-13 22:28:14 +03:00
parent 1607a39955
commit d977668ba3
4 changed files with 85 additions and 91 deletions

View File

@@ -1,9 +1,9 @@
package memcache package memcache
import ( import (
"fmt"
"sync" "sync"
"log"
"server/utils" "server/utils"
) )
@@ -33,11 +33,11 @@ func (b *BufferPool) mkBuffs() {
return return
} }
b.buffs = make(map[int]*buffer, b.frees) b.buffs = make(map[int]*buffer, b.frees)
log.Println("Create", b.frees, "buffers") fmt.Println("Create", b.frees, "buffers")
for i := 0; i < b.frees; i++ { for i := 0; i < b.frees; i++ {
buf := buffer{ buf := buffer{
-1, -1,
make([]byte, b.size, b.size), make([]byte, b.size),
false, false,
} }
b.buffs[i] = &buf b.buffs[i] = &buf
@@ -59,18 +59,8 @@ func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) {
return return
} }
} }
log.Println("Create slow buffer") fmt.Println("Create slow buffer")
return make([]byte, b.size), -1
buf := buffer{
p.Id,
make([]byte, b.size, b.size),
true,
}
b.frees++
b.buffs[b.frees] = &buf
buff = buf.buf
index = b.frees
return
} }
func (b *BufferPool) ReleaseBuffer(index int) { func (b *BufferPool) ReleaseBuffer(index int) {
@@ -85,26 +75,27 @@ func (b *BufferPool) ReleaseBuffer(index int) {
buff.used = false buff.used = false
buff.pieceId = -1 buff.pieceId = -1
b.frees++ b.frees++
//fmt.Println("Release buffer:", index, b.frees)
} else { } else {
utils.FreeOSMem() utils.FreeOSMem()
} }
} }
//func (b *BufferPool) Used() map[int]struct{} { func (b *BufferPool) Used() map[int]struct{} {
// b.mu.Lock() if len(b.buffs) == 0 {
// defer b.mu.Unlock() b.mu.Lock()
// if len(b.buffs) == 0 { b.mkBuffs()
// b.mkBuffs() b.mu.Unlock()
// } }
// used := make(map[int]struct{}) used := make(map[int]struct{})
// for _, b := range b.buffs { for _, b := range b.buffs {
// if b.used { if b.used {
// used[b.pieceId] = struct{}{} used[b.pieceId] = struct{}{}
// } }
// } }
// return used return used
//} }
//
//func (b *BufferPool) Len() int { func (b *BufferPool) Len() int {
// return b.frees return b.frees
//} }

View File

@@ -1,15 +1,16 @@
package memcache package memcache
import ( import (
"fmt"
"sort"
"sync" "sync"
"server/torr/reader"
"server/torr/storage/state" "server/torr/storage/state"
"server/utils" "server/utils"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"log"
"server/torr/reader"
) )
type Cache struct { type Cache struct {
@@ -32,30 +33,30 @@ type Cache struct {
pieces map[int]*Piece pieces map[int]*Piece
bufferPull *BufferPool bufferPull *BufferPool
usedPieces map[int]struct{}
prcLoaded int
readers map[*reader.Reader]struct{} readers map[*reader.Reader]struct{}
} }
func NewCache(capacity int64, storage *Storage) *Cache { func NewCache(capacity int64, storage *Storage) *Cache {
ret := &Cache{ ret := &Cache{
capacity: capacity, capacity: capacity,
filled: 0, filled: 0,
pieces: make(map[int]*Piece), pieces: make(map[int]*Piece),
readers: make(map[*reader.Reader]struct{}), s: storage,
usedPieces: make(map[int]struct{}), readers: make(map[*reader.Reader]struct{}),
s: storage,
} }
return ret return ret
} }
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
log.Println("Create cache for:", info.Name) fmt.Println("Create cache for:", info.Name)
//Min capacity of 2 pieces length //Min capacity of 2 pieces length
caps := info.PieceLength * 2 cap := info.PieceLength * 2
if c.capacity < caps { if c.capacity < cap {
c.capacity = caps c.capacity = cap
} }
c.pieceLength = info.PieceLength c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces() c.pieceCount = info.NumPieces()
@@ -75,10 +76,7 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock() c.muPiece.Lock()
defer func() { defer c.muPiece.Unlock()
c.muPiece.Unlock()
go utils.FreeOSMemGC(c.capacity)
}()
if val, ok := c.pieces[m.Index()]; ok { if val, ok := c.pieces[m.Index()]; ok {
return val return val
} }
@@ -87,7 +85,7 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
func (c *Cache) Close() error { func (c *Cache) Close() error {
c.isRemove = false c.isRemove = false
log.Println("Close cache for:", c.hash) fmt.Println("Close cache for:", c.hash)
if _, ok := c.s.caches[c.hash]; ok { if _, ok := c.s.caches[c.hash]; ok {
delete(c.s.caches, c.hash) delete(c.s.caches, c.hash)
} }
@@ -135,50 +133,64 @@ func (c *Cache) cleanPieces() {
defer func() { c.isRemove = false }() defer func() { c.isRemove = false }()
c.muRemove.Unlock() c.muRemove.Unlock()
bufPieces := c.getBufferedPieces() remPieces := c.getRemPieces()
if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) {
if len(bufPieces) > 0 && c.filled >= c.capacity { remCount := int((c.filled - c.capacity) / c.pieceLength)
c.muReader.Lock() if remCount < 1 {
for reader := range c.readers { remCount = 1
beg, end := c.getReaderPieces(reader)
for id := range bufPieces {
if id >= beg && id <= end {
delete(bufPieces, id)
}
}
} }
c.muReader.Unlock() if remCount > len(remPieces) {
if len(bufPieces) > 0 { remCount = len(remPieces)
for _, p := range bufPieces { }
p.Release()
} remPieces = remPieces[:remCount]
bufPieces = nil
go utils.FreeOSMemGC(c.capacity) for _, p := range remPieces {
c.removePiece(p)
} }
} }
} }
func (c *Cache) getBufferedPieces() map[int]*Piece { func (c *Cache) getRemPieces() []*Piece {
pieces := make(map[int]*Piece) pieces := make([]*Piece, 0)
fill := int64(0) fill := int64(0)
used := c.usedPieces loading := 0
used := c.bufferPull.Used()
for u := range used { for u := range used {
piece := c.pieces[u] v := c.pieces[u]
if piece.Size > 0 { if v.Size > 0 {
if piece.Id > 0 { if v.Id > 0 {
pieces[piece.Id] = piece pieces = append(pieces, v)
}
fill += v.Size
if !v.complete {
loading++
} }
fill += piece.Size
} }
} }
c.filled = fill c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed < pieces[j].accessed
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces return pieces
} }
func (c *Cache) removePiece(piece *Piece) { func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release() piece.Release()
return
if c.prcLoaded >= 95 {
utils.FreeOSMemGC(c.capacity)
} else {
utils.FreeOSMem()
}
}
func prc(val, of int) int {
return int(float64(val) * 100.0 / float64(of))
} }
func (c *Cache) AddReader(r *reader.Reader) { func (c *Cache) AddReader(r *reader.Reader) {
@@ -199,9 +211,3 @@ func (c *Cache) ReadersLen() int {
} }
return len(c.readers) return len(c.readers)
} }
func (c *Cache) getReaderPieces(reader *reader.Reader) (begin, end int) {
end = int((reader.Offset() + reader.Readahead()) / c.pieceLength)
begin = int((reader.Offset() - c.capacity + reader.Readahead()) / c.pieceLength)
return
}

View File

@@ -39,7 +39,6 @@ func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
if p.buffer == nil { if p.buffer == nil {
return 0, errors.New("Can't get buffer write") return 0, errors.New("Can't get buffer write")
} }
p.cache.usedPieces[p.Id] = struct{}{}
} }
n = copy(p.buffer[off:], b[:]) n = copy(p.buffer[off:], b[:])
p.Size += int64(n) p.Size += int64(n)
@@ -102,9 +101,6 @@ func (p *Piece) Release() {
} }
p.Size = 0 p.Size = 0
p.complete = false p.complete = false
if _, ok := p.cache.usedPieces[p.Id]; ok {
delete(p.cache.usedPieces, p.Id)
}
} }
func (p *Piece) Stat() state.ItemState { func (p *Piece) Stat() state.ItemState {

View File

@@ -3,14 +3,15 @@ package memcache
import ( import (
"sync" "sync"
"server/torr/storage"
"server/torr/storage/state" "server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage" storage2 "github.com/anacrolix/torrent/storage"
) )
type Storage struct { type Storage struct {
storage.TorrentImpl storage.Storage
caches map[metainfo.Hash]*Cache caches map[metainfo.Hash]*Cache
capacity int64 capacity int64
@@ -24,7 +25,7 @@ func NewStorage(capacity int64) *Storage {
return stor return stor
} }
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
ch := NewCache(s.capacity, s) ch := NewCache(s.capacity, s)