1+ # Copyright 2022 The Regents of the University of California (Regents)
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at
6+ #
7+ # http://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ # See the License for the specific language governing permissions and
13+ # limitations under the License.
14+ #
15+ # Copyright ©2022. The Regents of the University of California (Regents).
16+ # All Rights Reserved. Permission to use, copy, modify, and distribute this
17+ # software and its documentation for educational, research, and not-for-profit
18+ # purposes, without fee and without a signed licensing agreement, is hereby
19+ # granted, provided that the above copyright notice, this paragraph and the
20+ # following two paragraphs appear in all copies, modifications, and
21+ # distributions. Contact The Office of Technology Licensing, UC Berkeley, 2150
22+ # Shattuck Avenue, Suite 510, Berkeley, CA 94720-1620, (510) 643-7201,
23+ # [email protected] , http://ipira.berkeley.edu/industry-info for commercial 24+ # licensing opportunities. IN NO EVENT SHALL REGENTS BE LIABLE TO ANY PARTY
25+ # FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES,
26+ # INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
27+ # DOCUMENTATION, EVEN IF REGENTS HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
28+ # DAMAGE. REGENTS SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
29+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
30+ # PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION, IF ANY,
31+ # PROVIDED HEREUNDER IS PROVIDED "AS IS". REGENTS HAS NO OBLIGATION TO PROVIDE
32+ # MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
33+
34+
35+ from builtins import str
36+ from builtins import range
37+ from io import open
38+ import os
39+
40+
41+ class WGConfig ():
42+ """A class for parsing and writing Wireguard configuration files"""
43+ SECTION_FIRSTLINE = '_index_firstline'
44+ SECTION_LASTLINE = '_index_lastline'
45+ SECTION_RAW = '_rawdata'
46+ _interface = None # interface attributes
47+ _peers = None # peer data
48+
49+ def __init__ (self , file , keyattr = 'PublicKey' ):
50+ """Object initialization"""
51+ self .filename = self .file2filename (file )
52+ self .keyattr = keyattr
53+ self .lines = []
54+ self .initialize_file ()
55+
56+ @staticmethod
57+ def file2filename (file ):
58+ """Handle special filenames: 'wg0' and 'wg0.conf' become '/etc/wireguard/wg0.conf' """
59+ if os .path .basename (file ) == file :
60+ if not file .endswith ('.conf' ):
61+ file += '.conf'
62+ file = os .path .join ('/etc/wireguard' , file )
63+ return file
64+
65+ def invalidate_data (self ):
66+ """Clears the data structs"""
67+ self ._interface = None
68+ self ._peers = None
69+
70+ def read_file (self ):
71+ """Reads the Wireguard config file into memory"""
72+ with open (self .filename , 'r' ) as wgfile :
73+ self .lines = [line .rstrip () for line in wgfile .readlines ()]
74+ self .invalidate_data ()
75+
76+ def write_file (self , file = None ):
77+ """Writes a Wireguard config file from memory to file"""
78+ if file is None :
79+ filename = self .filename
80+ else :
81+ filename = self .file2filename (file )
82+ with os .fdopen (os .open (filename , os .O_WRONLY | os .O_CREAT | os .O_TRUNC , 0o640 ), 'w' ) as wgfile :
83+ wgfile .writelines (line + '\n ' for line in self .lines )
84+
85+ @staticmethod
86+ def parse_line (line ):
87+ """Splits a single attr/value line into its parts"""
88+ attr , _ , value = line .partition ('=' )
89+ attr = attr .strip ()
90+ parts = value .partition ('#' )
91+ value = parts [0 ].strip () # strip comments and whitespace
92+ value = str (value ) # this line is for Python2 support only
93+ comment = parts [1 ] + parts [2 ]
94+ if value .isnumeric ():
95+ value = [int (value )]
96+ else :
97+ value = [item .strip () for item in value .split (',' )] # decompose into list based on commata as separator
98+ return attr , value , comment
99+
100+ def parse_lines (self ):
101+ """Parses the lines of a Wireguard config file into memory"""
102+
103+ # There will be two special attributes in the parsed data:
104+ #_index_firstline: Line (zero indexed) of the section header (including any leading lines with comments)
105+ #_index_lastline: Line (zero indexed) of the last attribute line of the section (including any directly following comments)
106+
107+ def close_section (section , section_data ):
108+ section_data = {k : (v if len (v ) > 1 else v [0 ]) for k , v in section_data .items ()}
109+ if section is None : # nothing to close on first section
110+ return
111+ elif section == 'interface' : # close interface section
112+ self ._interface = section_data
113+ else : # close peer section
114+ peername = section_data .get (self .keyattr )
115+ self ._peers [peername ] = section_data
116+ section_data [self .SECTION_RAW ] = self .lines [section_data [self .SECTION_FIRSTLINE ]:(section_data [self .SECTION_LASTLINE ] + 1 )]
117+
118+ self ._interface = dict ()
119+ self ._peers = dict ()
120+ section = None
121+ section_data = dict ()
122+ last_empty_line_in_section = - 1 # virtual empty line before start of file
123+ for i , line in enumerate (self .lines ):
124+ # Ignore leading whitespace and trailing whitespace
125+ line = line .strip ()
126+ # Ignore empty lines and comments
127+ if len (line ) == 0 :
128+ last_empty_line_in_section = i
129+ continue
130+ if line .startswith ('[' ): # section
131+ if last_empty_line_in_section is not None :
132+ section_data [self .SECTION_LASTLINE ] = [last_empty_line_in_section - 1 ]
133+ close_section (section , section_data )
134+ section_data = dict ()
135+ section = line [1 :].partition (']' )[0 ].lower ()
136+ if last_empty_line_in_section is None :
137+ section_data [self .SECTION_FIRSTLINE ] = [i ]
138+ else :
139+ section_data [self .SECTION_FIRSTLINE ] = [last_empty_line_in_section + 1 ]
140+ last_empty_line_in_section = None
141+ section_data [self .SECTION_LASTLINE ] = [i ]
142+ if not section in ['interface' , 'peer' ]:
143+ raise ValueError ('Unsupported section [{0}] in line {1}' .format (section , i ))
144+ elif line .startswith ('#' ):
145+ section_data [self .SECTION_LASTLINE ] = [i ]
146+ else : # regular line
147+ attr , value , _comment = self .parse_line (line )
148+ section_data [attr ] = section_data .get (attr , [])
149+ section_data [attr ].extend (value )
150+ section_data [self .SECTION_LASTLINE ] = [i ]
151+ close_section (section , section_data )
152+
153+ def handle_leading_comment (self , leading_comment ):
154+ """Appends a leading comment for a section"""
155+ if leading_comment is not None :
156+ if leading_comment .strip ()[0 ] != '#' :
157+ raise ValueError ('A comment needs to start with a "#"' )
158+ self .lines .append (leading_comment )
159+
160+ def initialize_file (self , leading_comment = None ):
161+ """Empties the file and adds the interface section header"""
162+ self .lines = list ()
163+ self .handle_leading_comment (leading_comment ) # add leading comment if needed
164+ self .lines .append ('[Interface]' )
165+ self .invalidate_data ()
166+
167+ def add_peer (self , key , leading_comment = None ):
168+ """Adds a new peer with the given (public) key"""
169+ if key in self .peers :
170+ raise KeyError ('Peer to be added already exists' )
171+ self .lines .append ('' ) # append an empty line for separation
172+ self .handle_leading_comment (leading_comment ) # add leading comment if needed
173+ # Append peer with key attribute
174+ self .lines .append ('[Peer]' )
175+ self .lines .append ('{0} = {1}' .format (self .keyattr , key ))
176+ # Invalidate data cache
177+ self .invalidate_data ()
178+
179+ def del_peer (self , key ):
180+ """Removes the peer with the given (public) key"""
181+ if not key in self .peers :
182+ raise KeyError ('The peer to be deleted does not exist' )
183+ section_firstline = self .peers [key ][self .SECTION_FIRSTLINE ]
184+ section_lastline = self .peers [key ][self .SECTION_LASTLINE ]
185+ # Remove a blank line directly before the peer section
186+ if section_firstline > 0 :
187+ if len (self .lines [section_firstline - 1 ]) == 0 :
188+ section_firstline -= 1
189+ # Only keep needed lines
190+ result = []
191+ if section_firstline > 0 :
192+ result .extend (self .lines [0 :section_firstline ])
193+ result .extend (self .lines [(section_lastline + 1 ):])
194+ self .lines = result
195+ # Invalidate data cache
196+ self .invalidate_data ()
197+
198+ def get_sectioninfo (self , key ):
199+ """Get first and last line of the section identified by the given key ("None" for interface section)"""
200+ if key is None : # interface
201+ section_firstline = self .interface [self .SECTION_FIRSTLINE ]
202+ section_lastline = self .interface [self .SECTION_LASTLINE ]
203+ else : # peer
204+ if not key in self .peers :
205+ raise KeyError ('The specified peer does not exist' )
206+ section_firstline = self .peers [key ][self .SECTION_FIRSTLINE ]
207+ section_lastline = self .peers [key ][self .SECTION_LASTLINE ]
208+ return section_firstline , section_lastline
209+
210+ def add_attr (self , key , attr , value , leading_comment = None , append_as_line = False ):
211+ """Adds an attribute/value pair to the given peer ("None" for adding an interface attribute)"""
212+ section_firstline , section_lastline = self .get_sectioninfo (key )
213+ if leading_comment is not None :
214+ if leading_comment .strip ()[0 ] != '#' :
215+ raise ValueError ('A comment needs to start with a "#"' )
216+ # Look for line with the attribute
217+ line_found = None
218+ for i in range (section_firstline + 1 , section_lastline + 1 ):
219+ line_attr , line_value , line_comment = self .parse_line (self .lines [i ])
220+ if attr == line_attr :
221+ line_found = i
222+ # Add the attribute at the right place
223+ if (line_found is None ) or append_as_line :
224+ line_found = section_lastline if (line_found is None ) else line_found
225+ line_found += 1
226+ self .lines .insert (line_found , '{0} = {1}' .format (attr , value ))
227+ else :
228+ line_attr , line_value , line_comment = self .parse_line (self .lines [line_found ])
229+ line_value .append (value )
230+ if len (line_comment ) > 0 :
231+ line_comment = ' ' + line_comment
232+ line_value = [str (item ) for item in line_value ]
233+ self .lines [line_found ] = line_attr + ' = ' + ', ' .join (line_value ) + line_comment
234+ # Handle leading comments
235+ if leading_comment is not None :
236+ self .lines .insert (line_found , leading_comment )
237+ # Invalidate data cache
238+ self .invalidate_data ()
239+
240+ def del_attr (self , key , attr , value = None , remove_leading_comments = True ):
241+ """Removes an attribute/value pair from the given peer ("None" for adding an interface attribute); set 'value' to 'None' to remove all values"""
242+ section_firstline , section_lastline = self .get_sectioninfo (key )
243+ # Find all lines with matching attribute name and (if requested) value
244+ line_found = []
245+ for i in range (section_firstline + 1 , section_lastline + 1 ):
246+ line_attr , line_value , line_comment = self .parse_line (self .lines [i ])
247+ if attr == line_attr :
248+ if (value is None ) or (value in line_value ):
249+ line_found .append (i )
250+ if len (line_found ) == 0 :
251+ raise ValueError ('The attribute/value to be deleted is not present' )
252+ # Process all relevant lines
253+ for i in reversed (line_found ): # reversed so that non-processed indices stay valid
254+ if value is None :
255+ del (self .lines [i ])
256+ else :
257+ line_attr , line_value , line_comment = self .parse_line (self .lines [i ])
258+ line_value .remove (value )
259+ if len (line_value ) > 0 : # keep remaining values in that line
260+ self .lines [i ] = line_attr + ' = ' + ', ' .join (line_value ) + line_comment
261+ else : # otherwise line is no longer needed
262+ del (self .lines [i ])
263+ # Handle leading comments
264+ if remove_leading_comments :
265+ i = line_found [0 ] - 1
266+ while i > 0 :
267+ if len (self .lines [i ]) and (self .lines [i ][0 ] == '#' ):
268+ del (self .lines [i ])
269+ i -= 1
270+ else :
271+ break
272+ # Invalidate data cache
273+ self .invalidate_data ()
274+
275+ @property
276+ def interface (self ):
277+ """Dictionary with interface attributes"""
278+ if self ._interface is None :
279+ self .parse_lines ()
280+ return self ._interface
281+
282+ @property
283+ def peers (self ):
284+ """Dictionary with peer data"""
285+ if self ._peers is None :
286+ self .parse_lines ()
287+ return self ._peers
0 commit comments