scylla_cql/frame/response/
event.rs1use crate::frame::frame_errors::{
4 ClusterChangeEventParseError, CqlEventParseError, SchemaChangeEventParseError,
5};
6use crate::frame::server_event_type::EventType;
7use crate::frame::types;
8use std::net::SocketAddr;
9
10#[derive(Debug)]
12#[expect(clippy::enum_variant_names)]
15pub enum Event {
16 TopologyChange(TopologyChangeEvent),
18 StatusChange(StatusChangeEvent),
20 SchemaChange(SchemaChangeEvent),
22}
23
24#[derive(Debug)]
26pub enum TopologyChangeEvent {
27 NewNode(SocketAddr),
29 RemovedNode(SocketAddr),
31}
32
33#[derive(Debug)]
35pub enum StatusChangeEvent {
36 Up(SocketAddr),
38 Down(SocketAddr),
40}
41
42#[derive(Debug)]
44#[expect(clippy::enum_variant_names)]
47pub enum SchemaChangeEvent {
48 KeyspaceChange {
50 change_type: SchemaChangeType,
52 keyspace_name: String,
54 },
55 TableChange {
57 change_type: SchemaChangeType,
59 keyspace_name: String,
61 object_name: String,
63 },
64 TypeChange {
66 change_type: SchemaChangeType,
68 keyspace_name: String,
70 type_name: String,
72 },
73 FunctionChange {
75 change_type: SchemaChangeType,
77 keyspace_name: String,
79 function_name: String,
81 arguments: Vec<String>,
83 },
84 AggregateChange {
86 change_type: SchemaChangeType,
88 keyspace_name: String,
90 aggregate_name: String,
92 arguments: Vec<String>,
94 },
95}
96
97#[derive(Debug)]
99pub enum SchemaChangeType {
100 Created,
102
103 Updated,
105
106 Dropped,
108
109 Invalid,
111}
112
113impl Event {
114 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, CqlEventParseError> {
116 let event_type: EventType = types::read_string(buf)
117 .map_err(CqlEventParseError::EventTypeParseError)?
118 .parse()?;
119 match event_type {
120 EventType::TopologyChange => Ok(Self::TopologyChange(
121 TopologyChangeEvent::deserialize(buf)
122 .map_err(CqlEventParseError::TopologyChangeEventParseError)?,
123 )),
124 EventType::StatusChange => Ok(Self::StatusChange(
125 StatusChangeEvent::deserialize(buf)
126 .map_err(CqlEventParseError::StatusChangeEventParseError)?,
127 )),
128 EventType::SchemaChange => Ok(Self::SchemaChange(SchemaChangeEvent::deserialize(buf)?)),
129 }
130 }
131}
132
133impl SchemaChangeEvent {
134 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, SchemaChangeEventParseError> {
136 let type_of_change_string =
137 types::read_string(buf).map_err(SchemaChangeEventParseError::TypeOfChangeParseError)?;
138 let type_of_change = match type_of_change_string {
139 "CREATED" => SchemaChangeType::Created,
140 "UPDATED" => SchemaChangeType::Updated,
141 "DROPPED" => SchemaChangeType::Dropped,
142 _ => SchemaChangeType::Invalid,
143 };
144
145 let target =
146 types::read_string(buf).map_err(SchemaChangeEventParseError::TargetTypeParseError)?;
147 let keyspace_affected = types::read_string(buf)
148 .map_err(SchemaChangeEventParseError::AffectedKeyspaceParseError)?
149 .to_string();
150
151 match target {
152 "KEYSPACE" => Ok(Self::KeyspaceChange {
153 change_type: type_of_change,
154 keyspace_name: keyspace_affected,
155 }),
156 "TABLE" => {
157 let table_name = types::read_string(buf)
158 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
159 .to_string();
160 Ok(Self::TableChange {
161 change_type: type_of_change,
162 keyspace_name: keyspace_affected,
163 object_name: table_name,
164 })
165 }
166 "TYPE" => {
167 let changed_type = types::read_string(buf)
168 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
169 .to_string();
170 Ok(Self::TypeChange {
171 change_type: type_of_change,
172 keyspace_name: keyspace_affected,
173 type_name: changed_type,
174 })
175 }
176 "FUNCTION" => {
177 let function = types::read_string(buf)
178 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
179 .to_string();
180 let number_of_arguments = types::read_short(buf).map_err(|err| {
181 SchemaChangeEventParseError::ArgumentCountParseError(err.into())
182 })?;
183
184 let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
185
186 for _ in 0..number_of_arguments {
187 argument_vector.push(
188 types::read_string(buf)
189 .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
190 .to_string(),
191 );
192 }
193
194 Ok(Self::FunctionChange {
195 change_type: type_of_change,
196 keyspace_name: keyspace_affected,
197 function_name: function,
198 arguments: argument_vector,
199 })
200 }
201 "AGGREGATE" => {
202 let name = types::read_string(buf)
203 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
204 .to_string();
205 let number_of_arguments = types::read_short(buf).map_err(|err| {
206 SchemaChangeEventParseError::ArgumentCountParseError(err.into())
207 })?;
208
209 let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
210
211 for _ in 0..number_of_arguments {
212 argument_vector.push(
213 types::read_string(buf)
214 .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
215 .to_string(),
216 );
217 }
218
219 Ok(Self::AggregateChange {
220 change_type: type_of_change,
221 keyspace_name: keyspace_affected,
222 aggregate_name: name,
223 arguments: argument_vector,
224 })
225 }
226
227 _ => Err(SchemaChangeEventParseError::UnknownTargetOfSchemaChange(
228 target.to_string(),
229 )),
230 }
231 }
232}
233
234impl TopologyChangeEvent {
235 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
237 let type_of_change = types::read_string(buf)
238 .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
239 let addr =
240 types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
241
242 match type_of_change {
243 "NEW_NODE" => Ok(Self::NewNode(addr)),
244 "REMOVED_NODE" => Ok(Self::RemovedNode(addr)),
245 _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
246 type_of_change.to_string(),
247 )),
248 }
249 }
250}
251
252impl StatusChangeEvent {
253 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
255 let type_of_change = types::read_string(buf)
256 .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
257 let addr =
258 types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
259
260 match type_of_change {
261 "UP" => Ok(Self::Up(addr)),
262 "DOWN" => Ok(Self::Down(addr)),
263 _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
264 type_of_change.to_string(),
265 )),
266 }
267 }
268}