diff --git a/src/server/torr/BTServer.go b/src/server/torr/BTServer.go index f984642..8da29f7 100644 --- a/src/server/torr/BTServer.go +++ b/src/server/torr/BTServer.go @@ -7,21 +7,21 @@ import ( "sync" "server/settings" - "server/torr/storage" - "server/torr/storage/memcacheV2" + "server/torr/storage/memcache" "server/torr/storage/state" "server/utils" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" + "log" ) type BTServer struct { config *torrent.ClientConfig client *torrent.Client - storage storage.Storage + storage *memcache.Storage torrents map[metainfo.Hash]*Torrent @@ -53,7 +53,7 @@ func (bt *BTServer) Disconnect() { if bt.client != nil { bt.client.Close() bt.client = nil - utils.FreeOSMemGC() + utils.FreeOSMemGC(0) } } @@ -63,12 +63,13 @@ func (bt *BTServer) Reconnect() error { } func (bt *BTServer) configure() { - bt.storage = memcacheV2.NewStorage(settings.Get().CacheSize) + bt.storage = memcache.NewStorage(settings.Get().CacheSize) blocklist, _ := iplist.MMapPackedFile(filepath.Join(settings.Path, "blocklist")) - userAgent := "uTorrent/3.4.9" - peerID := "-UT3490-" + userAgent := "uTorrent/3.5.5" + peerID := "-UT3550-" + cliVers := "µTorrent 3.5.5" bt.config = torrent.NewDefaultClientConfig() @@ -87,14 +88,12 @@ func (bt *BTServer) configure() { bt.config.Bep20 = peerID bt.config.PeerID = utils.PeerIDRandom(peerID) bt.config.HTTPUserAgent = userAgent + bt.config.ExtendedHandshakeClientVersion = cliVers bt.config.EstablishedConnsPerTorrent = settings.Get().ConnectionsLimit if settings.Get().DhtConnectionLimit > 0 { bt.config.ConnTracker.SetMaxEntries(settings.Get().DhtConnectionLimit) } - bt.config.TorrentPeersHighWater = 3000 - bt.config.HalfOpenConnsPerTorrent = 50 - if settings.Get().DownloadRateLimit > 0 { bt.config.DownloadRateLimiter = utils.Limit(settings.Get().DownloadRateLimit * 1024) } @@ -105,9 +104,7 @@ func (bt *BTServer) configure() { bt.config.ListenPort = settings.Get().PeersListenPort } - //bt.config.Debug = true - - fmt.Println("Configure client:", settings.Get()) + log.Println("Configure client:", settings.Get()) } func (bt *BTServer) AddTorrent(magnet metainfo.Magnet, onAdd func(*Torrent)) (*Torrent, error) { @@ -183,16 +180,6 @@ func (bt *BTServer) CacheState(hash metainfo.Hash) *state.CacheState { return cacheState } -func (bt *BTServer) GetCache(hash metainfo.Hash) *memcacheV2.Cache { - st := bt.GetTorrent(hash) - if st == nil { - return nil - } - - cacheState := bt.storage.GetCache(hash) - return cacheState.(*memcacheV2.Cache) -} - func (bt *BTServer) WriteState(w io.Writer) { bt.client.WriteStatus(w) } diff --git a/src/server/torr/Play.go b/src/server/torr/Play.go index 60ce699..1c1d1d8 100644 --- a/src/server/torr/Play.go +++ b/src/server/torr/Play.go @@ -11,20 +11,20 @@ import ( "github.com/anacrolix/missinggo/httptoo" "github.com/anacrolix/torrent" "github.com/labstack/echo" + "log" ) func (bt *BTServer) View(torr *Torrent, file *torrent.File, c echo.Context) error { go settings.SetViewed(torr.Hash().HexString(), file.Path()) - reader := NewReader(file, bt.GetCache(torr.hash)) - //reader := torr.NewReader(file, 0) + reader := torr.NewReader(file, 0) - fmt.Println("Connect reader:", len(torr.readers)) + log.Println("Connect client") c.Response().Header().Set("Connection", "close") c.Response().Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", torr.Hash().HexString(), file.Path()))) http.ServeContent(c.Response(), c.Request(), file.Path(), time.Time{}, reader) - fmt.Println("Disconnect reader:", len(torr.readers)) + log.Println("Disconnect client") torr.CloseReader(reader) return c.NoContent(http.StatusOK) } diff --git a/src/server/torr/Torrent.go b/src/server/torr/Torrent.go index aa236d2..1d54208 100644 --- a/src/server/torr/Torrent.go +++ b/src/server/torr/Torrent.go @@ -1,8 +1,8 @@ package torr import ( - "fmt" "io" + "log" "sort" "sync" "time" @@ -13,6 +13,8 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" "github.com/labstack/gommon/bytes" + "server/torr/reader" + "server/torr/storage/memcache" ) type TorrentStatus int @@ -47,12 +49,11 @@ type Torrent struct { status TorrentStatus - readers map[torrent.Reader]struct{} - muTorrent sync.Mutex muReader sync.Mutex - bt *BTServer + bt *BTServer + cache *memcache.Cache lastTimeSpeed time.Time DownloadSpeed float64 @@ -102,7 +103,6 @@ func NewTorrent(magnet metainfo.Magnet, bt *BTServer) (*Torrent, error) { torr.status = TorrentAdded torr.lastTimeSpeed = time.Now() torr.bt = bt - torr.readers = make(map[torrent.Reader]struct{}) torr.hash = magnet.InfoHash torr.closed = goTorrent.Closed() @@ -116,11 +116,17 @@ func (t *Torrent) WaitInfo() bool { if t.Torrent == nil { return false } + + tm := time.NewTimer(time.Minute * 10) + select { case <-t.Torrent.GotInfo(): + t.cache = t.bt.storage.GetCache(t.hash) return true case <-t.closed: return false + case <-tm.C: + return false } } @@ -181,7 +187,7 @@ func (t *Torrent) progressEvent() { } func (t *Torrent) expired() bool { - return len(t.readers) == 0 && t.expiredTime.Before(time.Now()) && (t.status == TorrentWorking || t.status == TorrentClosed) + return t.cache.ReadersLen() == 0 && t.expiredTime.Before(time.Now()) && (t.status == TorrentWorking || t.status == TorrentClosed) } func (t *Torrent) Files() []*torrent.File { @@ -207,7 +213,7 @@ func (t *Torrent) Length() int64 { return t.Torrent.Length() } -func (t *Torrent) NewReader(file *torrent.File, readahead int64) torrent.Reader { +func (t *Torrent) NewReader(file *torrent.File, readahead int64) *reader.Reader { t.muReader.Lock() if t.status == TorrentClosed { @@ -215,23 +221,27 @@ func (t *Torrent) NewReader(file *torrent.File, readahead int64) torrent.Reader } defer t.muReader.Unlock() - reader := file.NewReader() + reader := reader.NewReader(file) if readahead <= 0 { readahead = utils.GetReadahead() } reader.SetReadahead(readahead) - t.readers[reader] = struct{}{} + t.cache.AddReader(reader) return reader } -func (t *Torrent) CloseReader(reader torrent.Reader) { +func (t *Torrent) CloseReader(reader *reader.Reader) { t.muReader.Lock() reader.Close() - delete(t.readers, reader) - t.expiredTime = time.Now().Add(time.Second * 5) + t.cache.RemReader(reader) + t.expiredTime = time.Now().Add(time.Minute) t.muReader.Unlock() } +func (t *Torrent) GetCache() *memcache.Cache { + return t.cache +} + func (t *Torrent) Preload(file *torrent.File, size int64) { if size < 0 { return @@ -276,7 +286,7 @@ func (t *Torrent) Preload(file *torrent.File, size int64) { } defer func() { t.CloseReader(readerPre) - t.expiredTime = time.Now().Add(time.Minute * 1) + t.expiredTime = time.Now().Add(time.Minute * 5) }() if endPreloadOffset > 0 { @@ -288,7 +298,7 @@ func (t *Torrent) Preload(file *torrent.File, size int64) { readerPost.SetReadahead(buff5mb) defer func() { t.CloseReader(readerPost) - t.expiredTime = time.Now().Add(time.Minute * 1) + t.expiredTime = time.Now().Add(time.Minute * 5) }() } @@ -300,9 +310,9 @@ func (t *Torrent) Preload(file *torrent.File, size int64) { var lastSize int64 = 0 errCount := 0 for t.status == TorrentPreload { - t.expiredTime = time.Now().Add(time.Minute * 1) + t.expiredTime = time.Now().Add(time.Minute * 5) t.PreloadedBytes = t.Torrent.BytesCompleted() - fmt.Println("Preload:", file.Torrent().InfoHash().HexString(), bytes.Format(t.PreloadedBytes), "/", bytes.Format(t.PreloadSize), "Speed:", utils.Format(t.DownloadSpeed), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) + log.Println("Preload:", file.Torrent().InfoHash().HexString(), bytes.Format(t.PreloadedBytes), "/", bytes.Format(t.PreloadSize), "Speed:", utils.Format(t.DownloadSpeed), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) if t.PreloadedBytes >= t.PreloadSize { return } @@ -337,10 +347,6 @@ func (t *Torrent) Close() { t.muReader.Lock() defer t.muReader.Unlock() - for r := range t.readers { - r.Close() - } - if _, ok := t.bt.torrents[t.hash]; ok { delete(t.bt.torrents, t.hash) } diff --git a/src/server/torr/Reader.go b/src/server/torr/reader/Reader.go similarity index 63% rename from src/server/torr/Reader.go rename to src/server/torr/reader/Reader.go index 98063a7..3833eef 100644 --- a/src/server/torr/Reader.go +++ b/src/server/torr/reader/Reader.go @@ -1,27 +1,21 @@ -package torr +package reader import ( - "io" - "github.com/anacrolix/torrent" - "server/torr/storage/memcacheV2" + "io" ) type Reader struct { torrent.Reader - - offset int64 - - file *torrent.File - cache *memcacheV2.Cache + offset int64 + readahead int64 + file *torrent.File } -func NewReader(file *torrent.File, cache *memcacheV2.Cache) *Reader { +func NewReader(file *torrent.File) *Reader { r := new(Reader) r.file = file r.Reader = file.NewReader() - r.Reader.SetReadahead(0) - r.cache = cache return r } @@ -36,13 +30,24 @@ func (r *Reader) Seek(offset int64, whence int) (n int64, err error) { } n, err = r.Reader.Seek(offset, whence) r.offset = n - r.cache.SetPos(int(n)) return } func (r *Reader) Read(p []byte) (n int, err error) { n, err = r.Reader.Read(p) r.offset += int64(n) - r.cache.SetPos(int(n)) return } + +func (r *Reader) SetReadahead(length int64) { + r.Reader.SetReadahead(length) + r.readahead = length +} + +func (r *Reader) Offset() int64 { + return r.offset +} + +func (r *Reader) Readahead() int64 { + return r.readahead +} diff --git a/src/server/torr/storage/Storage.go b/src/server/torr/storage/Storage.go index 7a174b4..e80cc3a 100644 --- a/src/server/torr/storage/Storage.go +++ b/src/server/torr/storage/Storage.go @@ -11,6 +11,5 @@ type Storage interface { storage.ClientImpl GetStats(hash metainfo.Hash) *state.CacheState - GetCache(hash metainfo.Hash) interface{} CloseHash(hash metainfo.Hash) } diff --git a/src/server/torr/storage/memcache/Buffer.go b/src/server/torr/storage/memcache/Buffer.go index b636ba9..a49b91b 100644 --- a/src/server/torr/storage/memcache/Buffer.go +++ b/src/server/torr/storage/memcache/Buffer.go @@ -1,9 +1,9 @@ package memcache import ( - "fmt" "sync" + "log" "server/utils" ) @@ -14,42 +14,62 @@ type buffer struct { } type BufferPool struct { - buffs map[int]*buffer - bufferLength int64 - bufferCount int - mu sync.Mutex + buffs map[int]*buffer + frees int + size int64 + mu sync.Mutex } -func NewBufferPool(bufferLength int64) *BufferPool { +func NewBufferPool(bufferLength int64, capacity int64) *BufferPool { bp := new(BufferPool) - bp.bufferLength = bufferLength - bp.buffs = make(map[int]*buffer) + buffsSize := int(capacity/bufferLength) + 3 + bp.frees = buffsSize + bp.size = bufferLength return bp } +func (b *BufferPool) mkBuffs() { + if b.buffs != nil { + return + } + b.buffs = make(map[int]*buffer, b.frees) + log.Println("Create", b.frees, "buffers") + for i := 0; i < b.frees; i++ { + buf := buffer{ + -1, + make([]byte, b.size, b.size), + false, + } + b.buffs[i] = &buf + } +} + func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { b.mu.Lock() defer b.mu.Unlock() + b.mkBuffs() for id, buf := range b.buffs { if !buf.used { - fmt.Println("Get buffer:", id) buf.used = true buf.pieceId = p.Id buff = buf.buf index = id + b.frees-- + //fmt.Printf("Get buffer: %v %v %v %p\n", id, p.Id, b.frees, buff) return } } + log.Println("Create slow buffer") - fmt.Println("Create buffer:", b.bufferCount) - buf := new(buffer) - buf.buf = make([]byte, b.bufferLength) - buf.used = true - buf.pieceId = p.Id - b.buffs[b.bufferCount] = buf - index = b.bufferCount + buf := buffer{ + p.Id, + make([]byte, b.size, b.size), + true, + } + b.frees++ + b.buffs[b.frees] = &buf buff = buf.buf - b.bufferCount++ + index = b.frees return } @@ -60,10 +80,11 @@ func (b *BufferPool) ReleaseBuffer(index int) { } b.mu.Lock() defer b.mu.Unlock() + b.mkBuffs() if buff, ok := b.buffs[index]; ok { - fmt.Println("Release buffer:", index) buff.used = false buff.pieceId = -1 + b.frees++ } else { utils.FreeOSMem() } @@ -72,6 +93,9 @@ func (b *BufferPool) ReleaseBuffer(index int) { func (b *BufferPool) Used() map[int]struct{} { b.mu.Lock() defer b.mu.Unlock() + if len(b.buffs) == 0 { + b.mkBuffs() + } used := make(map[int]struct{}) for _, b := range b.buffs { if b.used { @@ -82,13 +106,5 @@ func (b *BufferPool) Used() map[int]struct{} { } func (b *BufferPool) Len() int { - b.mu.Lock() - defer b.mu.Unlock() - count := 0 - for _, b := range b.buffs { - if b.used { - count++ - } - } - return count + return b.frees } diff --git a/src/server/torr/storage/memcache/Cache.go b/src/server/torr/storage/memcache/Cache.go index db7be18..138331d 100644 --- a/src/server/torr/storage/memcache/Cache.go +++ b/src/server/torr/storage/memcache/Cache.go @@ -1,8 +1,6 @@ package memcache import ( - "fmt" - "sort" "sync" "server/torr/storage/state" @@ -10,11 +8,15 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" + "log" + "server/torr/reader" ) type Cache struct { storage.TorrentImpl + s *Storage + capacity int64 filled int64 hash metainfo.Hash @@ -25,37 +27,39 @@ type Cache struct { muPiece sync.Mutex muRemove sync.Mutex + muReader sync.Mutex isRemove bool pieces map[int]*Piece bufferPull *BufferPool - prcLoaded int - position int + readers map[*reader.Reader]struct{} } -func NewCache(capacity int64) *Cache { +func NewCache(capacity int64, storage *Storage) *Cache { ret := &Cache{ capacity: capacity, filled: 0, pieces: make(map[int]*Piece), + readers: make(map[*reader.Reader]struct{}), + s: storage, } return ret } func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { - fmt.Println("Create cache for:", info.Name) + log.Println("Create cache for:", info.Name) //Min capacity of 2 pieces length - cap := info.PieceLength * 2 - if c.capacity < cap { - c.capacity = cap + caps := info.PieceLength * 2 + if c.capacity < caps { + c.capacity = caps } c.pieceLength = info.PieceLength c.pieceCount = info.NumPieces() c.piecesBuff = int(c.capacity / c.pieceLength) c.hash = hash - c.bufferPull = NewBufferPool(c.pieceLength) + c.bufferPull = NewBufferPool(c.pieceLength, c.capacity) for i := 0; i < c.pieceCount; i++ { c.pieces[i] = &Piece{ @@ -69,7 +73,10 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { c.muPiece.Lock() - defer c.muPiece.Unlock() + defer func() { + c.muPiece.Unlock() + go utils.FreeOSMemGC(c.capacity) + }() if val, ok := c.pieces[m.Index()]; ok { return val } @@ -78,10 +85,14 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Close() error { c.isRemove = false - fmt.Println("Close cache for:", c.hash) + log.Println("Close cache for:", c.hash) + if _, ok := c.s.caches[c.hash]; ok { + delete(c.s.caches, c.hash) + } c.pieces = nil c.bufferPull = nil - utils.FreeOSMemGC() + c.readers = nil + utils.FreeOSMemGC(0) return nil } @@ -109,11 +120,6 @@ func (c *Cache) GetState() state.CacheState { return cState } -func (c *Cache) setPos(pos int) { - c.position = (c.position + pos) / 2 - //fmt.Println("Read:", c.position) -} - func (c *Cache) cleanPieces() { if c.isRemove { return @@ -127,70 +133,74 @@ func (c *Cache) cleanPieces() { defer func() { c.isRemove = false }() c.muRemove.Unlock() - remPieces := c.getRemPieces() - if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) { - remCount := int((c.filled - c.capacity) / c.pieceLength) - if remCount < 1 { - remCount = 1 - } - if remCount > len(remPieces) { - remCount = len(remPieces) - } + bufPieces := c.getBufferedPieces() - remPieces = remPieces[:remCount] - - for _, p := range remPieces { - c.removePiece(p) + if len(bufPieces) > 0 && c.filled >= c.capacity { + c.muReader.Lock() + for reader := range c.readers { + beg, end := c.getReaderPieces(reader) + for id := range bufPieces { + if id >= beg && id <= end { + delete(bufPieces, id) + } + } + } + c.muReader.Unlock() + if len(bufPieces) > 0 { + for _, p := range bufPieces { + p.Release() + } + bufPieces = nil + go utils.FreeOSMemGC(c.capacity) } } } -func (c *Cache) getRemPieces() []*Piece { - pieces := make([]*Piece, 0) +func (c *Cache) getBufferedPieces() map[int]*Piece { + pieces := make(map[int]*Piece) fill := int64(0) - loading := 0 used := c.bufferPull.Used() - - fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength) - low := c.position - fpices + 1 - high := c.position + c.piecesBuff - fpices + 3 - for u := range used { - v := c.pieces[u] - if v.Size > 0 { - if v.Id > 0 && (v.Id < low || v.Id > high) { - pieces = append(pieces, v) - } - fill += v.Size - if !v.complete { - loading++ + piece := c.pieces[u] + if piece.Size > 0 { + if piece.Id > 0 { + pieces[piece.Id] = piece + //pieces = append(pieces, piece) } + fill += piece.Size } } c.filled = fill - sort.Slice(pieces, func(i, j int) bool { - return pieces[i].accessed < pieces[j].accessed - }) - c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) return pieces } func (c *Cache) removePiece(piece *Piece) { - c.muPiece.Lock() - defer c.muPiece.Unlock() piece.Release() + return +} - //st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash) - if c.prcLoaded >= 95 { - //fmt.Println("Clean memory GC:", st) - utils.FreeOSMemGC() - } else { - //fmt.Println("Clean memory:", st) - utils.FreeOSMem() +func (c *Cache) AddReader(r *reader.Reader) { + c.muReader.Lock() + defer c.muReader.Unlock() + c.readers[r] = struct{}{} +} + +func (c *Cache) RemReader(r *reader.Reader) { + c.muReader.Lock() + defer c.muReader.Unlock() + delete(c.readers, r) +} + +func (c *Cache) ReadersLen() int { + if c == nil || c.readers == nil { + return 0 } + return len(c.readers) } -func prc(val, of int) int { - return int(float64(val) * 100.0 / float64(of)) +func (c *Cache) getReaderPieces(reader *reader.Reader) (begin, end int) { + end = int((reader.Offset() + reader.Readahead()) / c.pieceLength) + begin = int((reader.Offset() - c.capacity + reader.Readahead()) / c.pieceLength) + return } diff --git a/src/server/torr/storage/memcache/Piece.go b/src/server/torr/storage/memcache/Piece.go index a03c67a..f21f75b 100644 --- a/src/server/torr/storage/memcache/Piece.go +++ b/src/server/torr/storage/memcache/Piece.go @@ -42,13 +42,14 @@ func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { } n = copy(p.buffer[off:], b[:]) p.Size += int64(n) - p.accessed = time.Now().Unix() + 2000 + p.accessed = time.Now().Unix() return } func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { p.mu.RLock() defer p.mu.RUnlock() + size := len(b) if size+int(off) > len(p.buffer) { size = len(p.buffer) - int(off) @@ -60,19 +61,13 @@ func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { return 0, io.ErrUnexpectedEOF } n = copy(b, p.buffer[int(off) : int(off)+size][:]) - + p.accessed = time.Now().Unix() if int(off)+size >= len(p.buffer) { p.readed = true } if int64(len(b))+off >= p.Size { go p.cache.cleanPieces() } - - if p.complete { - p.accessed = time.Now().Unix() - p.cache.setPos(p.Id) - } - return n, nil } @@ -86,13 +81,12 @@ func (p *Piece) MarkComplete() error { func (p *Piece) MarkNotComplete() error { p.complete = false - p.accessed = 0 return nil } func (p *Piece) Completion() storage.Completion { return storage.Completion{ - Complete: p.complete, + Complete: p.complete && len(p.buffer) > 0, Ok: true, } } diff --git a/src/server/torr/storage/memcache/Storage.go b/src/server/torr/storage/memcache/Storage.go index aea901a..83a9064 100644 --- a/src/server/torr/storage/memcache/Storage.go +++ b/src/server/torr/storage/memcache/Storage.go @@ -3,32 +3,31 @@ package memcache import ( "sync" - "server/torr/storage" "server/torr/storage/state" "github.com/anacrolix/torrent/metainfo" - storage2 "github.com/anacrolix/torrent/storage" + "github.com/anacrolix/torrent/storage" ) type Storage struct { - storage.Storage + storage.TorrentImpl caches map[metainfo.Hash]*Cache capacity int64 mu sync.Mutex } -func NewStorage(capacity int64) storage.Storage { +func NewStorage(capacity int64) *Storage { stor := new(Storage) stor.capacity = capacity stor.caches = make(map[metainfo.Hash]*Cache) return stor } -func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { +func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { s.mu.Lock() defer s.mu.Unlock() - ch := NewCache(s.capacity) + ch := NewCache(s.capacity, s) ch.Init(info, infoHash) s.caches[infoHash] = ch return ch, nil @@ -64,3 +63,14 @@ func (s *Storage) Close() error { } return nil } + +func (s *Storage) GetCache(hash metainfo.Hash) *Cache { + s.mu.Lock() + defer s.mu.Unlock() + for _, c := range s.caches { + if c.hash == hash { + return c + } + } + return nil +} diff --git a/src/server/torr/storage/memcacheV2/Buffer.go b/src/server/torr/storage/memcacheV2/Buffer.go deleted file mode 100644 index 4c60d2d..0000000 --- a/src/server/torr/storage/memcacheV2/Buffer.go +++ /dev/null @@ -1,94 +0,0 @@ -package memcacheV2 - -import ( - "fmt" - "sync" - - "server/utils" -) - -type buffer struct { - pieceId int - buf []byte - used bool -} - -type BufferPool struct { - buffs map[int]*buffer - bufferLength int64 - bufferCount int - mu sync.Mutex -} - -func NewBufferPool(bufferLength int64) *BufferPool { - bp := new(BufferPool) - bp.bufferLength = bufferLength - bp.buffs = make(map[int]*buffer) - return bp -} - -func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { - b.mu.Lock() - defer b.mu.Unlock() - for id, buf := range b.buffs { - if !buf.used { - fmt.Println("Get buffer:", id) - buf.used = true - buf.pieceId = p.Id - buff = buf.buf - index = id - return - } - } - - fmt.Println("Create buffer:", b.bufferCount) - buf := new(buffer) - buf.buf = make([]byte, b.bufferLength) - buf.used = true - buf.pieceId = p.Id - b.buffs[b.bufferCount] = buf - index = b.bufferCount - buff = buf.buf - b.bufferCount++ - return -} - -func (b *BufferPool) ReleaseBuffer(index int) { - if index == -1 { - utils.FreeOSMem() - return - } - b.mu.Lock() - defer b.mu.Unlock() - if buff, ok := b.buffs[index]; ok { - fmt.Println("Release buffer:", index) - buff.used = false - buff.pieceId = -1 - } else { - utils.FreeOSMem() - } -} - -func (b *BufferPool) Used() map[int]struct{} { - b.mu.Lock() - defer b.mu.Unlock() - used := make(map[int]struct{}) - for _, b := range b.buffs { - if b.used { - used[b.pieceId] = struct{}{} - } - } - return used -} - -func (b *BufferPool) Len() int { - b.mu.Lock() - defer b.mu.Unlock() - count := 0 - for _, b := range b.buffs { - if b.used { - count++ - } - } - return count -} diff --git a/src/server/torr/storage/memcacheV2/Cache.go b/src/server/torr/storage/memcacheV2/Cache.go deleted file mode 100644 index 2f63a70..0000000 --- a/src/server/torr/storage/memcacheV2/Cache.go +++ /dev/null @@ -1,197 +0,0 @@ -package memcacheV2 - -import ( - "fmt" - "sort" - "sync" - - "server/torr/storage/state" - "server/utils" - - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" -) - -type Cache struct { - storage.TorrentImpl - - capacity int64 - filled int64 - hash metainfo.Hash - - pieceLength int64 - pieceCount int - piecesBuff int - - muPiece sync.Mutex - muRemove sync.Mutex - isRemove bool - - pieces map[int]*Piece - bufferPull *BufferPool - - prcLoaded int - position int -} - -func NewCache(capacity int64) *Cache { - ret := &Cache{ - capacity: capacity, - filled: 0, - pieces: make(map[int]*Piece), - } - - return ret -} - -func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { - fmt.Println("Create cache for:", info.Name) - //Min capacity of 2 pieces length - cap := info.PieceLength * 2 - if c.capacity < cap { - c.capacity = cap - } - c.pieceLength = info.PieceLength - c.pieceCount = info.NumPieces() - c.piecesBuff = int(c.capacity / c.pieceLength) - c.hash = hash - c.bufferPull = NewBufferPool(c.pieceLength) - - for i := 0; i < c.pieceCount; i++ { - c.pieces[i] = &Piece{ - Id: i, - Length: info.Piece(i).Length(), - Hash: info.Piece(i).Hash().HexString(), - cache: c, - } - } -} - -func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { - c.muPiece.Lock() - defer c.muPiece.Unlock() - if val, ok := c.pieces[m.Index()]; ok { - return val - } - return nil -} - -func (c *Cache) Close() error { - c.isRemove = false - fmt.Println("Close cache for:", c.hash) - c.pieces = nil - c.bufferPull = nil - utils.FreeOSMemGC() - return nil -} - -func (c *Cache) GetState() state.CacheState { - cState := state.CacheState{} - cState.Capacity = c.capacity - cState.PiecesLength = c.pieceLength - cState.PiecesCount = c.pieceCount - cState.Hash = c.hash.HexString() - - stats := make(map[int]state.ItemState, 0) - c.muPiece.Lock() - var fill int64 = 0 - for _, value := range c.pieces { - stat := value.Stat() - if stat.BufferSize > 0 { - fill += stat.BufferSize - stats[stat.Id] = stat - } - } - c.filled = fill - c.muPiece.Unlock() - cState.Filled = c.filled - cState.Pieces = stats - return cState -} - -func (c *Cache) SetPos(pos int) { - //c.position = (c.position + pos) / 2 - c.position = pos - //fmt.Println("Read:", c.position) -} - -func (c *Cache) cleanPieces() { - if c.isRemove { - return - } - c.muRemove.Lock() - if c.isRemove { - c.muRemove.Unlock() - return - } - c.isRemove = true - defer func() { c.isRemove = false }() - c.muRemove.Unlock() - - remPieces := c.getRemPieces() - if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) { - remCount := int((c.filled - c.capacity) / c.pieceLength) - if remCount < 1 { - remCount = 1 - } - if remCount > len(remPieces) { - remCount = len(remPieces) - } - - remPieces = remPieces[:remCount] - - for _, p := range remPieces { - c.removePiece(p) - } - } -} - -func (c *Cache) getRemPieces() []*Piece { - pieces := make([]*Piece, 0) - fill := int64(0) - loading := 0 - used := c.bufferPull.Used() - - fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength) - low := c.position - fpices + 1 - high := c.position + c.piecesBuff - fpices + 3 - - for u := range used { - v := c.pieces[u] - if v.Size > 0 { - if v.Id > 0 && (v.Id < low || v.Id > high) { - pieces = append(pieces, v) - } - fill += v.Size - if !v.complete { - loading++ - } - } - } - c.filled = fill - sort.Slice(pieces, func(i, j int) bool { - return pieces[i].accessed < pieces[j].accessed - }) - - c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) - return pieces -} - -func (c *Cache) removePiece(piece *Piece) { - c.muPiece.Lock() - defer c.muPiece.Unlock() - piece.Release() - - //st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash) - if c.prcLoaded >= 95 { - //fmt.Println("Clean memory GC:", st) - utils.FreeOSMemGC() - } else { - //fmt.Println("Clean memory:", st) - utils.FreeOSMem() - } -} - -func prc(val, of int) int { - return int(float64(val) * 100.0 / float64(of)) -} diff --git a/src/server/torr/storage/memcacheV2/Piece.go b/src/server/torr/storage/memcacheV2/Piece.go deleted file mode 100644 index 31cba64..0000000 --- a/src/server/torr/storage/memcacheV2/Piece.go +++ /dev/null @@ -1,118 +0,0 @@ -package memcacheV2 - -import ( - "errors" - "io" - "sync" - "time" - - "server/torr/storage/state" - - "github.com/anacrolix/torrent/storage" -) - -type Piece struct { - storage.PieceImpl - - Id int - Hash string - Length int64 - Size int64 - - complete bool - accessed int64 - buffer []byte - bufIndex int - - mu sync.RWMutex - cache *Cache -} - -func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { - p.mu.Lock() - defer p.mu.Unlock() - - if p.buffer == nil { - go p.cache.cleanPieces() - p.buffer, p.bufIndex = p.cache.bufferPull.GetBuffer(p) - if p.buffer == nil { - return 0, errors.New("Can't get buffer write") - } - } - n = copy(p.buffer[off:], b[:]) - p.Size += int64(n) - p.accessed = time.Now().Unix() + 2000 - return -} - -func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { - p.mu.RLock() - defer p.mu.RUnlock() - size := len(b) - if size+int(off) > len(p.buffer) { - size = len(p.buffer) - int(off) - if size < 0 { - size = 0 - } - } - if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size { - return 0, io.ErrUnexpectedEOF - } - n = copy(b, p.buffer[int(off) : int(off)+size][:]) - - if int64(len(b))+off >= p.Size { - go p.cache.cleanPieces() - time.Sleep(time.Millisecond * 2000) - } - - if p.complete { - p.accessed = time.Now().Unix() - p.cache.SetPos(p.Id) - } - - return n, nil -} - -func (p *Piece) MarkComplete() error { - if len(p.buffer) == 0 { - return errors.New("piece is not complete") - } - p.complete = true - return nil -} - -func (p *Piece) MarkNotComplete() error { - p.complete = false - p.accessed = 0 - return nil -} - -func (p *Piece) Completion() storage.Completion { - return storage.Completion{ - Complete: p.complete, - Ok: true, - } -} - -func (p *Piece) Release() { - p.mu.Lock() - defer p.mu.Unlock() - if p.buffer != nil { - p.buffer = nil - p.cache.bufferPull.ReleaseBuffer(p.bufIndex) - p.bufIndex = -1 - } - p.Size = 0 - p.complete = false -} - -func (p *Piece) Stat() state.ItemState { - itm := state.ItemState{ - Id: p.Id, - Hash: p.Hash, - Accessed: p.accessed, - Completed: p.complete, - BufferSize: p.Size, - } - return itm -} diff --git a/src/server/torr/storage/memcacheV2/Reader.go b/src/server/torr/storage/memcacheV2/Reader.go deleted file mode 100644 index 0798119..0000000 --- a/src/server/torr/storage/memcacheV2/Reader.go +++ /dev/null @@ -1,27 +0,0 @@ -package memcacheV2 - -import "github.com/anacrolix/torrent" - -type Reader struct { - torrent.Reader - - pos int64 -} - -func NewReader(file torrent.File) *Reader { - r := new(Reader) - r.Reader = file.NewReader() - return r -} - -func (r *Reader) Read(p []byte) (n int, err error) { - n, err = r.Read(p) - r.pos += int64(n) - return -} - -func (r *Reader) Seek(offset int64, whence int) (ret int64, err error) { - ret, err = r.Reader.Seek(offset, whence) - r.pos = ret - return -} diff --git a/src/server/torr/storage/memcacheV2/Storage.go b/src/server/torr/storage/memcacheV2/Storage.go deleted file mode 100644 index 5a002db..0000000 --- a/src/server/torr/storage/memcacheV2/Storage.go +++ /dev/null @@ -1,75 +0,0 @@ -package memcacheV2 - -import ( - "sync" - - "server/torr/storage" - "server/torr/storage/state" - - "github.com/anacrolix/torrent/metainfo" - storage2 "github.com/anacrolix/torrent/storage" -) - -type Storage struct { - storage.Storage - - caches map[metainfo.Hash]*Cache - capacity int64 - mu sync.Mutex -} - -func NewStorage(capacity int64) storage.Storage { - stor := new(Storage) - stor.capacity = capacity - stor.caches = make(map[metainfo.Hash]*Cache) - return stor -} - -func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { - s.mu.Lock() - defer s.mu.Unlock() - ch := NewCache(s.capacity) - ch.Init(info, infoHash) - s.caches[infoHash] = ch - return ch, nil -} - -func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState { - s.mu.Lock() - defer s.mu.Unlock() - if c, ok := s.caches[hash]; ok { - st := c.GetState() - return &st - } - return nil -} - -func (s *Storage) GetCache(hash metainfo.Hash) interface{} { - s.mu.Lock() - defer s.mu.Unlock() - if c, ok := s.caches[hash]; ok { - return c - } - return nil -} - -func (s *Storage) CloseHash(hash metainfo.Hash) { - if s.caches == nil { - return - } - s.mu.Lock() - defer s.mu.Unlock() - if ch, ok := s.caches[hash]; ok { - ch.Close() - delete(s.caches, hash) - } -} - -func (s *Storage) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - for _, ch := range s.caches { - ch.Close() - } - return nil -}