| Class | Jabber::Bytestreams::IBB |
| In: |
lib/xmpp4r/bytestreams/helper/ibb/base.rb
|
| Parent: | Object |
In-Band Bytestreams (JEP-0047) implementation
Don‘t use directly, use IBBInitiator and IBBTarget
In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.
Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.
| NS_IBB | = | 'http://jabber.org/protocol/ibb' |
Create a new bytestream
Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 30
30: def initialize(stream, session_id, my_jid, peer_jid)
31: @stream = stream
32: @session_id = session_id
33: @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid)
34: @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid)
35:
36: @active = false
37: @seq_send = 0
38: @seq_recv = 0
39: @queue = []
40: @queue_lock = Mutex.new
41: @pending = Semaphore.new
42: @sendbuf = ''
43: @sendbuf_lock = Mutex.new
44:
45: @block_size = 4096 # Recommended by JEP0047
46: end
Close the stream
Waits for acknowledge from peer, may throw ErrorException
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 128
128: def close
129: if active?
130: flush
131: deactivate
132:
133: iq = Iq.new(:set, @peer_jid)
134: close = iq.add REXML::Element.new('close')
135: close.add_namespace IBB::NS_IBB
136: close.attributes['sid'] = @session_id
137:
138: @stream.send_with_id(iq) { |answer|
139: answer.type == :result
140: }
141: end
142: end
Empty the send-buffer by sending remaining data
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 72
72: def flush
73: @sendbuf_lock.synchronize {
74: while @sendbuf.size > 0
75: send_data(@sendbuf[0..@block_size-1])
76: @sendbuf = @sendbuf[@block_size..-1].to_s
77: end
78: }
79: end
Receive data
Will wait until the Message with the next sequence number is in the stanza queue.
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 86
86: def read
87: if active?
88: res = nil
89:
90: while res.nil?
91: @queue_lock.synchronize {
92: @queue.each { |item|
93: # Find next data
94: if item.type == :data and item.seq == @seq_recv.to_s
95: res = item
96: break
97: # No data? Find close
98: elsif item.type == :close and res.nil?
99: res = item
100: end
101: }
102:
103: @queue.delete_if { |item| item == res }
104: }
105:
106: # No data? Wait for next to arrive...
107: @pending.wait unless res
108: end
109:
110: if res.type == :data
111: @seq_recv += 1
112: @seq_recv = 0 if @seq_recv > 65535
113: res.data
114: elsif res.type == :close
115: deactivate
116: nil # Closed
117: end
118: else
119: nil
120: end
121: end
Send data
Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.
| buf: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 59
59: def write(buf)
60: @sendbuf_lock.synchronize {
61: @sendbuf += buf
62:
63: while @sendbuf.size >= @block_size
64: send_data(@sendbuf[0..@block_size-1])
65: @sendbuf = @sendbuf[@block_size..-1].to_s
66: end
67: }
68: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 182
182: def activate
183: unless active?
184: @stream.add_message_callback(200, self) { |msg|
185: data = msg.first_element('data')
186: if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id
187: if msg.type == nil
188: @queue_lock.synchronize {
189: @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s)
190: @pending.run
191: }
192: elsif msg.type == :error
193: @queue_lock.synchronize {
194: @queue << IBBQueueItem.new(:close)
195: @pending.run
196: }
197: end
198: true
199: else
200: false
201: end
202: }
203:
204: @stream.add_iq_callback(200, self) { |iq|
205: close = iq.first_element('close')
206: if iq.type == :set and close and close.attributes['sid'] == @session_id
207: answer = iq.answer(false)
208: answer.type = :result
209: @stream.send(answer)
210:
211: @queue_lock.synchronize {
212: @queue << IBBQueueItem.new(:close)
213: @pending.run
214: }
215: true
216: else
217: false
218: end
219: }
220:
221: @active = true
222: end
223: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 225
225: def deactivate
226: if active?
227: @stream.delete_message_callback(self)
228: @stream.delete_iq_callback(self)
229:
230: @active = false
231: end
232: end
Send data directly
| data: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 149
149: def send_data(databuf)
150: if active?
151: msg = Message.new
152: msg.from = @my_jid
153: msg.to = @peer_jid
154:
155: data = msg.add REXML::Element.new('data')
156: data.add_namespace NS_IBB
157: data.attributes['sid'] = @session_id
158: data.attributes['seq'] = @seq_send.to_s
159: data.text = Base64::encode64 databuf
160:
161: # TODO: Implement AMP correctly
162: amp = msg.add REXML::Element.new('amp')
163: amp.add_namespace 'http://jabber.org/protocol/amp'
164: deliver_at = amp.add REXML::Element.new('rule')
165: deliver_at.attributes['condition'] = 'deliver-at'
166: deliver_at.attributes['value'] = 'stored'
167: deliver_at.attributes['action'] = 'error'
168: match_resource = amp.add REXML::Element.new('rule')
169: match_resource.attributes['condition'] = 'match-resource'
170: match_resource.attributes['value'] = 'exact'
171: match_resource.attributes['action'] = 'error'
172:
173: @stream.send(msg)
174:
175: @seq_send += 1
176: @seq_send = 0 if @seq_send > 65535
177: else
178: raise 'Attempt to send data when not activated'
179: end
180: end