ledger.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package blockchain
  14. import (
  15. "bytes"
  16. "compress/gzip"
  17. "context"
  18. "encoding/json"
  19. "io"
  20. "io/ioutil"
  21. "log"
  22. "sync"
  23. "time"
  24. "github.com/mudler/edgevpn/pkg/hub"
  25. "github.com/mudler/edgevpn/pkg/utils"
  26. "github.com/pkg/errors"
  27. )
  28. type Ledger struct {
  29. sync.Mutex
  30. blockchain Store
  31. channel io.Writer
  32. }
  33. type Store interface {
  34. Add(Block)
  35. Len() int
  36. Last() Block
  37. }
  38. // New returns a new ledger which writes to the writer
  39. func New(w io.Writer, s Store) *Ledger {
  40. c := &Ledger{channel: w, blockchain: s}
  41. if s.Len() == 0 {
  42. c.newGenesis()
  43. }
  44. return c
  45. }
  46. func (l *Ledger) newGenesis() {
  47. t := time.Now()
  48. genesisBlock := Block{}
  49. genesisBlock = Block{0, t.String(), map[string]map[string]Data{}, genesisBlock.Checksum(), ""}
  50. l.blockchain.Add(genesisBlock)
  51. }
  52. // Syncronizer starts a goroutine which
  53. // writes the blockchain to the periodically
  54. func (l *Ledger) Syncronizer(ctx context.Context, t time.Duration) {
  55. go func() {
  56. t := utils.NewBackoffTicker(utils.BackoffMaxInterval(t))
  57. defer t.Stop()
  58. for {
  59. select {
  60. case <-t.C:
  61. l.Lock()
  62. bytes, err := json.Marshal(l.blockchain.Last())
  63. if err != nil {
  64. log.Println(err)
  65. }
  66. l.channel.Write(compress(bytes).Bytes())
  67. l.Unlock()
  68. case <-ctx.Done():
  69. return
  70. }
  71. }
  72. }()
  73. }
  74. func compress(b []byte) *bytes.Buffer {
  75. var buf bytes.Buffer
  76. gz := gzip.NewWriter(&buf)
  77. gz.Write(b)
  78. gz.Close()
  79. return &buf
  80. }
  81. func deCompress(b []byte) (*bytes.Buffer, error) {
  82. r, err := gzip.NewReader(bytes.NewReader(b))
  83. if err != nil {
  84. return nil, err
  85. }
  86. result, err := ioutil.ReadAll(r)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return bytes.NewBuffer(result), nil
  91. }
  92. // Update the blockchain from a message
  93. func (l *Ledger) Update(f *Ledger, h *hub.Message, c chan *hub.Message) (err error) {
  94. //chain := make(Blockchain, 0)
  95. block := &Block{}
  96. b, err := deCompress([]byte(h.Message))
  97. if err != nil {
  98. err = errors.Wrap(err, "failed decompressing")
  99. return
  100. }
  101. err = json.Unmarshal(b.Bytes(), block)
  102. if err != nil {
  103. err = errors.Wrap(err, "failed unmarshalling blockchain data")
  104. return
  105. }
  106. l.Lock()
  107. if block.Index > l.blockchain.Len() {
  108. l.blockchain.Add(*block)
  109. }
  110. l.Unlock()
  111. return
  112. }
  113. // Announce keeps updating async data to the blockchain.
  114. // Sends a broadcast at the specified interval
  115. // by making sure the async retrieved value is written to the
  116. // blockchain
  117. func (l *Ledger) Announce(ctx context.Context, d time.Duration, async func()) {
  118. go func() {
  119. //t := time.NewTicker(t)
  120. t := utils.NewBackoffTicker(utils.BackoffMaxInterval(d))
  121. defer t.Stop()
  122. for {
  123. select {
  124. case <-t.C:
  125. async()
  126. case <-ctx.Done():
  127. return
  128. }
  129. }
  130. }()
  131. }
  132. // AnnounceDeleteBucket Announce a deletion of a bucket. It stops when the bucket is deleted
  133. // It takes an interval time and a max timeout.
  134. // It is best effort, and the timeout is necessary, or we might flood network with requests
  135. // if more writers are attempting to write to the same resource
  136. func (l *Ledger) AnnounceDeleteBucket(ctx context.Context, interval, timeout time.Duration, bucket string) {
  137. del, cancel := context.WithTimeout(ctx, timeout)
  138. l.Announce(del, interval, func() {
  139. _, exists := l.CurrentData()[bucket]
  140. if exists {
  141. l.DeleteBucket(bucket)
  142. } else {
  143. cancel()
  144. }
  145. })
  146. }
  147. // AnnounceDeleteBucketKey Announce a deletion of a key from a bucket. It stops when the key is deleted
  148. func (l *Ledger) AnnounceDeleteBucketKey(ctx context.Context, interval, timeout time.Duration, bucket, key string) {
  149. del, cancel := context.WithTimeout(ctx, timeout)
  150. l.Announce(del, interval, func() {
  151. _, exists := l.CurrentData()[bucket][key]
  152. if exists {
  153. l.Delete(bucket, key)
  154. } else {
  155. cancel()
  156. }
  157. })
  158. }
  159. // AnnounceUpdate Keeps announcing something into the blockchain if state is differing
  160. func (l *Ledger) AnnounceUpdate(ctx context.Context, interval time.Duration, bucket, key string, value interface{}) {
  161. l.Announce(ctx, interval, func() {
  162. v, exists := l.CurrentData()[bucket][key]
  163. realv, _ := json.Marshal(value)
  164. switch {
  165. case !exists || string(v) != string(realv):
  166. l.Add(bucket, map[string]interface{}{key: value})
  167. }
  168. })
  169. }
  170. // Persist Keeps announcing something into the blockchain until it is reconciled
  171. func (l *Ledger) Persist(ctx context.Context, interval, timeout time.Duration, bucket, key string, value interface{}) {
  172. put, cancel := context.WithTimeout(ctx, timeout)
  173. l.Announce(put, interval, func() {
  174. v, exists := l.CurrentData()[bucket][key]
  175. realv, _ := json.Marshal(value)
  176. switch {
  177. case !exists || string(v) != string(realv):
  178. l.Add(bucket, map[string]interface{}{key: value})
  179. case exists && string(v) == string(realv):
  180. cancel()
  181. }
  182. })
  183. }
  184. // GetKey retrieve the current key from the blockchain
  185. func (l *Ledger) GetKey(b, s string) (value Data, exists bool) {
  186. l.Lock()
  187. defer l.Unlock()
  188. if l.blockchain.Len() > 0 {
  189. last := l.blockchain.Last()
  190. if _, exists = last.Storage[b]; !exists {
  191. return
  192. }
  193. value, exists = last.Storage[b][s]
  194. if exists {
  195. return
  196. }
  197. }
  198. return
  199. }
  200. // Exists returns true if there is one element with a matching value
  201. func (l *Ledger) Exists(b string, f func(Data) bool) (exists bool) {
  202. l.Lock()
  203. defer l.Unlock()
  204. if l.blockchain.Len() > 0 {
  205. for _, bv := range l.blockchain.Last().Storage[b] {
  206. if f(bv) {
  207. exists = true
  208. return
  209. }
  210. }
  211. }
  212. return
  213. }
  214. // CurrentData returns the current ledger data (locking)
  215. func (l *Ledger) CurrentData() map[string]map[string]Data {
  216. l.Lock()
  217. defer l.Unlock()
  218. return buckets(l.blockchain.Last().Storage).copy()
  219. }
  220. // LastBlock returns the last block in the blockchain
  221. func (l *Ledger) LastBlock() Block {
  222. l.Lock()
  223. defer l.Unlock()
  224. return l.blockchain.Last()
  225. }
  226. type bucket map[string]Data
  227. func (b bucket) copy() map[string]Data {
  228. copy := map[string]Data{}
  229. for k, v := range b {
  230. copy[k] = v
  231. }
  232. return copy
  233. }
  234. type buckets map[string]map[string]Data
  235. func (b buckets) copy() map[string]map[string]Data {
  236. copy := map[string]map[string]Data{}
  237. for k, v := range b {
  238. copy[k] = bucket(v).copy()
  239. }
  240. return copy
  241. }
  242. // Add data to the blockchain
  243. func (l *Ledger) Add(b string, s map[string]interface{}) {
  244. l.Lock()
  245. current := buckets(l.blockchain.Last().Storage).copy()
  246. for s, k := range s {
  247. if _, exists := current[b]; !exists {
  248. current[b] = make(map[string]Data)
  249. }
  250. dat, _ := json.Marshal(k)
  251. current[b][s] = Data(string(dat))
  252. }
  253. l.Unlock()
  254. l.writeData(current)
  255. }
  256. // Delete data from the ledger (locking)
  257. func (l *Ledger) Delete(b string, k string) {
  258. l.Lock()
  259. new := make(map[string]map[string]Data)
  260. for bb, kk := range l.blockchain.Last().Storage {
  261. if _, exists := new[bb]; !exists {
  262. new[bb] = make(map[string]Data)
  263. }
  264. // Copy all keys/v except b/k
  265. for kkk, v := range kk {
  266. if !(bb == b && kkk == k) {
  267. new[bb][kkk] = v
  268. }
  269. }
  270. }
  271. l.Unlock()
  272. l.writeData(new)
  273. }
  274. // DeleteBucket deletes a bucket from the ledger (locking)
  275. func (l *Ledger) DeleteBucket(b string) {
  276. l.Lock()
  277. new := make(map[string]map[string]Data)
  278. for bb, kk := range l.blockchain.Last().Storage {
  279. // Copy all except the specified bucket
  280. if bb == b {
  281. continue
  282. }
  283. if _, exists := new[bb]; !exists {
  284. new[bb] = make(map[string]Data)
  285. }
  286. for kkk, v := range kk {
  287. new[bb][kkk] = v
  288. }
  289. }
  290. l.Unlock()
  291. l.writeData(new)
  292. }
  293. // String returns the blockchain as string
  294. func (l *Ledger) String() string {
  295. bytes, _ := json.MarshalIndent(l.blockchain, "", " ")
  296. return string(bytes)
  297. }
  298. // Index returns last known blockchain index
  299. func (l *Ledger) Index() int {
  300. return l.blockchain.Len()
  301. }
  302. func (l *Ledger) writeData(s map[string]map[string]Data) {
  303. newBlock := l.blockchain.Last().NewBlock(s)
  304. if newBlock.IsValid(l.blockchain.Last()) {
  305. l.Lock()
  306. l.blockchain.Add(newBlock)
  307. l.Unlock()
  308. }
  309. bytes, err := json.Marshal(l.blockchain.Last())
  310. if err != nil {
  311. log.Println(err)
  312. }
  313. l.channel.Write(compress(bytes).Bytes())
  314. }