1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 import org.vertx.java.deploy.impl.VertxLocator
16 import org.vertx.java.core.buffer
17 import org.vertx.java.core
18 import org.vertx.java.core.json
19 import java.lang
20
21 from core.javautils import map_to_java, map_from_java
22 from core.buffer import Buffer
23
24 __author__ = "Scott Horn"
25 __email__ = "scott@hornmicro.com"
26 __credits__ = "Based entirely on work by Tim Fox http://tfox.org"
29 """This class represents a distributed lightweight event bus which can encompass multiple vert.x instances.
30 It is very useful for otherwise isolated vert.x application instances to communicate with each other.
31
32 Messages sent over the event bus are JSON objects represented as Ruby Hash instances.
33
34 The event bus implements a distributed publish / subscribe network.
35
36 Messages are sent to an address.
37
38 There can be multiple handlers registered against that address.
39 Any handlers with a matching name will receive the message irrespective of what vert.x application instance and
40 what vert.x instance they are located in.
41
42 All messages sent over the bus are transient. On event of failure of all or part of the event bus messages
43 may be lost. Applications should be coded to cope with lost messages, e.g. by resending them, and making application
44 services idempotent.
45
46 The order of messages received by any specific handler from a specific sender will match the order of messages
47 sent from that sender.
48
49 When sending a message, a reply handler can be provided. If so, it will be called when the reply from the receiver
50 has been received.
51
52 When receiving a message in a handler the received object is an instance of EventBus::Message - this contains
53 the actual Hash of the message plus a reply method which can be used to reply to it.
54 """
55 handler_dict = {}
56
57 @staticmethod
59 return org.vertx.java.deploy.impl.VertxLocator.vertx.eventBus()
60
61 @staticmethod
62 - def send(address, message, reply_handler=None):
63 """Send a message on the event bus
64
65 Keyword arguments:
66 @param address: the address to publish to
67 @param message: The message to send
68 @param reply_handler: An optional reply handler.
69 It will be called when the reply from a receiver is received.
70 """
71 EventBus.send_or_pub(True, address, message, reply_handler)
72
73 @staticmethod
75 """Publish a message on the event bus
76
77 Keyword arguments:
78 @param address: the address to publish to
79 @param message: The message to publish
80 """
81 EventBus.send_or_pub(False, address, message)
82
83 @staticmethod
84 - def send_or_pub(send, address, message, reply_handler=None):
97
98
99 @staticmethod
101 """ Register a handler.
102
103 Keyword arguments:
104 @param address: the address to register for. Any messages sent to that address will be
105 received by the handler. A single handler can be registered against many addresses.
106 @param local_only: if True then handler won't be propagated across cluster
107 @param handler: The handler
108
109 @return: id of the handler which can be used in EventBus.unregister_handler
110 """
111 if handler is None:
112 raise RuntimeError("handler is required")
113 internal = InternalHandler(handler)
114 if local_only:
115 EventBus.java_eventbus().registerLocalHandler(address, internal)
116 else:
117 EventBus.java_eventbus().registerHandler(address, internal)
118 id = java.util.UUID.randomUUID().toString()
119 EventBus.handler_dict[id] = address, internal
120 return id
121
122 @staticmethod
124 """
125 Registers a handler against a uniquely generated address, the address is returned as the id
126 received by the handler. A single handler can be registered against many addresses.
127
128 Keyword arguments:
129 @param local_only: If Rrue then handler won't be propagated across cluster
130 @param handler: The handler
131
132 @return: id of the handler which can be used in EventBus.unregister_handler
133 """
134 if handler is None:
135 raise RuntimeError("Handler is required")
136 internal = InternalHandler(handler)
137 id = java.util.UUID.randomUUID().toString()
138 if local_only:
139 EventBus.java_eventbus().registerLocalHandler(id, internal)
140 else:
141 EventBus.java_eventbus().registerHandler(id, internal)
142 EventBus.handler_dict[id] = id, internal
143 return id
144
145 @staticmethod
147 """Unregisters a handler
148
149 Keyword arguments:
150 @param handler_id: the id of the handler to unregister. Returned from EventBus.register_handler
151 """
152 [address, handler] = EventBus.handler_dict.pop(handler_id)
153
154 EventBus.java_eventbus().unregisterHandler(address, handler)
155
156 @staticmethod
158 if isinstance(message, dict):
159 message = org.vertx.java.core.json.JsonObject(map_to_java(message))
160 elif isinstance(message, Buffer):
161 message = message._to_java_buffer()
162 elif isinstance(message, long):
163 message = java.lang.Long(message)
164 elif isinstance(message, float):
165 message = java.lang.Double(message)
166 elif isinstance(message, int):
167 message = java.lang.Integer(message)
168 else:
169 message = map_to_java(message)
170 return message
171
174 self.handler = handler
175
178
180 """Represents a message received from the event bus"""
182 self.java_obj = message
183 if isinstance(message.body, org.vertx.java.core.json.JsonObject):
184 self.body = map_from_java(message.body.toMap())
185 elif isinstance(message.body, org.vertx.java.core.buffer.Buffer):
186 self.body = Buffer(message.body)
187 else:
188 self.body = map_from_java(message.body)
189
190 - def reply(self, reply, handler=None):
191 """Reply to this message. If the message was sent specifying a receipt handler, that handler will be
192 called when it has received a reply. If the message wasn't sent specifying a receipt handler
193 this method does nothing.
194 Replying to a message this way is equivalent to sending a message to an address which is the same as the message id
195 of the original message.
196
197 Keyword arguments:
198 @param reply: message to send as reply
199 @param handler: the reply handler
200 """
201 reply = EventBus.convert_msg(reply)
202 if handler is None:
203 self.java_obj.reply(reply)
204 else:
205 self.java_obj.reply(reply, InternalHandler(handler))
206