scylla_cql/frame/response/
event.rs1use crate::frame::frame_errors::{
2 ClusterChangeEventParseError, CqlEventParseError, SchemaChangeEventParseError,
3};
4use crate::frame::server_event_type::EventType;
5use crate::frame::types;
6use std::net::SocketAddr;
7
8#[derive(Debug)]
9pub enum Event {
10 TopologyChange(TopologyChangeEvent),
11 StatusChange(StatusChangeEvent),
12 SchemaChange(SchemaChangeEvent),
13}
14
15#[derive(Debug)]
16pub enum TopologyChangeEvent {
17 NewNode(SocketAddr),
18 RemovedNode(SocketAddr),
19}
20
21#[derive(Debug)]
22pub enum StatusChangeEvent {
23 Up(SocketAddr),
24 Down(SocketAddr),
25}
26
27#[derive(Debug)]
28pub enum SchemaChangeEvent {
29 KeyspaceChange {
30 change_type: SchemaChangeType,
31 keyspace_name: String,
32 },
33 TableChange {
34 change_type: SchemaChangeType,
35 keyspace_name: String,
36 object_name: String,
37 },
38 TypeChange {
39 change_type: SchemaChangeType,
40 keyspace_name: String,
41 type_name: String,
42 },
43 FunctionChange {
44 change_type: SchemaChangeType,
45 keyspace_name: String,
46 function_name: String,
47 arguments: Vec<String>,
48 },
49 AggregateChange {
50 change_type: SchemaChangeType,
51 keyspace_name: String,
52 aggregate_name: String,
53 arguments: Vec<String>,
54 },
55}
56
57#[derive(Debug)]
58pub enum SchemaChangeType {
59 Created,
60 Updated,
61 Dropped,
62 Invalid,
63}
64
65impl Event {
66 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, CqlEventParseError> {
67 let event_type: EventType = types::read_string(buf)
68 .map_err(CqlEventParseError::EventTypeParseError)?
69 .parse()?;
70 match event_type {
71 EventType::TopologyChange => Ok(Self::TopologyChange(
72 TopologyChangeEvent::deserialize(buf)
73 .map_err(CqlEventParseError::TopologyChangeEventParseError)?,
74 )),
75 EventType::StatusChange => Ok(Self::StatusChange(
76 StatusChangeEvent::deserialize(buf)
77 .map_err(CqlEventParseError::StatusChangeEventParseError)?,
78 )),
79 EventType::SchemaChange => Ok(Self::SchemaChange(SchemaChangeEvent::deserialize(buf)?)),
80 }
81 }
82}
83
84impl SchemaChangeEvent {
85 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, SchemaChangeEventParseError> {
86 let type_of_change_string =
87 types::read_string(buf).map_err(SchemaChangeEventParseError::TypeOfChangeParseError)?;
88 let type_of_change = match type_of_change_string {
89 "CREATED" => SchemaChangeType::Created,
90 "UPDATED" => SchemaChangeType::Updated,
91 "DROPPED" => SchemaChangeType::Dropped,
92 _ => SchemaChangeType::Invalid,
93 };
94
95 let target =
96 types::read_string(buf).map_err(SchemaChangeEventParseError::TargetTypeParseError)?;
97 let keyspace_affected = types::read_string(buf)
98 .map_err(SchemaChangeEventParseError::AffectedKeyspaceParseError)?
99 .to_string();
100
101 match target {
102 "KEYSPACE" => Ok(Self::KeyspaceChange {
103 change_type: type_of_change,
104 keyspace_name: keyspace_affected,
105 }),
106 "TABLE" => {
107 let table_name = types::read_string(buf)
108 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
109 .to_string();
110 Ok(Self::TableChange {
111 change_type: type_of_change,
112 keyspace_name: keyspace_affected,
113 object_name: table_name,
114 })
115 }
116 "TYPE" => {
117 let changed_type = types::read_string(buf)
118 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
119 .to_string();
120 Ok(Self::TypeChange {
121 change_type: type_of_change,
122 keyspace_name: keyspace_affected,
123 type_name: changed_type,
124 })
125 }
126 "FUNCTION" => {
127 let function = types::read_string(buf)
128 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
129 .to_string();
130 let number_of_arguments = types::read_short(buf).map_err(|err| {
131 SchemaChangeEventParseError::ArgumentCountParseError(err.into())
132 })?;
133
134 let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
135
136 for _ in 0..number_of_arguments {
137 argument_vector.push(
138 types::read_string(buf)
139 .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
140 .to_string(),
141 );
142 }
143
144 Ok(Self::FunctionChange {
145 change_type: type_of_change,
146 keyspace_name: keyspace_affected,
147 function_name: function,
148 arguments: argument_vector,
149 })
150 }
151 "AGGREGATE" => {
152 let name = types::read_string(buf)
153 .map_err(SchemaChangeEventParseError::AffectedTargetNameParseError)?
154 .to_string();
155 let number_of_arguments = types::read_short(buf).map_err(|err| {
156 SchemaChangeEventParseError::ArgumentCountParseError(err.into())
157 })?;
158
159 let mut argument_vector = Vec::with_capacity(number_of_arguments as usize);
160
161 for _ in 0..number_of_arguments {
162 argument_vector.push(
163 types::read_string(buf)
164 .map_err(SchemaChangeEventParseError::FunctionArgumentParseError)?
165 .to_string(),
166 );
167 }
168
169 Ok(Self::AggregateChange {
170 change_type: type_of_change,
171 keyspace_name: keyspace_affected,
172 aggregate_name: name,
173 arguments: argument_vector,
174 })
175 }
176
177 _ => Err(SchemaChangeEventParseError::UnknownTargetOfSchemaChange(
178 target.to_string(),
179 )),
180 }
181 }
182}
183
184impl TopologyChangeEvent {
185 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
186 let type_of_change = types::read_string(buf)
187 .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
188 let addr =
189 types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
190
191 match type_of_change {
192 "NEW_NODE" => Ok(Self::NewNode(addr)),
193 "REMOVED_NODE" => Ok(Self::RemovedNode(addr)),
194 _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
195 type_of_change.to_string(),
196 )),
197 }
198 }
199}
200
201impl StatusChangeEvent {
202 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, ClusterChangeEventParseError> {
203 let type_of_change = types::read_string(buf)
204 .map_err(ClusterChangeEventParseError::TypeOfChangeParseError)?;
205 let addr =
206 types::read_inet(buf).map_err(ClusterChangeEventParseError::NodeAddressParseError)?;
207
208 match type_of_change {
209 "UP" => Ok(Self::Up(addr)),
210 "DOWN" => Ok(Self::Down(addr)),
211 _ => Err(ClusterChangeEventParseError::UnknownTypeOfChange(
212 type_of_change.to_string(),
213 )),
214 }
215 }
216}