room.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package hub
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/libp2p/go-libp2p-core/peer"
  6. pubsub "github.com/libp2p/go-libp2p-pubsub"
  7. )
  8. // RoomBufSize is the number of incoming messages to buffer for each topic.
  9. const RoomBufSize = 128
  10. // Room represents a subscription to a single PubSub topic. Messages
  11. // can be published to the topic with Room.Publish, and received
  12. // messages are pushed to the Messages channel.
  13. type Room struct {
  14. // Messages is a channel of messages received from other peers in the chat room
  15. Messages chan *Message
  16. ctx context.Context
  17. ps *pubsub.PubSub
  18. topic *pubsub.Topic
  19. sub *pubsub.Subscription
  20. roomName string
  21. self peer.ID
  22. nick string
  23. }
  24. // JoinRoom tries to subscribe to the PubSub topic for the room name, returning
  25. // a Room on success.
  26. func JoinRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, roomName string) (*Room, error) {
  27. // join the pubsub topic
  28. topic, err := ps.Join(topicName(roomName))
  29. if err != nil {
  30. return nil, err
  31. }
  32. // and subscribe to it
  33. sub, err := topic.Subscribe()
  34. if err != nil {
  35. return nil, err
  36. }
  37. cr := &Room{
  38. ctx: ctx,
  39. ps: ps,
  40. topic: topic,
  41. sub: sub,
  42. self: selfID,
  43. roomName: roomName,
  44. Messages: make(chan *Message, RoomBufSize),
  45. }
  46. // start reading messages from the subscription in a loop
  47. go cr.readLoop()
  48. return cr, nil
  49. }
  50. // Publish sends a message to the pubsub topic.
  51. func (cr *Room) Publish(message string, o ...func(*Message)) error {
  52. m := &Message{
  53. Message: message,
  54. }
  55. for _, f := range o {
  56. f(m)
  57. }
  58. return cr.PublishMessage(m)
  59. }
  60. // Publish sends a message to the pubsub topic.
  61. func (cr *Room) PublishMessage(m *Message) error {
  62. m.SenderID = cr.self.Pretty()
  63. msgBytes, err := json.Marshal(m)
  64. if err != nil {
  65. return err
  66. }
  67. return cr.topic.Publish(cr.ctx, msgBytes)
  68. }
  69. // readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
  70. func (cr *Room) readLoop() {
  71. for {
  72. msg, err := cr.sub.Next(cr.ctx)
  73. if err != nil {
  74. close(cr.Messages)
  75. return
  76. }
  77. // only forward messages delivered by others
  78. if msg.ReceivedFrom == cr.self {
  79. continue
  80. }
  81. cm := new(Message)
  82. err = json.Unmarshal(msg.Data, cm)
  83. if err != nil {
  84. continue
  85. }
  86. // send valid messages onto the Messages channel
  87. cr.Messages <- cm
  88. }
  89. }
  90. func topicName(roomName string) string {
  91. return "chat-room:" + roomName
  92. }